Elasticsearch/dataflow - 约 60 个并发连接后连接超时

Kaz*_*uki 0 python elasticsearch google-cloud-platform google-cloud-dataflow apache-beam

我们在 Elastic Cloud 上托管 elatsicsearch 集群并从数据流 (GCP) 调用它。工作在开发中运行良好,但当我们部署到产品时,我们在客户端看到大量连接超时。

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "main.py", line 159, in process
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/utils.py", line 152, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/__init__.py", line 1617, in search
    body=body,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 390, in perform_request
    raise e
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 365, in perform_request
    timeout=timeout,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/connection/http_urllib3.py", line 258, in perform_request
    raise ConnectionError("N/A", str(e), e)
elasticsearch.exceptions.ConnectionError: ConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>: Failed to establish a new connection: [Errno 110] Connection timed out) caused by: NewConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>: Failed to establish a new connection: [Errno 110] Connection timed out)
Run Code Online (Sandbox Code Playgroud)

我将 elasticsearch 客户端中的超时设置增加到 300 秒,如下所示,但似乎没有帮助。

self.elasticsearch = Elasticsearch([es_host], http_auth=http_auth, timeout=300)
Run Code Online (Sandbox Code Playgroud)

查看部署https://cloud.elastic.co/deployments/ /metrics CPU 和内存使用率非常低(低于 10%),搜索响应时间也为 200 毫秒。这里的瓶颈可能是什么?我们如何避免这种超时?

如下日志所示,大多数请求因连接超时而失败,而成功的请求很快就会收到响应:

在此输入图像描述

我尝试通过 ssh 进入虚拟机,但遇到连接错误。netstat显示大约有 60 个到弹性搜索 IP 地址的 ESTABLISHED 连接。当我从虚拟机卷曲到弹性搜索地址时,我能够重现超时。我可以很好地卷曲到其他 URL。另外,我可以从本地很好地卷曲到elasticsearch,所以问题只是虚拟机和elasticsaerch服务器之间的连接。

dataflow(计算引擎)或ElasticSearch对并发连接数有限制吗?我在网上找不到任何信息。

Pab*_*blo 5

我对 ElasticSearch 连接器做了一些研究。您可能需要尝试遵循两个原则来确保您的连接器尽可能高效。

注意按照其他答案中的建议设置最大工作线程数可能不会有太大帮助(目前) - 让我们提高 Beam/Elastic 集群资源的利用率,如果我们开始达到其中任何一个的限制,那么我们可以考虑限制工人数量 - 但现在,您可以尝试改进您的连接器。

使用对外部服务的批量请求

您提供的代码对进入 DoFn 的每个元素发出单独的搜索请求。正如您所指出的,这工作正常,但它会导致您的管道花费太多时间等待每个元素的外部请求 - 因此您等待往返的时间将为 O(n)。

很高兴,Elasticsearch客户端有一个msearch方法,它应该允许您批量执行搜索。你可以这样做:

class PredictionFn(beam.DoFn):
    def __init__(self, ...):
      self.buffer = []
    ...
    def process(self, element):
        self.buffer.append(element)
        if len(self.buffer) > BATCH_SIZE:
          return self.flush()

    def flush(self):
        result = []

        # Perform the search requests for user ids
        user_ids = [uid for cid, did, uid in self.buffer]
        user_ids_request = self._build_uid_reqs(user_ids)

        resp = es.msearch(body=user_ids_request)

        user_id_and_device_id_lists = []
        for r, elm in zip(resp['responses'], self.buffer):
          if len(r["hits"]["hits"]) == 0:
            continue
          # Get new device_id_list
          user_id_and_device_id_lists.append((elm[2],  # User ID
                                              device_id_list))
          

        device_id_lists = [elm[1] for elm in user_id_and_device_id_lists]
        device_ids_request = self._build_device_id_reqs(device_id_lists)

        resp = es.msearch(body=device_ids_request)

        resp = self.elasticsearch.search(index="sessions", body={"query": {"match": {"userId": user_id }}})
        # Handle the result, output anything necessary

    def _build_uid_reqs(self, uids):
      # Relying on this answer: /sf/ask/1998237741/
      res = []
      for uid in uids:
        res.append(json.dumps({'index': 'sessions'}))  # Request HEAD
        res.append(json.dumps({"query": {"match": {"userId": uid }}}))  # Request BODY

      return '\n'.join(res)
Run Code Online (Sandbox Code Playgroud)

重用客户端,因为它是线程安全的

客户Elasticsearch也是线程安全的

因此,您可以执行以下操作,而不是每次都创建一个新的:

class PredictionFn(beam.DoFn):
    CLIENT = None

    def init_elasticsearch(self):
        if PredictionFn.CLIENT is not None:
          return PredictionFn.CLIENT
        es_host = fetch_host()
        http_auth = fetch_auth()
        PredictionFn.CLIENT = Elasticsearch([es_host], http_auth=http_auth, 
            timeout=300, sniff_on_connection_fail=True,
            retry_on_timeout=True, max_retries=2,
            maxsize=5) # 5 connections per client
        return PredictionFn.CLIENT
Run Code Online (Sandbox Code Playgroud)

这应该确保您为每个工作人员保留一个客户端,并且您不会创建太多与 ElasticSearch 的连接 - 从而不会收到拒绝消息。

让我知道这两个是否有帮助,或者我们是否需要尝试进一步的改进!