我正在尝试连接一个表和一个流,并创建另一个表,如下所示:
CREATE TABLE table_fx_latest AS
SELECT t1.currencyid,
t1.maxtimestamp,
t2.midprice
FROM stream_fx2 t2 LEFT JOIN table_fx_latest3 t1
ON t1.currencyid = t2.currencyid AND
t1.timestamp = t2.maxtimestamp
GROUP BY t1.currencyid,
t1.maxtimestamp,
t2.midprice;
Run Code Online (Sandbox Code Playgroud)
但是报告了以下错误:
Cannot RUN execution plan for this statement, CreateTableAsSelect{name=TABLE_FX_LATEST_PRICE6, query=Query{queryBody=QuerySpecification{select=Select{distinct=false, selectItems=[T1.CURRENCYID T1_CURRENCYID, T1.MAXTIMESTAMP MAXTIMESTAMP, T2.MIDPRICE MIDPRICE]}, from=Join{type=LEFT, left=AliasedRelation{relation=STREAM_FX2, alias=T2}, right=AliasedRelation{relation=TABLE_FX_LATEST3, alias=T1}, criteria=Optional[JoinOn{((T1.CURRENCYID = T2.CURRENCYID) AND (T2.TIMESTAMP = T1.MAXTIMESTAMP))}]}, =null, where=null, groupBy=Optional[GroupBy{isDistinct=false, groupingElements=[SimpleGroupBy{columns=[T1.CURRENCYID]}, SimpleGroupBy{columns=[T1.MAXTIMESTAMP]}, SimpleGroupBy{columns=[T2.MIDPRICE]}]}], having=null, orderBy=[], limit=null}, orderBy=[]}, notExists=false, properties={}}
Caused by: io.confluent.ksql.parser.tree.LogicalBinaryExpression cannot be cast to io.confluent.ksql.parser.tree.ComparisonExpression
Run Code Online (Sandbox Code Playgroud)
这是stream_fx2流和 …
我的主题之一是将string-json作为键-{“ city”:“ X”,“ id”:22}。在我的ksql语句中,我想将其提取到2个不同的字段而不是一个字段中,以便稍后进行过滤和加入。在文档中,它似乎允许我仅将整个字符串粘贴到键中,而不是允许将其格式化为JSON(就像FORMAT_VALUE一样),请参见下文。
VALUE_FORMAT(必需)指定主题中消息值的序列化格式。支持的格式:JSON,DELIMITED和AVRO
KEY将Kafka主题中的消息密钥与KSQL流中的一列相关联。
我从针对由Kafka主题定义的KTable的KSQL查询中获得意外结果。KTABLE是“交易”,并且由压缩主题“ localhost.dbo.TradeHistory”支持。它应该包含由TradeId键控的股票交易的最新信息。主题的键是TradeId。每笔交易都有一个AccountId,我正在尝试构造一个查询以获取按帐户分组的交易金额的总和。
ksql> create table Trades(TradeId int, AccountId int, Spn int, Amount double) with (KAFKA_TOPIC = 'localhost.dbo.TradeHistory', VALUE_FORMAT = 'JSON', KEY = 'TradeId');
...
ksql> describe extended Trades;
Name : TRADES
Type : TABLE
Key field : TRADEID
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : JSON
Kafka topic : localhost.dbo.TradeHistory (partitions: 1, replication: 1)
Field | Type
---------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
TRADEID | INTEGER
ACCOUNTID | …Run Code Online (Sandbox Code Playgroud) 我是stackoverflow的新手,所以让我知道如果我在这里发布这个问题我有什么不对.
我已经尝试找到答案,但无法在网站上找到KSQL JOIN相关问题,所以我发布了这个.我已经尝试了不同的方法来运行此查询,但我一直得到空指针异常,所以在此处发布.
我有两个kafka avro主题交易和费用,但数据有很多空白,以清楚我已创建以下主题和表与修剪数据.DEAL_STREAM和EXPENSE_TABLE
ksql> describe EXPENSE_TABLE;
Run Code Online (Sandbox Code Playgroud)
结果:
Field | Type
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)
Run Code Online (Sandbox Code Playgroud)
和
ksql> describe deal_stream;
Run Code Online (Sandbox Code Playgroud)
结果:
Field | Type
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
KSQL_COL_0 | VARCHAR(STRING)
KSQL_COL_1 | VARCHAR(STRING)
KSQL_COL_2 | VARCHAR(STRING)
Run Code Online (Sandbox Code Playgroud)
当我执行以下Query时,它给我空指针异常.我尝试了以下查询.
1:
ksql> CREATE STREAM deal_expense_new AS SELECT td.KSQL_COL_0 , te.KSQL_COL_1 FROM deal_stream td LEFT JOIN expense_table te ON td.KSQL_COL_0 = te.KSQL_COL_0;
Run Code Online (Sandbox Code Playgroud)
2: …
我有一个Kafka主题,大约有300万条记录。我想从中选择具有特定参数的单个记录。我一直在尝试使用Lenses进行查询,但是无法形成正确的查询。以下是1条消息的记录内容。
{
"header": {
"schemaVersionNo": "1",
},
"payload": {
"modifiedDate": 1552334325212,
"createdDate": 1552334325212,
"createdBy": "A",
"successful": true,
"source_order_id": "3411976933214",
}
}
Run Code Online (Sandbox Code Playgroud)
现在,我想过滤出具有特定source_order_id的记录,但无法找出正确的方法。我们已经尝试通过镜头以及Kafka Tool。
我们在镜头中尝试过的示例查询如下:
SELECT * FROM `TEST`
WHERE _vtype='JSON' AND _ktype='BYTES'
AND _sample=2 AND _sampleWindow=200 AND payload.createdBy='fms'
Run Code Online (Sandbox Code Playgroud)
此查询有效,但是,如果我们尝试使用如下所示的源ID,则会出现错误:
SELECT * FROM `TEST`
WHERE _vtype='JSON' AND _ktype='BYTES'
AND _sample=2 AND _sampleWindow=200 AND payload.source_order_id='3411976911924'
Error : "Invalid syntax at line=3 and column=41.Invalid syntax for 'payload.source_order_id'. Field 'payload' resolves to primitive type STRING.
Run Code Online (Sandbox Code Playgroud)
通过自定义使用者使用所有300万条记录,然后对其进行遍历,这对我来说似乎并不是一种优化的方法,因此,寻找针对这种用例的任何可用解决方案。