1
0
Fork 0

Async ProtocolStreamWriter (#112)

pull/116/head
Stephan Unverwerth 2016-10-25 23:50:36 +02:00 committed by Felix Becker
parent cd3bf18fe2
commit 6806ba94e0
2 changed files with 68 additions and 16 deletions

View File

@ -4,12 +4,24 @@ declare(strict_types = 1);
namespace LanguageServer; namespace LanguageServer;
use LanguageServer\Protocol\Message; use LanguageServer\Protocol\Message;
use Sabre\Event\{
Loop,
Promise
};
use RuntimeException; use RuntimeException;
class ProtocolStreamWriter implements ProtocolWriter class ProtocolStreamWriter implements ProtocolWriter
{ {
/**
* @var resource $output
*/
private $output; private $output;
/**
* @var array $messages
*/
private $messages = [];
/** /**
* @param resource $output * @param resource $output
*/ */
@ -22,26 +34,58 @@ class ProtocolStreamWriter implements ProtocolWriter
* Sends a Message to the client * Sends a Message to the client
* *
* @param Message $msg * @param Message $msg
* @return void * @return Promise Resolved when the message has been fully written out to the output stream
*/ */
public function write(Message $msg) public function write(Message $msg)
{ {
$data = (string)$msg; // if the message queue is currently empty, register a write handler.
$msgSize = strlen($data); if (empty($this->messages)) {
$totalBytesWritten = 0; Loop\addWriteStream($this->output, function () {
$this->flush();
});
}
while ($totalBytesWritten < $msgSize) { $promise = new Promise();
error_clear_last(); $this->messages[] = [
$bytesWritten = @fwrite($this->output, substr($data, $totalBytesWritten)); 'message' => (string)$msg,
if ($bytesWritten === false) { 'promise' => $promise
$error = error_get_last(); ];
if ($error !== null) { return $promise;
throw new RuntimeException('Could not write message: ' . error_get_last()['message']); }
/**
* Writes pending messages to the output stream.
*
* @return void
*/
private function flush()
{
$keepWriting = true;
while ($keepWriting) {
$message = $this->messages[0]['message'];
$promise = $this->messages[0]['promise'];
$bytesWritten = @fwrite($this->output, $message);
if ($bytesWritten > 0) {
$message = substr($message, $bytesWritten);
}
// Determine if this message was completely sent
if (strlen($message) === 0) {
array_shift($this->messages);
// This was the last message in the queue, remove the write handler.
if (count($this->messages) === 0) {
Loop\removeWriteStream($this->output);
$keepWriting = false;
}
$promise->fulfill();
} else { } else {
throw new RuntimeException('Could not write message'); $this->messages[0]['message'] = $message;
} $keepWriting = false;
} }
$totalBytesWritten += $bytesWritten;
} }
} }
} }

View File

@ -7,6 +7,7 @@ use PHPUnit\Framework\TestCase;
use LanguageServer\ProtocolStreamWriter; use LanguageServer\ProtocolStreamWriter;
use LanguageServer\Protocol\Message; use LanguageServer\Protocol\Message;
use AdvancedJsonRpc\{Request as RequestBody}; use AdvancedJsonRpc\{Request as RequestBody};
use Sabre\Event\Loop;
class ProtocolStreamWriterTest extends TestCase class ProtocolStreamWriterTest extends TestCase
{ {
@ -21,7 +22,14 @@ class ProtocolStreamWriterTest extends TestCase
$msg = new Message(new RequestBody(1, 'aMethod', ['arg' => str_repeat('X', 100000)])); $msg = new Message(new RequestBody(1, 'aMethod', ['arg' => str_repeat('X', 100000)]));
$msgString = (string)$msg; $msgString = (string)$msg;
$writer->write($msg); $promise = $writer->write($msg);
$promise->then(function () {
Loop\stop();
}, function () {
Loop\stop();
});
Loop\run();
fclose($writeHandle); fclose($writeHandle);