eth*_*nny 2 java apache-kafka apache-flink flink-streaming
我正在看一些生成~30K消息/秒的kafka主题.我有一个flink拓扑设置来读取其中一个,聚合一点(5秒窗口)然后(最终)写入DB.
当我运行拓扑并删除除了读取 - >聚合步骤之外的所有内容时,我每分钟只能获得~30K消息.背压不会发生在任何地方.
我究竟做错了什么?
看起来我只能得到~1.5 MB/s.不是接近提到的100MB/s.
当前的代码路径:
DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);
DataStream<Tuple4<Long, Long, Integer, String>> ds4 = dataStream4.rebalance().flatMap(new mapper2("data_4")).setParallelism(4);
Run Code Online (Sandbox Code Playgroud)
public class mapper2 implements FlatMapFunction<byte[], Tuple4<Long, Long, Integer, String>> {
private String mapId;
public mapper2(String mapId) {
this.mapId = mapId;
}
@Override
public void flatMap(byte[] bytes, Collector<Tuple4<Long, Long, Integer, String>> collector) throws Exception {
TimeData timeData = (TimeData)ts_thriftDecoder.fromBytes(bytes);
Tuple4 tuple4 = new Tuple4<Long, Long, Integer, String>();
tuple4.f0 = timeData.getId();
tuple4.f1 = timeData.getOtherId();
tuple4.f2 = timeData.getSections().size();
tuple4.f3 = mapId;
collector.collect(tuple4);
}
}
Run Code Online (Sandbox Code Playgroud)
从代码中,我看到两个可能导致性能问题的潜在组件:
为了理解瓶颈的位置,我首先要从Kafka主题中测量Flink读取的原始读取性能.
因此,您可以在群集上运行以下代码吗?
public class RawKafka {
private static final Logger LOG = LoggerFactory.getLogger(RawKafka.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);
dataStream4.flatMap(new FlatMapFunction<byte[], Integer>() {
long received = 0;
long logfreq = 50000;
long lastLog = -1;
long lastElements = 0;
@Override
public void flatMap(byte[] element, Collector<Integer> collector) throws Exception {
received++;
if (received % logfreq == 0) {
// throughput over entire time
long now = System.currentTimeMillis();
// throughput for the last "logfreq" elements
if(lastLog == -1) {
// init (the first)
lastLog = now;
lastElements = received;
} else {
long timeDiff = now - lastLog;
long elementDiff = received - lastElements;
double ex = (1000/(double)timeDiff);
LOG.info("During the last {} ms, we received {} elements. That's {} elements/second/core. GB received {}",
timeDiff, elementDiff, elementDiff*ex, (received * 2500) / 1024 / 1024 / 1024);
// reinit
lastLog = now;
lastElements = received;
}
}
}
});
env.execute("Raw kafka throughput");
}
}
Run Code Online (Sandbox Code Playgroud)
此代码测量来自Kafka的50k元素之间的时间,并记录从Kafka读取的元素数量.在我的本地机器上,我获得了~330k元素/核心/秒的吞吐量:
16:09:34,028 INFO RawKafka - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,028 INFO RawKafka - During the last 86 ms, we received 30000 elements. That's 348837.20930232556 elements/second/core. GB received 0
16:09:34,028 INFO RawKafka - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0
16:09:34,028 INFO RawKafka - During the last 88 ms, we received 30000 elements. That's 340909.0909090909 elements/second/core. GB received 0
16:09:34,030 INFO RawKafka - During the last 90 ms, we received 30000 elements. That's 333333.3333333333 elements/second/core. GB received 0
16:09:34,030 INFO RawKafka - During the last 91 ms, we received 30000 elements. That's 329670.3296703297 elements/second/core. GB received 0
16:09:34,030 INFO RawKafka - During the last 85 ms, we received 30000 elements. That's 352941.17647058825 elements/second/core. GB received 0
Run Code Online (Sandbox Code Playgroud)
我真的很想知道你从卡夫卡读取的吞吐量.
归档时间: |
|
查看次数: |
1639 次 |
最近记录: |