标签: ksql

如何加入多个Kafka主题?

所以我有...

  • 第一个具有通用应用程序日志(log4j)的主题。存储诸如HTTP API请求/响应和警告,异常等之类的东西。一个逻辑业务请求可以有多个日志关联。(这些日志在几秒钟之内发生)
  • 第二个主题包含来自上述业务请求的命令,其他服务将对其执行操作。(这些命令也会在几秒钟之内发生,但可能距原始请求只有几分钟的时间)
  • 第三个主题包含由其他服务的操作生成的事件。(大多数事件会在几秒钟内完成,但有些事件最多可能需要3-5天才能收到)

因此,单个逻辑业务请求可以具有由微服务彼此传递的uuid关联的多个日志,命令和事件。

那么,可以用来阅读这3个主题并将它们全部组合为一个json文档,然后将它们转储给Elasticsearch的技术/模式有哪些?

流媒体?

apache-kafka apache-kafka-streams ksql

5
推荐指数
1
解决办法
5150
查看次数

汇编4.1.0 - > KSQL:STREAM-TABLE join - >表数据null

步骤1:运行生产者以创建样本数据

./bin/kafka-avro-console-producer \
         --broker-list localhost:9092 --topic stream-test-topic \
         --property schema.registry.url=http://localhost:8081 \
         --property value.schema='{"type":"record","name":"dealRecord","fields":[{"name":"DEAL_ID","type":"string"},{"name":"DEAL_EXPENSE_CODE","type":"string"},{"name":"DEAL_BRANCH","type":"string"}]}'
Run Code Online (Sandbox Code Playgroud)

样本数据 :

{"DEAL_ID":"deal002", "DEAL_EXPENSE_CODE":"EXP002", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal003", "DEAL_EXPENSE_CODE":"EXP003", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal004", "DEAL_EXPENSE_CODE":"EXP004", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal005", "DEAL_EXPENSE_CODE":"EXP005", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal006", "DEAL_EXPENSE_CODE":"EXP006", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal007", "DEAL_EXPENSE_CODE":"EXP001", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal008", "DEAL_EXPENSE_CODE":"EXP002", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal009", "DEAL_EXPENSE_CODE":"EXP003", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal010", "DEAL_EXPENSE_CODE":"EXP004", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal011", "DEAL_EXPENSE_CODE":"EXP005", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal012", "DEAL_EXPENSE_CODE":"EXP006", "DEAL_BRANCH":"AMSTERDAM"}
Run Code Online (Sandbox Code Playgroud)

第2步:打开另一个终端并运行使用者来测试数据.

./bin/kafka-avro-console-consumer --topic stream-test-topic \
         --bootstrap-server localhost:9092 \
         --property schema.registry.url=http://localhost:8081 \
         --from-beginning
Run Code Online (Sandbox Code Playgroud)

步骤3:打开另一个终端并运行生产者.

./bin/kafka-avro-console-producer \
         --broker-list localhost:9092 --topic expense-test-topic \
--property "parse.key=true" \
--property "key.separator=:" \
--property schema.registry.url=http://localhost:8081 \
--property key.schema='"string"' \ …
Run Code Online (Sandbox Code Playgroud)

apache-kafka ksql

5
推荐指数
1
解决办法
986
查看次数

创建KSQL流:如何从复杂的json中提取值

我正在尝试在Apache/KAFKA KSQL中创建一个流主题包含(有点复杂的JSON)

{
  "agreement_id": "dd8afdbe-59cf-4272-b640-b14a24d8234c",
  "created_at": "2018-02-17 16:00:00.000Z",
  "id": "6db276a8-2efe-4495-9908-4d3fc4cc16fa",
  "event_type": "data",
  "total_charged_amount": {
    "tax_free_amount": null,
    "tax_amounts": [],
    "tax_included_amount": {
      "amount": 0.0241,
      "currency": "EUR"
    }
  }
  "used_service_units": [
    {
      "amount": 2412739,
      "currency": null,
      "unit_of_measure": "bytes"
    }
  ]
}
Run Code Online (Sandbox Code Playgroud)

现在创建一个流很容易就像event_type和created_at这样简单的东西.那会是这样的

CREATE STREAM tstream (event_type varchar, created_at varchar) WITH (kafka_topic='usage_events', value_format='json');

但现在我需要访问used_service_units ....我想在上面的JSON中提取"金额"

我该怎么做?

CREATE STREAM usage (event_type varchar,create_at varchar, used_service_units[0].amount int) WITH (kafka_topic='usage_events', value_format='json');
Run Code Online (Sandbox Code Playgroud)

结果是

line 1:78: mismatched input '[' expecting {'ADD', 'APPROXIMATE', ...
Run Code Online (Sandbox Code Playgroud)

如果我改为创建一个像这样的流

CREATE STREAM usage (event_type varchar,create_at varchar, …
Run Code Online (Sandbox Code Playgroud)

json apache-kafka ksql

5
推荐指数
1
解决办法
1604
查看次数

如何使用kafka中的主题创建包含大量JSON字段的KSQL Stream?

我将一个长JSON字符串传递给kafka主题,例如:

{
    "glossary": {
        "title": "example glossary",
        "GlossDiv": {
            "title": "S",
            "GlossList": {
                "GlossEntry": {
                    "ID": "SGML",
                    "SortAs": "SGML",
                    "GlossTerm": "Standard Generalized Markup Language",
                    "Acronym": "SGML",
                    "Abbrev": "ISO 8879:1986",
                    "GlossDef": {
                        "para": "A meta-markup language, used to create markup languages such as DocBook.",
                        "GlossSeeAlso": ["GML", "XML"]
                    },
                    "GlossSee": "markup"
                }
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

并且想要从kafka主题创建流,其中所有字段都指定了KSQL中的每个字段,例如:

 CREATE STREAM pageviews_original (*) WITH \
(kafka_topic='pageviews', value_format='JSON');
Run Code Online (Sandbox Code Playgroud)

apache-kafka confluent ksql

5
推荐指数
1
解决办法
417
查看次数

Kafka时差最后两个记录,KSQL还是其他?

因此,我正在评估Kafka。在我们的用例中,必须创建一个包含从一个事件到下一个事件的“经过的时间”的新主题,这主要是因为传感器将向Kafka报告“开”或“关”。因此,具有时间戳,传感器名称和状态,可以创建持续时间为“ on”和“ off”状态的新主题。

  1. 这在KSQL中可行吗?
  2. 还是应该真的让消费者或流处理器来解决这个问题?

我的数据是这样的:

{ 2019:02:15 00:00:00, sensor1, off}
{ 2019:02:15 00:00:30, sensor1, on} 
Run Code Online (Sandbox Code Playgroud)

得到结果

{ 2019:02:15 00:30:00, sensor1, off, 30sec }. 
Run Code Online (Sandbox Code Playgroud)

本质上,必须组合多个传感器的状态才能确定机器的组合状态。数百甚至不是最终工厂中的数千个传感器

apache-kafka apache-kafka-streams ksql

5
推荐指数
1
解决办法
201
查看次数

使用KSQL计算所有条目

是否可以使用KSQL来计算特定列的条目,GROUP BY而不是通过应用程序流的所有条目聚合?

我正在寻找这样的东西:

| Count all | Count id1 | count id2 |
| ---245----|----150----|----95-----|
Run Code Online (Sandbox Code Playgroud)

或者在KSQL中更像这样:

[some timestamp] | Count all | 245   
[some timestamp] | Count id1 | 150   
[some timestamp] | Count id2 | 95   
.   
.   
.   
Run Code Online (Sandbox Code Playgroud)

谢谢
- 蒂姆

apache-kafka apache-kafka-streams ksql

4
推荐指数
1
解决办法
2103
查看次数

从KSQL表读取数据

也许这是一个初学者的问题,但是建议使用什么方式读取KSQL中生成的数据?

假设我进行了一些流处理并将数据写入KSQL表。现在,我想通过Spring应用程序访问此数据(例如,通过websocket散布一些实时数据)。我的第一个猜测是使用Spring Kafka并订阅基本主题。还是应该使用Kafka Streams?

另一个用例是进行流处理并将结果写入Redis存储(例如,对于始终返回当前值的Web服务)。这里的方法是什么?

谢谢!

apache-kafka apache-kafka-streams spring-kafka apache-kafka-connect ksql

4
推荐指数
1
解决办法
1204
查看次数

Kafka ksql简单连接不起作用

我在流和表中重新键入了数据,我使用了Confluent 4.1

1)创建流

   CREATE STREAM session_details_stream (Media varchar ,SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'sessionDetails', value_format = 'json');
Run Code Online (Sandbox Code Playgroud)

2)创建重新密钥流,因为这个脚本不起作用,但在此之前它的工作原理,为什么?

CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM  partition by root;
Run Code Online (Sandbox Code Playgroud)

然后我创建下一个脚本s

CREATE STREAM session_details_stream_update as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM  partition by SessionIdTime;
CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,root from session_details_stream_update  partition by root;
Run Code Online (Sandbox Code Playgroud)

session_details_stream_rekeyed的结果是正常的:

ksql> select * from session_details_stream_rekeyed;
      1526411486488 | 2018-02-05T15:16:07.113+02:001| tex | 2018-02-05T15:16:07.113+02:001 | 1 | 2018-02-05T15:16:07.113+02:001
Run Code Online (Sandbox Code Playgroud)

3)创建主题流;

 CREATE STREAM voip_details_stream …
Run Code Online (Sandbox Code Playgroud)

apache-kafka ksql

2
推荐指数
1
解决办法
587
查看次数

你能从远程主机运行KSQL吗?

我在我的集​​群的一个节点上运行了confluent-ksql-server.我们可以让ksql由kafka集群外的特定主机/机器连接吗?

PS-这是为开发人员提供ksql访问权限

谢谢 !

apache-kafka confluent ksql

2
推荐指数
1
解决办法
228
查看次数

Apache Kafka的流加入示例?

我正在寻找一个使用Kafka Streams的示例,说明如何执行这种操作,即将一个带客户表和一个地址表连接起来并将数据下沉到ES:

顾客

+------+------------+----------------+-----------------------+
| id   | first_name | last_name      | email                 |
+------+------------+----------------+-----------------------+
| 1001 | Sally      | Thomas         | sally.thomas@acme.com |
| 1002 | George     | Bailey         | gbailey@foobar.com    |
| 1003 | Edward     | Davidson       | ed@walker.com         |
| 1004 | Anne       | Kim            | annek@noanswer.org    |
+------+------------+----------------+-----------------------+
Run Code Online (Sandbox Code Playgroud)

地址

+----+-------------+---------------------------+------------+--------------+-------+----------+
| id | customer_id | street                    | city       | state        | zip   | type     |
+----+-------------+---------------------------+------------+--------------+-------+----------+
| 10 |        1001 | 3183 Moore Avenue         | Euless     | …
Run Code Online (Sandbox Code Playgroud)

java elasticsearch apache-kafka apache-kafka-streams ksql

2
推荐指数
2
解决办法
174
查看次数