1
0
Fork 0
observables
Felix Becker 2017-01-27 18:56:08 +01:00
parent f43c41739e
commit 9c590e38da
5 changed files with 178 additions and 156 deletions

View File

@ -38,7 +38,8 @@
"sabre/uri": "^2.0",
"JetBrains/phpstorm-stubs": "dev-master",
"composer/composer": "^1.3",
"reactivex/rxphp": "^1.5"
"reactivex/rxphp": "^1.5",
"gamringer/php-json-patch": "^1.0"
},
"repositories": [
{

View File

@ -17,16 +17,19 @@ use LanguageServer\Protocol\{
use LanguageServer\FilesFinder\{FilesFinder, ClientFilesFinder, FileSystemFilesFinder};
use LanguageServer\ContentRetriever\{ContentRetriever, ClientContentRetriever, FileSystemContentRetriever};
use LanguageServer\Index\{DependenciesIndex, GlobalIndex, Index, ProjectIndex, StubsIndex};
use AdvancedJsonRpc;
use AdvancedJsonRpc as JsonRpc;
use Sabre\Event\{Loop, Promise};
use function Sabre\Event\coroutine;
use Rx\{Observable, CallbackObserver, ObservableInterface};
use gamringer\JSONPatch\{Patch, Operation};
use gamringer\JSONPointer\Pointer;
use Exception;
use Throwable;
use Webmozart\PathUtil\Path;
use Webmozart\Glob\Glob;
use Sabre\Uri;
class LanguageServer extends AdvancedJsonRpc\Dispatcher
class LanguageServer extends JsonRpc\Dispatcher
{
/**
* Handles textDocument/* method calls
@ -124,22 +127,23 @@ class LanguageServer extends AdvancedJsonRpc\Dispatcher
});
// Map from request ID to subscription
$subscriptions = [];
$this->protocolReader->on('message', function (Message $msg) use ($subscriptions) {
$this->protocolReader->on('message', function (Message $msg) use (&$subscriptions) {
// Ignore responses, this is the handler for requests and notifications
if (AdvancedJsonRpc\Response::isResponse($msg->body)) {
if (JsonRpc\Response::isResponse($msg->body)) {
return;
}
if ($msg->body->method === '$/cancelRequest') {
if (!isset($subscriptions[$msg->body->id])) {
if (!isset($subscriptions[$msg->body->params->id])) {
return;
}
// Express that we are not interested anymore in the observable
$subscriptions[$msg->body->id]->dispose();
$subscriptions[$msg->body->params->id]->dispose();
unset($msg->body->params->id);
return;
}
// The result object that is built through JSON patches
$result = null;
$error = null;
$pointer = new Pointer($result);
try {
// Invoke the method handler to get a result
$obs = $this->dispatch($msg->body);
@ -147,39 +151,35 @@ class LanguageServer extends AdvancedJsonRpc\Dispatcher
$obs = Observable::error($e);
}
// Notifications dont need further acting
if (AdvancedJsonRpc\Notification::isNotification($msg->body)) {
if (JsonRpc\Notification::isNotification($msg->body)) {
return;
}
if (!($obs instanceof ObservableInterface)) {
$obs = Observable::just($obs);
$obs = Observable::just(new JSONPatch('replace', '/', $obs));
}
$subscriptions[$msg->body->id] = $obs->subscribe(new CallbackObserver(
function (JSONPatch $patch) use (&$result) {
$this->protocolWriter->write(
new Message(
new AdvancedJsonRpc\Notification(
'$/partialResult', ['id' => $msg->body->id, 'patch' => $patch]
)
)
);
function (Operation\Appliable $operation) use ($pointer) {
$this->protocolWriter->write(new Message(new JsonRpc\Notification('$/partialResult', [
'id' => $msg->body->id,
'patch' => [$operation]
])));
// Apply path to result object for BC
$patch->apply($result);
$operation->apply($pointer);
},
function (\Exception $error) use ($msg) {
if (!($error instanceof AdvancedJsonRpc\Error)) {
$error = new AdvancedJsonRpc\Error(
(string)$error,
AdvancedJsonRpc\ErrorCode::INTERNAL_ERROR,
null,
$error
);
if (!($error instanceof JsonRpc\Error)) {
$error = new JsonRpc\Error((string)$error, JsonRpc\ErrorCode::INTERNAL_ERROR, null, $error);
}
// If an unexpected error occured, send back an INTERNAL_ERROR error response
$this->protocolWriter->write(new Message(new AdvancedJsonRpc\ErrorResponse($msg->body->id, $error)));
$this->protocolWriter->write(new Message(new JsonRpc\ErrorResponse($msg->body->id, $error)));
},
function () use ($msg, &$result) {
// Return the built result object for BC
$this->protocolWriter->write(new Message(new AdvancedJsonRpc\SuccessResponse($msg->body->id, $result)));
function () use ($msg, &$result, &$subscriptions) {
// Return the complete result object for BC
$this->protocolWriter->write(new Message(new JsonRpc\SuccessResponse($msg->body->id, $result)));
if (isset($subscriptions[$msg->body->id]) {
$subscriptions[$msg->body->id]->dispose();
unset($subscriptions[$msg->body->id]);
}
}
));
});

View File

@ -260,40 +260,55 @@ class TextDocument
*
* @param TextDocumentIdentifier $textDocument The text document
* @param Position $position The position inside the text document
* @return Promise <Location|Location[]>
* @return Observable Will emit JSON Patch operations that eventually result in Location[]
*/
public function definition(TextDocumentIdentifier $textDocument, Position $position): Promise
public function definition(TextDocumentIdentifier $textDocument, Position $position): Observable
{
return coroutine(function () use ($textDocument, $position) {
$document = yield $this->documentLoader->getOrLoad($textDocument->uri);
$node = $document->getNodeAtPosition($position);
if ($node === null) {
return [];
}
// Handle definition nodes
$fqn = DefinitionResolver::getDefinedFqn($node);
while (true) {
if ($fqn) {
$def = $this->index->getDefinition($fqn);
} else {
// Handle reference nodes
$def = $this->definitionResolver->resolveReferenceNodeToDefinition($node);
return $this->documentLoader->getOrLoad($textDocument->uri)
->flatMap(function (PhpDocument $document) {
$node = $document->getNodeAtPosition($position);
if ($node === null) {
return Observable::empty();
}
// If no result was found and we are still indexing, try again after the index was updated
if ($def !== null || $this->index->isComplete()) {
break;
}
yield waitForEvent($this->index, 'definition-added');
}
if (
$def === null
|| $def->symbolInformation === null
|| Uri\parse($def->symbolInformation->location->uri)['scheme'] === 'phpstubs'
) {
return [];
}
return $def->symbolInformation->location;
});
// Handle definition nodes
$fqn = DefinitionResolver::getDefinedFqn($node);
return Observable::merge(
observableFromEvent($this->index, 'definition-added')->throttle(100),
observableFromEvent($this->index, 'complete')->take(1)
)
// Repeat the following logic as long as the index is not complete
->takeWhile(function () {
return !$this->index->isComplete();
})
// Try to find the definition on each event
->map(function () use ($fqn) {
if ($fqn) {
return $this->index->getDefinition($fqn);
} else {
// Handle reference nodes
return $this->definitionResolver->resolveReferenceNodeToDefinition($node);
}
});
})
// Ignore events where definitions were not found
->filter(function (Definition $def) {
return $def === null;
})
// If we found one definition, complete
->take(1)
// Check if we actually have a Definition with location info and Definition is not part of stubs
->filter(function (Definition $def) {
return (
$def->symbolInformation === null
|| Uri\parse($def->symbolInformation->location->uri)['scheme'] === 'phpstubs'
);
})
// Turn the found Definition into an add operation for the location
->map(function (Definition $def) {
return new Operation\Add('/-', $def->symbolInformation->location);
})
// Initialize with empty array
->startWith(new Operation\Replace('/', []));
}
/**

View File

@ -7,6 +7,7 @@ use LanguageServer\{LanguageClient, Project, PhpDocumentLoader};
use LanguageServer\Index\{ProjectIndex, DependenciesIndex, Index};
use LanguageServer\Protocol\{SymbolInformation, SymbolDescriptor, ReferenceInformation, DependencyReference, Location};
use Sabre\Event\Promise;
use Rx\Observable;
use function Sabre\Event\coroutine;
use function LanguageServer\waitForEvent;
@ -62,44 +63,31 @@ class Workspace
* The workspace symbol request is sent from the client to the server to list project-wide symbols matching the query string.
*
* @param string $query
* @return Observable <SymbolInformation[]>
* @return Observable Will yield JSON Patch Operations that eventually result in SymbolInformation[]
*/
public function symbol(string $query): Observable
{
(
$this->index->isStaticComplete()
? observableFromEvent($this->index->once('static-complete', function () {
})
: Observable::empty()
)
return Observable::create(function (ObserverInterface $observer) {
// Wait until indexing for definitions finished
if (!$this->index->isStaticComplete()) {
$this->index->once('static-complete', function () {
});
}
$observer->onNext(42);
$observer->onCompleted();
return new CallbackDisposable(function () {
});
});
return coroutine(function () use ($query) {
// Wait until indexing for definitions finished
if (!$this->index->isStaticComplete()) {
yield waitForEvent($this->index, 'static-complete');
}
$symbols = [];
foreach ($this->index->getDefinitions() as $fqn => $definition) {
if ($query === '' || stripos($fqn, $query) !== false) {
$symbols[] = $definition->symbolInformation;
return Observable::just(null)
// Wait for indexing event if not yet finished
->flatMap(function () {
if (!$this->index->isStaticComplete()) {
return observableFromEvent($this->index, 'static-complete')->take(1);
}
}
return $symbols;
});
})
// Get definitions from complete index
->flatMap(function () {
return Observable::fromArray($this->index->getDefinitions());
})
// Filter by matching FQN to query
->filter(function (Definition $def) use ($query) {
return $query === '' || stripos($def->fqn, $query) !== false;
})
// Send each SymbolInformation
->map(function (Definition $def) use ($query) {
return new Operation\Add('/-', $def->symbolInformation);
})
// Initialize with an empty array
->startWith(new Operation\Replace('/', []));
}
/**
@ -107,74 +95,82 @@ class Workspace
*
* @param SymbolDescriptor $query Partial metadata about the symbol that is being searched for.
* @param string[] $files An optional list of files to restrict the search to.
* @return ReferenceInformation[]
* @return Observable ReferenceInformation[]
*/
public function xreferences($query, array $files = null): Promise
public function xreferences($query, array $files = null): Observable
{
return coroutine(function () use ($query, $files) {
if ($this->composerLock === null) {
return [];
}
// Wait until indexing finished
if (!$this->index->isComplete()) {
yield waitForEvent($this->index, 'complete');
}
/** Map from URI to array of referenced FQNs in dependencies */
$refs = [];
// Get all references TO dependencies
$fqns = isset($query->fqsen) ? [$query->fqsen] : array_values($this->dependenciesIndex->getDefinitions());
foreach ($fqns as $fqn) {
foreach ($this->sourceIndex->getReferenceUris($fqn) as $uri) {
if (!isset($refs[$uri])) {
$refs[$uri] = [];
}
if (array_search($uri, $refs[$uri]) === false) {
$refs[$uri][] = $fqn;
return Observable::just(null)
->flatMap(function () {
if ($this->composerLock === null) {
return Observable::empty();
}
// Wait until indexing finished
if (!$this->index->isComplete()) {
return observableFromEvent($this->index, 'complete')->take(1);
}
})
// Get all definition FQNs in dependencies
->flatMap(function () {
if (isset($query->fqsen)) {
$fqns = [$this->dependenciesIndex->getDefinition($query->fqsen)];
} else {
$fqns = $this->dependenciesIndex->getDefinitions();
}
return Observable::fromArray($fqns);
})
// Get all URIs in the project source that reference those definitions
->flatMap(function (Definition $def) {
return Observable::fromArray($this->sourceIndex->getReferenceUris($fqn));
})
->distinct()
->flatMap(function (string $uri) {
return $this->documentLoader->getOrLoad($uri);
$symbol = new SymbolDescriptor;
$symbol->fqsen = $fqn;
foreach (get_object_vars($def->symbolInformation) as $prop => $val) {
$symbol->$prop = $val;
}
// Find out package name
preg_match('/\/vendor\/([^\/]+\/[^\/]+)\//', $def->symbolInformation->location->uri, $matches);
$packageName = $matches[1];
foreach (array_merge($this->composerLock->packages, $this->composerLock->{'packages-dev'}) as $package) {
if ($package->name === $packageName) {
$symbol->package = $package;
break;
}
}
}
$refInfos = [];
foreach ($refs as $uri => $fqns) {
foreach ($fqns as $fqn) {
$def = $this->dependenciesIndex->getDefinition($fqn);
$symbol = new SymbolDescriptor;
$symbol->fqsen = $fqn;
foreach (get_object_vars($def->symbolInformation) as $prop => $val) {
$symbol->$prop = $val;
}
// Find out package name
preg_match('/\/vendor\/([^\/]+\/[^\/]+)\//', $def->symbolInformation->location->uri, $matches);
$packageName = $matches[1];
foreach ($this->composerLock->packages as $package) {
if ($package->name === $packageName) {
$symbol->package = $package;
})
// If there was no FQSEN provided, check if query attributes match
if (!isset($query->fqsen)) {
$matches = true;
foreach (get_object_vars($query) as $prop => $val) {
if ($query->$prop != $symbol->$prop) {
$matches = false;
break;
}
}
// If there was no FQSEN provided, check if query attributes match
if (!isset($query->fqsen)) {
$matches = true;
foreach (get_object_vars($query) as $prop => $val) {
if ($query->$prop != $symbol->$prop) {
$matches = false;
break;
}
}
if (!$matches) {
continue;
}
}
$doc = yield $this->documentLoader->getOrLoad($uri);
foreach ($doc->getReferenceNodesByFqn($fqn) as $node) {
$refInfo = new ReferenceInformation;
$refInfo->reference = Location::fromNode($node);
$refInfo->symbol = $symbol;
$refInfos[] = $refInfo;
if (!$matches) {
continue;
}
}
foreach ($doc->getReferenceNodesByFqn($fqn) as $node) {
$refInfo = new ReferenceInformation;
$refInfo->reference = Location::fromNode($node);
$refInfo->symbol = $symbol;
$refInfos[] = $refInfo;
}
})
->flatMap(function (PhpDocument $doc) use ($fqn) {
})
$refInfos = [];
foreach ($refs as $uri => $fqns) {
foreach ($fqns as $fqn) {
}
}
return $refInfos;
});
})
->startWith(new Operation\Replace('/', []));
}
/**

View File

@ -79,6 +79,13 @@ function timeout($seconds = 0): Promise
return $promise;
}
function observableTimeout($seconds = 0)
{
return Observable::create(function (ObserverInterface $observer) use ($seconds) {
Loop\setTimeout([$observer, 'onCompleted'], $seconds);
});
}
/**
* Returns a promise that is fulfilled once the passed event was triggered on the passed EventEmitter
*
@ -94,7 +101,7 @@ function waitForEvent(EmitterInterface $emitter, string $event): Promise
}
/**
* Returns a promise that is fulfilled once the passed event was triggered on the passed EventEmitter
* Returns an Observable that emits every event. For once semantics use ->take(1)
*
* @param EmitterInterface $emitter
* @param string $event
@ -102,9 +109,12 @@ function waitForEvent(EmitterInterface $emitter, string $event): Promise
*/
function observableFromEvent(EmitterInterface $emitter, string $event): Promise
{
Observable::create()
$emitter->once($event, [$p, 'fulfill']);
return $p;
Observable::create(function (Observer $observer) use ($emitter, $event) {
$emitter->on($event, [$observer, 'onNext']);
return new CallbackDisposable(function () use ($event, $observer) {
return $emitter->removeListener($event, [$observer, 'onNext']);
});
});
}
/**