diff --git a/src/Sherlock/common/tmp/RollingCurl/Request.php b/src/Sherlock/common/tmp/RollingCurl/Request.php new file mode 100644 index 0000000..37d2efc --- /dev/null +++ b/src/Sherlock/common/tmp/RollingCurl/Request.php @@ -0,0 +1,277 @@ +setUrl($url); + $this->setMethod($method); + } + + /** + * You may wish to store some "extra" info with this request, you can put any of that here. + * + * @param mixed $extraInfo + * @return \RollingCurl\Request + */ + public function setExtraInfo($extraInfo) + { + $this->extraInfo = $extraInfo; + return $this; + } + + /** + * @return mixed + */ + public function getExtraInfo() + { + return $this->extraInfo; + } + + /** + * @param array $headers + * @return \RollingCurl\Request + */ + public function setHeaders($headers) + { + $this->headers = $headers; + return $this; + } + + /** + * @return array + */ + public function getHeaders() + { + return $this->headers; + } + + /** + * @param string $method + * @return \RollingCurl\Request + */ + public function setMethod($method) + { + $this->method = $method; + return $this; + } + + /** + * @return string + */ + public function getMethod() + { + return $this->method; + } + + /** + * @param array $options + * @throws \InvalidArgumentException + * @return \RollingCurl\Request + */ + public function setOptions($options) + { + if (!is_array($options)) { + throw new \InvalidArgumentException("options must be an array"); + } + $this->options = $options; + return $this; + } + + /** + * @param array $options + * @throws \InvalidArgumentException + * @return \RollingCurl\Request + */ + public function addOptions($options) + { + if (!is_array($options)) { + throw new \InvalidArgumentException("options must be an array"); + } + $this->options = $options + $this->options; + return $this; + } + + /** + * @return array + */ + public function getOptions() + { + return $this->options; + } + + /** + * @param string $postData + * @return \RollingCurl\Request + */ + public function setPostData($postData) + { + $this->postData = $postData; + return $this; + } + + /** + * @return string + */ + public function getPostData() + { + return $this->postData; + } + + /** + * @param int $responseErrno + * @return \RollingCurl\Request + */ + public function setResponseErrno($responseErrno) + { + $this->responseErrno = $responseErrno; + return $this; + } + + /** + * @return int + */ + public function getResponseErrno() + { + return $this->responseErrno; + } + + /** + * @param string $responseError + * @return \RollingCurl\Request + */ + public function setResponseError($responseError) + { + $this->responseError = $responseError; + return $this; + } + + /** + * @return string + */ + public function getResponseError() + { + return $this->responseError; + } + + /** + * @param array $responseInfo + * @return \RollingCurl\Request + */ + public function setResponseInfo($responseInfo) + { + $this->responseInfo = $responseInfo; + return $this; + } + + /** + * @return array + */ + public function getResponseInfo() + { + return $this->responseInfo; + } + + /** + * @param string $responseText + * @return \RollingCurl\Request + */ + public function setResponseText($responseText) + { + $this->responseText = $responseText; + return $this; + } + + /** + * @return string + */ + public function getResponseText() + { + return $this->responseText; + } + + /** + * @param string $url + * @return \RollingCurl\Request + */ + public function setUrl($url) + { + $this->url = $url; + return $this; + } + + /** + * @return string + */ + public function getUrl() + { + return $this->url; + } + + + +} diff --git a/src/Sherlock/common/tmp/RollingCurl/RollingCurl.php b/src/Sherlock/common/tmp/RollingCurl/RollingCurl.php new file mode 100644 index 0000000..aa57a53 --- /dev/null +++ b/src/Sherlock/common/tmp/RollingCurl/RollingCurl.php @@ -0,0 +1,478 @@ + 1, + CURLOPT_FOLLOWLOCATION => 1, + CURLOPT_MAXREDIRS => 5, + CURLOPT_CONNECTTIMEOUT => 30, + CURLOPT_TIMEOUT => 30, + ); + + /** + * @var array + */ + private $headers = array(); + + /** + * @var Request[] + * + * Requests queued to be processed + */ + private $pendingRequests = array(); + + /** + * @var Request[] + * + * Requests currently being processed by curl + */ + private $activeRequests = array(); + + /** + * @var Request[] + * + * All processed requests + */ + private $completedRequests = array(); + + + /** + * Add a request to the request queue + * + * @param Request $request + * @return RollingCurl + */ + public function add(Request $request) + { + $this->pendingRequests[] = $request; + return $this; + } + + /** + * Create new Request and add it to the request queue + * + * @param string $url + * @param string $method + * @param array|string $postData + * @param array $headers + * @param array $options + * @return RollingCurl + */ + public function request($url, $method = "GET", $postData = null, $headers = null, $options = null) + { + $newRequest = new Request($url, $method); + if ($postData) { + $newRequest->setPostData($postData); + } + if ($headers) { + $newRequest->setHeaders($headers); + } + if ($options) { + $newRequest->setOptions($options); + } + return $this->add($newRequest); + } + + /** + * Perform GET request + * + * @param string $url + * @param array $headers + * @param array $options + * @return RollingCurl + */ + public function get($url, $headers = null, $options = null) + { + return $this->request($url, "GET", null, $headers, $options); + } + + /** + * Perform POST request + * + * @param string $url + * @param array|string $postData + * @param array $headers + * @param array $options + * @return RollingCurl + */ + public function post($url, $postData = null, $headers = null, $options = null) + { + return $this->request($url, "POST", $postData, $headers, $options); + } + + /** + * Perform PUT request + * + * @param string $url + * @param null $putData + * @param array $headers + * @param array $options + * @return RollingCurl + */ + public function put($url, $putData = null, $headers = null, $options = null) + { + return $this->request($url, "PUT", $putData, $headers, $options); + } + + /** + * Perform DELETE request + * + * @param string $url + * @param array $headers + * @param array $options + * @return RollingCurl + */ + public function delete($url, $headers = null, $options = null) + { + return $this->request($url, "DELETE", null, $headers, $options); + } + + /** + * Run all queued requests + * + * @return void + */ + public function execute() + { + + $master = curl_multi_init(); + + // start the first batch of requests + $firstBatch = $this->getNextPendingRequests($this->getSimultaneousLimit()); + + // what a silly "error" + if (count($firstBatch) == 0) { + return; + } + + foreach ($firstBatch as $request) { + // setup the curl request, queue it up, and put it in the active array + $ch = curl_init(); + $options = $this->prepareRequestOptions($request); + curl_setopt_array($ch, $options); + curl_multi_add_handle($master, $ch); + $this->activeRequests[(string)$ch] = $request; + } + + do { + + while (($execrun = curl_multi_exec($master, $running)) == CURLM_CALL_MULTI_PERFORM); + + if ($execrun != CURLM_OK) { + // todo: throw exception + break; + } + + // a request was just completed -- find out which one + while ($transfer = curl_multi_info_read($master)) { + + // get the request object back and put the curl response into it + $key = (string)$transfer['handle']; + $request = $this->activeRequests[$key]; + $request->setResponseText(curl_multi_getcontent($transfer['handle'])); + $request->setResponseErrno(curl_errno($transfer['handle'])); + $request->setResponseError(curl_error($transfer['handle'])); + $request->setResponseInfo(curl_getinfo($transfer['handle'])); + + // if there is a callback, run it + if (is_callable($this->callback)) { + $callback = $this->callback; + $callback($request, $this); + } + + // remove the request from the list of active requests + unset($this->activeRequests[$key]); + + // move the request to the completed set + $this->completedRequests[] = $request; + + // start a new request (it's important to do this before removing the old one) + if ($nextRequest = $this->getNextPendingRequest()) { + // setup the curl request, queue it up, and put it in the active array + $ch = curl_init(); + $options = $this->prepareRequestOptions($nextRequest); + curl_setopt_array($ch, $options); + curl_multi_add_handle($master, $ch); + $this->activeRequests[(string)$ch] = $nextRequest; + } + + // remove the curl handle that just completed + curl_multi_remove_handle($master, $transfer['handle']); + + } + + if ($running) { + curl_multi_select($master, $this->timeout); + } + + // keep the loop going as long as multi_exec says it is running + } while ($running); + + curl_multi_close($master); + } + + + /** + * Helper function to gather all the curl options: global, inferred, and per request + * + * @param Request $request + * @return array + */ + private function prepareRequestOptions(Request $request) + { + + // options for this entire curl object + $options = $this->getOptions(); + + // set the request URL + $options[CURLOPT_URL] = $request->getUrl(); + + // set the request method + $options[CURLOPT_CUSTOMREQUEST] = $request->getMethod(); + + // posting data w/ this request? + if ($request->getPostData()) { + $options[CURLOPT_POST] = 1; + $options[CURLOPT_POSTFIELDS] = $request->getPostData(); + } + + // if the request has headers, use those, or if there are global headers, use those + if ($request->getHeaders()) { + $options[CURLOPT_HEADER] = 0; + $options[CURLOPT_HTTPHEADER] = $request->getHeaders(); + } elseif ($this->getHeaders()) { + $options[CURLOPT_HEADER] = 0; + $options[CURLOPT_HTTPHEADER] = $this->getHeaders(); + } + + // if the request has options set, use those and have them take precedence + if ($request->getOptions()) { + $options = $request->getOptions() + $options; + } + + return $options; + } + + /** + * Define an anonymous callback to handle the response: + * + * $rc = new RollingCurl() + * $rc->setCallback(function($response, $info, $request, $rolling_curl) { + * // process + * }); + * + * Function should take four parameters: $response, $info, $request, $rolling_callback. + * $response is response body + * $info is additional curl info + * $request is the original request + * $rolling_curl is the current instance of the RollingCurl (useful if you want to requeue a URL) + * + * @param \Closure $callback + * @return RollingCurl + */ + public function setCallback(\Closure $callback) + { + $this->callback = $callback; + return $this; + } + + /** + * @return \Closure + */ + public function getCallback() + { + return $this->callback; + } + + /** + * @param array $headers + * @throws \InvalidArgumentException + * @return RollingCurl + */ + public function setHeaders($headers) + { + if (!is_array($headers)) { + throw new \InvalidArgumentException("headers must be an array"); + } + $this->headers = $headers; + return $this; + } + + /** + * @return array + */ + public function getHeaders() + { + return $this->headers; + } + + /** + * @param array $options + * @throws \InvalidArgumentException + * @return RollingCurl + */ + public function setOptions($options) + { + if (!is_array($options)) { + throw new \InvalidArgumentException("options must be an array"); + } + $this->options = $options; + return $this; + } + + /** + * Override and add options + * + * @param array $options + * @throws \InvalidArgumentException + * @return RollingCurl + */ + public function addOptions($options) + { + if (!is_array($options)) { + throw new \InvalidArgumentException("options must be an array"); + } + $this->options = $options + $this->options; + return $this; + } + + /** + * @return array + */ + public function getOptions() + { + return $this->options; + } + + /** + * @param int $timeout + * @throws \InvalidArgumentException + * @return RollingCurl + */ + public function setTimeout($timeout) + { + if (!is_int($timeout) || $timeout < 0) { + throw new \InvalidArgumentException("Timeout must be an int >= 0"); + } + $this->timeout = $timeout; + return $this; + } + + /** + * @return float + */ + public function getTimeout() + { + return $this->timeout; + } + + /** + * Set the limit for how many cURL requests will be execute simultaneously. + * + * Please be mindful that if you set this too high, requests are likely to fail + * more frequently or automated software may perceive you as a DOS attack and + * automatically block further requests. + * + * @param int $count + * @throws \InvalidArgumentException + * @return RollingCurl + */ + public function setSimultaneousLimit($count) + { + if (!is_int($count) || $count < 2) { + throw new \InvalidArgumentException("setSimultaneousLimit count must be an int >= 2"); + } + $this->simultaneousLimit = $count; + return $this; + } + + /** + * @return int + */ + public function getSimultaneousLimit() + { + return $this->simultaneousLimit; + } + + /** + * Return the next $limit pending requests (may return nothing) + * + * @param int $limit + * @return Request[] + */ + public function getNextPendingRequests($limit = 1) + { + return array_splice($this->pendingRequests, 0, $limit); + } + + /** + * Return the next pending requests (may return nothing) + * + * @return Request|null + */ + public function getNextPendingRequest() + { + $next = $this->getNextPendingRequests(); + if (count($next)) { + return $next[0]; + } + return null; + } + + /** + * @return Request[] + */ + public function getCompletedRequests() + { + return $this->completedRequests; + } + +} diff --git a/src/Sherlock/requests/BatchCommand.php b/src/Sherlock/requests/BatchCommand.php new file mode 100644 index 0000000..6248053 --- /dev/null +++ b/src/Sherlock/requests/BatchCommand.php @@ -0,0 +1,122 @@ +commands = $commands; + } + + } + + /** + * @param \Sherlock\requests\Command $command + */ + public function addCommand($command) + { + $this->commands[] = $command; + } + + /** + * + */ + public function clearCommands() + { + $this->commands = array(); + } + + /** + * Fill all commands that don't have an index set + * + * @param $index + */ + public function fillIndex($index) + { + /** @param Command $value */ + $map = function ($value) use ($index) { + $value->index = $index; + }; + + array_map($map, $this->commands); + } + + /** + * Fill all commands that don't have a type set + * + * @param $type + */ + public function fillType($type) + { + + /** @param Command $value */ + $map = function ($value) use ($type) { + $value->type = $type; + }; + + array_map($map, $this->commands); + } + + + /** + * + */ + public function rewind() + { + reset($this->commands); + } + + /** + * @return Command + */ + public function current() + { + return current($this->commands); + } + + /** + * @return mixed + */ + public function key() + { + return key($this->commands); + } + + /** + * @return Command|void + */ + public function next() + { + return next($this->commands); + } + + /** + * @return bool + */ + public function valid() + { + return false !== current($this->commands); + } + + +} \ No newline at end of file diff --git a/src/Sherlock/requests/BatchCommandInterface.php b/src/Sherlock/requests/BatchCommandInterface.php new file mode 100644 index 0000000..47ed552 --- /dev/null +++ b/src/Sherlock/requests/BatchCommandInterface.php @@ -0,0 +1,16 @@ +index; + + if (isset($this->type) && $this->type !== null) { + $uri .= '/' .$this->type; + } + + if (isset($this->id) && $this->id !== null) { + $uri .= '/' .$this->id; + } + + return $uri; + } + + /** + * @return string + */ + public function getAction() + { + return $this->action; + } + + /** + * @return string + */ + public function getData() + { + return $this->data; + } + +} diff --git a/src/Sherlock/requests/CommandInterface.php b/src/Sherlock/requests/CommandInterface.php new file mode 100644 index 0000000..80b5d8e --- /dev/null +++ b/src/Sherlock/requests/CommandInterface.php @@ -0,0 +1,29 @@ +batch instanceof BatchCommand) { + Analog::error("Cannot add a new document to an external BatchCommandInterface"); + throw new exceptions\RuntimeException("Cannot add a new document to an external BatchCommandInterface"); + } + + $command = new Command(); + if (is_array($value)) { + $command->data = json_encode($value, true); + + } elseif (is_string($value)) { + $command->data = $value; + } + + if ($id !== null) { + $command->id = $id; + $command->action = 'put'; + } else { + $command->action = 'post'; + } + + //Only doing this because typehinting is wonky without it... + if ($this->batch instanceof BatchCommand) { + $this->batch->addCommand($command); + } + + return $this; + } + + /** + * Accepts an array of Commands or a BatchCommand + * @param array|BatchCommand $values + * @return $this + * @throws \Sherlock\common\exceptions\BadMethodCallException + */ + public function documents($values) { - if (is_array($value)) - $this->params['document'] = $value; - elseif (is_string($value)) - $this->params['document'] = json_decode($value,true); + if ($values instanceof BatchCommandInterface) { + $this->batch = $values; + } elseif (is_array($values)) { + + $isBatch = true; + $batch = new BatchCommand(); + + /** + * @param mixed $value + */ + $map = function ($value) use ($isBatch, $batch) { + if (!$value instanceof Command) { + $isBatch = false; + } else { + $batch->addCommand($value); + } + }; + + array_map($map, $values); + + if (!$isBatch) { + Analog::error("If an array is supplied, all elements must be a Command object."); + throw new exceptions\BadMethodCallException("If an array is supplied, all elements must be a Command object."); + } + + $this->batch = $batch; + + } else { + Analog::error("Documents method only accepts arrays of Commands or BatchCommandInterface objects"); + throw new exceptions\BadMethodCallException("Documents method only accepts arrays of Commands or BatchCommandInterface objects"); + } return $this; + } /** * Perform the indexing operation * - * @throws \Sherlock\common\exceptions\RuntimeException + * @throws exceptions\RuntimeException * @return \Sherlock\responses\IndexResponse */ public function execute() { - \Analog\Analog::log("IndexDocumentRequest->execute() - ".print_r($this->params, true), \Analog\Analog::DEBUG); + Analog::debug("IndexDocumentRequest->execute() - ".print_r($this->params, true)); - foreach (array('document', 'index', 'type') as $key) { + foreach (array('index', 'type') as $key) { if (!isset($this->params[$key])) { - \Analog\Analog::log($key." cannot be empty.", \Analog\Analog::ERROR); - throw new \Sherlock\common\exceptions\RuntimeException($key." cannot be empty."); + Analog::error($key." cannot be empty."); + throw new exceptions\RuntimeException($key." cannot be empty."); } } foreach (array('index', 'type') as $key) { if (count($this->params[$key]) > 1) { - \Analog\Analog::log("Only one ".$key." may be inserted into at a time.", \Analog\Analog::ERROR); - throw new \Sherlock\common\exceptions\RuntimeException("Only one ".$key." may be inserted into at a time."); + Analog::error("Only one ".$key." may be inserted into at a time."); + throw new exceptions\RuntimeException("Only one ".$key." may be inserted into at a time."); } } - //If an id is supplied, this is a put with id, otherwise post without - if (isset($this->params['id'])) { - $id = $this->params['id']; - $this->_action = 'put'; - } else { - $id = ''; - $this->_action = 'post'; + //if this is an internal Sherlock BatchCommand, make sure index/types/action are filled + if ($this->batch instanceof BatchCommand) { + $this->batch->fillIndex($this->params['index'][0]); + $this->batch->fillType($this->params['type'][0]); } - $uri = '/'.$this->params['index'][0].'/'.$this->params['type'][0].'/'.$id; - - //required since PHP doesn't allow argument differences between - //parent and children under Strict - $this->_uri = $uri; - $this->_data = json_encode($this->params['document'], JSON_FORCE_OBJECT); - return parent::execute(); } diff --git a/src/Sherlock/requests/IndexRequest.php b/src/Sherlock/requests/IndexRequest.php index 15094aa..0232ef9 100644 --- a/src/Sherlock/requests/IndexRequest.php +++ b/src/Sherlock/requests/IndexRequest.php @@ -8,6 +8,7 @@ namespace Sherlock\requests; +use Analog\Analog; use Sherlock\common\exceptions; use Sherlock\wrappers; @@ -190,48 +191,44 @@ public function settings($settings, $merge = true) * Delete an index * * @return \Sherlock\responses\IndexResponse - * @throws \Sherlock\common\exceptions\RuntimeException + * @throws exceptions\RuntimeException */ public function delete() { - \Analog\Analog::log("IndexRequest->execute() - ".print_r($this->params, true), \Analog\Analog::DEBUG); + Analog::debug("IndexRequest->execute() - ".print_r($this->params, true)); if (!isset($this->params['index'])) - throw new \Sherlock\common\exceptions\RuntimeException("Index cannot be empty."); + throw new exceptions\RuntimeException("Index cannot be empty."); $index = implode(',', $this->params['index']); - $uri = '/'.$index; - //required since PHP doesn't allow argument differences between - //parent and children under Strict - $this->_uri = $uri; - $this->_data = null; - $this->_action = 'delete'; + $command = new Command(); + $command->index = $index; + $command->action = 'delete'; + + $this->batch->clearCommands(); + $this->batch->addCommand($command); $ret = parent::execute(); - return $ret; + return $ret[0]; } /** * Create an index * * @return \Sherlock\responses\IndexResponse - * @throws \Sherlock\common\exceptions\RuntimeException + * @throws exceptions\RuntimeException */ public function create() { - \Analog\Analog::log("IndexRequest->create() - ".print_r($this->params, true), \Analog\Analog::DEBUG); + Analog::log("IndexRequest->create() - ".print_r($this->params, true), Analog::DEBUG); if (!isset($this->params['index'])) - throw new \Sherlock\common\exceptions\RuntimeException("Index cannot be empty."); + throw new exceptions\RuntimeException("Index cannot be empty."); $index = implode(',', $this->params['index']); - $uri = '/'.$index; - - - //Final JSON should be object properties, not an array. So we need to iterate //through the array members and merge into an associative array. @@ -243,17 +240,20 @@ public function create() "mappings" => $mappings); - //force JSON object when encoding because we may have empty parameters - $this->_data = json_encode($body, JSON_FORCE_OBJECT); - $this->_action = 'put'; - $this->_uri = $uri; + $command = new Command(); + $command->index = $index; + $command->action = 'put'; + $command->data = json_encode($body, JSON_FORCE_OBJECT); + + $this->batch->clearCommands(); + $this->batch->addCommand($command); /** * @var \Sherlock\responses\IndexResponse */ $ret = parent::execute(); - return $ret; + return $ret[0]; } /** @@ -262,31 +262,36 @@ public function create() * @todo allow updating settings of all indices * * @return \Sherlock\responses\IndexResponse - * @throws \Sherlock\common\exceptions\RuntimeException + * @throws exceptions\RuntimeException */ public function updateSettings() { - \Analog\Analog::log("IndexRequest->updateSettings() - ".print_r($this->params, true), \Analog\Analog::DEBUG); + Analog::log("IndexRequest->updateSettings() - ".print_r($this->params, true), Analog::DEBUG); if (!isset($this->params['index'])) { - \Analog\Analog::log("Index cannot be empty.", \Analog\Analog::ERROR); - throw new \Sherlock\common\exceptions\RuntimeException("Index cannot be empty."); + Analog::log("Index cannot be empty.", Analog::ERROR); + throw new exceptions\RuntimeException("Index cannot be empty."); } $index = implode(',', $this->params['index']); - $uri = '/'.$index.'/_settings'; $body = array("index" => $this->params['indexSettings']); - $this->_uri = $uri; - $this->_data = json_encode($body); - $this->_action = 'put'; + $command = new Command(); + $command->index = $index; + $command->id = '_settings'; + $command->action = 'put'; + $command->data = json_encode($body, JSON_FORCE_OBJECT); + + $this->batch->clearCommands(); + $this->batch->addCommand($command); + $ret = parent::execute(); - return $ret; + return $ret[0]; } @@ -294,43 +299,47 @@ public function updateSettings() * Update/add the Mapping of an index * * @return \Sherlock\responses\IndexResponse - * @throws \Sherlock\common\exceptions\RuntimeException + * @throws exceptions\RuntimeException */ public function updateMapping() { - \Analog\Analog::log("IndexRequest->updateMapping() - ".print_r($this->params, true), \Analog\Analog::DEBUG); + Analog::log("IndexRequest->updateMapping() - ".print_r($this->params, true), Analog::DEBUG); if (!isset($this->params['index'])) { - \Analog\Analog::log("Index cannot be empty.", \Analog\Analog::ERROR); - throw new \Sherlock\common\exceptions\RuntimeException("Index cannot be empty."); + Analog::log("Index cannot be empty.", Analog::ERROR); + throw new exceptions\RuntimeException("Index cannot be empty."); } if (count($this->params['indexMappings']) > 1) { - \Analog\Analog::log("May only update one mapping at a time.", \Analog\Analog::ERROR); - throw new \Sherlock\common\exceptions\RuntimeException("May only update one mapping at a time."); + Analog::log("May only update one mapping at a time.", Analog::ERROR); + throw new exceptions\RuntimeException("May only update one mapping at a time."); } if (!isset($this->params['type'])) { - \Analog\Analog::log("Type must be specified.", \Analog\Analog::ERROR); - throw new \Sherlock\common\exceptions\RuntimeException("Type must be specified."); + Analog::log("Type must be specified.", Analog::ERROR); + throw new exceptions\RuntimeException("Type must be specified."); } if (count($this->params['type']) > 1) { - \Analog\Analog::log("Only one type may be updated at a time.", \Analog\Analog::ERROR); - throw new \Sherlock\common\exceptions\RuntimeException("Only one type may be updated at a time."); + Analog::log("Only one type may be updated at a time.", Analog::ERROR); + throw new exceptions\RuntimeException("Only one type may be updated at a time."); } $index = implode(',', $this->params['index']); - $uri = '/'.$index.'/'.$this->params['type'][0].'/_mapping'; $body = $this->params['indexMappings']; + $command = new Command(); + $command->index = $index; + $command->type = $this->params['type'][0]; + $command->id = '_mapping'; + $command->action = 'put'; + $command->data = json_encode($body, JSON_FORCE_OBJECT); - $this->_uri = $uri; - $this->_data = json_encode($body, JSON_FORCE_OBJECT); - $this->_action = 'put'; + $this->batch->clearCommands(); + $this->batch->addCommand($command); $ret = parent::execute(); - return $ret; + return $ret[0]; } } diff --git a/src/Sherlock/requests/Request.php b/src/Sherlock/requests/Request.php index 23334e7..84b1cdf 100644 --- a/src/Sherlock/requests/Request.php +++ b/src/Sherlock/requests/Request.php @@ -10,9 +10,11 @@ use Sherlock\common\events\Events; use Sherlock\common\events\RequestEvent; use Sherlock\common\exceptions; +use Sherlock\common\tmp\RollingCurl; use Sherlock\responses\IndexResponse; use Analog\Analog; use Guzzle\Http\Client; +use Sherlock\responses\Response; /** * Base class for various requests. @@ -28,20 +30,11 @@ class Request //required since PHP doesn't allow argument differences between //parent and children under Strict - /* - * @var string - */ - protected $_uri; - - /* - * @var string + /** + * @var BatchCommandInterface */ - protected $_data; + protected $batch; - /* - * @var string - */ - protected $_action; /** * @param \Symfony\Component\EventDispatcher\EventDispatcher $dispatcher @@ -50,18 +43,18 @@ class Request public function __construct($dispatcher) { if (!isset($dispatcher)) { - \Analog\Analog::log("An Event Dispatcher must be injected into all Request objects", \Analog\Analog::ERROR); + Analog::log("An Event Dispatcher must be injected into all Request objects", Analog::ERROR); throw new exceptions\BadMethodCallException("An Event Dispatcher must be injected into all Request objects"); } - $this->dispatcher = $dispatcher; + $this->batch = new BatchCommand(); } /** * Execute the Request, performs on the actual transport layer * - * @throws \Sherlock\common\exceptions\RuntimeException + * @throws exceptions\RuntimeException * @throws \Sherlock\common\exceptions\BadResponseException * @throws \Sherlock\common\exceptions\ClientErrorResponseException * @return \Sherlock\responses\Response @@ -71,12 +64,7 @@ public function execute() $reflector = new \ReflectionClass(get_class($this)); $class = $reflector->getShortName(); - \Analog\Analog::log("Request->execute()", \Analog\Analog::DEBUG); - - if (!isset($this->_uri)) { - \Analog\Analog::log("Request URI must be set.", \Analog\Analog::ERROR); - throw new \Sherlock\common\exceptions\RuntimeException("Request URI must be set."); - } + Analog::debug("Request->execute()"); //construct a requestEvent and dispatch it with the "request.preexecute" event //This will, among potentially other things, populate the $node variable with @@ -100,50 +88,89 @@ public function execute() throw new exceptions\RuntimeException("Request requires a port to connect to"); } - $path = 'http://'.$this->node['host'].':'.$this->node['port'].$this->_uri; + $path = 'http://'.$this->node['host'].':'.$this->node['port']; + + Analog::debug("Request->commands: ".print_r($this->batch,true)); + - \Analog\Analog::log("Request->_uri: ".$this->_uri, \Analog\Analog::DEBUG); - \Analog\Analog::log("Request->_data: ".$this->_data, \Analog\Analog::DEBUG); - \Analog\Analog::log("Request->_action: ".$this->_action, \Analog\Analog::DEBUG); - $client = new Client(); + $rolling = new RollingCurl\RollingCurl(); - $action = $this->_action; - try { - $response = $client->$action($path, null, $this->_data)->send(); + $window = 10; + $counter = 0; - } catch (\Guzzle\Http\Exception\ClientErrorResponseException $e) { - Analog::log("Request->execute() - ClientErrorResponseException - Request failed from ".$class, Analog::ERROR); - Analog::log(print_r($e->getMessage(), true), Analog::ERROR); - Analog::log(print_r($e->getResponse()->getBody(true), true), Analog::ERROR); + /** @var BatchCommandInterface $batch */ + $batch = $this->batch; - throw new \Sherlock\common\exceptions\ClientErrorResponseException($e->getResponse()->getBody(true), $e->getCode(), $e); - } catch (\Guzzle\Http\Exception\ServerErrorResponseException $e) { - Analog::log("Request->execute() - ServerErrorResponseException - Request failed from ".$class, Analog::ERROR); - Analog::log(print_r($e->getMessage(), true), Analog::ERROR); - Analog::log(print_r($e->getResponse()->getBody(true), true), Analog::ERROR); + //prefill our buffer with a full window + //the rest will be streamed by our callback closure + foreach ($batch as $request) { - throw new \Sherlock\common\exceptions\ClientErrorResponseException($e->getResponse()->getBody(true), $e->getCode(), $e); - } catch (\Guzzle\Http\Exception\BadResponseException $e) { - Analog::log("Request->execute() - BadResponseException - Request failed from ".$class, Analog::ERROR); - Analog::log(print_r($e->getMessage(), true), Analog::ERROR); - Analog::log(print_r($e->getResponse()->getBody(true), true), Analog::ERROR); + /** @var CommandInterface $req */ + $req = $request; + $action = $req->getAction(); - throw new \Sherlock\common\exceptions\BadResponseException($e->getResponse()->getBody(true), $e->getCode(), $e); - } catch (\Exception $e) { - Analog::log("Request->execute() - Exception - Request failed from ".$class, Analog::ERROR); - Analog::log(print_r($e, true), Analog::ERROR); + if ($action == 'put' || $action == 'post') { + $rolling->$action($path.$req->getURI(), $req->getData()); + } else { + $rolling->$action($path.$req->getURI()); + } - throw new \Sherlock\common\exceptions\RuntimeException($e->getMessage(), $e->getCode(), $e); + if ($counter > $window) { + break; + } } + /** + * @param RollingCurl\Request $request + * @param RollingCurl\RollingCurl $rolling + */ + $callback = function (RollingCurl\Request $request, RollingCurl\RollingCurl $rolling) use ($batch, $path) { + + //a curl handle just finished, advance the iterator one and add to the queue + //First check to see if there are any left to process (aka valid) + if ($batch->valid()) { + + //advance + $batch->next(); + + //make sure we haven't hit the end + if ($batch->valid()) { + + $data = $batch->current(); + + $action = $data->getAction(); + + if ($action == 'put' || $action == 'post') { + $rolling->$action($path.$data->getURI(), $data->getData()); + } else { + $rolling->$action($path.$data->getURI()); + } + } + } + }; + + $rolling->setSimultaneousLimit($window); + $rolling->setCallback($callback); + + $rolling->execute(); + $ret = $rolling->getCompletedRequests(); + + //This is kinda gross... - if ($class == 'SearchRequest') - $ret = new \Sherlock\responses\QueryResponse($response); - elseif ($class == 'IndexRequest') - $ret = new \Sherlock\responses\IndexResponse($response); - elseif ($class == 'IndexDocumentRequest') - $ret = new \Sherlock\responses\IndexResponse($response); - - return $ret; + $returnResponse = 'Response'; + if ($class == 'SearchRequest') { + $returnResponse = '\Sherlock\responses\QueryResponse'; + } elseif ($class == 'IndexRequest') { + $returnResponse = '\Sherlock\responses\IndexResponse'; + } elseif ($class == 'IndexDocumentRequest') { + $returnResponse = '\Sherlock\responses\IndexResponse'; + } + + $finalResponse = array(); + foreach ($ret as $response) { + $finalResponse[] = new $returnResponse($response); + } + + return $finalResponse; } } diff --git a/src/Sherlock/requests/SearchRequest.php b/src/Sherlock/requests/SearchRequest.php index d98f561..a1523ba 100644 --- a/src/Sherlock/requests/SearchRequest.php +++ b/src/Sherlock/requests/SearchRequest.php @@ -172,42 +172,51 @@ public function facets($facets) */ public function execute() { - \Analog\Analog::log("SearchRequest->execute() - ".print_r($this->params, true), \Analog\Analog::DEBUG); + Analog::debug("SearchRequest->execute() - ".print_r($this->params, true)); $finalQuery = $this->composeFinalQuery(); - if (isset($this->params['index'])) + if (isset($this->params['index'])) { $index = implode(',', $this->params['index']); - else + } else { $index = ''; + } - if (isset($this->params['type'])) + if (isset($this->params['type'])) { $type = implode(',', $this->params['type']); - else + } else { $type = ''; + } - if (isset($this->params['search_type'])) + if (isset($this->params['search_type'])) { $queryParams[] = $this->params['search_type']; + } - if (isset($this->params['routing'])) + if (isset($this->params['routing'])) { $queryParams[] = $this->params['routing']; + } if (isset($queryParams)) { $queryParams = '?' . implode("&", $queryParams); - } else + } else { $queryParams = ''; + } + + $body = array($finalQuery); - $uri = '/'.$index.'/'.$type.'/_search'.$queryParams; - //required since PHP doesn't allow argument differences between - //parent and children under Strict - $this->_uri = $uri; - $this->_data = $finalQuery; + $command = new Command(); + $command->index = $index; + $command->type = $type; + $command->id = '_search'.$queryParams; + $command->action = 'post'; + $command->data = json_encode($body); - //Guzzle doesn't allow GET with request body, use post - $this->_action = 'post'; + $this->batch->clearCommands(); + $this->batch->addCommand($command); - return parent::execute(); + $ret = parent::execute(); + return $ret[0]; } /** diff --git a/src/Sherlock/responses/IndexResponse.php b/src/Sherlock/responses/IndexResponse.php index e81106f..31d137f 100644 --- a/src/Sherlock/responses/IndexResponse.php +++ b/src/Sherlock/responses/IndexResponse.php @@ -7,6 +7,10 @@ namespace Sherlock\responses; +/** + * Class IndexResponse + * @package Sherlock\responses + */ class IndexResponse extends Response { /** @@ -19,6 +23,9 @@ class IndexResponse extends Response */ public $acknowledged; + /** + * @param \Sherlock\common\tmp\RollingCurl\Request $response + */ public function __construct($response) { parent::__construct($response); diff --git a/src/Sherlock/responses/QueryResponse.php b/src/Sherlock/responses/QueryResponse.php index 7cc4737..ca70ab9 100644 --- a/src/Sherlock/responses/QueryResponse.php +++ b/src/Sherlock/responses/QueryResponse.php @@ -36,7 +36,7 @@ class QueryResponse extends Response implements \IteratorAggregate, \Countable public $hits; /** - * @param \Guzzle\Http\Message\Response $response + * @param \Sherlock\common\tmp\RollingCurl\Request $response * @throws \Sherlock\common\exceptions\BadMethodCallException */ public function __construct($response) diff --git a/src/Sherlock/responses/Response.php b/src/Sherlock/responses/Response.php index 03ebde8..66693a7 100644 --- a/src/Sherlock/responses/Response.php +++ b/src/Sherlock/responses/Response.php @@ -6,8 +6,15 @@ */ namespace Sherlock\responses; + +use Analog\Analog; use Guzzle\Http\Message; +use Sherlock\common\exceptions\BadMethodCallException; +/** + * Class Response + * @package Sherlock\responses + */ class Response { /** @@ -16,17 +23,18 @@ class Response public $responseData; /** - * @param \Guzzle\Http\Message\Response $response - * @throws \Sherlock\common\exceptions\BadMethodCallException + * @param \Sherlock\common\tmp\RollingCurl\Request $response + * @throws BadMethodCallException */ public function __construct($response) { if (!isset($response)) { - \Analog\Analog::log("Response must be set in constructor.",\Analog\Analog::ERROR); - throw new \Sherlock\common\exceptions\BadMethodCallException("Response must be set in constructor."); + Analog::error("Response must be set in constructor."); + throw new BadMethodCallException("Response must be set in constructor."); } - //\Analog\Analog::log("Response->__construct() : ".print_r($this->responseData,true),\Analog\Analog::DEBUG); - $this->responseData = $response->json(); + $this->responseData = json_decode($response->getResponseText(), true); + + Analog::debug("Response:".print_r($response, true)); } } diff --git a/tests/IndexOperationsTest.php b/tests/IndexOperationsTest.php new file mode 100644 index 0000000..1f90c26 --- /dev/null +++ b/tests/IndexOperationsTest.php @@ -0,0 +1,71 @@ +object = new Sherlock\Sherlock(); + $this->object->addNode('localhost', '9200'); + } + + /** + * Tears down the fixture, for example, closes a network connection. + * This method is called after a test is executed. + */ + protected function tearDown() + { + + } + + + function assertThrowsException($exception_name, $code) { + $e = null; + try{ + $code(); + }catch (\Exception $e) { + // No more code, we only want to catch the exception in $e + } + + $this->assertInstanceOf($exception_name, $e); + } + + + public function testCreateDelete() + { + $sherlock = $this->object; + + $index = $sherlock->index('testindexoperations'); + + $response = $index->create(); + $this->assertEquals(true, $response->ok); + + + $response = $index->delete(); + $this->assertEquals(true, $response->ok); + + } + + + +} \ No newline at end of file diff --git a/tests/IndexingTest.php b/tests/IndexingTest.php index dc8197f..a74df30 100644 --- a/tests/IndexingTest.php +++ b/tests/IndexingTest.php @@ -6,7 +6,7 @@ */ namespace Sherlock\tests; -use Sherlock\Sherlock; +use Sherlock; /** @@ -25,7 +25,7 @@ class IndexingTest extends \PHPUnit_Framework_TestCase */ protected function setUp() { - $this->object = new \Sherlock\sherlock; + $this->object = new Sherlock\Sherlock(); $this->object->addNode('localhost', '9200'); } @@ -63,8 +63,27 @@ public function testAddDoc() $doc = $sherlock->document()->index('testindexing')->type('tweet')->document(array("field" => "test")); $response = $doc->execute(); - $this->assertInstanceOf('\sherlock\responses\IndexResponse', $response); - $this->assertEquals(true, $response->ok); + + $this->assertInstanceOf('\Sherlock\responses\IndexResponse', $response[0]); + $this->assertEquals(true, $response[0]->ok); + } + public function testAddBatchDoc() + { + $sherlock = $this->object; + + $doc = $sherlock->document()->index('testindexing')->type('tweet'); + + for ($i = 0; $i < 2000; $i++) { + $doc->document('{"field":"test"}'); + } + + $response = $doc->execute(); + + $this->assertInstanceOf('\Sherlock\responses\IndexResponse', $response[0]); + $this->assertEquals(true, $response[0]->ok); + + } + } \ No newline at end of file diff --git a/tests/MappingTest.php b/tests/MappingTest.php index 26d7d45..88cf13b 100644 --- a/tests/MappingTest.php +++ b/tests/MappingTest.php @@ -18,7 +18,7 @@ class MappingTest extends \PHPUnit_Framework_TestCase */ protected function setUp() { - $this->object = new \Sherlock\sherlock; + $this->object = new Sherlock(); $this->object->addNode('localhost', '9200'); } diff --git a/tests/QueryTest.php b/tests/QueryTest.php index 3518f01..b60fe43 100644 --- a/tests/QueryTest.php +++ b/tests/QueryTest.php @@ -38,7 +38,7 @@ public function __construct() */ protected function setUp() { - $this->object = new \Sherlock\sherlock; + $this->object = new Sherlock(); $this->object->addNode('localhost', '9200'); } @@ -102,9 +102,8 @@ public function testBool() $this->assertEquals($expectedData, $data); $resp = $req->execute(); - - - + + } /**