当我尝试导入apache beam时,我收到以下错误.
>>> import apache_beam
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/toor/pfff/local/lib/python2.7/site-packages/apache_beam/__init__.py", line 78, in <module>
from apache_beam import io
File "/home/toor/pfff/local/lib/python2.7/site-packages/apache_beam/io/__init__.py", line 21, in <module>
...
from apitools.base.protorpclite import messages
File "/home/toor/pfff/local/lib/python2.7/site-packages/apitools/base/protorpclite/messages.py", line 1165, in <module>
class Field(six.with_metaclass(_FieldMeta, object)):
TypeError: Error when calling the metaclass bases
metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases
Run Code Online (Sandbox Code Playgroud)
我正在一个新的虚拟环境中工作,我已经通过安装带有pip的google-cloud-dataflow安装了apache_beam,因为我需要能够在google云平台上运行的版本.
pip install google-cloud-dataflow
我不知道如何解决这个错误.为了完整起见,我在Windows上的ubuntu上使用bash上的python 2.7.12.我的同事在Windows上的ubuntu上运行bash时遇到了同样的错误,而直接在Windows上运行工作正常.
安装的软件包版本是: …
二者DoFn并PTransform是定义操作的装置PCollection.我们怎么知道何时使用?
我正在设置一个数据流作业,为此作业,工作人员需要访问私有的bitbucket存储库以安装库来处理数据。为了授予对数据流工作者的访问权限,我设置了一对SSH密钥(公共和私有)。我设法将私钥添加到我的数据流工作者上。当尝试通过git + ssh pip安装软件包时,出现错误Host key verification failed。
我试图.ssh/known_hosts在dataflow worker上查找文件,但是在普通VM上并不那么简单。
另外,我可以通过以下命令自行设置它,但效果不佳:
mkdir -p ~/.ssh
chmod 0700 ~/.ssh
ssh-keyscan bitbucket.org > ~/.ssh/known_hosts
Run Code Online (Sandbox Code Playgroud)
我仍然收到Host key verification failed错误。
建议的替代解决方案是运行此问题,ssh-keygen -R bitbucket.org但随后出现以下错误:
mkstemp: No such file or directory
对于Dataflow Python SDK,您需要使用来打包代码setup.py。工人启动时要执行的所有命令都用编写subprocess.Popen。命令列表如下:
CUSTOM_COMMANDS = [
# decrypt key encrypted key in repository via gcloud kms
['gcloud', '-v'],
['gcloud', 'kms', 'decrypt', '--location', 'global', '--keyring',
'bitbucketpackages', '--key', 'package', '--plaintext-file',
'bb_package_key_decrypted', '--ciphertext-file', 'bb_package_key'],
['chmod', '700', 'bb_package_key_decrypted'],
# …Run Code Online (Sandbox Code Playgroud) 阅读Google的Dataflow API,我的印象是它与Apache Storm的功能非常相似.通过流水线流实时数据处理.除非我完全忽略了这一点,否则我不希望在如何执行彼此写入的管道上建立桥梁,而是期待与Google不同的东西,而不是重新发明轮子.Apache Storm已经很好地放置并可用于任何编程语言.做这样的事情的真正价值是什么?
我正在研究通过Google Dataflow/Apache Beam处理来自Web用户会话的日志,并且需要将用户的日志(流式传输)与上个月用户会话的历史记录相结合.
我看过以下方法:
element的processElement(ProcessContext processContext)我的理解是,通过加载的数据.withSideInputs(pCollectionView)需要适合内存.我知道我可以将所有单个用户的会话历史记录放入内存,但不是所有会话历史记录.
我的问题是,是否有一种方法可以从仅与当前用户会话相关的侧输入加载/流式传输数据?
我想象一个parDo函数,它将通过指定用户的ID从侧面输入加载用户的历史会话.但只有当前用户的历史会话才适合内存; 通过侧输入加载所有历史会话将太大.
一些伪代码来说明:
public static class MetricFn extends DoFn<LogLine, String> {
final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;
public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
this.pHistoryView = historyView;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);
final LogLine currentLogLine = processContext.element();
final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
processContext.output(outputMetric);
}
}
Run Code Online (Sandbox Code Playgroud) 嗨,我正在尝试运行一个管道,我正在计算发布到pubsub的消息与30秒心跳*之间的差异*(10K流,每个心跳每30秒).我不关心100%的数据完整性,但我想了解PubsubIO的水印启发式(以及我是否可以调整它),以确定我是否能够以足够低的损失忽略后期数据.
*注意,pubsub主题提供了[可能需要几天]持久性,以防我们不得不取消管道,因此启发式工作与积压订阅很有效.
有人可以解释如何计算水印(假设使用了timestamplabel()),以及如何调整水印(如果有的话)?
我有一个大约90GB的大型导入文件,由我用Java编写的数据流处理.使用PipelineOptionsFactory的默认设置,我的工作需要很长时间才能完成.如何增加工人数量以提高绩效?
谢谢
我刚刚开始使用Google Data Flow,我编写了一个简单的流程,从云存储中读取CSV文件.其中一个步骤涉及调用Web服务以丰富结果.有问题的Web服务在批量发送多个100个请求时表现更好.
在查看API时,我没有看到将PCollection的100个元素聚合到单个Par.do执行中的好方法.然后需要拆分结果以处理写入BigQuery表的流的最后一步.
不确定我是否需要使用窗口是我想要的.我看到的大多数窗口示例都更适合在给定时间段内进行计数.
我是该项目的新手,我正在尝试在Dataflow和数据库之间创建一个连接器.
文档明确指出我应该使用Source和Sink但我看到很多人直接使用与PInput或PDone相关联的PTransform.
源/接收器API处于实验阶段(使用PTransform解释所有示例),但似乎更容易将其与自定义运行器集成(例如:spark).
如果我参考代码,则使用这两种方法.我看不到任何使用PTransform API会更有趣的用例.
Source/Sink API是否应该重新设计PTranform API?
我是否遗漏了能明确区分这两种方法的东西?
Source/Sink API是否足够稳定,被认为是编码输入和输出的好方法?
谢谢你的建议!
我们试图在Apache Beam管道上使用固定窗口(使用DirectRunner).我们的流程如下:
CombineFn,将每个Events 窗口组合成一个List<Event>List<Event>管道代码:
pipeline
// Read from pubsub topic to create unbounded PCollection
.apply(PubsubIO
.<String>read()
.topic(options.getTopic())
.withCoder(StringUtf8Coder.of())
)
// Deserialize JSON into Event object
.apply("ParseEvent", ParDo
.of(new ParseEventFn())
)
// Window events with a fixed window size of 5 seconds
.apply("Window", Window
.<Event>into(FixedWindows
.of(Duration.standardSeconds(5))
)
)
// Group events by window
.apply("CombineEvents", Combine
.globally(new CombineEventsFn())
.withoutDefaults()
)
// Log grouped events
.apply("LogEvent", ParDo
.of(new LogEventFn()) …Run Code Online (Sandbox Code Playgroud)