小编use*_*180的帖子

使用气流蜂巢运算符并输出到文本文件

嗨,我想使用气流配置单元运算符执行配置单元查询并将结果输出到文件。我不想在这里使用 INSERT OVERWRITE。

hive_ex = HiveOperator(
    task_id='hive-ex',
    hql='/sql/hive-ex.sql',
    hiveconfs={
        'DAY': '{{ ds }}',
        'YESTERDAY': '{{ yesterday_ds }}',
        'OUTPUT': '{{ file_path }}'+'csv',
    },
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

做这个的最好方式是什么?

我知道如何使用 bash 运算符执行此操作,但想知道我们是否可以使用 hive 运算符

hive_ex = BashOperator(
    task_id='hive-ex',
    bash_command='hive -f hive.sql -DAY={{ ds }} >> {{ file_path }} 
    /file_{{ds}}.json',
    dag=dag
)
Run Code Online (Sandbox Code Playgroud)

python airflow airflow-scheduler

5
推荐指数
1
解决办法
5144
查看次数

SerializationException:IntegerDeserializer 接收到的数据大小不是 4

我在执行 kstreams 以获取聚合计数时收到此错误。

Exception in thread "KStreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_2] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000002
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:220)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:204)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:491)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:431)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:346)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:405)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1029)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:883)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
Run Code Online (Sandbox Code Playgroud)

这是我正在执行的代码

 final KStream<String, EventsAvro> stream = builder.stream("events_topic");

    KStream<Integer, Long> events = stream.map((k, v) -> new KeyValue<Integer, Long>(v.getPageId(), v.getUserId()));

    KGroupedStream<Integer, Long> groupedStream = events.groupByKey(Grouped.with(Serdes.Integer(), Serdes.Long()));
    KTable<Windowed<Integer>, Long> windowedCount = groupedStream
                .windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(ofMillis(5L))) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

4
推荐指数
1
解决办法
5412
查看次数