未来模式(多线程模式)

未来模式

客户端提供一个「未来」或「异步」的模式。这允许批量处理请求(并行发送到群集),这会对性能和吞吐量产生巨大影响。

PHP 基本上是单线程的, 但是 libcurl 库提供了「多接口」的功能。 这允许像 PHP这样的语言通过提供一批要处理的请求来获得并发。 批处理由底层多线程 libcurl 库并发执行,然后将相应返回给 PHP。

在单线程环境中,执行 n 请求的时间,是这些执行 n 请求延迟的总和。 使用多接口,执行 n 请求的时间是最慢请求的延迟 (假设有足够的句柄可用于并行执行所有请求)。

此外,多接口允许同时向不同主机发出请求,这意味着 Elasticsearch-PHP 客户端可以更有效的利用整个集群。

使用 Future 模式

虽然使用这种模式相对简单,但它在代码中引入了更多的职责。为了开启 future 模式,在客户端选项中添加 future 参数,并将值设置为 lazy

  1. $client = ClientBuilder::create()->build();
  2. $params = [
  3. 'index' => 'test',
  4. 'type' => 'test',
  5. 'id' => 1,
  6. 'client' => [
  7. 'future' => 'lazy'
  8. ]
  9. ];
  10. $future = $client->get($params);

这将会返回一个 future 对象,而不是一个真正的响应数据。future 对象是待处理对象,它看起来就像占位符。你可以把 future 对象当做普通对象在代码中使用。当你需要响应数据,你可以通过解析 future 对象获得。如果 future 对象已被解析(由于其它操作),可以立即使用响应数据。如果 future 对象没有被解析完成,那么解析动作将会产生阻塞,直到解析完成。

事实上,这意味着你可以通过设置 future: lazy 键值对来构造一个批量请求队列,而返回的 future 对象直到解析完成,程序才会继续执行。无论什么时候,所有的请求以并行的方式发送到集群,以异步的方式返回给 curl。

这听起来很复杂,但由于 RingPHP 的 FutureArray 接口,使其变得很简单,这使 future 对象看起来像一个简单的关联数组。例如:

  1. $client = ClientBuilder::create()->build();
  2. $params = [
  3. 'index' => 'test',
  4. 'type' => 'test',
  5. 'id' => 1,
  6. 'client' => [
  7. 'future' => 'lazy'
  8. ]
  9. ];
  10. $future = $client->get($params);
  11. $doc = $future['_source']; // 此调用将产生阻塞并迫使 future 被解析

像普通返回一样,以关联数组的形式与 Future 模式交互,会导致 Future 模式解析出特定值(进而解析出所有待处理的请求和值)。可用模式如下:

  1. $client = ClientBuilder::create()->build();
  2. $futures = [];
  3. for ($i = 0; $i < 1000; $i++) {
  4. $params = [
  5. 'index' => 'test',
  6. 'type' => 'test',
  7. 'id' => $i,
  8. 'client' => [
  9. 'future' => 'lazy'
  10. ]
  11. ];
  12. $futures[] = $client->get($params); //请求入队列
  13. }
  14. foreach ($futures as $future) {
  15. // 访问Future模式的值,必要时会触发解析
  16. echo $future['_source'];
  17. }

队列中的请求将会并行执行,并在执行之后给请求对应的 $future 变量赋值。默认每次批量执行100个请求。

如果你希望强制 Future 模式解析出返回值,但又不需要在当下使用,你可以调用 wait() 方法强制 Future 模式解析该请求:

  1. $client = ClientBuilder::create()->build();
  2. $futures = [];
  3. for ($i = 0; $i < 1000; $i++) {
  4. $params = [
  5. 'index' => 'test',
  6. 'type' => 'test',
  7. 'id' => $i,
  8. 'client' => [
  9. 'future' => 'lazy'
  10. ]
  11. ];
  12. $futures[] = $client->get($params); //请求入队列
  13. }
  14. //wait() 方法强制 Future 模式执行底层请求
  15. $futures[999]->wait();

修改批次大小

默认的批次大小是100,也就是说每当有100个请求在排队,客户端就会开始处理 Future 对象(比如初始化一个 curl_multi 调用)。 批次的大小可以按照你的喜好来修改。 通过设置 Handler 的 max_handles 来修改。

  1. $handlerParams = [
  2. 'max_handles' => 500
  3. ];
  4. $defaultHandler = ClientBuilder::defaultHandler($handlerParams);
  5. $client = ClientBuilder::create()
  6. ->setHandler($defaultHandler)
  7. ->build();

上面的设置会让客户端每500个请求进行一次批量处理。要注意的是,不论这个批次是否被填满,当开始处理一个 Future 对象时会导致整个批次被都处理。比如,只有499个请求在这个批次中…但是最后一个 Future 对象的直接处理(也就是获取结果)会导致整个批次都被直接处理(可能会等待比较久,也可以用这个特性来触发批处理):

  1. $handlerParams = [
  2. 'max_handles' => 500
  3. ];
  4. $defaultHandler = ClientBuilder::defaultHandler($handlerParams);
  5. $client = ClientBuilder::create()
  6. ->setHandler($defaultHandler)
  7. ->build();
  8. $futures = [];
  9. for ($i = 0; $i < 499; $i++) {
  10. $params = [
  11. 'index' => 'test',
  12. 'type' => 'test',
  13. 'id' => $i,
  14. 'client' => [
  15. 'future' => 'lazy'
  16. ]
  17. ];
  18. $futures[] = $client->get($params); // 构建一个请求队列
  19. }
  20. // 处理了最后一个 Future 对象,前面的也都会被处理
  21. $body = $future[499]['body'];

混沌的分批

一个批处理中可以包含各种类型的请求。比如,可以在一个批处理中加入 Get 请求、Index 请求和 Search 请求,这依然能被正常处理。

  1. $client = ClientBuilder::create()->build();
  2. $futures = [];
  3. $params = [
  4. 'index' => 'test',
  5. 'type' => 'test',
  6. 'id' => 1,
  7. 'client' => [
  8. 'future' => 'lazy'
  9. ]
  10. ];
  11. $futures['getRequest'] = $client->get($params); // 第一个请求 Get
  12. $params = [
  13. 'index' => 'test',
  14. 'type' => 'test',
  15. 'id' => 2,
  16. 'body' => [
  17. 'field' => 'value'
  18. ],
  19. 'client' => [
  20. 'future' => 'lazy'
  21. ]
  22. ];
  23. $futures['indexRequest'] = $client->index($params); // 第二个请求 Index
  24. $params = [
  25. 'index' => 'test',
  26. 'type' => 'test',
  27. 'body' => [
  28. 'query' => [
  29. 'match' => [
  30. 'field' => 'value'
  31. ]
  32. ]
  33. ],
  34. 'client' => [
  35. 'future' => 'lazy'
  36. ]
  37. ];
  38. $futures['searchRequest'] = $client->search($params); // 第三个请求 Search
  39. // 处理 Future,也就是获取其中一个结果,整个批次都会被处理...这里会阻塞,直到批处理请求完成才会返回
  40. $searchResults = $futures['searchRequest']['hits'];
  41. // 这里会立即返回值,因为已经在前面批处理中完成了请求
  42. $doc = $futures['getRequest']['_source'];

Future 模式注意事项

在使用 future 模式时,有几点需要注意。最重要也最明显的是:需要自己处理将要遇到的问题。这点通常比较细微,但有时会引入意想不到的并发症。

例如,如果你手动使用 wait(),你可能需要调用 wait() 多次,如果存在重试的话。这是因为每次重试都会引入另一层包裹的 futures,并且每个都需要被解决掉才能得到最终结果。

但是,如果通过 ArrayInterface 访问值,则不需要这样做 (例如: $response['hits']['hits']),因为 FutureArrayInterface 将自动地并且完全地解析 future 提供的值。

另外一个需要注意的是,某些 APIs 将失去 「助手」功能。 例如: "exists" APIs (如: $client→exists(), $client→indices()→exists, $client→indices→templateExists() 等) 通常在正常操作下返回 true 或 false。

在运行 future 模式时,future 的延展取决于你的应用,这就意味着客户端无法再检查响应并返回简单的 true / false。相反地,你将看到 Elasticsearch 原始返回并须要采取适当的措施。

这也适用于 ping()