有一个如下所示的数据框,它有一个不干净的列'id',它应该是数字列
id, name
1, A
2, B
3, C
tt, D
4, E
5, F
de, G
Run Code Online (Sandbox Code Playgroud)
是否有一种简洁的方法来删除行,因为tt和de不是数值
tt,D
de,G
Run Code Online (Sandbox Code Playgroud)
使数据帧干净?
id, name
1, A
2, B
3, C
4, E
5, F
Run Code Online (Sandbox Code Playgroud) 我读到Kafka不再需要zookeeper,所以我不想在docker-compose中使用zookeeper。但我不知道哪个卡夫卡图像可以在没有动物园管理员的情况下工作。谁能给出提示吗?
在启动 Kafka-Connect 时,我看到了很多警告
10:33:56.706 [DistributedHerder] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'config.storage.topic' was supplied but isn't a known config.
10:33:56.707 [DistributedHerder] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'group.id' was supplied but isn't a known config.
10:33:56.708 [DistributedHerder] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'status.storage.topic' was supplied but isn't a known config.
10:33:56.709 [DistributedHerder] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'internal.key.converter.schemas.enable' was supplied but isn't a known config.
10:33:56.710 [DistributedHerder] WARN org.apache.kafka.clients.admin.AdminClientConfig - The configuration 'config.storage.replication.factor' was supplied but isn't a known config.
10:33:56.710 [DistributedHerder] …Run Code Online (Sandbox Code Playgroud) 关于生成代理键,第一步是获取distinct,然后为每个元组构建增量键.
因此,我使用Java Set来获取不同的元素,并且它不在堆空间中.然后,我使用Flink的distinct(),它完全有效.
我能问一下这有什么不同吗?
另一个相关的问题是,Flink能否在mapper中生成代理键?
我在 中看到了以下常量ConsumerGroupState。什么情况会导致消费者组处于 DEAD 或 UNKNOWN 状态?
UNKNOWN("Unknown"),
PREPARING_REBALANCE("PreparingRebalance"),
COMPLETING_REBALANCE("CompletingRebalance"),
STABLE("Stable"),
DEAD("Dead"),
EMPTY("Empty");
Run Code Online (Sandbox Code Playgroud)
我认为 EMPTY 意味着成员是空的。PREPARING_REBALANCE 表示新消费者加入/旧消费者离开,COMPLETING_REBALANCE 与之类似。
我的目标是找到一个表明消费者组处于非活动状态的状态,据我了解,我可以简单地采用 EMPTY 状态。
通常我将 json 字符串解析为 json 对象,而不是直接操作 json 字符串。例如,像这样的 json 字符串
{"number": "1234567"}
Run Code Online (Sandbox Code Playgroud)
如果我必须在末尾添加 000
...
{...,"number" : "1234567000",...}
....
Run Code Online (Sandbox Code Playgroud)
我将使用 jackson 将其解析为 Json 对象或 POJO
我了解 Json 对象或 POJO 的可读性透视解析要好得多,但我对性能感到好奇。在这种情况下,如果我直接操作json字符串,我必须使用正则表达式提取数字属性,并在末尾添加000,如果数据很多,这比解析为Json对象要昂贵得多?因为字符串对象基本上创建了一个新的字符串对象?
编辑:基于@Itai Steinherz的链接,我还在JS中做了一个基准,它显示json解析更好 https://jsbench.me/93jr1w6k5b/1
我发现删除 arun只会将状态从 更改active为deleted,因为如果通过deleted.
是否可以run从 UI 中删除 a以节省空间?删除运行时,是否也删除了与运行对应的工件?
如果没有,可以通过休息调用删除运行吗?
"errors.deadletterqueue.topic.name"适用于源连接器吗?我使用JDBC 接收器连接器进行了测试,它可以工作,但我没有找到有序列化错误的记录进入死信队列。
我使用Debezium Connector for MongoDB,apache-kafka-connect版本是 2.4.0。
其余错误处理配置:
"errors.tolerance": "all",
"errors.log.enable": "false",
"errors.deadletterqueue.topic.name": "test-dlq",
"errors.deadletterqueue.context.headers.enable": "true"
Run Code Online (Sandbox Code Playgroud) 我的源连接器抛出
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Error while forwarding register schema request to the master; error code: 50003
或者
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Master not known
我发现当模式注册表的主更改并且我在 k8s 上的同一服务下有两个模式注册表副本时会发生这种情况。
最上面的异常是org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
如何增加容忍度,以便连接器可以重试更多次,直到选出新的主节点?
我读到Docker (Compose) clientconnectstokakatooearly,但它没有给出要检查的命令。
我应该如何配置我的 kafka 代理,以便在 Zookeeper 未准备好时重试?由于 kafka 代理尚未准备好,我的模式注册表也失败。
我的 docker-compose 文件:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.3
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:5.5.3
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:5.5.3
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper …Run Code Online (Sandbox Code Playgroud) 使用joinHint和joinWithTiny,joinWithHuge有什么区别?
关于joinHint,我们可以使用BROADCAST_HASH_FIRST:提示第一个连接输入比第二个小得多.REPARTITION_HASH_FIRST:提示第一个连接输入比第二个小一点.
同时,我们也可以使用joinWithHuge和joinWithTiny
它们是一样的吗?所以joinWithTiny正在使用BROADCAST_HASH_FIRST?
利用这些的好处是Flink作业节省了检查加入数据大小的时间?
我想使用MockAdminClient我的 Kafka 应用程序进行单元测试。如何MockAdminClient使用 进行导入maven?
更新:
我的kafka-client相关的pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
通过引用maven的测试,我发现了这样的 How can I reference unit test classes of a maven dependency in my java project?
我有一个清单
val result = MutableList(N) { 0 }
Run Code Online (Sandbox Code Playgroud)
我想为列表中的元素增加 1 result
这有效
for (item in result.withIndex()) {
result[item.index] = result[item.index] + 1
}
Run Code Online (Sandbox Code Playgroud)
但是有更多的 kotlin 方法吗?喜欢it-> it+=1?我得到 val 不能使用这个重新分配
apache-kafka ×7
apache-flink ×2
java ×2
dead-letter ×1
docker ×1
jackson ×1
json ×1
kotlin ×1
mlflow ×1
pandas ×1
python ×1
string ×1