需要帮助我有这个
service {
name = "nginx"
tags = [ "nginx", "web", "urlprefix-/nginx" ]
port = "http"
check {
type = "tcp"
interval = "10s"
timeout = "2s"
}
}
Run Code Online (Sandbox Code Playgroud)
如果返回 200 响应(如 localhost:8080/test/index.html),我如何为特定 URI 添加运行状况
我是python的新手,我想创建一个switch case,其中case可以将间隔作为条件,例如:
switch = {
1..<21: do one stuff,
21...31: do another
}
Run Code Online (Sandbox Code Playgroud)
我怎样才能达到这个结果?
我希望能够根据消息密钥的密钥将Kafkastream中的所有记录发送到另一个主题。例如 Kafka中的流包含名称作为键和记录作为值。我想根据记录的关键将这些记录分散到不同的主题
数据:(jhon-> {jhonsRecord}),(sean-> {seansRecord}),(mary-> {marysRecord}),(jhon-> {jhonsRecord2}),预期
下面是我现在执行此操作的方式,但是由于名称列表比较笨拙,因此速度很慢。另外,即使有一些名字的记录,我也需要遍历整个列表。请提出修复建议
for( String name : names )
{
recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);
}
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 KafkaTool 连接到 Kafka。我收到一个错误:连接到集群时出错。未能创建新的 KafkaAdminClient
Kafka 和 Zookeeper 托管在 Docker 中。我运行下一个命令
docker network create kafka
docker run --network=kafka -d --name zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 confluentinc/cp-zookeeper:latest
docker run --network=kafka -d -p 9092:9092 --name kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka:latest
Run Code Online (Sandbox Code Playgroud)
为什么 KafkaTool 无法连接到托管在 Docker 中的 Kafka?
我有一个带有字符串的列表的函数,我必须在列表中找到以“ b”开头的第二个元素。
例如:
second_elemnt_starting_with_b(["b", "a", "bb"]) => "bb"
Run Code Online (Sandbox Code Playgroud) 我们有特定的主题,只有在条件consumeEnabled=true 时才需要消费消息。所以,它应该像这样工作:
应用程序正在使用消息,但随后 consumerEnabled 变为 false 无需考虑的情况。
请用 Spring Kafka 和/或 Kafka Java 客户端定义实现决策的最佳方式
尝试将主题“name:localtopic”映射到索引“name:indexoftopic”,它在弹性搜索“localtopic和indexoftopic”中创建两个新索引,并且主题数据仅在主题名称索引“localtopic”中可见,连接器中没有显示错误(分布式模式)
我的配置是
"config" : {
"connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max" : "1",
"topics" : "localtopic",
"topic.index.map" : "localtopic:indexoftopic",
"connection.url" : "aws elasticsearch url",
"type.name" : "event",
"key.ignore" : "false",
"schema.ignore" : "true",
"schemas.enable" : "false",
"transforms" : "InsertKey,extractKey",
"transforms.InsertKey.type" : "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.InsertKey.fields" : "event-id",
"transforms.extractKey.type" : "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field" : "event-id"
}
Run Code Online (Sandbox Code Playgroud)
索引名称:indexoftopic是在elasticsearch中创建的,但数据是通过index_name:localtopic kafkaversion:2.3连接器版本:5 elasticsearch版本:3.2.0看到的
即使在日志 INFO --topics.regex = "" 中,我也不知道 ihis 选项,任何人都可以建议。怎么用这个???
Spring Boot (version 2.2) application with Spring Kafka (version 2.4) 无法与官方执行的Bitnami Docker Kafka (version 2)建立连接docker-compose.yml
version: '2'
services:
zookeeper:
image: 'bitnami/zookeeper:3'
ports:
- '2181:2181'
volumes:
- 'zookeeper_data:/bitnami'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:2'
ports:
- '9092:9092'
volumes:
- 'kafka_data:/bitnami'
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
Run Code Online (Sandbox Code Playgroud)
Spring 应用程序不断产生以下警告:
[kafka-admin-client-thread | adminclient-1] WARN o.apache.kafka.clients.NetworkClient.initiateConnect - [AdminClient clientId=adminclient-1] Error connecting to node 2228a9a3b8c5:9092 (id: 1001 rack: null) java.net.UnknownHostException: 2228a9a3b8c5
Run Code Online (Sandbox Code Playgroud)
或者
[kafka-admin-client-thread | …
Run Code Online (Sandbox Code Playgroud) 我已经使用kafka-python
库编写了一个 python 脚本,它将消息写入和读取到kafka
. 我写消息没有任何问题;kafka
我可以使用控制台工具检索它们。但我无法使用我的 python 脚本读取它们。我的消费者有一个 for ,它冻结在迭代的第一行并且永远不会返回。这是我的代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"my-topic",
bootstrap_servers="localhost:9092"),
value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)
for msg in consumer:
print(type(msg))
Run Code Online (Sandbox Code Playgroud)
消费者被创建并完全订阅;我可以看到它my-topic
列在其属性的主题列表中_client
。
任何想法?
我正在创建一个包含字符串和地图作为字段的 avro 类。我可以通过 maven 生成 avro 类,并且我能够在 localhost:8081 中创建一个注册表
.avsc 文件:
{
"type":"record",
"name":"AvroClass",
"namespace":"belliPack.avro",
"fields":[
{
"name":"title",
"type":"string"
},
{
"name":"map",
"type": {"type": "map", "values": "double"}
}
]
}
Run Code Online (Sandbox Code Playgroud)
模式注册表返回: $ curl -X GET http://localhost:8081/subjects/teste1-value/versions/1
{"subject":"teste1-value","version":1,"id":42,"schema":"{"type":"record","name":"AvroClass","namespace":"belliPack.avro","fields":[{"name":"title","type":"string"},{"name":"map","type":{"type":"map","values":"double"}}]}"}
Run Code Online (Sandbox Code Playgroud)
我的卡夫卡制作人课程是:
public KafkaProducer<String, AvroClass> createKafkaProducer() {
String bootstrapServer = "127.0.0.1:9092";
String schemaRegistryURL = "127.0.0.1:8081";
//create Producer properties
Properties properties = new Properties();
//kafka documentation>producer configs
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,schemaRegistryURL);
//create producer
KafkaProducer<String, AvroClass> producer = new KafkaProducer<>(properties);
return producer;
} …
Run Code Online (Sandbox Code Playgroud) apache-kafka ×7
java ×3
python ×3
docker ×2
python-3.x ×2
avro ×1
consul ×1
kafka-python ×1
nginx ×1
nomad ×1
spring-kafka ×1