小编Ram*_*man的帖子

Kafka Streams:标点与流程

在流应用程序中的单个任务中,以下两个方法是否独立运行(意味着“process”方法正在处理来自上游源的传入消息,“punctuate”方法也可以根据指定的时间表并行运行,并且WALL_CLOCK_TIME 作为 PunctuationType?)还是它们共享相同的线程,因此它是在给定时间运行的线程,如果是这样,如果 process 方法不断从上游源获取消息,那么 punctuate 方法将永远不会被调用?

  • Processor.process(K key, V value)
    使用给定的键和值处理记录。

  • ProcessorContext.schedule(long interval, PunctuationType type, Punctuator callback)
    为处理器安排周期性操作。

另外,请说明在 punctuate 方法中分区 id 值为 -1 是什么意思。punctuate 方法不是特定于任何分区吗?

  • int ProcessorContext.partition()
    返回当前输入记录的分区id;如果它不可用,则可能是 -1(例如,如果从标点调用中调用此方法)

apache-kafka apache-kafka-streams

7
推荐指数
1
解决办法
4288
查看次数

subprocess.Popen-没有这样的文件或目录

码:

import subprocess

process = subprocess.Popen('echo 5')
Run Code Online (Sandbox Code Playgroud)

错误:

Traceback (most recent call last):
  File "test.py", line 3, in <module>
    process = subprocess.Popen('echo 5')
  File "/usr/lib64/python2.6/subprocess.py", line 642, in __init__
    errread, errwrite)
  File "/usr/lib64/python2.6/subprocess.py", line 1238, in _execute_child
    raise child_exception
OSError: [Errno 2] No such file or directory
Run Code Online (Sandbox Code Playgroud)

有人可以告知上述代码有什么问题吗?

python subprocess

3
推荐指数
1
解决办法
5776
查看次数

Kafka Stream:优雅关闭

如果我们在后台启动 KafkaStream 应用程序(例如 Linux),是否有办法从外部向应用程序发出信号,以启动正常关闭?

apache-kafka apache-kafka-streams

3
推荐指数
1
解决办法
8825
查看次数

NiFi:ReplaceTextWithMapping处理器

我有以下插入语句:

  1. insert into temp1 values (test1, test2)

  2. insert into temp2 values (test3)

预期成绩:

  1. insert into temp1 values (100, 200)

  2. insert into temp2 values (300)

从本质上讲,我想更换一次查询的文字test1test2与价值100200分别和第二个查询替换test3与价值300。有人可以为上述用例提供映射文件帮助吗?

我尝试了以下操作,但没有任何效果。

搜索值(RegEx)替换值

(1)(.*values.*)(.*test1)(.*,)(.*test2) -> $2 val1 $4 val2

(2)(.*values.*)(.*test1) -> $2 val3
Run Code Online (Sandbox Code Playgroud)

regex apache-nifi

2
推荐指数
1
解决办法
484
查看次数

Kafka Stream:KTable 物化

如何识别主题的 KTable 实现何时完成?

例如,假设 KTable 有几百万行。伪代码如下:

KTable<String, String> kt = kgroupedStream.groupByKey(..).reduce(..); //Assume this produces few million rows
Run Code Online (Sandbox Code Playgroud)

在某个时间点,我想安排一个线程来调用以下写入主题的线程: kt.toStream().to("output_topic_name");

我想确保所有数据都作为上述调用的一部分写入。此外,一旦调用上述“to”方法,是否可以在下一个计划中调用它,或者第一个调用是否始终保持活动状态?

后续问题:

约束
1) 好的,我看到一旦 kafkastream 启动,kstream 和 ktable 是无界/无限的。但是,ktable 实现(到压缩的主题)不会在指定时间段内为同一键发送多个条目。

因此,除非压缩过程尝试清理这些并仅保留最新的,否则下游应用程序将使用从主题查询的相同键的所有可用条目,从而导致重复。即使压缩过程进行了某种程度的清理,也总是不可能在给定的时间点,随着压缩过程的追赶,某些键具有多个条目。

我假设 KTable 在 RocksDB 中只有一个给定键的记录。如果我们有办法安排实现,这将有助于避免重复。此外,减少在主题中持久化的数据量(增加存储量),增加网络流量,压缩过程的额外开销以清理它。

2) 也许 ReadOnlyKeyValueStore 将允许从存储中进行受控检索,但它仍然缺乏调度键、值和写入主题的方法,这需要额外的编码。

是否可以改进 API 以允许受控实现?

apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
3323
查看次数

Kafka Streams:内部主题分区

Kafka 版本:1.0.0
假设流应用程序使用低级处理器 API,它维护状态并从具有 10 个分区的主题中读取。请澄清内部主题是否应使用相同数量的分区创建,或者是根据代理默认值创建的。如果是后者,如果我们需要增加内部主题的分区,有什么办法吗?

apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
1992
查看次数

Kafka Streams:ConsumerRebalanceListener 实现

您能否告知以下类需要如何在流配置中注册?

public class MyConsumerRebalanceListener implements ConsumerRebalanceListener {

  static final Logger oLogger = Logger.getLogger(StreamConsumerRebalanceListener.class);

  @Override
  public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
    for (TopicPartition p : partitions) {
        oLogger.info(p + " partitions has been assigned to the stream instance");
    }

  }

  @Override
  public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
    for (TopicPartition p : partitions) {
        oLogger.warn(p + " partitions has been removed from the stream instance");
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-kafka-streams

2
推荐指数
1
解决办法
1290
查看次数

Kafka Stream:消费者提交频率

有了至少一次的保证,我知道发生故障的情况下可能会重复。但是,
1)Kafka Stream库执行提交的频率如何?
2)除了上述内容,用户是否还需要考虑提交?
3)是否有关于执行提交频率的最佳实践?

apache-kafka apache-kafka-streams

1
推荐指数
1
解决办法
659
查看次数