我是 kafka 的新手,我已经在本地安装了带有默认配置的 kafka 10。现在,我面临一个问题。我正在从控制台生产者那里产生消息。如果消息大约为 4096 字节。然后,它被控制台消费者很好地消费。但是当我将消息大小从 4096 字节增加时。然后,在使用它时消息被截断到大约 4096 字节。我不明白这个问题。
怎么了 ?a) 消息是否发布不完整。b) 消息是否被不完全消费。
注意:我没有对默认设置进行任何更改,我使用的是控制台生产者和消费者。
请任何人帮忙
我正在尝试使用 Kafka connect 使用独立模式写入数据。我将数据写入的主题是具有多个分区。但是,数据仅写入其中一个分区。当我启动多个消费者控制台时,数据仅打印到其中一个。另一个消费者控制台只有在第一个控制台关闭后才能获取任何数据。我无法弄清楚需要在配置文件中进行哪些更改才能使其写入多个分区。
这是standalone.properties
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=1000
rest.port=8084
Run Code Online (Sandbox Code Playgroud)
连接文件源.properties:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test4.txt
topic=consumer_group
Run Code Online (Sandbox Code Playgroud)
现在我使用以下命令来运行连接器:
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties
Run Code Online (Sandbox Code Playgroud)
使用以下命令启动消费者控制台:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic consumer_group --from-beginning --consumer-property group.id=new-consumer-group
Run Code Online (Sandbox Code Playgroud)
它只将数据打印到消费者控制台之一。但是,如果我使用生产者控制台而不是 Kafka 连接来编写消息,那么我可以看到多个消费者上的消息(以循环方式),应该是这样。但是使用Kafka connect,它只是将所有数据写入单个分区,同一组中的其他消费者必须闲置。需要更改什么才能写入循环系统中的所有分区?
我正在尝试创建以下工作流程
虽然我可以按照 hive Metastore 所需的格式排列 nginx 日志(仅限空格或逗号分隔的必填字段),但我想知道这是否可以在不触及 nginx 日志格式的情况下完成
这两种方法都需要自定义实现,并且关于如何进行相同操作的文档很少。
哪一种是实现这一目标的正确方法?是否有任何示例可用于解析 nginx 日志输出/任何源数据,同时使用 kafka 连接将其写入主题。我正在使用独立的文件连接器。
我按照此处提到的教程重新设置了我的 kafka 制作人:https : //www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-14-04
我正在使用 cron 和以下带有 IP 的服务器上的脚本向生产者推送一些事件:1.2.3.4
#!/usr/bin/env python
import threading, logging, time
import multiprocessing
import requests
import datetime
import json
from kafka import KafkaProducer
class CheckApis():
apisList = {"a": "https://test.eng.com/"}
kafkaProducer = "1.2.3.4:9092"
kafkaTopic = "sometopic"
producer = KafkaProducer(bootstrap_servers=kafkaProducer)
for key, value in apisList.items():
headers = {};
response = requests.request("GET", value, headers=headers)
message = {"app_name": key, "status": response.status_code, "message": "none", "timestamp": str(datetime.datetime.utcnow())}
producer.send(kafkaTopic, json.dumps(message).encode('utf-8'));
print (response.text)
print (response.status_code)
producer.close()
Run Code Online (Sandbox Code Playgroud)
这很有效,我可以使用以下命令查看推送的事件:
~/kafka/bin/kafka-console-consumer.sh --zookeeper 1.2.3.4:2181 --topic sometopic …Run Code Online (Sandbox Code Playgroud) python node.js apache-kafka kafka-consumer-api apache-kafka-connect
我正在使用这个debezium-examples
我在jdbc-sink.json 中添加了"topics.regex": "CID1122.(.*)"如下
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics.regex": "CID1122.(.*)",
"connection.url": "jdbc:mysql://mysql:3306/inventory?verifyServerCertificate=false",
"connection.user": "root",
"connection.password": "debezium",
"auto.create": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"name": "jdbc-sink",
"insert.mode": "upsert",
"pk.fields": "id,companyId",
"pk.mode": "record_value"
}
}
Run Code Online (Sandbox Code Playgroud)
Kafka 主题列表是
CID1122.department
CID1122.designation
CID1122.employee
Run Code Online (Sandbox Code Playgroud)
我面对卡夫卡 java.lang.NullPointerException
connect_1 | 2019-01-30 06:14:47,302 INFO || Checking MySql dialect for existence of table "CID1122"."employee" [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1 | 2019-01-30 06:14:47,303 INFO || Using MySql dialect table "CID1122"."employee" absent [io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect]
connect_1 | 2019-01-30 …Run Code Online (Sandbox Code Playgroud) regex jdbc apache-kafka apache-kafka-connect confluent-platform
我已经阅读了他们的博客并理解了他们的例子。 https://www.confluent.io/blog/apache-kafka-to-amazon-s3-exactly-once/
但我正在努力解决我所拥有的这种情况。我目前的配置是:
"flush.size": "50",
"rotate.interval.ms": "-1",
"rotate.schedule.interval.ms": "300000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "YYYY/MM/dd/HH",
"timestamp.extractor": "Wallclock"
Run Code Online (Sandbox Code Playgroud)
根据我对配置的了解。连接器将50在300000ms(5 分钟)后提交记录文件或文件,以先到者为准。如果连接器将文件上传到 s3 但未能提交到 Kafka,由于我设置了轮换计划间隔,Kafka 如何重新上传将覆盖 s3 文件的相同记录?这不会导致 s3 重复吗?
amazon-s3 apache-kafka apache-kafka-connect confluent-platform