小编Omr*_*nor的帖子

Apache Flink - 在作业中无法识别自定义Java选项

我已将以下行添加到flink-conf.yaml:

env.java.opts:" - Ddy.props.path =/PATH/TO/PROPS/FILE"

当启动jobmanager(jobmanager.sh启动集群)时,我在日志中看到jvm选项确实被识别

2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -  JVM Options:
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Xms256m
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Xmx256m
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -XX:MaxPermSize=256m
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Ddy.props.path=/srv/dy/stream-aggregators/aggregators.conf
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Dlog.file=/srv/flink-1.2.0/log/flink-flink-jobmanager-0-flinkvm-master.log
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Dlog4j.configuration=file:/srv/flink-1.2.0/conf/log4j.properties
2017-02-20 12:19:23,536 INFO  org.apache.flink.runtime.jobmanager.JobManager                -     -Dlogback.configurationFile=file:/srv/flink-1.2.0/conf/logback.xml
Run Code Online (Sandbox Code Playgroud)

但是当我运行flink作业(flink run -d PROG.JAR)时,System.getProperty("dy.props.path")返回null(当打印系统属性时,我发现它确实不存在.)

问题是 - 如何设置flink-job代码中可用的系统属性?

java apache-flink flink-streaming

8
推荐指数
1
解决办法
1590
查看次数

kafka-connect 在分布式模式下返回 409

我正在运行 kafka-connect 分布式设置。

我正在使用单机/进程设置(仍处于分布式模式)进行测试,效果很好,现在我正在使用 3 个节点(和 3 个连接进程),日志不包含错误,但是当我提交 s3-connector 时通过rest-api请求,它返回:{"error_code":409,"message":"Cannot complete request because of a conflicting operation (e.g. worker rebalance)"}

当我停止其中一个节点上的 kafka-connect 进程时,我实际上可以提交作业并且一切正常。

我的集群中有 3 个代理,主题的分区号是 32。

这是我尝试启动的连接器:

{
    "name": "s3-sink-new-2",
    "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "tasks.max": "32",
        "topics": "rawEventsWithoutAttribution5",
        "s3.region": "us-east-1",
        "s3.bucket.name": "dy-raw-collection",
        "s3.part.size": "64000000",
        "flush.size": "10000",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
        "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
        "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
        "partition.duration.ms": "60000",
        "path.format": "\'year\'=YYYY/\'month\'=MM/\'day\'=dd/\'hour\'=HH",
        "locale": "US",
        "timezone": "GMT",
        "timestamp.extractor": "RecordField",
        "timestamp.field": "procTimestamp",
        "name": "s3-sink-new-2"
    }
}
Run Code Online (Sandbox Code Playgroud)

日志中没有任何内容表明有问题,我真的迷失在这里。

apache-kafka apache-kafka-connect confluent-platform

4
推荐指数
1
解决办法
1936
查看次数