From 6806ba94e0371b7f6ec5a4e7f2cf8b0820dec17d Mon Sep 17 00:00:00 2001 From: Stephan Unverwerth Date: Tue, 25 Oct 2016 23:50:36 +0200 Subject: [PATCH] Async ProtocolStreamWriter (#112) --- src/ProtocolStreamWriter.php | 74 ++++++++++++++++++++++++------ tests/ProtocolStreamWriterTest.php | 10 +++- 2 files changed, 68 insertions(+), 16 deletions(-) diff --git a/src/ProtocolStreamWriter.php b/src/ProtocolStreamWriter.php index 2ac3579..41d7afc 100644 --- a/src/ProtocolStreamWriter.php +++ b/src/ProtocolStreamWriter.php @@ -4,12 +4,24 @@ declare(strict_types = 1); namespace LanguageServer; use LanguageServer\Protocol\Message; +use Sabre\Event\{ + Loop, + Promise +}; use RuntimeException; class ProtocolStreamWriter implements ProtocolWriter { + /** + * @var resource $output + */ private $output; + /** + * @var array $messages + */ + private $messages = []; + /** * @param resource $output */ @@ -22,26 +34,58 @@ class ProtocolStreamWriter implements ProtocolWriter * Sends a Message to the client * * @param Message $msg - * @return void + * @return Promise Resolved when the message has been fully written out to the output stream */ public function write(Message $msg) { - $data = (string)$msg; - $msgSize = strlen($data); - $totalBytesWritten = 0; + // if the message queue is currently empty, register a write handler. + if (empty($this->messages)) { + Loop\addWriteStream($this->output, function () { + $this->flush(); + }); + } - while ($totalBytesWritten < $msgSize) { - error_clear_last(); - $bytesWritten = @fwrite($this->output, substr($data, $totalBytesWritten)); - if ($bytesWritten === false) { - $error = error_get_last(); - if ($error !== null) { - throw new RuntimeException('Could not write message: ' . error_get_last()['message']); - } else { - throw new RuntimeException('Could not write message'); - } + $promise = new Promise(); + $this->messages[] = [ + 'message' => (string)$msg, + 'promise' => $promise + ]; + return $promise; + } + + /** + * Writes pending messages to the output stream. + * + * @return void + */ + private function flush() + { + $keepWriting = true; + while ($keepWriting) { + $message = $this->messages[0]['message']; + $promise = $this->messages[0]['promise']; + + $bytesWritten = @fwrite($this->output, $message); + + if ($bytesWritten > 0) { + $message = substr($message, $bytesWritten); + } + + // Determine if this message was completely sent + if (strlen($message) === 0) { + array_shift($this->messages); + + // This was the last message in the queue, remove the write handler. + if (count($this->messages) === 0) { + Loop\removeWriteStream($this->output); + $keepWriting = false; + } + + $promise->fulfill(); + } else { + $this->messages[0]['message'] = $message; + $keepWriting = false; } - $totalBytesWritten += $bytesWritten; } } } diff --git a/tests/ProtocolStreamWriterTest.php b/tests/ProtocolStreamWriterTest.php index 90ef32d..b481c3b 100644 --- a/tests/ProtocolStreamWriterTest.php +++ b/tests/ProtocolStreamWriterTest.php @@ -7,6 +7,7 @@ use PHPUnit\Framework\TestCase; use LanguageServer\ProtocolStreamWriter; use LanguageServer\Protocol\Message; use AdvancedJsonRpc\{Request as RequestBody}; +use Sabre\Event\Loop; class ProtocolStreamWriterTest extends TestCase { @@ -21,7 +22,14 @@ class ProtocolStreamWriterTest extends TestCase $msg = new Message(new RequestBody(1, 'aMethod', ['arg' => str_repeat('X', 100000)])); $msgString = (string)$msg; - $writer->write($msg); + $promise = $writer->write($msg); + $promise->then(function () { + Loop\stop(); + }, function () { + Loop\stop(); + }); + + Loop\run(); fclose($writeHandle);