我正在尝试一个简单的示例,将 Kafka 主题的数据读取到 Apache Beam 中。这是相关的片段:
with beam.Pipeline(options=pipeline_options) as pipeline:
_ = (
pipeline
| 'Read from Kafka' >> ReadFromKafka(
consumer_config={'bootstrap.servers': 'localhost:29092'},
topics=['test'])
| 'Print' >> beam.Map(print))
Run Code Online (Sandbox Code Playgroud)
使用上面的 Beam 管道片段,我没有看到任何消息传入。Kafka 在 Docker 容器中本地运行,我可以kafkacat从主机(容器外部)使用它来发布和订阅消息。所以,我想这方面没有问题。
看来 Beam 能够连接到 Kafka 并收到新消息的通知,因为我在发布数据时看到 Beam 日志中的偏移量发生了变化kafkacat:
INFO:root:severity: INFO
timestamp {
seconds: 1612886861
nanos: 534000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, groupId=Reader-0_offset_consumer_1692125327_none] Seeking to LATEST offset of partition test-0"
log_location: "org.apache.kafka.clients.consumer.internals.SubscriptionState"
thread: "22"
INFO:root:severity: INFO
timestamp {
seconds: 1612886861
nanos: 537000000
}
message: "[Consumer clientId=consumer-Reader-0_offset_consumer_1692125327_none-3, …Run Code Online (Sandbox Code Playgroud) 我正在使用 Apache-Beam 运行一些数据转换,包括从 txt、csv 和不同数据源中提取数据。我注意到的一件事是使用beam.Map和beam.ParDo时的结果差异
在下一个示例中:
我正在读取 csv 数据,在第一种情况下,使用beam.ParDo将其传递给DoFn,它提取第一个元素,即日期,然后打印它。第二种情况,我直接用beam.Map做同样的事情,然后打印出来。
class Printer(beam.DoFn):
def process(self,data_item):
print data_item
class DateExtractor(beam.DoFn):
def process(self,data_item):
return (str(data_item).split(','))[0]
data_from_source = (p
| 'ReadMyFile 01' >> ReadFromText('./input/data.csv')
| 'Splitter using beam.ParDo 01' >> beam.ParDo(DateExtractor())
| 'Printer the data 01' >> beam.ParDo(Printer())
)
copy_of_the_data = (p
| 'ReadMyFile 02' >> ReadFromText('./input/data.csv')
| 'Splitter using beam.Map 02' >> beam.Map(lambda record: (record.split(','))[0])
| 'Printer the data 02' >> beam.ParDo(Printer())
) …Run Code Online (Sandbox Code Playgroud) 我想利用时间分区表的新BigQuery功能,但不确定这在1.6版本的Dataflow SDK中是否可行.
查看BigQuery JSON API,要创建一个分区表,需要传入一个
"timePartitioning": { "type": "DAY" }
Run Code Online (Sandbox Code Playgroud)
选项,但com.google.cloud.dataflow.sdk.io.BigQueryIO接口仅允许指定TableReference.
我想也许我可以预先创建表,并通过BigQueryIO.Write.toTableReference lambda潜入分区装饰器..?是否有其他人通过Dataflow创建/编写分区表成功?
这似乎与设置当前不可用的表到期时间类似.
我正在尝试使用以下代码从 pub/sub 中读取
Read<String> pubsub = PubsubIO.<String>read().topic("projects/<projectId>/topics/<topic>").subscription("projects/<projectId>/subscriptions/<subscription>").withCoder(StringUtf8Coder.of()).withAttributes(new SimpleFunction<PubsubMessage,String>() {
@Override
public String apply(PubsubMessage input) {
LOG.info("hola " + input.getAttributeMap());
return new String(input.getMessage());
}
});
PCollection<String> pps = p.apply(pubsub)
.apply(
Window.<String>into(
FixedWindows.of(Duration.standardSeconds(15))));
pps.apply("printdata",ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
LOG.info("hola amigo "+c.element());
c.output(c.element());
}
}));
Run Code Online (Sandbox Code Playgroud)
与我在 NodeJS 上收到的相比,我收到了将包含在该data字段中的消息。我怎样才能得到这个ackId字段(我以后可以用它来确认消息)?我正在打印的属性映射是null. 是否有其他方法可以确认所有消息而无需弄清楚 ackId?
TextIO.read() and AvroIO.read() (as well as some other Beam IO's) by default don't perform very well in current Apache Beam runners when reading a filepattern that expands into a very large number of files - for example, 1M files.
How can I read such a large number of files efficiently?
我想通过 DoFn 向在 Dataflow 上运行的 Apache Beam Pipeline 发出 POST 请求。
为此,我创建了一个客户端,它实例化在 PoolingHttpClientConnectionManager 上配置的 HttpClosableClient。
但是,我为我处理的每个元素实例化一个客户端。
我如何设置一个由我的所有元素使用的持久客户端?
还有其他我应该使用的并行和高速 HTTP 请求类吗?
我想将序列化的 protobuf 消息的 PCollection 写入文本文件并读回应该很容易。但经过几次尝试后我未能成功。如果有人有任何意见,将不胜感激。
// definition of proto.
syntax = "proto3";
package test;
message PhoneNumber {
string number = 1;
string country = 2;
}
Run Code Online (Sandbox Code Playgroud)
我有下面的 python 代码,它实现了一个简单的 Beam 管道,用于将文本写入序列化的 protobuf。
# Test python code
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import phone_pb2
class ToProtoFn(beam.DoFn):
def process(self, element):
phone = phone_pb2.PhoneNumber()
phone.number, phone.country = element.strip().split(',')
yield phone.SerializeToString()
with beam.Pipeline(options=PipelineOptions()) as p:
lines = (p
| beam.Create(["123-456-789,us", "345-567-789,ca"])
| beam.ParDo(ToProtoFn())
| beam.io.WriteToText('/Users/greeness/data/phone-pb'))
Run Code Online (Sandbox Code Playgroud)
管道可以成功运行并生成一个包含内容的文件:
$ cat ~/data/phone-pb-00000-of-00001
123-456-789us
345-567-789ca …Run Code Online (Sandbox Code Playgroud) 我正在尝试创建一个谷歌数据流模板,但我似乎找不到一种方法来做到这一点而不产生以下异常:
WARNING: Size estimation of the source failed: RuntimeValueProvider{propertyName=inputFile, default=null}
java.lang.IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: RuntimeValueProvider{propertyName=inputFile, default=null}
at org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:234)
at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:218)
at org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:78)
at org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:53)
at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:40)
at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:37)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:453)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:392)
at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:170)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:680)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:174)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
at org.apache.beam.examples.MyMinimalWordCount.main(MyMinimalWordCount.java:69)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)
我可以使用 Beam 的 …
I'm setting up a slow-changing lookup Map in my Apache-Beam pipeline. It continuously updates the lookup map. For each key in lookup map, I retrieve the latest value in the global window with accumulating mode. But it always meets Exception :
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Duplicate values for mykey
Is anything wrong with this snippet code?
If I use .discardingFiredPanes() instead, I will lose information in the last emit.
pipeline
.apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(1L)))
.apply(
Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()))
.accumulatingFiredPanes())
.apply(new ReadSlowChangingTable())
.apply(Latest.perKey())
.apply(View.asMap()); …Run Code Online (Sandbox Code Playgroud) 请参阅下面的代码片段,我想["metric1", "metric2"]成为 RunTask.process 的输入。但是,它分别使用“metric1”和“metric2”运行了两次
def run():
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p = beam.Pipeline(options=pipeline_options)
root = p | 'Get source' >> beam.Create([
"source_name" # maybe ["source_name"] makes more sense since my process function takes an array as an input?
])
metric1 = root | "compute1" >> beam.ParDo(RunLongCompute(myarg="1")) #let's say it returns ["metic1"]
metric2 = root | "compute2" >> beam.ParDo(RunLongCompute(myarg="2")) #let's say it returns ["metic2"]
metric3 = (metric1, metric2) | beam.Flatten() | beam.ParDo(RunTask()) # I want ["metric1", "metric2"] …Run Code Online (Sandbox Code Playgroud) apache-beam-io ×10
apache-beam ×9
python ×3
java ×2
apache-kafka ×1
dataflow ×1
http ×1
python-2.7 ×1