小编Soh*_*ani的帖子

99% 延迟和吞吐量的含义是什么

我读过一些文章,对 Spark Streaming、Storm 和 Flink 等流处理引擎的性能进行基准测试。在评估部分,标准是99%和吞吐量。例如,Apache Kafka 以每秒大约 100.000 个事件的速度发送数据,这三个引擎充当流处理器,它们的性能是使用 99% 的延迟和吞吐量来描述的。

谁能为我澄清这两个标准?

streaming spark-streaming apache-storm apache-flink

6
推荐指数
1
解决办法
3886
查看次数

如何在 Kafka 集群上部署 Kafka Streaming 应用程序

我编写了 Kafka Streaming 应用程序,我想将它部署在 Kafka 集群上。所以我构建了一个 jar 文件并使用以下命令运行它:

 java -jar KafkaProcessing-1.0-SNAPSHOT-jar-with-dependencies.jar testTopic kafka1:9092,kafka2:9092 zookeeper1:2181,zookeeper2:2181 output
Run Code Online (Sandbox Code Playgroud)

它运行正常,但作业正在我在命令上方运行的机器上运行!我想当我指定BOOTSTRAP-SERVERS它时会自动在集群上进行计算,而不是在主机上!

所以我的问题是如何在 kafka 集群上提交 Kafka 流作业?像 Spark 和 Flink 一样,提供命令spark-submitflink run在集群上部署应用程序。

apache-kafka apache-kafka-streams

5
推荐指数
1
解决办法
2992
查看次数

将字符串转换为十六进制代码时的java字符集解码问题

我读取了一串字节,我想将其转换为十六进制代码。我知道一般数据是:

aa 01 00 1c 1e 38 5a 19 26 fc 00 7a e1 48 00 00 ff bf 00 58 01 2c 00 00 00 64 25 ff

我使用以下代码将字符串转换为十六进制:

 byte[] temp = record.value().getBytes();
                    StringBuffer result = new StringBuffer();
                    for (byte b : temp) {
                        result.append(String.format("%02X ", b));
                        result.append(" "); // delimiter
                    }
                    System.out.println(result);
Run Code Online (Sandbox Code Playgroud)

输出是:

EF BF BD 01 00 1C 1E 39 5A 18 40 EF BF BD 00 38 51 EF BF BD 00 00 EF BF BD …

java unicode byte

4
推荐指数
1
解决办法
3807
查看次数

未找到org.apache.flink.streaming.api.scala.DataStream的Apache Flink-类文件

使用Apache Flink版本1.3.2和Cassandra 3.11,我编写了一个简单的代码,使用Apache Flink Cassandra连接器将数据写入Cassandra.以下是代码:

final Collection<String> collection = new ArrayList<>(50);
        for (int i = 1; i <= 50; ++i) {
            collection.add("element " + i);
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<UUID, String>> dataStream = env
                .fromCollection(collection)
                .map(new MapFunction<String, Tuple2<UUID, String>>() {

                    final String mapped = " mapped ";
                    String[] splitted;

                    @Override
                    public Tuple2<UUID, String> map(String s) throws Exception {
                        splitted = s.split("\\s+");
                        return new Tuple2(
                                UUID.randomUUID(),
                                splitted[0] + mapped + splitted[1]
                        );
                    }
                });
        dataStream.print();
        CassandraSink.addSink(dataStream)
                .setQuery("INSERT INTO test.phases (id, …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

2
推荐指数
1
解决办法
1447
查看次数

使用正则表达式在数组字符串中拆分JSON对象

我有以下格式的字符串:

[{"HostName":"taskmanager1","Rack":"/default-rack","State":"RUNNING","NodeId":"taskmanager1:45454","NodeHTTPAddress":"taskmanager1:8042","LastHealthUpdate":1519568501615,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},{"HostName":"datanode2","Rack":"/default-rack","State":"RUNNING","NodeId":"datanode2:45454","NodeHTTPAddress":"datanode2:8042","LastHealthUpdate":1519260876106,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},{"HostName":"taskmanager3","Rack":"/default-rack","State":"RUNNING","NodeId":"taskmanager3:45454","NodeHTTPAddress":"taskmanager3:8042","LastHealthUpdate":1519568502251,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},{"HostName":"datanode3","Rack":"/default-rack","State":"RUNNING","NodeId":"datanode3:45454","NodeHTTPAddress":"datanode3:8042","LastHealthUpdate":1519260871527,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},{"HostName":"taskmanager2","Rack":"/default-rack","State":"RUNNING","NodeId":"taskmanager2:45454","NodeHTTPAddress":"taskmanager2:8042","LastHealthUpdate":1519568502259,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},{"HostName":"datanode1","Rack":"/default-rack","State":"RUNNING","NodeId":"datanode1:45454","NodeHTTPAddress":"datanode1:8042","LastHealthUpdate":1519260875647,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024}]
Run Code Online (Sandbox Code Playgroud)

我想将其拆分为多种(此处为6种)JSON格式,但是我的模式无法按需将其拆分。

我想要这样的东西:

{"HostName":"taskmanager1","Rack":"/default-rack","State":"RUNNING","NodeId":"taskmanager1:45454","NodeHTTPAddress":"taskmanager1:8042","LastHealthUpdate":1519568501615,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},
{"HostName":"datanode2","Rack":"/default-rack","State":"RUNNING","NodeId":"datanode2:45454","NodeHTTPAddress":"datanode2:8042","LastHealthUpdate":1519260876106,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},
{"HostName":"taskmanager3","Rack":"/default-rack","State":"RUNNING","NodeId":"taskmanager3:45454","NodeHTTPAddress":"taskmanager3:8042","LastHealthUpdate":1519568502251,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},
{"HostName":"datanode3","Rack":"/default-rack","State":"RUNNING","NodeId":"datanode3:45454","NodeHTTPAddress":"datanode3:8042","LastHealthUpdate":1519260871527,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024}
,{"HostName":"taskmanager2","Rack":"/default-rack","State":"RUNNING","NodeId":"taskmanager2:45454","NodeHTTPAddress":"taskmanager2:8042","LastHealthUpdate":1519568502259,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024},
{"HostName":"datanode1","Rack":"/default-rack","State":"RUNNING","NodeId":"datanode1:45454","NodeHTTPAddress":"datanode1:8042","LastHealthUpdate":1519260875647,"HealthReport":"","NodeManagerVersion":"2.8.3","NumContainers":0,"UsedMemoryMB":0,"AvailableMemoryMB":1024}
Run Code Online (Sandbox Code Playgroud)

使用代码:

List<String> res = Arrays.asList(temp.replace('[', ' ').replace(']',' ').trim()).split(",");
Run Code Online (Sandbox Code Playgroud)

它将针对每个,字符进行拆分,并且使用该模式split("},\\}")还将删除}{字符。

如何将其拆分为制作Json对象的愿望?

使用Java模式(\\{.+})将整个字符串分组。

java regex json

1
推荐指数
1
解决办法
991
查看次数

如何在 Flink 流处理窗口中收集延迟数据

假设我有一个数据流,其中包含事件时间数据。我想在 8 毫秒的窗口时间内收集输入数据流并减少每个窗口数据。我使用以下代码来做到这一点:

aggregatedTuple
          .keyBy( 0).timeWindow(Time.milliseconds(8))
          .reduce(new ReduceFunction<Tuple2<Long, JSONObject>>()
Run Code Online (Sandbox Code Playgroud)

Point数据流的关键是处理时间的时间戳映射到处理毫秒的时间戳的后8个约数,例如1531569851297将映射到1531569851296

但数据流可能延迟到达并进入错误的窗口时间。例如,假设我将窗口时间设置为 8 毫秒。如果数据按顺序进入 Flink 引擎或至少延迟小于窗口时间(8 毫秒),这将是最好的情况。但假设数据流事件时间(也是数据流中的一个字段)已到达,延迟时间为 30 毫秒。所以它会进入错误的窗口,我想如果我检查每个数据流的事件时间,因为它想进入窗口,我可以过滤这么晚的数据。所以我有两个问题:

  • 如何在数据流想要进入窗口时过滤数据流并检查数据是否在窗口的正确时间戳创建?
  • 如何将如此晚的数据收集到变量中以对它们进行一些处理?

stream-processing windowing apache-flink

1
推荐指数
1
解决办法
2452
查看次数

Kafka在创建新主题时会产生大量额外数据

我有一个 3 节点 Zookeeper 集群版本 3.4.11 和 2 节点 Kafka 集群版本 0.11.3。我们编写了一个生产者,将消息发送到Kafka集群的特定主题和分区(我之前做过,并且对生产者进行了测试)。以下是经纪人配置:

broker.id=1
listeners=PLAINTEXT://node1:9092
num.partitions=24
delete.topic.enable=true
default.replication.factor=2
log.dirs=/data
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181
log.retention.hours=168
zookeeper.session.timeout.ms=40000
zookeeper.connection.timeout.ms=10000
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=2
Run Code Online (Sandbox Code Playgroud)

一开始,经纪人上没有主题,它们会自动创建。当我启动生产者时,Kafka 集群表现出奇怪的行为:

1-它创建了所有主题,但虽然生成数据的速率为每秒 10KB,但在不到一分钟的时间内,每个代理的日志目录从零数据变为 9.0 GB 数据!并且所有代理都关闭(因为缺乏日志目录容量)

2-就在开始生成数据时,我尝试使用控制台消费者使用数据,但它只是出错

WARN Error while fetching metadata with correlation id 2 : {Topic1=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
Run Code Online (Sandbox Code Playgroud)

3-以下是经纪商日志中反复出现的错误:

INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset-1} for Partition: Topic6-6. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
WARN Newly rolled segment file 00000000000000000000.log already exists; deleting it first (kafka.log.Log) …
Run Code Online (Sandbox Code Playgroud)

apache-kafka

0
推荐指数
1
解决办法
1921
查看次数