input = $input; Loop\addReadStream($this->input, function() { if (feof($this->input)) { throw new RuntimeException('Stream is closed'); } while (($c = fgetc($this->input)) !== false && $c !== '') { $this->buffer .= $c; switch ($this->parsingMode) { case ParsingMode::HEADERS: if ($this->buffer === "\r\n") { $this->parsingMode = ParsingMode::BODY; $this->contentLength = (int)$this->headers['Content-Length']; $this->buffer = ''; } else if (substr($this->buffer, -2) === "\r\n") { $parts = explode(':', $this->buffer); $this->headers[$parts[0]] = trim($parts[1]); $this->buffer = ''; } break; case ParsingMode::BODY: if (strlen($this->buffer) === $this->contentLength) { if (isset($this->listener)) { $msg = new Message(MessageBody::parse($this->buffer), $this->headers); $listener = $this->listener; $listener($msg); } $this->parsingMode = ParsingMode::HEADERS; $this->headers = []; $this->buffer = ''; } break; } } }); } /** * @param callable $listener Is called with a Message object * @return void */ public function onMessage(callable $listener) { $this->listener = $listener; } }