ale*_*ago 3 apache-kafka apache-kafka-connect confluent-platform
我正在使用 kafka/confluent (3.2.0) 来检索我们拥有的 Mongodb 实例的更改。
源进程由Debezium 源连接器管理使用 Source Connect Api并使用 Mesos (DC/OS) 扩展 Confluent Connect docker 映像部署在我们的系统上。Kafka 本身使用框架版本部署在同一 DC/OS 上。
由于我们有一些大于默认大小的消息,因此我更改了这些 kafka 安装参数:
• replica.fetch.max.bytes
• message.max.bytes
都到 4MB。
然后我使用这个启动连接器Docker镜像
docker run -d --rm --net=bridge --name=kafka-connect-mongodb -e CONNECT_BOOTSTRAP_SERVERS=${KAFKA_BOOTSTRAP_SERVERS} -e CONNECT_REST_PORT=${CONNECT_REST_PORT} -e CONNECT_GROUP_ID="mongo-to-kafka-source-connector" -e CONNECT_CONFIG_STORAGE_TOPIC="${CONFIG.TOPIC}" -e CONNECT_OFFSET_STORAGE_TOPIC="${OFFSETS.TOPIC}" -e CONNECT_STATUS_STORAGE_TOPIC="${STATUS.TOPIC}" -e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" -e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="${SCHEMA_REGISTRY_LISTENERS}" -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="${SCHEMA_REGISTRY_LISTENERS}" -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" -e CONNECT_REST_ADVERTISED_HOST_NAME="${CONNECTOR_HOME}" -e CONNECT_LOG4J_ROOT_LOGLEVEL=INFO -e CONNECT_MAX_REQUEST_SIZE=4194304 -e KAFKA_MAX_REQUEST_SIZE=4194304 mongodb-source-connector:1.1
Run Code Online (Sandbox Code Playgroud)
我更改了默认的 max.request.size 生产者值,同时传递了 KAFKA_MAX_REQUEST_SIZE 和 CONNECT_MAX_REQUEST_SIZE,并且日志已正确更改为 4MB。
当我开始从 Mongodb 提取时出现问题。为此,我运行此 POST
curl -X POST \
http://hostname:8083/connectors \
-d '{
"name": "source_connector",
"config": {
"tasks.max":"1",
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "mongodbhost:27017",
"mongodb.name": "replica",
"collection.whitelist": "db[.]table",
"max.request.size": "4194304"
}
}'
Run Code Online (Sandbox Code Playgroud)
但随后日志说
[2017-10-09 12:22:56,036] INFO ProducerConfig values:
acks = all
batch.size = 16384
block.on.buffer.full = false
bootstrap.servers = [PLAINTEXT://172.17.0.3:9093]
buffer.memory = 33554432
client.id =
compression.type = none
connections.max.idle.ms = 540000
interceptor.classes = null
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 9223372036854775807
max.in.flight.requests.per.connection = 1
max.request.size = 1048576
metadata.fetch.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.ms = 50
request.timeout.ms = 2147483647
retries = 2147483647
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
timeout.ms = 30000
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
Run Code Online (Sandbox Code Playgroud)
因此,当我启动源进程时,将使用 max.request.size 的默认值。
这里是完整的日志。
我不明白我想念的是什么。
IRC 聊天帮助了我:我必须在启动 docker 映像时同时指定 KAFKA_PRODUCER_MAX_REQUEST_SIZE 和 CONNECT_PRODUCER_MAX_REQUEST_SIZE。
归档时间: |
|
查看次数: |
3506 次 |
最近记录: |