我有一个与StateStore交互的处理器,用于过滤并对消息执行复杂的逻辑.在process(key,value)
我context.forward(key,value)
用来发送我需要的键和值的方法中.出于调试目的,我也打印出来.
我有一个KStream mergedStream
,它来自另外两个流的连接.我想将处理器应用于该流的记录.我实现了这个:mergedStream.process(myprocessor,"stateStoreName")
当我启动这个程序时,我可以看到要打印到我的控制台的正确值.但是,如果我使用主题上mergedStream.to("topic")
的值将mergedStream发送到主题,则不是我在处理器中转发的那个,而是原始的.
我使用kafka-streams 0.10.1.0.
将我在处理器中转发的值转换为另一个流的最佳方法是什么?
是否可以将Processor API与KStream DSL创建的流混合使用?
我有一个在 EC2 上运行的 Java 进程,我想在进程出现故障或处于不良状态时在 Cloudwatch 中设置警报(例如,在过去 10 秒左右不向 Cloudwatch 发送心跳)。
做这个的最好方式是什么 ?我认为我需要自定义指标,但没有找到任何专门监视流程的文档。
如果需要,我可以使用 AWS SDK。
我有一个长时间运行的过程,它监听事件并进行一些密集的处理.
目前我Executors.newFixedThreadPool(x)
用来限制同时运行的作业数量,但是根据一天中的时间和其他各种因素,我希望能够动态增加或减少并发线程的数量.
如果我减少并发线程的数量,我希望当前正在运行的作业能够很好地完成.
是否有一个Java库可以让我控制并动态增加或减少线程池中运行的并发线程数?(该类必须实现ExecutorService).
我必须自己实施吗?
java multithreading executorservice threadpool threadpoolexecutor