标签: apache-kafka

检查Kafka队列是否为空

现在我有将几百条消息写入 kafka 队列的功能。但是当所有这些消息都被消耗掉时,我还需要执行额外的功能。有没有办法在 kafka 队列上放置一个侦听器,以便在它被清空时得到通知?

clojure apache-kafka clj-kafka

0
推荐指数
1
解决办法
1937
查看次数

用于 Kafka 的 Confluent Python API

我在使用官方 Confluent Kafka Python API 时遇到错误:

我订阅:

kafka_consumer.subscribe(topics=["my-avro-topic"], on_assign=on_assign_callback, on_revoke=on_revoke_callback)
Run Code Online (Sandbox Code Playgroud)

使用回调:

def on_assign_callback(consumer, topic_partitions):
for topic_partition in topic_partitions:
    print("without position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
                                   topic_partition.offset, topic_partition.error))

topic_partitions_with_offsets = consumer.position(topic_partitions)
print("assigned to {}->{} partitions".format(len(topic_partitions), len(topic_partitions_with_offsets)))

for topic_partition in topic_partitions_with_offsets:
    print("with position. topic={}. partition={}. offset={}. error={}".format(topic_partition.topic, topic_partition.partition,
                                   topic_partition.offset, topic_partition.error))
Run Code Online (Sandbox Code Playgroud)

产生控制台输出:

without position. topic=my-avro-topic. partition=0. offset=-1001. error=None
assigned to 1->1 partitions
with position. topic=my-avro-topic. partition=0. offset=-1001. error=KafkaError{code=_UNKNOWN_PARTITION,val=-190,str="(null)"}
Run Code Online (Sandbox Code Playgroud)

有人可以解释一下吗?为什么我会收到未知分区的回调通知?类似的代码使用 Java API 可以完美运行。

python apache-kafka kafka-consumer-api confluent-platform

0
推荐指数
1
解决办法
2512
查看次数

Kafka Connect 不输出 JSON

我正在使用 JDBC Kafka 连接器将数据从数据库读取到 Kafka。这有效,但它总是以 Avro 格式输出数据,即使我已经指定它应该使用 JSON。我知道这样做是因为当我在 python 中使用来自该主题的消息时,我会在每条消息的顶部看到模式。

我像这样运行连接器:

/usr/bin/connect-standalone /etc/schema-registry/connect-json-standalone.properties /etc/kafka-connect-jdbc/view.properties
Run Code Online (Sandbox Code Playgroud)

connect-json-standalone.properties 文件的内容是:

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schema.registry.url=http://localhost:8081
key.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schema.registry.url=http://localhost:8081
value.converter.schemas.enable=true

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets
Run Code Online (Sandbox Code Playgroud)

/etc/kafka-connect-jdbc/view.properties 的内容是:

name=view-small-jdbc-daily
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:teradata://domain.com/charset=UTF8,DBS_PORT=1025,DATABASE=test,USER=***,PASSWORD=***,LOB_SUPPORT=OFF
mode=bulk
table.whitelist=test_table
topic.prefix=view5-
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

0
推荐指数
1
解决办法
4058
查看次数

如何在 Kafka-Connect API 中设置 max.poll.records

我正在使用 confluent-3.0.1 平台并构建一个 Kafka-Elasticsearch 连接器。为此,我扩展了 SinkConnector 和 SinkTask(Kafka 连接 API)以从 Kafka 获取数据。

作为此代码的一部分,我正在扩展 SinkConnector 的 taskConfigs 方法以返回“max.poll.records”以一次仅获取 100 条记录。但它不起作用,我同时获取所有记录,但我没有在规定的时间内提交偏移量。请任何人帮我配置“max.poll.records”

 public List<Map<String, String>> taskConfigs(int maxTasks) {
    ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
    for (int i = 0; i < maxTasks; i++) {
      Map<String, String> config = new HashMap<String, String>();
      config.put(ConfigurationConstants.CLUSTER_NAME, clusterName);
      config.put(ConfigurationConstants.HOSTS, hosts);
      config.put(ConfigurationConstants.BULK_SIZE, bulkSize);
      config.put(ConfigurationConstants.IDS, elasticSearchIds);
      config.put(ConfigurationConstants.TOPICS_SATELLITE_DATA, topics);
      config.put(ConfigurationConstants.PUBLISH_TOPIC, topicTopublish);
      config.put(ConfigurationConstants.TYPES, elasticSearchTypes);
      config.put("max.poll.records", "100");

      configs.add(config);
    }
    return configs;
  }
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

0
推荐指数
1
解决办法
6005
查看次数

消息在 Apache Kafka 中被截断

我是 kafka 的新手,我已经在本地安装了带有默认配置的 kafka 10。现在,我面临一个问题。我正在从控制台生产者那里产生消息。如果消息大约为 4096 字节。然后,它被控制台消费者很好地消费。但是当我将消息大小从 4096 字节增加时。然后,在使用它时消息被截断到大约 4096 字节。我不明白这个问题。

怎么了 ?a) 消息是否发布不完整。b) 消息是否被不完全消费。

注意:我没有对默认设置进行任何更改,我使用的是控制台生产者和消费者。

请任何人帮忙

apache-kafka apache-kafka-connect

0
推荐指数
1
解决办法
2292
查看次数

无法运行Kafka-在cmd中没有输出,并且在Git Bash中没有这样的文件或目录错误

我正在本地设置Kafka。我已经解压缩了2.12版本并启动了zookeeper。Zookeeper已启动并正在运行,但是当我尝试使用命令启动kafka时.\bin\windows\kafka-server-start.bat .\config\server.properties,它没有显示任何输出。

我也使用命令在git bash中尝试过bin/kafka-server-start.sh config/server.properties,它给出了错误

/c/kafka/kafka_2.12-2.1.0/bin/kafka-run-class.sh:第306行:C:\ Program:没有此类文件或目录。

您能为我提供解决方案吗?

apache-kafka

0
推荐指数
1
解决办法
1473
查看次数

为什么我不能在Kafka中创建主题?

按照Apache Kafka快速入门指南,我完成了以下步骤:

  • 下载并解压缩 kafka_2.11-2.1.0.tgz
  • cd kafka_2.11-2.1.0
  • bin/zookeeper-server-start.sh config/zookeeper.properties
  • 在另一个终端的相同目录下, bin/kafka-server-start.sh config/server.properties
  • bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testecho $?之后运行,表明该命令以状态退出0

现在是关键时刻。该指南说:

现在,如果我们运行list topic命令,便可以看到该主题:

bin/kafka-topics.sh --list --zookeeper localhost:2181
> test
Run Code Online (Sandbox Code Playgroud)

但是,我没有从该命令获得任何输出,也没有正在测试的软件正在尝试在计算机上发送消息。 "test"主题崩溃,因为它找到该主题的0个分区。

我也有一个Ruby程序,可以将有关主题的消息发送到Kafka "test"。它失败并重试,并且我在Kafka日志中看到了此时创建的主题,并且可以向其发送消息。但是即使那样,列出主题的命令也不会返回任何内容。

为什么不能显式创建主题?为什么我不能列出按需创建的主题?我该如何解决?


日志

这是我在日志中看到的内容:https : //gist.github.com/nathanl/bea7a45a056b2d44146947ec88c29185

macos apache-kafka apache-zookeeper

0
推荐指数
1
解决办法
1531
查看次数

需要根据特定关键字过滤掉Kafka记录

我有一个Kafka主题,大约有300万条记录。我想从中选择具有特定参数的单个记录。我一直在尝试使用Lenses进行查询,但是无法形成正确的查询。以下是1条消息的记录内容。

{
  "header": {
    "schemaVersionNo": "1",
  },
  "payload": {
    "modifiedDate": 1552334325212,
    "createdDate": 1552334325212,
    "createdBy": "A",
    "successful": true,
    "source_order_id": "3411976933214",
  }
}
Run Code Online (Sandbox Code Playgroud)

现在,我想过滤出具有特定source_order_id的记录,但无法找出正确的方法。我们已经尝试通过镜头以及Kafka Tool。

我们在镜头中尝试过的示例查询如下:

SELECT * FROM `TEST`
WHERE _vtype='JSON' AND _ktype='BYTES'
AND _sample=2 AND _sampleWindow=200 AND payload.createdBy='fms'
Run Code Online (Sandbox Code Playgroud)

此查询有效,但是,如果我们尝试使用如下所示的源ID,则会出现错误:

SELECT * FROM `TEST`
WHERE _vtype='JSON' AND _ktype='BYTES'
AND _sample=2 AND _sampleWindow=200 AND payload.source_order_id='3411976911924'



 Error : "Invalid syntax at line=3 and column=41.Invalid syntax for 'payload.source_order_id'. Field 'payload' resolves to primitive type STRING.
Run Code Online (Sandbox Code Playgroud)

通过自定义使用者使用所有300万条记录,然后对其进行遍历,这对我来说似乎并不是一种优化的方法,因此,寻找针对这种用例的任何可用解决方案。

lenses apache-kafka ksql

0
推荐指数
1
解决办法
394
查看次数

如何通过REST API从数据库流式传输数据?

我将大量数据存储在Postres数据库中,我需要使用Django通过REST API将数据发送到客户端。要求是分块发送数据,并且不要一次将整个内容加载到内存中。我知道Django中有一个StreamingHttpResponse类,我将对此进行探讨。但是还有其他更好的选择吗?我听说过Kafka和Spark用于流式应用程序,但是我检查过的这两个教程往往涉及流式实时数据(例如与Twitter数据进行交互等)。但是是否可以使用这两种方法从数据库中流式传输数据?如果是,我如何将其与REST集成在一起,以便客户端可以与其交互?任何线索将不胜感激。谢谢。

database streaming apache-kafka django-rest-framework apache-spark

0
推荐指数
1
解决办法
130
查看次数

How to get message Apache Kafka on client JS?

Can Apache Kafka send message from server broker to client side (JS) directly?

Or I should use socket pipe?

javascript apache-kafka kafka-consumer-api

0
推荐指数
1
解决办法
54
查看次数