小编Sae*_*ham的帖子

来自Kafka Consumer的Spark Streaming

我可能需要和卡夫卡一起工作,我对它很陌生.我知道有些Kafka制作人会将日志(在Kafka中称为事件或消息或记录)发布到Kafka主题.

我需要通过消费者阅读卡夫卡主题.我是否需要首先设置消费者API然后我可以使用SparkStreaming Context(PySpark)进行流式传输,或者我可以直接使用KafkaUtils模块来读取kafka主题?

如果我需要设置Kafka消费者应用程序,我该怎么做?请您分享正确文档的链接.

提前致谢!!

apache-kafka apache-spark spark-streaming kafka-consumer-api pyspark

5
推荐指数
1
解决办法
817
查看次数

设置 templateLocation 参数时数据流作业运行失败

当我传递参数 staging、temp 和输出 GCS 存储桶位置时,数据流作业失败并出现以下异常。

爪哇代码:

final String[] used = Arrays.copyOf(args, args.length + 1); 
used[used.length - 1] = "--project=OVERWRITTEN"; final T options = 
PipelineOptionsFactory.fromArgs(used).withValidation().as(clazz); 
options.setProject(PROJECT_ID); 
options.setStagingLocation("gs://abc/staging/"); 
options.setTempLocation("gs://abc/temp"); 
options.setRunner(DataflowRunner.class); 
options.setGcpTempLocation("gs://abc");
Run Code Online (Sandbox Code Playgroud)

错误:

INFO: Staging pipeline description to gs://ups-heat-dev- tmp/mniazstaging_ingest_validation/staging/
May 10, 2018 11:56:35 AM org.apache.beam.runners.dataflow.util.PackageUtil tryStagePackage
INFO: Uploading <42088 bytes, hash E7urYrjAOjwy6_5H-UoUxA> to gs://ups-heat-dev-tmp/mniazstaging_ingest_validation/staging/pipeline-E7urYrjAOjwy6_5H-UoUxA.pb
Dataflow SDK version: 2.4.0
May 10, 2018 11:56:38 AM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: Printed job specification to gs://ups-heat-dev-tmp/mniazstaging_ingest_validation/templates/DataValidationPipeline
May 10, 2018 11:56:40 AM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: Template successfully created. …
Run Code Online (Sandbox Code Playgroud)

google-cloud-platform google-cloud-dataflow

5
推荐指数
1
解决办法
2154
查看次数

带窗口的 GroupByKey 后,Beam 管道不产生任何输出,并且出现内存错误

目的:

我想加载流数据,然后添加一个键,然后按键计数。

问题:

当我尝试使用流方法(无界数据)加载和按键分组大数据时,Apache Beam Dataflow 管道出现内存错误。因为似乎数据是按分组累积的,并且不会在触发每个窗口时更早地触发数据。

如果我减小元素大小(元素数量不会改变),它会起作用!因为实际上 group-by step 等待所有数据被分组,然后触发所有新的窗口数据。

我对两者进行了测试:

梁版本 2.11.0 和 scio 版本 0.7.4

梁版本 2.6.0 和 scio 版本 0.6.1

重新生成错误的方法:

  1. 阅读包含文件名的 Pubsub 消息
  2. 从 GCS 读取并加载相关文件作为逐行迭代器
  3. 逐行展平(因此它生成大约 10,000 个)元素
  4. 向元素添加时间戳(当前即时时间)
  5. 创建我的数据的键值(使用一些从 1 到 10 的随机整数键)
  6. Apply window with triggering(在行很小且没有内存问题的情况下会触发大约50次)
  7. 每个键计数(按键分组然后组合它们)
  8. 最后,我们应该有大约 50 * 10 个元素代表按窗口和键的计数(当行大小足够小时测试成功)

管道的可视化(步骤 4 到 7):

在此处输入图片说明

group-by-key 步骤的总结:

在此处输入图片说明

如您所见,数据是按组累积的,不会发出。

窗口代码在这里:

val windowedData = data.applyKvTransform(
  Window.into[myt](
    Sessions.withGapDuration(Duration.millis(1)))
    .triggering(
      Repeatedly.forever(AfterFirst.of(
        AfterPane.elementCountAtLeast(10),
        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))

      ).orFinally(AfterWatermark.pastEndOfWindow())

    ).withAllowedLateness(Duration.standardSeconds(100))
    .discardingFiredPanes()

)
Run Code Online (Sandbox Code Playgroud)

错误:

org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and …
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow apache-beam spotify-scio

3
推荐指数
1
解决办法
1840
查看次数

在 Scala 中初始化 Apache Beam 测试管道失败

当我尝试运行测试管道时,它会引发错误

这是创建测试管道的源代码:

val p: TestPipeline = TestPipeline.create()
Run Code Online (Sandbox Code Playgroud)

这是错误:

java.lang.IllegalStateException:您的 TestPipeline 声明是否缺少 @Rule 注释?用法:@Rule public final Transient TestPipeline pipeline = TestPipeline.create();

java pipeline scala apache-beam

3
推荐指数
1
解决办法
658
查看次数