标签: apache-beam

导入apache_beam元类冲突

当我尝试导入apache beam时,我收到以下错误.

>>> import apache_beam
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/toor/pfff/local/lib/python2.7/site-packages/apache_beam/__init__.py", line 78, in <module>
    from apache_beam import io
  File "/home/toor/pfff/local/lib/python2.7/site-packages/apache_beam/io/__init__.py", line 21, in <module>
    ...
    from apitools.base.protorpclite import messages
  File "/home/toor/pfff/local/lib/python2.7/site-packages/apitools/base/protorpclite/messages.py", line 1165, in <module>
    class Field(six.with_metaclass(_FieldMeta, object)):
TypeError: Error when calling the metaclass bases
    metaclass conflict: the metaclass of a derived class must be a (non-strict) subclass of the metaclasses of all its bases
Run Code Online (Sandbox Code Playgroud)

我正在一个新的虚拟环境中工作,我已经通过安装带有pip的google-cloud-dataflow安装了apache_beam,因为我需要能够在google云平台上运行的版本.

pip install google-cloud-dataflow

我不知道如何解决这个错误.为了完整起见,我在Windows上的ubuntu上使用bash上的python 2.7.12.我的同事在Windows上的ubuntu上运行bash时遇到了同样的错误,而直接在Windows上运行工作正常.

安装的软件包版本是: …

python-2.7 google-cloud-dataflow apache-beam

12
推荐指数
1
解决办法
1420
查看次数

Apache Beam:DoFn与PTransform

二者DoFnPTransform是定义操作的装置PCollection.我们怎么知道何时使用?

google-cloud-dataflow apache-beam

12
推荐指数
1
解决办法
1514
查看次数

如何将流数据与Dataflow/Beam中的大型历史数据集相结合

我正在研究通过Google Dataflow/Apache Beam处理来自Web用户会话的日志,并且需要将用户的日志(流式传输)与上个月用户会话的历史记录相结合.

我看过以下方法:

  1. 使用30天的固定窗口:最有可能放入大窗口以适应内存,我不需要更新用户的历史记录,只需参考它
  2. 使用CoGroupByKey连接两个数据集,但这两个数据集必须具有相同的窗口大小(https://cloud.google.com/dataflow/model/group-by-key#join),这在我的案例(24小时对30天)
  3. 使用侧输入检索用户的会话历史对于一个给定elementprocessElement(ProcessContext processContext)

我的理解是,通过加载的数据.withSideInputs(pCollectionView)需要适合内存.我知道我可以将所有单个用户的会话历史记录放入内存,但不是所有会话历史记录.

我的问题是,是否有一种方法可以从仅与当前用户会话相关的侧输入加载/流式传输数据?

我想象一个parDo函数,它将通过指定用户的ID从侧面输入加载用户的历史会话.但只有当前用户的历史会话才适合内存; 通过侧输入加载所有历史会话将太大.

一些伪代码来说明:

public static class MetricFn extends DoFn<LogLine, String> {

    final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;

    public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
        this.pHistoryView = historyView;
    }

    @Override
    public void processElement(ProcessContext processContext) throws Exception {
        Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);

        final LogLine currentLogLine = processContext.element();
        final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
        final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
        processContext.output(outputMetric);
    }
}
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow apache-flink apache-beam

11
推荐指数
1
解决办法
2137
查看次数

Apache Beam Python SDK ReadFromKafka 未收到数据

我正在尝试一个简单的示例,将 Kafka 主题的数据读取到 Apache Beam 中。这是相关的片段:

  with beam.Pipeline(options=pipeline_options) as pipeline:
    _ = (
        pipeline
        | 'Read from Kafka' >> ReadFromKafka(
            consumer_config={'bootstrap.servers': 'localhost:29092'},
            topics=['test'])
        | 'Print' >> beam.Map(print))
Run Code Online (Sandbox Code Playgroud)

使用上面的 Beam 管道片段,我没有看到任何消息传入。Kafka 在 Docker 容器中本地运行,我可以kafkacat从主机(容器外部)使用它来发布和订阅消息。所以,我想这方面没有问题。

看来 Beam 能够连接到 Kafka 并收到新消息的通知,因为我在发布数据时看到 Beam 日志中的偏移量发生了变化kafkacat

INFO:root:severity: INFO
timestamp {
  seconds: 1612886861
  nanos: 534000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset of partition test-0"
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"

INFO:root:severity: INFO
timestamp {
  seconds: 1612886861
  nanos: 537000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, …
Run Code Online (Sandbox Code Playgroud)

python apache-kafka apache-beam apache-beam-io

11
推荐指数
0
解决办法
2992
查看次数

如何为数据流指定工作者数量?

我有一个大约90GB的大型导入文件,由我用Java编写的数据流处理.使用PipelineOptionsFactory的默认设置,我的工作需要很长时间才能完成.如何增加工人数量以提高绩效?

谢谢

google-cloud-dataflow apache-beam

10
推荐指数
0
解决办法
1065
查看次数

使用Apache Beam进行窗口化 - 固定Windows似乎不会关闭?

我们试图在Apache Beam管道上使用固定窗口(使用DirectRunner).我们的流程如下:

  1. 从pub/sub中提取数据
  2. 将JSON反序列化为Java对象
  3. 窗口事件w /固定窗口5秒
  4. 使用自定义CombineFn,将每个Events 窗口组合成一个List<Event>
  5. 为了测试,只需输出结果 List<Event>

管道代码:

    pipeline
                // Read from pubsub topic to create unbounded PCollection
                .apply(PubsubIO
                    .<String>read()
                    .topic(options.getTopic())
                    .withCoder(StringUtf8Coder.of())
                )

                // Deserialize JSON into Event object
                .apply("ParseEvent", ParDo
                    .of(new ParseEventFn())
                )

                // Window events with a fixed window size of 5 seconds
                .apply("Window", Window
                    .<Event>into(FixedWindows
                        .of(Duration.standardSeconds(5))
                    )
                )

                // Group events by window
                .apply("CombineEvents", Combine
                    .globally(new CombineEventsFn())
                    .withoutDefaults()
                )

                // Log grouped events
                .apply("LogEvent", ParDo
                    .of(new LogEventFn()) …
Run Code Online (Sandbox Code Playgroud)

java google-cloud-dataflow apache-beam

10
推荐指数
1
解决办法
4015
查看次数

Apache光束计数器/度量标准在Flink WebUI中不可用

我正在使用Flink 1.4.1和Beam 2.3.0,并且想知道是否可以在Flink WebUI(或任何地方)中提供指标,如Dataflow WebUI中那样?

我用过像这样的柜台:

import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
...
Counter elementsRead = Metrics.counter(getClass(), "elements_read");
...
elementsRead.inc();
Run Code Online (Sandbox Code Playgroud)

但我无法"elements_read"在Flink WebUI中找到任何可用的计数(任务指标或累加器).我认为在BEAM-773之后这将是直截了当的.

java metrics apache-flink apache-beam

10
推荐指数
1
解决办法
757
查看次数

Apache Beam 不支持 Kotlin Iterable?

Apache Beam 似乎拒绝识别 Kotlin 的Iterable. 这是一个示例代码:

@ProcessElement
fun processElement(
    @Element input: KV<String, Iterable<String>>, receiver: OutputReceiver<String>
) {
    val output = input.key + "|" + input.value.toString()
    println("output: $output")
    receiver.output(output)
}
Run Code Online (Sandbox Code Playgroud)

我收到以下奇怪的错误:

java.lang.IllegalArgumentException:
   ...PrintString, @ProcessElement processElement(KV, OutputReceiver), @ProcessElement processElement(KV, OutputReceiver):
   @Element argument must have type org.apache.beam.sdk.values.KV<java.lang.String, java.lang.Iterable<? extends java.lang.String>>
Run Code Online (Sandbox Code Playgroud)

果然,如果我替换Iterablejava.lang.Iterable,相同的代码就可以正常工作。我究竟做错了什么?

依赖版本:

  • 科特林-jvm: 1.3.21
  • org.apache.beam: 2.11.0

这是一个包含完整代码和堆栈跟踪的要点:

更新

经过一番反复试验,我发现虽然List<String>抛出了类似的异常但MutableList<String>实际上有效:

class PrintString: DoFn<KV<String, MutableList<String>>, String>() {
    @ProcessElement
    fun processElement(
        @Element input: KV<String, …
Run Code Online (Sandbox Code Playgroud)

kotlin google-cloud-dataflow apache-beam

10
推荐指数
1
解决办法
773
查看次数

在Apache Beam中为不同的BigQuery表写入不同的值

假设我有一个PCollection<Foo>并且我想将它写入多个BigQuery表,为每个表选择一个可能不同的表Foo.

如何使用Apache Beam BigQueryIOAPI 执行此操作?

google-bigquery google-cloud-dataflow apache-beam

9
推荐指数
1
解决办法
2439
查看次数

数据流,使用客户提供的加密密钥加载文件

尝试使用CSEK加载GCS文件时,我收到数据流错误

[ERROR] The target object is encrypted by a customer-supplied encryption key
Run Code Online (Sandbox Code Playgroud)

我打算尝试在数据流方面进行AES解密,但我发现如果没有传递加密密钥,我甚至无法获取该文件.

是否有另一种方法可以从数据流中加载CSEK加密的Google云端存储文件?例如,使用谷歌云存储API,获取流句柄然后将其传递给数据流?

    // Fails
    p.apply("Read from source", TextIO.read().from("gs://my_bucket/myfile")).apply(..); 
Run Code Online (Sandbox Code Playgroud)

google-cloud-storage google-cloud-platform google-cloud-dataflow apache-beam

9
推荐指数
1
解决办法
576
查看次数