嗨,我想使用气流配置单元运算符执行配置单元查询并将结果输出到文件。我不想在这里使用 INSERT OVERWRITE。
hive_ex = HiveOperator(
task_id='hive-ex',
hql='/sql/hive-ex.sql',
hiveconfs={
'DAY': '{{ ds }}',
'YESTERDAY': '{{ yesterday_ds }}',
'OUTPUT': '{{ file_path }}'+'csv',
},
dag=dag
)
Run Code Online (Sandbox Code Playgroud)
做这个的最好方式是什么?
我知道如何使用 bash 运算符执行此操作,但想知道我们是否可以使用 hive 运算符
hive_ex = BashOperator(
task_id='hive-ex',
bash_command='hive -f hive.sql -DAY={{ ds }} >> {{ file_path }}
/file_{{ds}}.json',
dag=dag
)
Run Code Online (Sandbox Code Playgroud) 我在执行 kstreams 以获取聚合计数时收到此错误。
Exception in thread "KStreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_2] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000002
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:220)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:491)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:431)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:346)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:405)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1029)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
Run Code Online (Sandbox Code Playgroud)
这是我正在执行的代码
final KStream<String, EventsAvro> stream = builder.stream("events_topic");
KStream<Integer, Long> events = stream.map((k, v) -> new KeyValue<Integer, Long>(v.getPageId(), v.getUserId()));
KGroupedStream<Integer, Long> groupedStream = events.groupByKey(Grouped.with(Serdes.Integer(), Serdes.Long()));
KTable<Windowed<Integer>, Long> windowedCount = groupedStream
.windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(ofMillis(5L))) …Run Code Online (Sandbox Code Playgroud)