小编ajm*_*rie的帖子

使用PySpark从Neo4j中提取数据

我的时间序列当前存储为Neo4j服务器实例版本2.3.6中的图形(使用时间树结构,类似于)(仅限REST接口,无Bolt).我想要做的是使用PySpark以分布式方式对这些时间序列进行一些分析.

现在,我知道将Spark与Neo4j连接的现有项目,特别是这里列出的项目.这些问题是他们专注于创建一个使用图形的界面.在我的情况下,图形是不相关的,因为我的Neo4j Cypher查询旨在生成值数组.下游的一切都是关于处理这些数组作为时间序列; 再次,而不是图表.

我的问题是:有没有人使用PySpark成功并行查询仅支持REST的Neo4j实例,如果是的话,你是怎么做到的?该py2neo库似乎是一个不错的人选,直到我实现了连接对象不能跨分区(或者如果可以,我不知道如何)共享.现在我正在考虑让我的Spark作业在Neo4j服务器上运行独立的REST查询,但我想看看社区如何解决这个问题.

最好的,Aurélien

python neo4j pyspark

12
推荐指数
1
解决办法
479
查看次数

处理Dask分布式异常的方法

我在使用DaskDistributed开发数据分析管道方面取得了很大成功.然而,我仍然期待改进的一件事是我处理异常的方式.

现在,如果,我写下面的内容

def my_function (value):
    return 1 / value

results = (dask.bag
    .from_sequence(range(-10, 10))
    .map(my_function))

print(results.compute())
Run Code Online (Sandbox Code Playgroud)

...然后在运行程序时,我得到一个很长很长的追溯列表(每个工人一个,我猜).最相关的部分是

distributed.utils - ERROR - division by zero
Traceback (most recent call last):
  File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/utils.py", line 193, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/Users/ajmazurie/test/.env/pyenv-3.6.0-default/lib/python3.6/site-packages/distributed/client.py", line 1473, in …
Run Code Online (Sandbox Code Playgroud)

python dask

8
推荐指数
1
解决办法
1107
查看次数

标签 统计

python ×2

dask ×1

neo4j ×1

pyspark ×1