小编cri*_*007的帖子

游牧者:健康检查

需要帮助我有这个

service {
  name = "nginx"
  tags = [ "nginx", "web", "urlprefix-/nginx" ]
  port = "http"
  check {
    type = "tcp"
    interval = "10s"
    timeout = "2s"
  }
}
Run Code Online (Sandbox Code Playgroud)

如果返回 200 响应(如 localhost:8080/test/index.html),我如何为特定 URI 添加运行状况

nginx consul nomad

1
推荐指数
1
解决办法
1856
查看次数

python-如何在案例中使用间隔创建python的切换案例?

我是python的新手,我想创建一个switch case,其中case可以将间隔作为条件,例如:

switch = {
    1..<21: do one stuff,
    21...31: do another
}
Run Code Online (Sandbox Code Playgroud)

我怎样才能达到这个结果?

python switch-statement python-3.x

1
推荐指数
1
解决办法
62
查看次数

使用kafka流基于消息密钥将消息发送到主题

我希望能够根据消息密钥的密钥将Kafkastream中的所有记录发送到另一个主题。例如 Kafka中的流包含名称作为键和记录作为值。我想根据记录的关键将这些记录分散到不同的主题

数据:(jhon-> {jhonsRecord}),(sean-> {seansRecord}),(mary-> {marysRecord}),(jhon-> {jhonsRecord2}),预期

  • topic1:名称:jhon->(jhon-> {jhonsRecord}),(jhon-> {jhonsRecord2})
  • topic2:sean->(sean-> {seansRecord})
  • topic3:mary->(mary-> {marysRecord})

下面是我现在执行此操作的方式,但是由于名称列表比较笨拙,因此速度很慢。另外,即使有一些名字的记录,我也需要遍历整个列表。请提出修复建议

    for( String name : names )
    {
        recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);
    } 
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
162
查看次数

KafkaTool:无法连接到 Kafka 集群

我正在尝试使用 KafkaTool 连接到 Kafka。我收到一个错误:连接到集群时出错。未能创建新的 KafkaAdminClient

Kafka 和 Zookeeper 托管在 Docker 中。我运行下一个命令

   docker network create kafka
   docker run --network=kafka -d --name zookeeper -e ZOOKEEPER_CLIENT_PORT=2181 confluentinc/cp-zookeeper:latest
   docker run --network=kafka -d -p 9092:9092 --name kafka -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 confluentinc/cp-kafka:latest
Run Code Online (Sandbox Code Playgroud)

KafkaTool 的设置 在此处输入图片说明

在此处输入图片说明

为什么 KafkaTool 无法连接到托管在 Docker 中的 Kafka?

apache-kafka docker apache-zookeeper

1
推荐指数
1
解决办法
2880
查看次数

如何返回列表中以“ b”开头的第二个元素

我有一个带有字符串的列表的函数,我必须在列表中找到以“ b”开头的第二个元素。

例如:

second_elemnt_starting_with_b(["b", "a", "bb"]) => "bb"
Run Code Online (Sandbox Code Playgroud)

python python-3.x

1
推荐指数
1
解决办法
77
查看次数

仅当某些条件为真时才使用来自 Kafka 的消息

我们有特定的主题,只有在条件consumeEnabled=true 时才需要消费消息。所以,它应该像这样工作:

  1. 如果应用程序正在启动并且consumeEnabled=true,则将分区分配给消费者并使用来自主题的消息。
  2. 如果应用程序正在启动并且consumeEnabled=false,则不要将分区分配给消费者,也不要使用来自主题的消息。
  3. 如果应用程序已经以consumeEnabled=false 运行,但在运行时属性变为consumeEnabled=true,则在运行时将分区分配给消费者并使用来自主题的消息。

应用程序正在使用消息,但随后 consumerEnabled 变为 false 无需考虑的情况。

请用 Spring Kafka 和/或 Kafka Java 客户端定义实现决策的最佳方式

java apache-kafka spring-kafka

1
推荐指数
1
解决办法
982
查看次数

无法将主题映射到 kafka elasticsearch 连接器中的指定索引

尝试将主题“name:localtopic”映射到索引“name:indexoftopic”,它在弹性搜索“localtopic和indexoftopic”中创建两个新索引,并且主题数据仅在主题名称索引“localtopic”中可见,连接器中没有显示错误(分布式模式)

我的配置是

 "config" : {
  "connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "tasks.max" : "1",
  "topics" : "localtopic", 
  "topic.index.map" : "localtopic:indexoftopic",
  "connection.url" : "aws elasticsearch url",
  "type.name" : "event",
  "key.ignore" : "false",
  "schema.ignore" : "true",
  "schemas.enable" : "false",
  "transforms" : "InsertKey,extractKey",
  "transforms.InsertKey.type" : "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.InsertKey.fields" : "event-id",
  "transforms.extractKey.type" : "org.apache.kafka.connect.transforms.ExtractField$Key",
  "transforms.extractKey.field" : "event-id"
 }
Run Code Online (Sandbox Code Playgroud)

索引名称:indexoftopic是在elasticsearch中创建的,但数据是通过index_name:localtopic kafkaversion:2.3连接器版本:5 elasticsearch版本:3.2.0看到的

即使在日志 INFO --topics.regex = "" 中,我也不知道 ihis 选项,任何人都可以建议。怎么用这个???

elasticsearch apache-kafka apache-kafka-connect

1
推荐指数
1
解决办法
1969
查看次数

从 Spring Boot 应用程序连接到本地 Bitnami Docker Kafka 时出错

Spring Boot (version 2.2) application with Spring Kafka (version 2.4) 无法与官方执行的Bitnami Docker Kafka (version 2)建立连接docker-compose.yml

version: '2'

services:
  zookeeper:
    image: 'bitnami/zookeeper:3'
    ports:
      - '2181:2181'
    volumes:
      - 'zookeeper_data:/bitnami'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:2'
    ports:
      - '9092:9092'
    volumes:
      - 'kafka_data:/bitnami'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local
Run Code Online (Sandbox Code Playgroud)

Spring 应用程序不断产生以下警告:

[kafka-admin-client-thread | adminclient-1] WARN  o.apache.kafka.clients.NetworkClient.initiateConnect - [AdminClient clientId=adminclient-1] Error connecting to node 2228a9a3b8c5:9092 (id: 1001 rack: null) java.net.UnknownHostException: 2228a9a3b8c5
Run Code Online (Sandbox Code Playgroud)

或者

[kafka-admin-client-thread | …
Run Code Online (Sandbox Code Playgroud)

apache-kafka docker

1
推荐指数
1
解决办法
1352
查看次数

使用 kafka-python 检索主题中的消息

我已经使用kafka-python库编写了一个 python 脚本,它将消息写入和读取到kafka. 我写消息没有任何问题;kafka我可以使用控制台工具检索它们。但我无法使用我的 python 脚本读取它们。我的消费者有一个 for ,它冻结在迭代的第一行并且永远不会返回。这是我的代码:

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "my-topic",
    bootstrap_servers="localhost:9092"),
    value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)

for msg in consumer:
    print(type(msg))
Run Code Online (Sandbox Code Playgroud)

消费者被创建并完全订阅;我可以看到它my-topic列在其属性的主题列表中_client

任何想法?

python apache-kafka kafka-consumer-api kafka-python

1
推荐指数
1
解决办法
9556
查看次数

错误序列化 Avro 消息 - Kafka Schema Registry

我正在创建一个包含字符串和地图作为字段的 avro 类。我可以通过 maven 生成 avro 类,并且我能够在 localhost:8081 中创建一个注册表

.avsc 文件:

    {
"type":"record",
"name":"AvroClass",
"namespace":"belliPack.avro",
"fields":[
{
"name":"title",
"type":"string"
},
{
"name":"map",
"type": {"type": "map", "values": "double"}
}
]
}
Run Code Online (Sandbox Code Playgroud)

模式注册表返回: $ curl -X GET http://localhost:8081/subjects/teste1-value/versions/1

{"subject":"teste1-value","version":1,"id":42,"schema":"{"type":"record","name":"AvroClass","namespace":"belliPack.avro","fields":[{"name":"title","type":"string"},{"name":"map","type":{"type":"map","values":"double"}}]}"}
Run Code Online (Sandbox Code Playgroud)

我的卡夫卡制作人课程是:

public KafkaProducer<String, AvroClass> createKafkaProducer() {
    String bootstrapServer = "127.0.0.1:9092";
    String schemaRegistryURL = "127.0.0.1:8081";

    //create Producer properties
    Properties properties = new Properties();
    //kafka documentation>producer configs
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,schemaRegistryURL);

    //create producer
    KafkaProducer<String, AvroClass> producer = new KafkaProducer<>(properties);
    return producer;
} …
Run Code Online (Sandbox Code Playgroud)

java avro apache-kafka confluent-schema-registry

1
推荐指数
1
解决办法
3033
查看次数