标签: apache-beam

Apache Beam和Apache Nifi之间的区别

Apache Beam和Apache Nifi有哪些用例?它们似乎都是数据流引擎.如果两者都有相似的用例,哪两个更好?

apache-nifi apache-beam

9
推荐指数
1
解决办法
5331
查看次数

Apache Beam BigQueryIO 写入缓慢

我的 Beam 管道正在写入未分区的 BigQuery 目标表。PCollection 由数百万个 TableRow 组成。如果我使用 DirectRunner 运行 BigQueryIO,它显然会首先为 BigQueryWriteTemp 临时文件夹中的每条记录创建一个临时文件。这显然表现不佳。我在这里做错了吗?这是一个正常的批处理作业,而不是流式处理。(使用 DataflowRunner 运行的相同作业似乎没有这样做)

myrows.apply("WriteToBigQuery",
                BigQueryIO.writeTableRows().to(BQ_TARGET_TABLE)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
Run Code Online (Sandbox Code Playgroud)

这是我们看到的日志。这些文件中的每一个都包含一个 TableRow。DataflowRunner 上的相同似乎只创建了大约 3 个文件。

2017-08-14 11:43:49 INFO  TableRowWriter:63 - Opening TableRowWriter to gs://my-bucket/tmp/BigQueryWriteTemp/4836c162e29d43f58c4f5cc55b1b3bb3/59668b03-a1e8-4288-a049-3472e7cb6333.
2017-08-14 11:43:49 INFO  TableRowWriter:63 - Opening TableRowWriter to gs://my-bucket/tmp/BigQueryWriteTemp/4836c162e29d43f58c4f5cc55b1b3bb3/feeb454b-799e-4d77-bd12-dec313cdadc2.
2017-08-14 11:43:49 INFO  TableRowWriter:63 - Opening TableRowWriter to gs://my-bucket/tmp/BigQueryWriteTemp/4836c162e29d43f58c4f5cc55b1b3bb3/3c63db33-787f-4215-a425-3446d92157ed.
2017-08-14 11:43:49 INFO  TableRowWriter:63 - Opening TableRowWriter to gs://my-bucket/tmp/BigQueryWriteTemp/4836c162e29d43f58c4f5cc55b1b3bb3/87d55556-e012-4bef-8856-69efd4c5ab26.
2017-08-14 11:43:49 INFO  TableRowWriter:63 - Opening TableRowWriter to gs://my-bucket/tmp/BigQueryWriteTemp/4836c162e29d43f58c4f5cc55b1b3bb3/5e6bfe94-b1c9-49cb-b0c7-a768d78d85f3.
2017-08-14 11:43:49 INFO  TableRowWriter:63 - Opening TableRowWriter to gs://my-bucket/tmp/BigQueryWriteTemp/4836c162e29d43f58c4f5cc55b1b3bb3/b236948b-bdf0-4bfe-9d26-4e67c8904320.
2017-08-14 11:43:49 …
Run Code Online (Sandbox Code Playgroud)

google-bigquery apache-beam

9
推荐指数
1
解决办法
484
查看次数

如何使用 Beam 读取大型 CSV?

我想弄清楚如何使用 Apache Beam 读取大型 CSV 文件。我所说的“大”是指几 GB(因此一次将整个 CSV 读入内存是不切实际的)。

到目前为止,我已经尝试了以下选项:

  • 使用 TextIO.read():这不好,因为引用的 CSV 字段可能包含换行符。此外,这会尝试一次将整个文件读入内存。
  • 编写一个 DoFn,将文件作为流读取并发出记录(例如使用 commons-csv)。但是,这仍然一次读取整个文件。
  • 尝试使用此处所述的 SplittableDoFn 。我的目标是让它逐渐将记录作为无界 PCollection 发出 - 基本上,将我的文件变成记录流。但是,(1) 很难正确计算 (2) 由于 ParDo 创建了多个线程,因此它需要一些复杂的同步,并且 (3) 我生成的 PCollection 仍然不是无限的。
  • 尝试创建我自己的 UnboundedSource。这似乎非常复杂且记录不足(除非我遗漏了什么?)。

Beam 是否提供了任何简单的方法来让我以我想要的方式解析文件,而不必在继续下一个转换之前将整个文件读入内存?

apache-beam

9
推荐指数
1
解决办法
2424
查看次数

数据流,使用客户提供的加密密钥加载文件

尝试使用CSEK加载GCS文件时,我收到数据流错误

[ERROR] The target object is encrypted by a customer-supplied encryption key
Run Code Online (Sandbox Code Playgroud)

我打算尝试在数据流方面进行AES解密,但我发现如果没有传递加密密钥,我甚至无法获取该文件.

是否有另一种方法可以从数据流中加载CSEK加密的Google云端存储文件?例如,使用谷歌云存储API,获取流句柄然后将其传递给数据流?

    // Fails
    p.apply("Read from source", TextIO.read().from("gs://my_bucket/myfile")).apply(..); 
Run Code Online (Sandbox Code Playgroud)

google-cloud-storage google-cloud-platform google-cloud-dataflow apache-beam

9
推荐指数
1
解决办法
576
查看次数

输出类型中beam.ParDo 和beam.Map 的区别?

我正在使用 Apache-Beam 运行一些数据转换,包括从 txt、csv 和不同数据源中提取数据。我注意到的一件事是使用beam.Mapbeam.ParDo时的结果差异

在下一个示例中:

我正在读取 csv 数据,在第一种情况下,使用beam.ParDo将其传递给DoFn,它提取第一个元素,即日期,然后打印它。第二种情况,我直接用beam.Map做同样的事情,然后打印出来。

class Printer(beam.DoFn):
    def process(self,data_item):
        print data_item

class DateExtractor(beam.DoFn):
    def process(self,data_item):
        return (str(data_item).split(','))[0]

data_from_source = (p
                    | 'ReadMyFile 01' >> ReadFromText('./input/data.csv')
                    | 'Splitter using beam.ParDo 01' >> beam.ParDo(DateExtractor())
                    | 'Printer the data 01' >> beam.ParDo(Printer())
                    )

copy_of_the_data =  (p
                    | 'ReadMyFile 02' >> ReadFromText('./input/data.csv')
                    | 'Splitter using beam.Map 02' >> beam.Map(lambda record: (record.split(','))[0])
                    | 'Printer the data 02' >> beam.ParDo(Printer())
                    ) …
Run Code Online (Sandbox Code Playgroud)

python-2.7 apache-beam apache-beam-io

9
推荐指数
1
解决办法
5592
查看次数

应用TensorFlow Transform来转换/缩放生产中的要素

概观

我按照以下指南编写了TF Records,我曾经在那里tf.Transform预处理我的功能.现在,我想部署我的模型,我需要在真实的实时数据上应用这个预处理功能.

我的方法

首先,假设我有2个功能:

features = ['amount', 'age']
Run Code Online (Sandbox Code Playgroud)

我有transform_fn来自Apache Beam,来自working_dir=gs://path-to-transform-fn/

然后我使用以下方法加载转换函数:

tf_transform_output = tft.TFTransformOutput(working_dir)

我认为在生产中服务的最简单方法是获取一系列处理过的数据,然后调用model.predict()(我使用的是Keras模型).

要做到这一点,我认为transform_raw_features()方法正是我所需要的.

但是,似乎在构建架构之后:

raw_features = {}
for k in features:
    raw_features.update({k: tf.constant(1)})

print(tf_transform_output.transform_raw_features(raw_features))
Run Code Online (Sandbox Code Playgroud)

我明白了:

AttributeError: 'Tensor' object has no attribute 'indices'
Run Code Online (Sandbox Code Playgroud)

现在,我假设发生了这种情况,因为我tf.VarLenFeature()在我定义架构时使用了preprocessing_fn.

def preprocessing_fn(inputs):
    outputs = inputs.copy()

    for _ in features:
        outputs[_] = tft.scale_to_z_score(outputs[_])
Run Code Online (Sandbox Code Playgroud)

我使用以下方法构建元数据:

RAW_DATA_FEATURE_SPEC = {}
for _ in features:
    RAW_DATA_FEATURE_SPEC[_] = tf.VarLenFeature(dtype=tf.float32)
    RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec(RAW_DATA_FEATURE_SPEC))
Run Code Online (Sandbox Code Playgroud)

所以简而言之,给一本字典:

d = …

python tensorflow tensorflow-serving apache-beam tensorflow-transform

9
推荐指数
1
解决办法
568
查看次数

Dockerized Apache Beam 返回“未提供 id”

我在使用 dockerized Apache Beam 时遇到了问题。当尝试运行容器时,我只收到"No id provided."消息,仅此而已。这是代码和文件:

Dockerfile

FROM apache/beam_python3.8_sdk:latest
RUN apt update
RUN apt install -y wget curl unzip git
COPY ./ /root/data_analysis/
WORKDIR /root/data_analysis
RUN python3 -m pip install -r data_analysis/beam/requirements.txt
ENV PYTHONPATH=/root/data_analysis
ENV WORKER_ID=1
CMD python3 data_analysis/analysis.py
Run Code Online (Sandbox Code Playgroud)

代码analysis.py

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

def run():
    options = PipelineOptions(["--runner=DirectRunner"])

    with beam.Pipeline(options=options) as p:
        p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x-1) | beam.Map(print)

if __name__ == "__main__":
    run()
Run Code Online (Sandbox Code Playgroud)

命令:

% …
Run Code Online (Sandbox Code Playgroud)

apache docker apache-beam python-3.8

9
推荐指数
1
解决办法
3046
查看次数

编码器问题Apache Beam和CombineFn

我们正在使用Apache Beam和DirectRunner作为跑步者构建管道.我们目前正在尝试一个简单的管道,我们:

  1. 从Google Cloud Pub/Sub中提取数据(目前使用模拟器在本地运行)
  2. 反序列化为Java对象
  3. 使用固定窗口1分钟的窗口事件
  4. 使用自定义CombineFn将这些窗口从事件转换为事件列表来组合这些窗口.

管道代码:

pipeline
.apply(PubsubIO.<String>read().topic(options.getTopic()).withCoder(StringUtf8Coder.of()))

.apply("ParseEvent", ParDo.of(new ParseEventFn()))

.apply("WindowOneMinute",Window.<Event>into(FixedWindows.of(Duration.standardMinutes(1))))              

.apply("CombineEvents", Combine.globally(new CombineEventsFn()));
Run Code Online (Sandbox Code Playgroud)

ParseEvent函数:

    static class ParseEventFn extends DoFn<String, Event> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String json = c.element();
            c.output(gson.fromJson(json, Event.class));
        }
    }
Run Code Online (Sandbox Code Playgroud)

CombineEvents功能:

public static class CombineEventsFn extends CombineFn<Event, CombineEventsFn.Accum, EventListWrapper> {
        public static class Accum {
            EventListWrapper eventListWrapper = new EventListWrapper();
        }

        @Override
        public Accum createAccumulator() {
            return new Accum();
        }

        @Override
        public Accum addInput(Accum accumulator, Event …
Run Code Online (Sandbox Code Playgroud)

java google-cloud-platform google-cloud-dataflow apache-beam

8
推荐指数
2
解决办法
2864
查看次数

Apache Beam - 使用无界PCollection进行集成测试

我们正在为Apache Beam管道构建集成测试,并且遇到了一些问题.有关背景信息,请参见

有关我们管道的详情:

  • 我们使用PubsubIO我们的数据源(无界PCollection)
  • 中间变换包括自定义CombineFn和非常简单的窗口/触发策略
  • 我们最终的变换JdbcIO,用org.neo4j.jdbc.Driver写的Neo4j

目前的测试方法:

  • 在运行测试的计算机上运行Google Cloud的Pub/Sub模拟器
  • 构建内存中的Neo4j数据库并将其URI传递给我们的管道选项
  • 通过调用运行管道 OurPipeline.main(TestPipeline.convertToArgs(options)
  • 使用Google Cloud的Java Pub/Sub客户端库将消息发布到测试主题(使用Pub/Sub模拟器),该主题PubsubIO将从
  • 数据应该流经管道并最终命中我们的内存中的Neo4j实例
  • 在Neo4j中对这些数据的存在做出简单的断言

这是一个简单的集成测试,它将验证我们的整个管道是否按预期运行.

我们目前面临的问题是,当我们运行我们的管道时,它会阻塞.我们正在使用DirectRunnerpipeline.run()( pipeline.run().waitUntilFinish()),但测试似乎在运行管道后挂起.因为这是一个无限制的PCollection(在流模式下运行),管道不会终止,因此不会到达它之后的任何代码.

所以,我有几个问题:

1)有没有办法运行管道然后稍后手动停止?

2)有没有办法异步运行管道?理想情况下,它会启动管道(然后将继续轮询Pub/Sub以获取数据),然后转到负责发布到Pub/Sub的代码.

3)这种集成测试方法是否合理,或者是否有更好的方法可能更直接?这里的任何信息/指导将不胜感激.

如果我能提供任何额外的代码/背景,请告诉我 - 谢谢!

java integration-testing google-cloud-pubsub google-cloud-dataflow apache-beam

8
推荐指数
1
解决办法
961
查看次数

数据流管道-“处理在步骤&lt;STEP_NAME&gt;中停留了至少&lt;TIME&gt;,而没有在状态完成中输出或完成...”

我的团队开发的数据流管道突然开始卡住,停止处理事件。他们的工作日志充满警告消息,指出一个特定步骤被卡住了。奇怪的是,失败的步骤是不同的,一个是BigQuery输出,另一个是Cloud Storage输出。

以下是我们收到的日志消息:

对于BigQuery输出:

Processing stuck in step <STEP_NAME>/StreamingInserts/StreamingWriteTables/StreamingWrite for at least <TIME> without outputting or completing in state finish
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
  at java.util.concurrent.FutureTask.get(FutureTask.java:191)
  at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:765)
  at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:829)
  at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:131)
  at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:103)
  at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn$DoFnInvoker.invokeFinishBundle(Unknown Source)
Run Code Online (Sandbox Code Playgroud)

对于云存储输出:

Processing stuck in step <STEP_NAME>/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least <TIME> without outputting or completing in state process
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
  at java.util.concurrent.FutureTask.get(FutureTask.java:191)
  at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:421)
  at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:287)
  at org.apache.beam.sdk.io.FileBasedSink$Writer.close(FileBasedSink.java:1007)
  at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:726)
  at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
Run Code Online (Sandbox Code Playgroud)

所有应用程序都已被排干并重新部署,但是一段时间(3到4个小时)后发生了相同的事情。其中一些已经运行了40多个天,而他们突然进入了代码库,而没有对代码进行任何更改。

我想寻求帮助以了解此问题的原因。这些是存在这些问题的某些Dataflow作业的以下ID:

卡在BigQuery输出中: …

google-cloud-dataflow apache-beam

8
推荐指数
1
解决办法
1665
查看次数