Apache Beam和Apache Nifi有哪些用例?它们似乎都是数据流引擎.如果两者都有相似的用例,哪两个更好?
我的 Beam 管道正在写入未分区的 BigQuery 目标表。PCollection 由数百万个 TableRow 组成。如果我使用 DirectRunner 运行 BigQueryIO,它显然会首先为 BigQueryWriteTemp 临时文件夹中的每条记录创建一个临时文件。这显然表现不佳。我在这里做错了吗?这是一个正常的批处理作业,而不是流式处理。(使用 DataflowRunner 运行的相同作业似乎没有这样做)
myrows.apply("WriteToBigQuery",
                BigQueryIO.writeTableRows().to(BQ_TARGET_TABLE)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
这是我们看到的日志。这些文件中的每一个都包含一个 TableRow。DataflowRunner 上的相同似乎只创建了大约 3 个文件。
2017-08-14 11:43:49 INFO  TableRowWriter:63 - Opening TableRowWriter to gs://my-bucket/tmp/BigQueryWriteTemp/4836c162e29d43f58c4f5cc55b1b3bb3/59668b03-a1e8-4288-a049-3472e7cb6333.
2017-08-14 11:43:49 INFO  TableRowWriter:63 - Opening TableRowWriter to gs://my-bucket/tmp/BigQueryWriteTemp/4836c162e29d43f58c4f5cc55b1b3bb3/feeb454b-799e-4d77-bd12-dec313cdadc2.
2017-08-14 11:43:49 INFO  TableRowWriter:63 - Opening TableRowWriter to gs://my-bucket/tmp/BigQueryWriteTemp/4836c162e29d43f58c4f5cc55b1b3bb3/3c63db33-787f-4215-a425-3446d92157ed.
2017-08-14 11:43:49 INFO  TableRowWriter:63 - Opening TableRowWriter to gs://my-bucket/tmp/BigQueryWriteTemp/4836c162e29d43f58c4f5cc55b1b3bb3/87d55556-e012-4bef-8856-69efd4c5ab26.
2017-08-14 11:43:49 INFO  TableRowWriter:63 - Opening TableRowWriter to gs://my-bucket/tmp/BigQueryWriteTemp/4836c162e29d43f58c4f5cc55b1b3bb3/5e6bfe94-b1c9-49cb-b0c7-a768d78d85f3.
2017-08-14 11:43:49 INFO  TableRowWriter:63 - Opening TableRowWriter to gs://my-bucket/tmp/BigQueryWriteTemp/4836c162e29d43f58c4f5cc55b1b3bb3/b236948b-bdf0-4bfe-9d26-4e67c8904320.
2017-08-14 11:43:49 …我想弄清楚如何使用 Apache Beam 读取大型 CSV 文件。我所说的“大”是指几 GB(因此一次将整个 CSV 读入内存是不切实际的)。
到目前为止,我已经尝试了以下选项:
Beam 是否提供了任何简单的方法来让我以我想要的方式解析文件,而不必在继续下一个转换之前将整个文件读入内存?
尝试使用CSEK加载GCS文件时,我收到数据流错误
[ERROR] The target object is encrypted by a customer-supplied encryption key
我打算尝试在数据流方面进行AES解密,但我发现如果没有传递加密密钥,我甚至无法获取该文件.
是否有另一种方法可以从数据流中加载CSEK加密的Google云端存储文件?例如,使用谷歌云存储API,获取流句柄然后将其传递给数据流?
    // Fails
    p.apply("Read from source", TextIO.read().from("gs://my_bucket/myfile")).apply(..); 
google-cloud-storage google-cloud-platform google-cloud-dataflow apache-beam
我正在使用 Apache-Beam 运行一些数据转换,包括从 txt、csv 和不同数据源中提取数据。我注意到的一件事是使用beam.Map和beam.ParDo时的结果差异
在下一个示例中:
我正在读取 csv 数据,在第一种情况下,使用beam.ParDo将其传递给DoFn,它提取第一个元素,即日期,然后打印它。第二种情况,我直接用beam.Map做同样的事情,然后打印出来。
class Printer(beam.DoFn):
    def process(self,data_item):
        print data_item
class DateExtractor(beam.DoFn):
    def process(self,data_item):
        return (str(data_item).split(','))[0]
data_from_source = (p
                    | 'ReadMyFile 01' >> ReadFromText('./input/data.csv')
                    | 'Splitter using beam.ParDo 01' >> beam.ParDo(DateExtractor())
                    | 'Printer the data 01' >> beam.ParDo(Printer())
                    )
copy_of_the_data =  (p
                    | 'ReadMyFile 02' >> ReadFromText('./input/data.csv')
                    | 'Splitter using beam.Map 02' >> beam.Map(lambda record: (record.split(','))[0])
                    | 'Printer the data 02' >> beam.ParDo(Printer())
                    ) …我按照以下指南编写了TF Records,我曾经在那里tf.Transform预处理我的功能.现在,我想部署我的模型,我需要在真实的实时数据上应用这个预处理功能.
首先,假设我有2个功能:
features = ['amount', 'age']
我有transform_fn来自Apache Beam,来自working_dir=gs://path-to-transform-fn/
然后我使用以下方法加载转换函数:
tf_transform_output = tft.TFTransformOutput(working_dir)
我认为在生产中服务的最简单方法是获取一系列处理过的数据,然后调用model.predict()(我使用的是Keras模型).
要做到这一点,我认为transform_raw_features()方法正是我所需要的.
但是,似乎在构建架构之后:
raw_features = {}
for k in features:
    raw_features.update({k: tf.constant(1)})
print(tf_transform_output.transform_raw_features(raw_features))
我明白了:
AttributeError: 'Tensor' object has no attribute 'indices'
现在,我假设发生了这种情况,因为我tf.VarLenFeature()在我定义架构时使用了preprocessing_fn.
def preprocessing_fn(inputs):
    outputs = inputs.copy()
    for _ in features:
        outputs[_] = tft.scale_to_z_score(outputs[_])
我使用以下方法构建元数据:
RAW_DATA_FEATURE_SPEC = {}
for _ in features:
    RAW_DATA_FEATURE_SPEC[_] = tf.VarLenFeature(dtype=tf.float32)
    RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec(RAW_DATA_FEATURE_SPEC))
所以简而言之,给一本字典:
d = …
python tensorflow tensorflow-serving apache-beam tensorflow-transform
我在使用 dockerized Apache Beam 时遇到了问题。当尝试运行容器时,我只收到"No id provided."消息,仅此而已。这是代码和文件:
Dockerfile
FROM apache/beam_python3.8_sdk:latest
RUN apt update
RUN apt install -y wget curl unzip git
COPY ./ /root/data_analysis/
WORKDIR /root/data_analysis
RUN python3 -m pip install -r data_analysis/beam/requirements.txt
ENV PYTHONPATH=/root/data_analysis
ENV WORKER_ID=1
CMD python3 data_analysis/analysis.py
代码analysis.py:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run():
    options = PipelineOptions(["--runner=DirectRunner"])
    with beam.Pipeline(options=options) as p:
        p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x-1) | beam.Map(print)
if __name__ == "__main__":
    run()
命令:
% …我们正在使用Apache Beam和DirectRunner作为跑步者构建管道.我们目前正在尝试一个简单的管道,我们:
CombineFn将这些窗口从事件转换为事件列表来组合这些窗口.管道代码:
pipeline
.apply(PubsubIO.<String>read().topic(options.getTopic()).withCoder(StringUtf8Coder.of()))
.apply("ParseEvent", ParDo.of(new ParseEventFn()))
.apply("WindowOneMinute",Window.<Event>into(FixedWindows.of(Duration.standardMinutes(1))))              
.apply("CombineEvents", Combine.globally(new CombineEventsFn()));
ParseEvent函数:
    static class ParseEventFn extends DoFn<String, Event> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String json = c.element();
            c.output(gson.fromJson(json, Event.class));
        }
    }
CombineEvents功能:
public static class CombineEventsFn extends CombineFn<Event, CombineEventsFn.Accum, EventListWrapper> {
        public static class Accum {
            EventListWrapper eventListWrapper = new EventListWrapper();
        }
        @Override
        public Accum createAccumulator() {
            return new Accum();
        }
        @Override
        public Accum addInput(Accum accumulator, Event …java google-cloud-platform google-cloud-dataflow apache-beam
我们正在为Apache Beam管道构建集成测试,并且遇到了一些问题.有关背景信息,请参见
有关我们管道的详情:
PubsubIO我们的数据源(无界PCollection)CombineFn和非常简单的窗口/触发策略JdbcIO,用org.neo4j.jdbc.Driver写的Neo4j目前的测试方法:
OurPipeline.main(TestPipeline.convertToArgs(options)PubsubIO将从这是一个简单的集成测试,它将验证我们的整个管道是否按预期运行.
我们目前面临的问题是,当我们运行我们的管道时,它会阻塞.我们正在使用DirectRunner和pipeline.run()(不 pipeline.run().waitUntilFinish()),但测试似乎在运行管道后挂起.因为这是一个无限制的PCollection(在流模式下运行),管道不会终止,因此不会到达它之后的任何代码.
所以,我有几个问题:
1)有没有办法运行管道然后稍后手动停止?
2)有没有办法异步运行管道?理想情况下,它会启动管道(然后将继续轮询Pub/Sub以获取数据),然后转到负责发布到Pub/Sub的代码.
3)这种集成测试方法是否合理,或者是否有更好的方法可能更直接?这里的任何信息/指导将不胜感激.
如果我能提供任何额外的代码/背景,请告诉我 - 谢谢!
java integration-testing google-cloud-pubsub google-cloud-dataflow apache-beam
我的团队开发的数据流管道突然开始卡住,停止处理事件。他们的工作日志充满警告消息,指出一个特定步骤被卡住了。奇怪的是,失败的步骤是不同的,一个是BigQuery输出,另一个是Cloud Storage输出。
以下是我们收到的日志消息:
对于BigQuery输出:
Processing stuck in step <STEP_NAME>/StreamingInserts/StreamingWriteTables/StreamingWrite for at least <TIME> without outputting or completing in state finish
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
  at java.util.concurrent.FutureTask.get(FutureTask.java:191)
  at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:765)
  at org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:829)
  at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:131)
  at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:103)
  at org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn$DoFnInvoker.invokeFinishBundle(Unknown Source)
对于云存储输出:
Processing stuck in step <STEP_NAME>/WriteFiles/WriteShardedBundlesToTempFiles/WriteShardsIntoTempFiles for at least <TIME> without outputting or completing in state process
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
  at java.util.concurrent.FutureTask.get(FutureTask.java:191)
  at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.waitForCompletionAndThrowIfUploadFailed(AbstractGoogleAsyncWriteChannel.java:421)
  at com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel.close(AbstractGoogleAsyncWriteChannel.java:287)
  at org.apache.beam.sdk.io.FileBasedSink$Writer.close(FileBasedSink.java:1007)
  at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:726)
  at org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
所有应用程序都已被排干并重新部署,但是一段时间(3到4个小时)后发生了相同的事情。其中一些已经运行了40多个天,而他们突然进入了代码库,而没有对代码进行任何更改。
我想寻求帮助以了解此问题的原因。这些是存在这些问题的某些Dataflow作业的以下ID:
卡在BigQuery输出中: …
apache-beam ×10
java ×2
apache ×1
apache-nifi ×1
docker ×1
python ×1
python-2.7 ×1
python-3.8 ×1
tensorflow ×1