Cassandra在阻止同步请求的多个进程中同步执行

Luc*_*chi 1 python database cql multiprocessing cassandra

我有一个应用程序,它读取一系列包含道路车辆通道日志的XML文件.然后,应用程序处理每个记录,转换一些信息以匹配数据库列并将其插入到cassandra数据库中(在远程服务器中运行单个节点[它在内部网络中,因此连接不是真正的问题]) .在数据库中插入数据之后,每个文件的进程继续读取此数据并生成摘要表的信息,这使得信息可以在应用程序的不相关部分中进行深入分析.

我正在使用多处理来并行处理许多XML文件,而我遇到的麻烦就是与cassandra服务器进行通信.示意性地,该过程如下:

  1. 从XML文件中读取记录
  2. 流程记录的数据
  3. 将处理后的数据插入数据库(使用.execute_async(query))
  4. 重复1到3,直到XMl文件结束
  5. 等待我所做的所有插入查询的响应
  6. 从数据库中读取数据
  7. 处理读取数据
  8. 将已处理的数据插入摘要表中

现在,这在多个并行进程中运行顺畅,直到一个进程进入步骤6,其请求(使用.execute(query),即我将等待响应)始终面临超时.我收到的错误是:

 Process ProcessoImportacaoPNCT-1:
Traceback (most recent call last):
  File "C:\Users\Lucas\Miniconda\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\ImportacaoArquivosPNCT.py", line 231, in run
    core.CalculoIndicadoresPNCT.processa_equipamento(sessao_cassandra, equipamento, data, sentido, faixa)
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 336, in processa_equipamento
    desvio_medias(sessao_cassandra, equipamento, data_referencia, sentido, faixa)
  File "C:\Users\Lucas\PycharmProjects\novo_importador\app\core\CalculoIndicadoresPNCT.py", line 206, in desvio_medias
    veiculos = sessao_cassandra.execute(sql_pronto)
  File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 1594, in execute
    result = future.result(timeout)
  File "C:\Users\Lucas\Miniconda\lib\site-packages\cassandra\cluster.py", line 3296, in result
    raise self._final_exception
ReadTimeout: code=1200 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'received_responses': 0, 'required_responses': 1, 'consistency': 'ONE'}
Run Code Online (Sandbox Code Playgroud)

我已经将服务器中的超时更改为荒谬的时间(例如500000000毫秒),并且我还尝试在客户端中设置超时限制,.execute(query, timeout=3000)但仍然没有成功.

现在,当更多进程遇到同样的问题并且多个进程中的步骤1-3的强烈写入停止时,到达步骤6的最后进程已成功遵循该过程,这使我认为问题是cassandra优先考虑对于我每秒要求的数万个插入请求,要么忽略我的读取请求,要么将其放回到行中.

在我看来,解决这个问题的一种方法是,无论如何我都可以要求cassandra优先考虑我的读取请求,以便我可以继续处理,即使这意味着减慢其他进程.

现在,作为旁注,您可能认为我的流程建模不是最优的,我很乐意听到这方面的意见,但对于这个应用程序的实际情况,在我们的愿景中,这是最好的方法.所以我们实际上已经广泛地考虑过优化过程,但是(如果cassandra服务器可以处理它),这对我们的现实来说是最佳的.

所以,TL; DR:在执行成千上万的异步查询时,有没有办法优先考虑查询?如果没有,有没有办法以请求不超时的方式每秒执行成千上万的插入查询和读取查询?另外,你建议我做什么来解决这个问题?并行运行较少的进程显然是一个解决方案,但我试图避免.所以,很想听听每个人的想法.

在插入时存储数据所以我不需要再次读取它来进行汇总是不可能的,因为XML文件很大并且内存是个问题.

Jim*_*yer 5

我不知道如何优先读取查询.我相信内部Cassandra有独立的线程池用于读写操作,因此它们并行运行.如果没有看到您正在进行的架构和查询,很难说您是否正在进行非常昂贵的读取操作,或者如果系统如此淹没了写入,则无法跟上读取.

您可能希望在应用程序运行时尝试监视Cassandra中正在发生的事情.您可以使用几种工具来监控正在发生的事情.例如,如果你ssh到你的Cassandra节点并运行:

watch -n 1 nodetool tpstats
Run Code Online (Sandbox Code Playgroud)

这将显示线程池统计信息(每秒更新一次).您将能够查看队列是否正在填满或操作是否被阻止.如果任何"Dropped"计数器增加,那么这表示您没有足够的容量来执行您正在尝试的操作.如果是这种情况,则通过添加更多节点来添加容量,或者更改模式和方法,以便节点执行的工作量减少.

其他有用的东西要监控(在linux上使用watch -n 1来连续监控):

nodetool compactionstats
nodetool netstats
nodetool cfstats <keyspace.table name>
nodetool cfhistograms <keyspace> <table name>
Run Code Online (Sandbox Code Playgroud)

使用top命令和iostat等linux命令监视节点也很好,可以检查CPU利用率和磁盘利用率.

我的印象是你的单个节点没有足够的容量来完成你所提供的所有工作,所以要么你需要每单位时间处理更少的数据,要么添加更多的Cassandra节点以展开工作量.

由于分区行太多,我目前正面临自己的超时错误,因此我可能需要在分区键中添加基数以使每个分区的内容更小.