1
0
Fork 0
observables
Felix Becker 2017-01-26 21:15:33 +01:00
parent 47b5b6709c
commit f43c41739e
4 changed files with 94 additions and 32 deletions

View File

@ -37,7 +37,8 @@
"webmozart/glob": "^4.1",
"sabre/uri": "^2.0",
"JetBrains/phpstorm-stubs": "dev-master",
"composer/composer": "^1.3"
"composer/composer": "^1.3",
"reactivex/rxphp": "^1.5"
},
"repositories": [
{

View File

@ -122,40 +122,66 @@ class LanguageServer extends AdvancedJsonRpc\Dispatcher
$this->shutdown();
$this->exit();
});
$this->protocolReader->on('message', function (Message $msg) {
coroutine(function () use ($msg) {
// Map from request ID to subscription
$subscriptions = [];
$this->protocolReader->on('message', function (Message $msg) use ($subscriptions) {
// Ignore responses, this is the handler for requests and notifications
if (AdvancedJsonRpc\Response::isResponse($msg->body)) {
return;
}
if ($msg->body->method === '$/cancelRequest') {
if (!isset($subscriptions[$msg->body->id])) {
return;
}
// Express that we are not interested anymore in the observable
$subscriptions[$msg->body->id]->dispose();
return;
}
// The result object that is built through JSON patches
$result = null;
$error = null;
try {
// Invoke the method handler to get a result
$result = yield $this->dispatch($msg->body);
} catch (AdvancedJsonRpc\Error $e) {
// If a ResponseError is thrown, send it back in the Response
$error = $e;
} catch (Throwable $e) {
// If an unexpected error occured, send back an INTERNAL_ERROR error response
$obs = $this->dispatch($msg->body);
} catch (\Throwable $e) {
$obs = Observable::error($e);
}
// Notifications dont need further acting
if (AdvancedJsonRpc\Notification::isNotification($msg->body)) {
return;
}
if (!($obs instanceof ObservableInterface)) {
$obs = Observable::just($obs);
}
$subscriptions[$msg->body->id] = $obs->subscribe(new CallbackObserver(
function (JSONPatch $patch) use (&$result) {
$this->protocolWriter->write(
new Message(
new AdvancedJsonRpc\Notification(
'$/partialResult', ['id' => $msg->body->id, 'patch' => $patch]
)
)
);
// Apply path to result object for BC
$patch->apply($result);
},
function (\Exception $error) use ($msg) {
if (!($error instanceof AdvancedJsonRpc\Error)) {
$error = new AdvancedJsonRpc\Error(
(string)$e,
(string)$error,
AdvancedJsonRpc\ErrorCode::INTERNAL_ERROR,
null,
$e
$error
);
}
// Only send a Response for a Request
// Notifications do not send Responses
if (AdvancedJsonRpc\Request::isRequest($msg->body)) {
if ($error !== null) {
$responseBody = new AdvancedJsonRpc\ErrorResponse($msg->body->id, $error);
} else {
$responseBody = new AdvancedJsonRpc\SuccessResponse($msg->body->id, $result);
// If an unexpected error occured, send back an INTERNAL_ERROR error response
$this->protocolWriter->write(new Message(new AdvancedJsonRpc\ErrorResponse($msg->body->id, $error)));
},
function () use ($msg, &$result) {
// Return the built result object for BC
$this->protocolWriter->write(new Message(new AdvancedJsonRpc\SuccessResponse($msg->body->id, $result)));
}
$this->protocolWriter->write(new Message($responseBody));
}
})->otherwise('\\LanguageServer\\crash');
));
});
$this->protocolWriter = $writer;
$this->client = new LanguageClient($reader, $writer);

View File

@ -62,10 +62,31 @@ class Workspace
* The workspace symbol request is sent from the client to the server to list project-wide symbols matching the query string.
*
* @param string $query
* @return Promise <SymbolInformation[]>
* @return Observable <SymbolInformation[]>
*/
public function symbol(string $query): Promise
public function symbol(string $query): Observable
{
(
$this->index->isStaticComplete()
? observableFromEvent($this->index->once('static-complete', function () {
})
: Observable::empty()
)
return Observable::create(function (ObserverInterface $observer) {
// Wait until indexing for definitions finished
if (!$this->index->isStaticComplete()) {
$this->index->once('static-complete', function () {
});
}
$observer->onNext(42);
$observer->onCompleted();
return new CallbackDisposable(function () {
});
});
return coroutine(function () use ($query) {
// Wait until indexing for definitions finished
if (!$this->index->isStaticComplete()) {

View File

@ -93,6 +93,20 @@ function waitForEvent(EmitterInterface $emitter, string $event): Promise
return $p;
}
/**
* Returns a promise that is fulfilled once the passed event was triggered on the passed EventEmitter
*
* @param EmitterInterface $emitter
* @param string $event
* @return Promise
*/
function observableFromEvent(EmitterInterface $emitter, string $event): Promise
{
Observable::create()
$emitter->once($event, [$p, 'fulfill']);
return $p;
}
/**
* Returns the closest node of a specific type
*