Zam*_*rif 0 apache-kafka confluent ksql
我是stackoverflow的新手,所以让我知道如果我在这里发布这个问题我有什么不对.
我已经尝试找到答案,但无法在网站上找到KSQL JOIN相关问题,所以我发布了这个.我已经尝试了不同的方法来运行此查询,但我一直得到空指针异常,所以在此处发布.
我有两个kafka avro主题交易和费用,但数据有很多空白,以清楚我已创建以下主题和表与修剪数据.DEAL_STREAM
和EXPENSE_TABLE
ksql> describe EXPENSE_TABLE;
Run Code Online (Sandbox Code Playgroud)
结果:
Field | Type
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)
Run Code Online (Sandbox Code Playgroud)
和
ksql> describe deal_stream;
Run Code Online (Sandbox Code Playgroud)
结果:
Field | Type
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)
Run Code Online (Sandbox Code Playgroud)
当我执行以下Query时,它给我空指针异常.我尝试了以下查询.
1:
ksql> CREATE STREAM deal_expense_new AS SELECT td.KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0;
Run Code Online (Sandbox Code Playgroud)
2:
ksql> CREATE STREAM deal_expense_new AS SELECT td.KSQL_COL_0 AS KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0;
Run Code Online (Sandbox Code Playgroud)
3:
CREATE STREAM deal_expense_trimmed AS SELECT td.KSQL_COL_0 AS KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0 where td.KSQL_COL_0 IS NOT NULL;
Run Code Online (Sandbox Code Playgroud)
错误:
> Message Stream created and running ksql> Exception in thread
> "ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-115"
> java.lang.NullPointerException at
> io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
> at
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
> at
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
> at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
> Exception in thread
> "ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-116"
> java.lang.NullPointerException at
> io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
> at
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
> at
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
> at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
> Exception in thread
> "ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-113"
> java.lang.NullPointerException at
> io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
> at
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
> at
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
> at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
> Exception in thread
> "ksql_query_CSAS_DEAL_EXPENSE_NEW-01b2596a-3d2a-4d41-a823-0e345ec727fa-StreamThread-114"
> java.lang.NullPointerException at
> io.confluent.ksql.structured.SchemaKStream.lambda$selectKey$3(SchemaKStream.java:248)
> at
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:159)
> at
> org.apache.kafka.streams.kstream.internals.KStreamImpl$2.apply(KStreamImpl.java:156)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:169)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
> at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:221)
> at
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:422)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:924)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:804)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:756)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:726)
Run Code Online (Sandbox Code Playgroud)
小智 6
这个bug应该在最新的master中修复.该修复程序将包含在下一个月度发行版中.这是github问题:https://github.com/confluentinc/ksql/issues/521
归档时间: |
|
查看次数: |
307 次 |
最近记录: |