小编Hun*_*orn的帖子

使用pandas删除一列中的非数字行

有一个如下所示的数据框,它有一个不干净的列'id',它应该是数字列

id, name
1,  A
2,  B
3,  C
tt, D
4,  E
5,  F
de, G
Run Code Online (Sandbox Code Playgroud)

是否有一种简洁的方法来删除行,因为tt和de不是数值

tt,D
de,G
Run Code Online (Sandbox Code Playgroud)

使数据帧干净?

id, name
1,  A
2,  B
3,  C
4,  E
5,  F
Run Code Online (Sandbox Code Playgroud)

python pandas

37
推荐指数
4
解决办法
5万
查看次数

无需 Zookeeper 即可运行的 Kafka docker 镜像

我读到Kafka不再需要zookeeper,所以我不想在docker-compose中使用zookeeper。但我不知道哪个卡夫卡图像可以在没有动物园管理员的情况下工作。谁能给出提示吗?

apache-kafka

31
推荐指数
6
解决办法
2万
查看次数

Kafka 连接:提供了配置 XXX 但不是 AdminClientConfig 中的已知配置

在启动 Kafka-Connect 时,我看到了很多警告

10:33:56.706 [DistributedHerder] WARN  org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'config.storage.topic' was supplied but isn't a known config.
10:33:56.707 [DistributedHerder] WARN  org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'group.id' was supplied but isn't a known config.
10:33:56.708 [DistributedHerder] WARN  org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'status.storage.topic' was supplied but isn't a known config.
10:33:56.709 [DistributedHerder] WARN  org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'internal.key.converter.schemas.enable' was supplied but isn't a known config.
10:33:56.710 [DistributedHerder] WARN  org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'config.storage.replication.factor' was supplied but isn't a known config.
10:33:56.710 [DistributedHerder] …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-connect

17
推荐指数
1
解决办法
2万
查看次数

Flink在distinct()中使用了什么魔法?如何生成代理键?

关于生成代理键,第一步是获取distinct,然后为每个元组构建增量键.

因此,我使用Java Set来获取不同的元素,并且它不在堆空间中.然后,我使用Flink的distinct(),它完全有效.

我能问一下这有什么不同吗?

另一个相关的问题是,Flink能否在mapper中生成代理键?

apache-flink

7
推荐指数
1
解决办法
931
查看次数

Kafka ConsumerGroupState解释

我在 中看到了以下常量ConsumerGroupState。什么情况会导致消费者组处于 DEAD 或 UNKNOWN 状态?

UNKNOWN("Unknown"),
PREPARING_REBALANCE("PreparingRebalance"),
COMPLETING_REBALANCE("CompletingRebalance"),
STABLE("Stable"),
DEAD("Dead"),
EMPTY("Empty");
Run Code Online (Sandbox Code Playgroud)

我认为 EMPTY 意味着成员是空的。PREPARING_REBALANCE 表示新消费者加入/旧消费者离开,COMPLETING_REBALANCE 与之类似。

我的目标是找到一个表明消费者组处于非活动状态的状态,据我了解,我可以简单地采用 EMPTY 状态。

apache-kafka

7
推荐指数
1
解决办法
6355
查看次数

我应该将 json 字符串解析为 json 对象还是直接操作字符串

通常我将 json 字符串解析为 json 对象,而不是直接操作 json 字符串。例如,像这样的 json 字符串

{"number": "1234567"}

Run Code Online (Sandbox Code Playgroud)

如果我必须在末尾添加 000

...
{...,"number" : "1234567000",...}
....
Run Code Online (Sandbox Code Playgroud)

我将使用 jackson 将其解析为 Json 对象或 POJO

我了解 Json 对象或 POJO 的可读性透视解析要好得多,但我对性能感到好奇。在这种情况下,如果我直接操作json字符串,我必须使用正则表达式提取数字属性,并在末尾添加000,如果数据很多,这比解析为Json对象要昂贵得多?因为字符串对象基本上创建了一个新的字符串对象?

编辑:基于@Itai Steinherz的链接,我还在JS中做了一个基准,它显示json解析更好 https://jsbench.me/93jr1w6k5b/1

java string json jackson

6
推荐指数
1
解决办法
1471
查看次数

从 UI 中删除 mlflow 实验中的运行,因此后端存储中不存在运行

我发现删除 arun只会将状态从 更改activedeleted,因为如果通过deleted.

是否可以run从 UI 中删除 a以节省空间?删除运行时,是否也删除了与运行对应的工件?

如果没有,可以通过休息调用删除运行吗?

mlflow

5
推荐指数
2
解决办法
1401
查看次数

error.deadletterqueue.topic.name 是否适用于源连接器

"errors.deadletterqueue.topic.name"适用于源连接器吗?我使用JDBC 接收器连接器进行了测试,它可以工作,但我没有找到有序列化错误的记录进入死信队列。

我使用Debezium Connector for MongoDB版本是 2.4.0。

其余错误处理配置:

"errors.tolerance": "all",
"errors.log.enable": "false",
"errors.deadletterqueue.topic.name": "test-dlq",
"errors.deadletterqueue.context.headers.enable": "true"
Run Code Online (Sandbox Code Playgroud)

dead-letter apache-kafka apache-kafka-connect

5
推荐指数
1
解决办法
4515
查看次数

当模式注册表的主更改时连接器失败

我的源连接器抛出 Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Error while forwarding register schema request to the master; error code: 50003

或者 Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Master not known

我发现当模式注册表的主更改并且我在 k8s 上的同一服务下有两个模式注册表副本时会发生这种情况。

最上面的异常是org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler

如何增加容忍度,以便连接器可以重试更多次,直到选出新的主节点?

apache-kafka apache-kafka-connect confluent-schema-registry

5
推荐指数
1
解决办法
1679
查看次数

docker-compose kafka等待zookeeper和schema-registry等待kafka

我读到Docker (Compose) clientconnectstokakatooearly,但它没有给出要检查的命令。

我应该如何配置我的 kafka 代理,以便在 Zookeeper 未准备好时重试?由于 kafka 代理尚未准备好,我的模式注册表也失败。

我的 docker-compose 文件:

  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.3
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:5.5.3
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.3
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper …
Run Code Online (Sandbox Code Playgroud)

apache-kafka docker confluent-schema-registry

5
推荐指数
1
解决办法
2733
查看次数

JoinWithTiny,JoinWithHuge和joinHint的区别和好处

使用joinHint和joinWithTiny,joinWithHuge有什么区别?

关于joinHint,我们可以使用BROADCAST_HASH_FIRST:提示第一个连接输入比第二个小得多.REPARTITION_HASH_FIRST:提示第一个连接输入比第二个小一点.

同时,我们也可以使用joinWithHuge和joinWithTiny

它们是一样的吗?所以joinWithTiny正在使用BROADCAST_HASH_FIRST?

利用这些的好处是Flink作业节省了检查加入数据大小的时间?

apache-flink

4
推荐指数
1
解决办法
165
查看次数

导入 Kafka MockAdminClient 以使用 Maven 对我的 Kafka 应用程序进行单元测试

我想使用MockAdminClient我的 Kafka 应用程序进行单元测试。如何MockAdminClient使用 进行导入maven

https://github.com/apache/kafka/blob/2.3.0/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java

更新:

我的kafka-client相关的pom.xml

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
    <version>${kafka.version}</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)

通过引用maven的测试,我发现了这样的 How can I reference unit test classes of a maven dependency in my java project?

java apache-kafka

2
推荐指数
1
解决办法
1213
查看次数

以 Kotlin 方式将 MutableList 中的所有元素增加 1

我有一个清单

val result = MutableList(N) { 0 }
Run Code Online (Sandbox Code Playgroud)

我想为列表中的元素增加 1 result

这有效

for (item in result.withIndex()) {
                result[item.index] = result[item.index] + 1
}
Run Code Online (Sandbox Code Playgroud)

但是有更多的 kotlin 方法吗?喜欢it-> it+=1?我得到 val 不能使用这个重新分配

kotlin

0
推荐指数
1
解决办法
104
查看次数