我读过一些文章,对 Spark Streaming、Storm 和 Flink 等流处理引擎的性能进行基准测试。在评估部分,标准是99%和吞吐量。例如,Apache Kafka 以每秒大约 100.000 个事件的速度发送数据,这三个引擎充当流处理器,它们的性能是使用 99% 的延迟和吞吐量来描述的。
谁能为我澄清这两个标准?
我编写了 Kafka Streaming 应用程序,我想将它部署在 Kafka 集群上。所以我构建了一个 jar 文件并使用以下命令运行它:
java -jar KafkaProcessing-1.0-SNAPSHOT-jar-with-dependencies.jar testTopic kafka1:9092,kafka2:9092 zookeeper1:2181,zookeeper2:2181 output
Run Code Online (Sandbox Code Playgroud)
它运行正常,但作业正在我在命令上方运行的机器上运行!我想当我指定BOOTSTRAP-SERVERS它时会自动在集群上进行计算,而不是在主机上!
所以我的问题是如何在 kafka 集群上提交 Kafka 流作业?像 Spark 和 Flink 一样,提供命令spark-submit并flink run在集群上部署应用程序。
我读取了一串字节,我想将其转换为十六进制代码。我知道一般数据是:
aa 01 00 1c 1e 38 5a 19 26 fc 00 7a e1 48 00 00 ff bf 00 58 01 2c 00 00 00 64 25 ff
我使用以下代码将字符串转换为十六进制:
byte[] temp = record.value().getBytes();
StringBuffer result = new StringBuffer();
for (byte b : temp) {
result.append(String.format("%02X ", b));
result.append(" "); // delimiter
}
System.out.println(result);
Run Code Online (Sandbox Code Playgroud)
输出是:
EF BF BD 01 00 1C 1E 39 5A 18 40 EF BF BD 00 38 51 EF BF BD 00 00 EF BF BD …
使用Apache Flink版本1.3.2和Cassandra 3.11,我编写了一个简单的代码,使用Apache Flink Cassandra连接器将数据写入Cassandra.以下是代码:
final Collection<String> collection = new ArrayList<>(50);
for (int i = 1; i <= 50; ++i) {
collection.add("element " + i);
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<UUID, String>> dataStream = env
.fromCollection(collection)
.map(new MapFunction<String, Tuple2<UUID, String>>() {
final String mapped = " mapped ";
String[] splitted;
@Override
public Tuple2<UUID, String> map(String s) throws Exception {
splitted = s.split("\\s+");
return new Tuple2(
UUID.randomUUID(),
splitted[0] + mapped + splitted[1]
);
}
});
dataStream.print();
CassandraSink.addSink(dataStream)
.setQuery("INSERT INTO test.phases (id, …Run Code Online (Sandbox Code Playgroud) 我有以下格式的字符串:
[{"HostName":"taskmanager1","Rack":"/default-rack","State":"RUNNING","NodeId":"taskmanager1:45454","NodeHTTPAddress":"taskmanager1:8042","LastHealthUpdate":1519568501615,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},{"HostName":"datanode2","Rack":"/default-rack","State":"RUNNING","NodeId":"datanode2:45454","NodeHTTPAddress":"datanode2:8042","LastHealthUpdate":1519260876106,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},{"HostName":"taskmanager3","Rack":"/default-rack","State":"RUNNING","NodeId":"taskmanager3:45454","NodeHTTPAddress":"taskmanager3:8042","LastHealthUpdate":1519568502251,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},{"HostName":"datanode3","Rack":"/default-rack","State":"RUNNING","NodeId":"datanode3:45454","NodeHTTPAddress":"datanode3:8042","LastHealthUpdate":1519260871527,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},{"HostName":"taskmanager2","Rack":"/default-rack","State":"RUNNING","NodeId":"taskmanager2:45454","NodeHTTPAddress":"taskmanager2:8042","LastHealthUpdate":1519568502259,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},{"HostName":"datanode1","Rack":"/default-rack","State":"RUNNING","NodeId":"datanode1:45454","NodeHTTPAddress":"datanode1:8042","LastHealthUpdate":1519260875647,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024}]
Run Code Online (Sandbox Code Playgroud)
我想将其拆分为多种(此处为6种)JSON格式,但是我的模式无法按需将其拆分。
我想要这样的东西:
{"HostName":"taskmanager1","Rack":"/default-rack","State":"RUNNING","NodeId":"taskmanager1:45454","NodeHTTPAddress":"taskmanager1:8042","LastHealthUpdate":1519568501615,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},
{"HostName":"datanode2","Rack":"/default-rack","State":"RUNNING","NodeId":"datanode2:45454","NodeHTTPAddress":"datanode2:8042","LastHealthUpdate":1519260876106,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},
{"HostName":"taskmanager3","Rack":"/default-rack","State":"RUNNING","NodeId":"taskmanager3:45454","NodeHTTPAddress":"taskmanager3:8042","LastHealthUpdate":1519568502251,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},
{"HostName":"datanode3","Rack":"/default-rack","State":"RUNNING","NodeId":"datanode3:45454","NodeHTTPAddress":"datanode3:8042","LastHealthUpdate":1519260871527,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024}
,{"HostName":"taskmanager2","Rack":"/default-rack","State":"RUNNING","NodeId":"taskmanager2:45454","NodeHTTPAddress":"taskmanager2:8042","LastHealthUpdate":1519568502259,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},
{"HostName":"datanode1","Rack":"/default-rack","State":"RUNNING","NodeId":"datanode1:45454","NodeHTTPAddress":"datanode1:8042","LastHealthUpdate":1519260875647,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024}
Run Code Online (Sandbox Code Playgroud)
使用代码:
List<String> res = Arrays.asList(temp.replace('[', ' ').replace(']',' ').trim()).split(",");
Run Code Online (Sandbox Code Playgroud)
它将针对每个,字符进行拆分,并且使用该模式split("},\\}")还将删除}和{字符。
如何将其拆分为制作Json对象的愿望?
使用Java模式(\\{.+})将整个字符串分组。
假设我有一个数据流,其中包含事件时间数据。我想在 8 毫秒的窗口时间内收集输入数据流并减少每个窗口数据。我使用以下代码来做到这一点:
aggregatedTuple
.keyBy( 0).timeWindow(Time.milliseconds(8))
.reduce(new ReduceFunction<Tuple2<Long, JSONObject>>()
Run Code Online (Sandbox Code Playgroud)
Point:数据流的关键是处理时间的时间戳映射到处理毫秒的时间戳的后8个约数,例如1531569851297将映射到1531569851296。
但数据流可能延迟到达并进入错误的窗口时间。例如,假设我将窗口时间设置为 8 毫秒。如果数据按顺序进入 Flink 引擎或至少延迟小于窗口时间(8 毫秒),这将是最好的情况。但假设数据流事件时间(也是数据流中的一个字段)已到达,延迟时间为 30 毫秒。所以它会进入错误的窗口,我想如果我检查每个数据流的事件时间,因为它想进入窗口,我可以过滤这么晚的数据。所以我有两个问题:
我有一个 3 节点 Zookeeper 集群版本 3.4.11 和 2 节点 Kafka 集群版本 0.11.3。我们编写了一个生产者,将消息发送到Kafka集群的特定主题和分区(我之前做过,并且对生产者进行了测试)。以下是经纪人配置:
broker.id=1
listeners=PLAINTEXT://node1:9092
num.partitions=24
delete.topic.enable=true
default.replication.factor=2
log.dirs=/data
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
log.retention.hours=168
zookeeper.session.timeout.ms=40000
zookeeper.connection.timeout.ms=10000
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=2
Run Code Online (Sandbox Code Playgroud)
一开始,经纪人上没有主题,它们会自动创建。当我启动生产者时,Kafka 集群表现出奇怪的行为:
1-它创建了所有主题,但虽然生成数据的速率为每秒 10KB,但在不到一分钟的时间内,每个代理的日志目录从零数据变为 9.0 GB 数据!并且所有代理都关闭(因为缺乏日志目录容量)
2-就在开始生成数据时,我尝试使用控制台消费者使用数据,但它只是出错
WARN Error while fetching metadata with correlation id 2 : {Topic1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
Run Code Online (Sandbox Code Playgroud)
3-以下是经纪商日志中反复出现的错误:
INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: Topic6-6. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
WARN Newly rolled segment file 00000000000000000000.log already exists; deleting it first (kafka.log.Log) …Run Code Online (Sandbox Code Playgroud) apache-flink ×3
apache-kafka ×2
java ×2
apache-storm ×1
byte ×1
json ×1
regex ×1
streaming ×1
unicode ×1
windowing ×1