在流应用程序中的单个任务中,以下两个方法是否独立运行(意味着“process”方法正在处理来自上游源的传入消息,“punctuate”方法也可以根据指定的时间表并行运行,并且WALL_CLOCK_TIME 作为 PunctuationType?)还是它们共享相同的线程,因此它是在给定时间运行的线程,如果是这样,如果 process 方法不断从上游源获取消息,那么 punctuate 方法将永远不会被调用?
Processor.process(K key, V value)
使用给定的键和值处理记录。
ProcessorContext.schedule(long interval, PunctuationType type, Punctuator callback)
为处理器安排周期性操作。
另外,请说明在 punctuate 方法中分区 id 值为 -1 是什么意思。punctuate 方法不是特定于任何分区吗?
码:
import subprocess
process = subprocess.Popen('echo 5')
Run Code Online (Sandbox Code Playgroud)
错误:
Traceback (most recent call last):
File "test.py", line 3, in <module>
process = subprocess.Popen('echo 5')
File "/usr/lib64/python2.6/subprocess.py", line 642, in __init__
errread, errwrite)
File "/usr/lib64/python2.6/subprocess.py", line 1238, in _execute_child
raise child_exception
OSError: [Errno 2] No such file or directory
Run Code Online (Sandbox Code Playgroud)
有人可以告知上述代码有什么问题吗?
如果我们在后台启动 KafkaStream 应用程序(例如 Linux),是否有办法从外部向应用程序发出信号,以启动正常关闭?
我有以下插入语句:
insert into temp1 values (test1, test2)
insert into temp2 values (test3)
预期成绩:
insert into temp1 values (100, 200)
insert into temp2 values (300)
从本质上讲,我想更换一次查询的文字test1
,test2
与价值100
,200
分别和第二个查询替换test3
与价值300
。有人可以为上述用例提供映射文件帮助吗?
我尝试了以下操作,但没有任何效果。
搜索值(RegEx)替换值
(1)(.*values.*)(.*test1)(.*,)(.*test2) -> $2 val1 $4 val2
(2)(.*values.*)(.*test1) -> $2 val3
Run Code Online (Sandbox Code Playgroud) 如何识别主题的 KTable 实现何时完成?
例如,假设 KTable 有几百万行。伪代码如下:
KTable<String, String> kt = kgroupedStream.groupByKey(..).reduce(..); //Assume this produces few million rows
Run Code Online (Sandbox Code Playgroud)
在某个时间点,我想安排一个线程来调用以下写入主题的线程: kt.toStream().to("output_topic_name");
我想确保所有数据都作为上述调用的一部分写入。此外,一旦调用上述“to”方法,是否可以在下一个计划中调用它,或者第一个调用是否始终保持活动状态?
后续问题:
约束
1) 好的,我看到一旦 kafkastream 启动,kstream 和 ktable 是无界/无限的。但是,ktable 实现(到压缩的主题)不会在指定时间段内为同一键发送多个条目。
因此,除非压缩过程尝试清理这些并仅保留最新的,否则下游应用程序将使用从主题查询的相同键的所有可用条目,从而导致重复。即使压缩过程进行了某种程度的清理,也总是不可能在给定的时间点,随着压缩过程的追赶,某些键具有多个条目。
我假设 KTable 在 RocksDB 中只有一个给定键的记录。如果我们有办法安排实现,这将有助于避免重复。此外,减少在主题中持久化的数据量(增加存储量),增加网络流量,压缩过程的额外开销以清理它。
2) 也许 ReadOnlyKeyValueStore 将允许从存储中进行受控检索,但它仍然缺乏调度键、值和写入主题的方法,这需要额外的编码。
是否可以改进 API 以允许受控实现?
Kafka 版本:1.0.0
假设流应用程序使用低级处理器 API,它维护状态并从具有 10 个分区的主题中读取。请澄清内部主题是否应使用相同数量的分区创建,或者是根据代理默认值创建的。如果是后者,如果我们需要增加内部主题的分区,有什么办法吗?
您能否告知以下类需要如何在流配置中注册?
public class MyConsumerRebalanceListener implements ConsumerRebalanceListener {
static final Logger oLogger = Logger.getLogger(StreamConsumerRebalanceListener.class);
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition p : partitions) {
oLogger.info(p + " partitions has been assigned to the stream instance");
}
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition p : partitions) {
oLogger.warn(p + " partitions has been removed from the stream instance");
}
}
}
Run Code Online (Sandbox Code Playgroud) 有了至少一次的保证,我知道发生故障的情况下可能会重复。但是,
1)Kafka Stream库执行提交的频率如何?
2)除了上述内容,用户是否还需要考虑提交?
3)是否有关于执行提交频率的最佳实践?