我可能需要和卡夫卡一起工作,我对它很陌生.我知道有些Kafka制作人会将日志(在Kafka中称为事件或消息或记录)发布到Kafka主题.
我需要通过消费者阅读卡夫卡主题.我是否需要首先设置消费者API然后我可以使用SparkStreaming Context(PySpark)进行流式传输,或者我可以直接使用KafkaUtils模块来读取kafka主题?
如果我需要设置Kafka消费者应用程序,我该怎么做?请您分享正确文档的链接.
提前致谢!!
apache-kafka apache-spark spark-streaming kafka-consumer-api pyspark
当我传递参数 staging、temp 和输出 GCS 存储桶位置时,数据流作业失败并出现以下异常。
爪哇代码:
final String[] used = Arrays.copyOf(args, args.length + 1);
used[used.length - 1] = "--project=OVERWRITTEN"; final T options =
PipelineOptionsFactory.fromArgs(used).withValidation().as(clazz);
options.setProject(PROJECT_ID);
options.setStagingLocation("gs://abc/staging/");
options.setTempLocation("gs://abc/temp");
options.setRunner(DataflowRunner.class);
options.setGcpTempLocation("gs://abc");
Run Code Online (Sandbox Code Playgroud)
错误:
INFO: Staging pipeline description to gs://ups-heat-dev- tmp/mniazstaging_ingest_validation/staging/
May 10, 2018 11:56:35 AM org.apache.beam.runners.dataflow.util.PackageUtil tryStagePackage
INFO: Uploading <42088 bytes, hash E7urYrjAOjwy6_5H-UoUxA> to gs://ups-heat-dev-tmp/mniazstaging_ingest_validation/staging/pipeline-E7urYrjAOjwy6_5H-UoUxA.pb
Dataflow SDK version: 2.4.0
May 10, 2018 11:56:38 AM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: Printed job specification to gs://ups-heat-dev-tmp/mniazstaging_ingest_validation/templates/DataValidationPipeline
May 10, 2018 11:56:40 AM org.apache.beam.runners.dataflow.DataflowRunner run
INFO: Template successfully created. …Run Code Online (Sandbox Code Playgroud) 我想加载流数据,然后添加一个键,然后按键计数。
当我尝试使用流方法(无界数据)加载和按键分组大数据时,Apache Beam Dataflow 管道出现内存错误。因为似乎数据是按分组累积的,并且不会在触发每个窗口时更早地触发数据。
如果我减小元素大小(元素数量不会改变),它会起作用!因为实际上 group-by step 等待所有数据被分组,然后触发所有新的窗口数据。
我对两者进行了测试:
梁版本 2.11.0 和 scio 版本 0.7.4
梁版本 2.6.0 和 scio 版本 0.6.1
如您所见,数据是按组累积的,不会发出。
val windowedData = data.applyKvTransform(
Window.into[myt](
Sessions.withGapDuration(Duration.millis(1)))
.triggering(
Repeatedly.forever(AfterFirst.of(
AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1)))
).orFinally(AfterWatermark.pastEndOfWindow())
).withAllowedLateness(Duration.standardSeconds(100))
.discardingFiredPanes()
)
Run Code Online (Sandbox Code Playgroud)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$KeyCommitTooLargeException: Commit request for stage S2 and …Run Code Online (Sandbox Code Playgroud) 当我尝试运行测试管道时,它会引发错误
这是创建测试管道的源代码:
val p: TestPipeline = TestPipeline.create()
Run Code Online (Sandbox Code Playgroud)
这是错误:
java.lang.IllegalStateException:您的 TestPipeline 声明是否缺少 @Rule 注释?用法:@Rule public final Transient TestPipeline pipeline = TestPipeline.create();
apache-beam ×2
apache-kafka ×1
apache-spark ×1
java ×1
pipeline ×1
pyspark ×1
scala ×1
spotify-scio ×1