标签: apache-beam-io

Apache Beam Python SDK ReadFromKafka 未收到数据

我正在尝试一个简单的示例,将 Kafka 主题的数据读取到 Apache Beam 中。这是相关的片段:

  with beam.Pipeline(options=pipeline_options) as pipeline:
    _ = (
        pipeline
        | 'Read from Kafka' >> ReadFromKafka(
            consumer_config={'bootstrap.servers': 'localhost:29092'},
            topics=['test'])
        | 'Print' >> beam.Map(print))
Run Code Online (Sandbox Code Playgroud)

使用上面的 Beam 管道片段,我没有看到任何消息传入。Kafka 在 Docker 容器中本地运行,我可以kafkacat从主机(容器外部)使用它来发布和订阅消息。所以,我想这方面没有问题。

看来 Beam 能够连接到 Kafka 并收到新消息的通知,因为我在发布数据时看到 Beam 日志中的偏移量发生了变化kafkacat

INFO:root:severity: INFO
timestamp {
  seconds: 1612886861
  nanos: 534000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset of partition test-0"
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"

INFO:root:severity: INFO
timestamp {
  seconds: 1612886861
  nanos: 537000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, …
Run Code Online (Sandbox Code Playgroud)

python apache-kafka apache-beam apache-beam-io

11
推荐指数
0
解决办法
2992
查看次数

输出类型中beam.ParDo 和beam.Map 的区别?

我正在使用 Apache-Beam 运行一些数据转换,包括从 txt、csv 和不同数据源中提取数据。我注意到的一件事是使用beam.Mapbeam.ParDo时的结果差异

在下一个示例中:

我正在读取 csv 数据,在第一种情况下,使用beam.ParDo将其传递给DoFn,它提取第一个元素,即日期,然后打印它。第二种情况,我直接用beam.Map做同样的事情,然后打印出来。

class Printer(beam.DoFn):
    def process(self,data_item):
        print data_item

class DateExtractor(beam.DoFn):
    def process(self,data_item):
        return (str(data_item).split(','))[0]

data_from_source = (p
                    | 'ReadMyFile 01' >> ReadFromText('./input/data.csv')
                    | 'Splitter using beam.ParDo 01' >> beam.ParDo(DateExtractor())
                    | 'Printer the data 01' >> beam.ParDo(Printer())
                    )

copy_of_the_data =  (p
                    | 'ReadMyFile 02' >> ReadFromText('./input/data.csv')
                    | 'Splitter using beam.Map 02' >> beam.Map(lambda record: (record.split(','))[0])
                    | 'Printer the data 02' >> beam.ParDo(Printer())
                    ) …
Run Code Online (Sandbox Code Playgroud)

python-2.7 apache-beam apache-beam-io

9
推荐指数
1
解决办法
5592
查看次数

通过Google Cloud Dataflow创建/写入Parititoned BigQuery表

我想利用时间分区表的新BigQuery功能,但不确定这在1.6版本的Dataflow SDK中是否可行.

查看BigQuery JSON API,要创建一个分区表,需要传入一个

"timePartitioning": { "type": "DAY" }
Run Code Online (Sandbox Code Playgroud)

选项,但com.google.cloud.dataflow.sdk.io.BigQueryIO接口仅允许指定TableReference.

我想也许我可以预先创建表,并通过BigQueryIO.Write.toTableReference lambda潜入分区装饰器..?是否有其他人通过Dataflow创建/编写分区表成功?

这似乎与设置当前不可用的表到期时间类似.

google-bigquery google-cloud-dataflow apache-beam-io

8
推荐指数
2
解决办法
4379
查看次数

在 Apache Beam 上确认 Google Pub/Sub 消息

我正在尝试使用以下代码从 pub/sub 中读取

Read<String> pubsub = PubsubIO.<String>read().topic("projects/<projectId>/topics/<topic>").subscription("projects/<projectId>/subscriptions/<subscription>").withCoder(StringUtf8Coder.of()).withAttributes(new SimpleFunction<PubsubMessage,String>() {
    @Override
    public String apply(PubsubMessage input) {
        LOG.info("hola " + input.getAttributeMap());
        return new String(input.getMessage());
    }
});
PCollection<String> pps = p.apply(pubsub)
        .apply(
                Window.<String>into(
                    FixedWindows.of(Duration.standardSeconds(15))));
pps.apply("printdata",ParDo.of(new DoFn<String, String>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        LOG.info("hola amigo "+c.element());
        c.output(c.element());
    }
  }));
Run Code Online (Sandbox Code Playgroud)

与我在 NodeJS 上收到的相比,我收到了将包含在该data字段中的消息。我怎样才能得到这个ackId字段(我以后可以用它来确认消息)?我正在打印的属性映射是null. 是否有其他方法可以确认所有消息而无需弄清楚 ackId?

java google-cloud-pubsub apache-beam apache-beam-io

5
推荐指数
1
解决办法
1494
查看次数

读取大量文件时,如何提高 TextIO 或 AvroIO 的性能?

TextIO.read() and AvroIO.read() (as well as some other Beam IO's) by default don't perform very well in current Apache Beam runners when reading a filepattern that expands into a very large number of files - for example, 1M files.

How can I read such a large number of files efficiently?

google-cloud-dataflow apache-beam apache-beam-io

5
推荐指数
1
解决办法
1254
查看次数

DoFn 中的 HTTP 客户端

我想通过 DoFn 向在 Dataflow 上运行的 Apache Beam Pipeline 发出 POST 请求。

为此,我创建了一个客户端,它实例化在 PoolingHttpClientConnectionManager 上配置的 HttpClosableClient。

但是,我为我处理的每个元素实例化一个客户端。

我如何设置一个由我的所有元素使用的持久客户端?

还有其他我应该使用的并行和高速 HTTP 请求类吗?

http google-cloud-dataflow apache-beam apache-beam-io

5
推荐指数
1
解决办法
2454
查看次数

在 Beam 中读写序列化的 protobuf

我想将序列化的 protobuf 消息的 PCollection 写入文本文件并读回应该很容易。但经过几次尝试后我未能成功。如果有人有任何意见,将不胜感激。

// definition of proto.

syntax = "proto3";
package test;
message PhoneNumber {
  string number = 1;
  string country = 2;
}
Run Code Online (Sandbox Code Playgroud)

我有下面的 python 代码,它实现了一个简单的 Beam 管道,用于将文本写入序列化的 protobuf。

# Test python code
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import phone_pb2

class ToProtoFn(beam.DoFn):
  def process(self, element):
    phone = phone_pb2.PhoneNumber()
    phone.number, phone.country = element.strip().split(',')
    yield phone.SerializeToString()

with beam.Pipeline(options=PipelineOptions()) as p:
  lines = (p 
      | beam.Create(["123-456-789,us", "345-567-789,ca"])
      | beam.ParDo(ToProtoFn())
      | beam.io.WriteToText('/Users/greeness/data/phone-pb'))
Run Code Online (Sandbox Code Playgroud)

管道可以成功运行并生成一个包含内容的文件:

$ cat ~/data/phone-pb-00000-of-00001 


123-456-789us


345-567-789ca …
Run Code Online (Sandbox Code Playgroud)

python protocol-buffers apache-beam apache-beam-io

5
推荐指数
1
解决办法
3573
查看次数

使用 TextIO 和 ValueProvider 创建数据流模板时出错

我正在尝试创建一个谷歌数据流模板,但我似乎找不到一种方法来做到这一点而不产生以下异常:

WARNING: Size estimation of the source failed: RuntimeValueProvider{propertyName=inputFile, default=null}
java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=inputFile, default=null}
        at org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:234)
        at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:218)
        at org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:78)
        at org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:53)
        at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:40)
        at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:37)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:453)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
        at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
        at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:392)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:170)
        at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:680)
        at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:174)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
        at org.apache.beam.examples.MyMinimalWordCount.main(MyMinimalWordCount.java:69)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
        at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

我可以使用 Beam 的 …

java google-cloud-dataflow apache-beam apache-beam-io

5
推荐指数
1
解决办法
5018
查看次数

How to solve Duplicate values exception when I create PCollectionView&lt;Map&lt;String,String&gt;&gt;

I'm setting up a slow-changing lookup Map in my Apache-Beam pipeline. It continuously updates the lookup map. For each key in lookup map, I retrieve the latest value in the global window with accumulating mode. But it always meets Exception :

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Duplicate values for mykey

Is anything wrong with this snippet code?

If I use .discardingFiredPanes() instead, I will lose information in the last emit.

pipeline
  .apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(1L)))
  .apply(
      Window.<Long>into(new GlobalWindows())
         .triggering(Repeatedly.forever(
             AfterProcessingTime.pastFirstElementInPane()))
         .accumulatingFiredPanes())
  .apply(new ReadSlowChangingTable())
  .apply(Latest.perKey())
  .apply(View.asMap()); …
Run Code Online (Sandbox Code Playgroud)

dataflow google-cloud-dataflow apache-beam apache-beam-io

5
推荐指数
1
解决办法
1332
查看次数

如何合并两个结果并将其传送到 apache-beam 管道中的下一步

请参阅下面的代码片段,我想["metric1", "metric2"]成为 RunTask.process 的输入。但是,它分别使用“metric1”和“metric2”运行了两次

def run():
  
  pipeline_options = PipelineOptions(pipeline_args)
  pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
  p = beam.Pipeline(options=pipeline_options)

  root = p | 'Get source' >> beam.Create([
      "source_name" # maybe ["source_name"] makes more sense since my process function takes an array as an input?
  ])

  metric1 = root | "compute1" >> beam.ParDo(RunLongCompute(myarg="1")) #let's say it returns ["metic1"]
  metric2 = root | "compute2" >> beam.ParDo(RunLongCompute(myarg="2")) #let's say it returns ["metic2"]

  metric3 = (metric1, metric2) | beam.Flatten() | beam.ParDo(RunTask()) # I want ["metric1", "metric2"] …
Run Code Online (Sandbox Code Playgroud)

python google-cloud-dataflow apache-beam apache-beam-io

5
推荐指数
1
解决办法
4157
查看次数