当我尝试导入apache beam时,我收到以下错误.
>>> import apache_beam
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/toor/pfff/local/lib/python2.7/site-packages/apache_beam/__init__.py", line 78, in <module>
from apache_beam import io
File "/home/toor/pfff/local/lib/python2.7/site-packages/apache_beam/io/__init__.py", line 21, in <module>
...
from apitools.base.protorpclite import messages
File "/home/toor/pfff/local/lib/python2.7/site-packages/apitools/base/protorpclite/messages.py", line 1165, in <module>
class Field(six.with_metaclass(_FieldMeta, object)):
TypeError: Error when calling the metaclass bases
metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases
Run Code Online (Sandbox Code Playgroud)
我正在一个新的虚拟环境中工作,我已经通过安装带有pip的google-cloud-dataflow安装了apache_beam,因为我需要能够在google云平台上运行的版本.
pip install google-cloud-dataflow
我不知道如何解决这个错误.为了完整起见,我在Windows上的ubuntu上使用bash上的python 2.7.12.我的同事在Windows上的ubuntu上运行bash时遇到了同样的错误,而直接在Windows上运行工作正常.
安装的软件包版本是: …
二者DoFn并PTransform是定义操作的装置PCollection.我们怎么知道何时使用?
我正在研究通过Google Dataflow/Apache Beam处理来自Web用户会话的日志,并且需要将用户的日志(流式传输)与上个月用户会话的历史记录相结合.
我看过以下方法:
element的processElement(ProcessContext processContext)我的理解是,通过加载的数据.withSideInputs(pCollectionView)需要适合内存.我知道我可以将所有单个用户的会话历史记录放入内存,但不是所有会话历史记录.
我的问题是,是否有一种方法可以从仅与当前用户会话相关的侧输入加载/流式传输数据?
我想象一个parDo函数,它将通过指定用户的ID从侧面输入加载用户的历史会话.但只有当前用户的历史会话才适合内存; 通过侧输入加载所有历史会话将太大.
一些伪代码来说明:
public static class MetricFn extends DoFn<LogLine, String> {
final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;
public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
this.pHistoryView = historyView;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);
final LogLine currentLogLine = processContext.element();
final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
processContext.output(outputMetric);
}
}
Run Code Online (Sandbox Code Playgroud) 我正在尝试一个简单的示例,将 Kafka 主题的数据读取到 Apache Beam 中。这是相关的片段:
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| 'Read from Kafka' >> ReadFromKafka(
consumer_config={'bootstrap.servers': 'localhost:29092'},
topics=['test'])
| 'Print' >> beam.Map(print))
Run Code Online (Sandbox Code Playgroud)
使用上面的 Beam 管道片段,我没有看到任何消息传入。Kafka 在 Docker 容器中本地运行,我可以kafkacat从主机(容器外部)使用它来发布和订阅消息。所以,我想这方面没有问题。
看来 Beam 能够连接到 Kafka 并收到新消息的通知,因为我在发布数据时看到 Beam 日志中的偏移量发生了变化kafkacat:
INFO:root:severity: INFO
timestamp {
seconds: 1612886861
nanos: 534000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset of partition test-0"
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"
INFO:root:severity: INFO
timestamp {
seconds: 1612886861
nanos: 537000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, …Run Code Online (Sandbox Code Playgroud) 我有一个大约90GB的大型导入文件,由我用Java编写的数据流处理.使用PipelineOptionsFactory的默认设置,我的工作需要很长时间才能完成.如何增加工人数量以提高绩效?
谢谢
我们试图在Apache Beam管道上使用固定窗口(使用DirectRunner).我们的流程如下:
CombineFn,将每个Events 窗口组合成一个List<Event>List<Event>管道代码:
pipeline
// Read from pubsub topic to create unbounded PCollection
.apply(PubsubIO
.<String>read()
.topic(options.getTopic())
.withCoder(StringUtf8Coder.of())
)
// Deserialize JSON into Event object
.apply("ParseEvent", ParDo
.of(new ParseEventFn())
)
// Window events with a fixed window size of 5 seconds
.apply("Window", Window
.<Event>into(FixedWindows
.of(Duration.standardSeconds(5))
)
)
// Group events by window
.apply("CombineEvents", Combine
.globally(new CombineEventsFn())
.withoutDefaults()
)
// Log grouped events
.apply("LogEvent", ParDo
.of(new LogEventFn()) …Run Code Online (Sandbox Code Playgroud) 我正在使用Flink 1.4.1和Beam 2.3.0,并且想知道是否可以在Flink WebUI(或任何地方)中提供指标,如Dataflow WebUI中那样?
我用过像这样的柜台:
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
...
Counter elementsRead = Metrics.counter(getClass(), "elements_read");
...
elementsRead.inc();
Run Code Online (Sandbox Code Playgroud)
但我无法"elements_read"在Flink WebUI中找到任何可用的计数(任务指标或累加器).我认为在BEAM-773之后这将是直截了当的.
Apache Beam 似乎拒绝识别 Kotlin 的Iterable. 这是一个示例代码:
@ProcessElement
fun processElement(
@Element input: KV<String, Iterable<String>>, receiver: OutputReceiver<String>
) {
val output = input.key + "|" + input.value.toString()
println("output: $output")
receiver.output(output)
}
Run Code Online (Sandbox Code Playgroud)
我收到以下奇怪的错误:
java.lang.IllegalArgumentException:
...PrintString, @ProcessElement processElement(KV, OutputReceiver), @ProcessElement processElement(KV, OutputReceiver):
@Element argument must have type org.apache.beam.sdk.values.KV<java.lang.String, java.lang.Iterable<? extends java.lang.String>>
Run Code Online (Sandbox Code Playgroud)
果然,如果我替换Iterable为java.lang.Iterable,相同的代码就可以正常工作。我究竟做错了什么?
依赖版本:
1.3.212.11.0这是一个包含完整代码和堆栈跟踪的要点:
更新:
经过一番反复试验,我发现虽然List<String>抛出了类似的异常但MutableList<String>实际上有效:
class PrintString: DoFn<KV<String, MutableList<String>>, String>() {
@ProcessElement
fun processElement(
@Element input: KV<String, …Run Code Online (Sandbox Code Playgroud) 假设我有一个PCollection<Foo>并且我想将它写入多个BigQuery表,为每个表选择一个可能不同的表Foo.
如何使用Apache Beam BigQueryIOAPI 执行此操作?
尝试使用CSEK加载GCS文件时,我收到数据流错误
[ERROR] The target object is encrypted by a customer-supplied encryption key
Run Code Online (Sandbox Code Playgroud)
我打算尝试在数据流方面进行AES解密,但我发现如果没有传递加密密钥,我甚至无法获取该文件.
是否有另一种方法可以从数据流中加载CSEK加密的Google云端存储文件?例如,使用谷歌云存储API,获取流句柄然后将其传递给数据流?
// Fails
p.apply("Read from source", TextIO.read().from("gs://my_bucket/myfile")).apply(..);
Run Code Online (Sandbox Code Playgroud) google-cloud-storage google-cloud-platform google-cloud-dataflow apache-beam
apache-beam ×10
apache-flink ×2
java ×2
apache-kafka ×1
kotlin ×1
metrics ×1
python ×1
python-2.7 ×1