所以我有...
因此,单个逻辑业务请求可以具有由微服务彼此传递的uuid关联的多个日志,命令和事件。
那么,可以用来阅读这3个主题并将它们全部组合为一个json文档,然后将它们转储给Elasticsearch的技术/模式有哪些?
流媒体?
步骤1:运行生产者以创建样本数据
./bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic stream-test-topic \
--property schema.registry.url=http://localhost:8081 \
--property value.schema='{"type":"record","name":"dealRecord","fields":[{"name":"DEAL_ID","type":"string"},{"name":"DEAL_EXPENSE_CODE","type":"string"},{"name":"DEAL_BRANCH","type":"string"}]}'
Run Code Online (Sandbox Code Playgroud)
样本数据 :
{"DEAL_ID":"deal002", "DEAL_EXPENSE_CODE":"EXP002", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal003", "DEAL_EXPENSE_CODE":"EXP003", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal004", "DEAL_EXPENSE_CODE":"EXP004", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal005", "DEAL_EXPENSE_CODE":"EXP005", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal006", "DEAL_EXPENSE_CODE":"EXP006", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal007", "DEAL_EXPENSE_CODE":"EXP001", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal008", "DEAL_EXPENSE_CODE":"EXP002", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal009", "DEAL_EXPENSE_CODE":"EXP003", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal010", "DEAL_EXPENSE_CODE":"EXP004", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal011", "DEAL_EXPENSE_CODE":"EXP005", "DEAL_BRANCH":"AMSTERDAM"}
{"DEAL_ID":"deal012", "DEAL_EXPENSE_CODE":"EXP006", "DEAL_BRANCH":"AMSTERDAM"}
Run Code Online (Sandbox Code Playgroud)
第2步:打开另一个终端并运行使用者来测试数据.
./bin/kafka-avro-console-consumer --topic stream-test-topic \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--from-beginning
Run Code Online (Sandbox Code Playgroud)
步骤3:打开另一个终端并运行生产者.
./bin/kafka-avro-console-producer \
--broker-list localhost:9092 --topic expense-test-topic \
--property "parse.key=true" \
--property "key.separator=:" \
--property schema.registry.url=http://localhost:8081 \
--property key.schema='"string"' \ …Run Code Online (Sandbox Code Playgroud) 我正在尝试在Apache/KAFKA KSQL中创建一个流主题包含(有点复杂的JSON)
{
"agreement_id": "dd8afdbe-59cf-4272-b640-b14a24d8234c",
"created_at": "2018-02-17 16:00:00.000Z",
"id": "6db276a8-2efe-4495-9908-4d3fc4cc16fa",
"event_type": "data",
"total_charged_amount": {
"tax_free_amount": null,
"tax_amounts": [],
"tax_included_amount": {
"amount": 0.0241,
"currency": "EUR"
}
}
"used_service_units": [
{
"amount": 2412739,
"currency": null,
"unit_of_measure": "bytes"
}
]
}
Run Code Online (Sandbox Code Playgroud)
现在创建一个流很容易就像event_type和created_at这样简单的东西.那会是这样的
CREATE STREAM tstream (event_type varchar, created_at varchar) WITH (kafka_topic='usage_events', value_format='json');
但现在我需要访问used_service_units ....我想在上面的JSON中提取"金额"
我该怎么做?
CREATE STREAM usage (event_type varchar,create_at varchar, used_service_units[0].amount int) WITH (kafka_topic='usage_events', value_format='json');
Run Code Online (Sandbox Code Playgroud)
结果是
line 1:78: mismatched input '[' expecting {'ADD', 'APPROXIMATE', ...
Run Code Online (Sandbox Code Playgroud)
如果我改为创建一个像这样的流
CREATE STREAM usage (event_type varchar,create_at varchar, …Run Code Online (Sandbox Code Playgroud) 我将一个长JSON字符串传递给kafka主题,例如:
{
"glossary": {
"title": "example glossary",
"GlossDiv": {
"title": "S",
"GlossList": {
"GlossEntry": {
"ID": "SGML",
"SortAs": "SGML",
"GlossTerm": "Standard Generalized Markup Language",
"Acronym": "SGML",
"Abbrev": "ISO 8879:1986",
"GlossDef": {
"para": "A meta-markup language, used to create markup languages such as DocBook.",
"GlossSeeAlso": ["GML", "XML"]
},
"GlossSee": "markup"
}
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
并且想要从kafka主题创建流,其中所有字段都指定了KSQL中的每个字段,例如:
CREATE STREAM pageviews_original (*) WITH \
(kafka_topic='pageviews', value_format='JSON');
Run Code Online (Sandbox Code Playgroud) 因此,我正在评估Kafka。在我们的用例中,必须创建一个包含从一个事件到下一个事件的“经过的时间”的新主题,这主要是因为传感器将向Kafka报告“开”或“关”。因此,具有时间戳,传感器名称和状态,可以创建持续时间为“ on”和“ off”状态的新主题。
我的数据是这样的:
{ 2019:02:15 00:00:00, sensor1, off}
{ 2019:02:15 00:00:30, sensor1, on}
Run Code Online (Sandbox Code Playgroud)
得到结果
{ 2019:02:15 00:30:00, sensor1, off, 30sec }.
Run Code Online (Sandbox Code Playgroud)
本质上,必须组合多个传感器的状态才能确定机器的组合状态。数百甚至不是最终工厂中的数千个传感器
是否可以使用KSQL来计算特定列的条目,GROUP BY而不是通过应用程序流的所有条目聚合?
我正在寻找这样的东西:
| Count all | Count id1 | count id2 |
| ---245----|----150----|----95-----|
Run Code Online (Sandbox Code Playgroud)
或者在KSQL中更像这样:
[some timestamp] | Count all | 245
[some timestamp] | Count id1 | 150
[some timestamp] | Count id2 | 95
.
.
.
Run Code Online (Sandbox Code Playgroud)
谢谢
- 蒂姆
也许这是一个初学者的问题,但是建议使用什么方式读取KSQL中生成的数据?
假设我进行了一些流处理并将数据写入KSQL表。现在,我想通过Spring应用程序访问此数据(例如,通过websocket散布一些实时数据)。我的第一个猜测是使用Spring Kafka并订阅基本主题。还是应该使用Kafka Streams?
另一个用例是进行流处理并将结果写入Redis存储(例如,对于始终返回当前值的Web服务)。这里的方法是什么?
谢谢!
apache-kafka apache-kafka-streams spring-kafka apache-kafka-connect ksql
我在流和表中重新键入了数据,我使用了Confluent 4.1
1)创建流
CREATE STREAM session_details_stream (Media varchar ,SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'sessionDetails', value_format = 'json');
Run Code Online (Sandbox Code Playgroud)
2)创建重新密钥流,因为这个脚本不起作用,但在此之前它的工作原理,为什么?
CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM partition by root;
Run Code Online (Sandbox Code Playgroud)
然后我创建下一个脚本s
CREATE STREAM session_details_stream_update as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM partition by SessionIdTime;
CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,root from session_details_stream_update partition by root;
Run Code Online (Sandbox Code Playgroud)
session_details_stream_rekeyed的结果是正常的:
ksql> select * from session_details_stream_rekeyed;
1526411486488 | 2018-02-05T15:16:07.113+02:001| tex | 2018-02-05T15:16:07.113+02:001 | 1 | 2018-02-05T15:16:07.113+02:001
Run Code Online (Sandbox Code Playgroud)
3)创建主题流;
CREATE STREAM voip_details_stream …Run Code Online (Sandbox Code Playgroud) 我在我的集群的一个节点上运行了confluent-ksql-server.我们可以让ksql由kafka集群外的特定主机/机器连接吗?
PS-这是为开发人员提供ksql访问权限
谢谢 !
我正在寻找一个使用Kafka Streams的示例,说明如何执行这种操作,即将一个带客户表和一个地址表连接起来并将数据下沉到ES:
+------+------------+----------------+-----------------------+
| id | first_name | last_name | email |
+------+------------+----------------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Davidson | ed@walker.com |
| 1004 | Anne | Kim | annek@noanswer.org |
+------+------------+----------------+-----------------------+
Run Code Online (Sandbox Code Playgroud)
+----+-------------+---------------------------+------------+--------------+-------+----------+
| id | customer_id | street | city | state | zip | type |
+----+-------------+---------------------------+------------+--------------+-------+----------+
| 10 | 1001 | 3183 Moore Avenue | Euless | …Run Code Online (Sandbox Code Playgroud)