Promises for async writer
parent
7bb4c47d31
commit
1d949b9282
|
@ -4,17 +4,23 @@ declare(strict_types = 1);
|
||||||
namespace LanguageServer;
|
namespace LanguageServer;
|
||||||
|
|
||||||
use LanguageServer\Protocol\Message;
|
use LanguageServer\Protocol\Message;
|
||||||
use Sabre\Event\Loop;
|
use Sabre\Event\{
|
||||||
|
Loop,
|
||||||
|
Promise
|
||||||
|
};
|
||||||
use RuntimeException;
|
use RuntimeException;
|
||||||
|
|
||||||
class ProtocolStreamWriter implements ProtocolWriter
|
class ProtocolStreamWriter implements ProtocolWriter
|
||||||
{
|
{
|
||||||
|
/**
|
||||||
|
* @var resource $output
|
||||||
|
*/
|
||||||
private $output;
|
private $output;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @var string $buffer
|
* @var array $messages
|
||||||
*/
|
*/
|
||||||
private $buffer;
|
private $messages = [];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param resource $output
|
* @param resource $output
|
||||||
|
@ -22,21 +28,6 @@ class ProtocolStreamWriter implements ProtocolWriter
|
||||||
public function __construct($output)
|
public function __construct($output)
|
||||||
{
|
{
|
||||||
$this->output = $output;
|
$this->output = $output;
|
||||||
Loop\addWriteStream($this->output, function () {
|
|
||||||
error_clear_last();
|
|
||||||
$bytesWritten = @fwrite($this->output, $this->buffer);
|
|
||||||
if ($bytesWritten === false) {
|
|
||||||
$error = error_get_last();
|
|
||||||
if ($error !== null) {
|
|
||||||
throw new RuntimeException('Could not write message: ' . error_get_last()['message']);
|
|
||||||
} else {
|
|
||||||
throw new RuntimeException('Could not write message');
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if ($bytesWritten > 0) {
|
|
||||||
$this->buffer = substr($this->buffer, $bytesWritten);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -47,6 +38,58 @@ class ProtocolStreamWriter implements ProtocolWriter
|
||||||
*/
|
*/
|
||||||
public function write(Message $msg)
|
public function write(Message $msg)
|
||||||
{
|
{
|
||||||
$this->buffer .= $msg;
|
// if the message queue is currently empty, register a write handler.
|
||||||
|
if (empty($this->messages)) {
|
||||||
|
Loop\addWriteStream($this->output, [$this, 'writeData']);
|
||||||
|
}
|
||||||
|
|
||||||
|
$promise = new Promise();
|
||||||
|
$this->messages[] = [
|
||||||
|
'message' => (string)$msg,
|
||||||
|
'promise' => $promise
|
||||||
|
];
|
||||||
|
return $promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes pending messages to the output stream.
|
||||||
|
* Must be public to be able to be used as a callback.
|
||||||
|
*
|
||||||
|
* @return void
|
||||||
|
*/
|
||||||
|
public function writeData() {
|
||||||
|
$message = $this->messages[0]['message'];
|
||||||
|
$promise = $this->messages[0]['promise'];
|
||||||
|
|
||||||
|
error_clear_last();
|
||||||
|
$bytesWritten = @fwrite($this->output, $message);
|
||||||
|
|
||||||
|
if ($bytesWritten === false) {
|
||||||
|
$error = error_get_last();
|
||||||
|
$promise->reject($error);
|
||||||
|
if ($error !== null) {
|
||||||
|
throw new RuntimeException('Could not write message: ' . error_get_last()['message']);
|
||||||
|
} else {
|
||||||
|
throw new RuntimeException('Could not write message');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if ($bytesWritten > 0) {
|
||||||
|
$message = substr($message, $bytesWritten);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine if this message was completely sent
|
||||||
|
if (strlen($message) === 0) {
|
||||||
|
array_shift($this->messages);
|
||||||
|
|
||||||
|
// This was the last message in the queue, remove the write handler.
|
||||||
|
if (count($this->messages) === 0) {
|
||||||
|
Loop\removeWriteStream($this->output);
|
||||||
|
}
|
||||||
|
|
||||||
|
$promise->fulfill();
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
$this->messages[0]['message'] = $message;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ use PHPUnit\Framework\TestCase;
|
||||||
use LanguageServer\ProtocolStreamWriter;
|
use LanguageServer\ProtocolStreamWriter;
|
||||||
use LanguageServer\Protocol\Message;
|
use LanguageServer\Protocol\Message;
|
||||||
use AdvancedJsonRpc\{Request as RequestBody};
|
use AdvancedJsonRpc\{Request as RequestBody};
|
||||||
|
use Sabre\Event\Loop;
|
||||||
|
|
||||||
class ProtocolStreamWriterTest extends TestCase
|
class ProtocolStreamWriterTest extends TestCase
|
||||||
{
|
{
|
||||||
|
@ -21,7 +22,14 @@ class ProtocolStreamWriterTest extends TestCase
|
||||||
$msg = new Message(new RequestBody(1, 'aMethod', ['arg' => str_repeat('X', 100000)]));
|
$msg = new Message(new RequestBody(1, 'aMethod', ['arg' => str_repeat('X', 100000)]));
|
||||||
$msgString = (string)$msg;
|
$msgString = (string)$msg;
|
||||||
|
|
||||||
$writer->write($msg);
|
$promise = $writer->write($msg);
|
||||||
|
$promise->then(function() {
|
||||||
|
Loop\stop();
|
||||||
|
}, function() {
|
||||||
|
Loop\stop();
|
||||||
|
});
|
||||||
|
|
||||||
|
Loop\run();
|
||||||
|
|
||||||
fclose($writeHandle);
|
fclose($writeHandle);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue