我有一个基本的流处理流程,看起来像
master topic -> my processing in a mapper/filter -> output topics
Run Code Online (Sandbox Code Playgroud)
我想知道处理"坏消息"的最佳方法.这可能是我无法正确反序列化的消息,或者处理/过滤逻辑可能以某种意外的方式失败(我没有外部依赖,所以不应该有这种类型的瞬态错误).
我正在考虑将所有处理/过滤代码包装在try catch中,如果出现异常,则路由到"错误主题".然后我可以研究该消息并对其进行修改或修改我的代码,然后将其重播为master.如果我让任何异常传播,则流似乎被卡住并且不再拾取消息.
为了完整性,这里是我的代码(伪ish):
class Document {
// Fields
}
class AnalysedDocument {
Document document;
String rawValue;
Exception exception;
Analysis analysis;
// All being well
AnalysedDocument(Document document, Analysis analysis) {...}
// Analysis failed
AnalysedDocument(Document document, Exception exception) {...}
// Deserialisation failed
AnalysedDocument(String rawValue, Exception exception) {...}
}
KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
.stream(Serdes.String(), Serdes.String(), "master")
.mapValues(new ValueMapper<String, AnalysedDocument>() { …Run Code Online (Sandbox Code Playgroud) 如何从单个主题创建多个流?当我做这样的事情时:
KStreamBuilder builder = new KStreamBuilder();
builder.stream(Serdes.String(), Serdes.String(), "master")
/* Filtering logic */
.to(Serdes.String(), Serdes.String(), "output1");
builder.stream(Serdes.String(), Serdes.String(), "master")
/* Filtering logic */
.to(Serdes.String(), Serdes.String(), "output2");
KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic master has already been registered by another source.
at org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:347)
at org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:92)
Run Code Online (Sandbox Code Playgroud)
我是否需要为"master"中的每个流创建另一个KafkaStream实例?
我有一个主要主题和多个谓词,每个谓词都有一个与之关联的输出主题.我想将每条记录发送到其谓词解析为true的所有主题.我使用Luwak来测试记录满足哪个谓词(使用这个库,你用一个谓词列表评估一个文件,它告诉你哪些匹配 - 即我只调用一次得到满意的谓词列表).
我正在尝试使用Kafka Streams,但KStream上似乎没有合适的方法(KStream#branch仅将记录路由到单个主题).
一种可能的方法如下:
Stream from master
Map the values into a format with the original content and the list of matching predicates
Stream to an intermediate with-matches topic
For each predicate/output topic
Stream from intermediate with-matches topic
Filter "does list of matches predicates contain predicate ID"
Map the values to just the original content
Stream to corresponding output topic
Run Code Online (Sandbox Code Playgroud)
然而,这样一个中间话题似乎"笨拙".有更好的建议吗?
我在用:
我希望在OSGi容器(apache-servicemix-4.4.1-fuse-06-03)中运行一个Akka项目,但是在安装依赖库时遇到一些麻烦.我已安装akka-actor并config如下:
osgi:install -s mvn:com.typesafe.akka/akka-actor/2.1-SNAPSHOTosgi:install -s mvn:com.typesafe/config/0.4.1但我无法安装scala-library(需要v2.9.2).我尝试创建自己的捆绑使用maven-bundle-plugin但无效,并已谷歌搜索多年.
任何帮助将不胜感激.
我希望有一个主题(日志保留7天)和几个较小的主题,过滤的语料库具有较小的日志保留(2天).这可能吗?
注意:我正在使用Kafka v0.10.1.1.
我想从kafka主题中定期读取一批消息,或者当读取的消息数达到一定数量时,将它们作为批处理发送到下游系统.目前,我的kafka拓扑结构由处理器终止,处理器保存消息,然后使用punctuate方法逐步处理批处理.
但是,我不确定这是完美的,因为如果应用程序在调用punctuate方法之前崩溃,我认为一些消息会丢失(即消费者认为它已经完成了它们但它们不会出现在下游系统中) .
batchQueue = new LinkedBlockingQueue<String>(batchSize);
KStream<String, String> inputStream = builder
.stream(Serdes.String(), Serdes.String(), "source-topic")
.process(new ProcessorSupplier<String, String>() {
@Override
public Processor<String, String> get() {
return new AbstractProcessor<String, Wrapper>() {
@Override
public void init(ProcessorContext context) {
super.init(context);
context.schedule(flushPeriod);
}
@Override
public void process(String key, String value) {
batchQueue.add(value);
if (batchQueue.remainingCapacity() == 0) {
processQueue();
}
}
@Override
public void punctuate(long timestamp) {
processQueue();
context().commit();
}
}
@Override
public void close() {}
};
}
});
Run Code Online (Sandbox Code Playgroud)
什么(如果有的话)是决定Clojure核心中参数函数顺序的规则?
map并filter期望数据结构作为最后一个参数.assoc并select-keys期望数据结构作为第一个参数.map并filter期望函数作为第一个参数.update-in期望函数作为最后一个参数的函数.这可能会在使用线程宏时造成痛苦(我知道我可以使用as->),那么这些决定背后的原因是什么?我也很高兴知道我的功能可以尽可能地与伟人所写的一致.
我们的数据库使用 Elasticsearch v5.6.12。我们经常使用批量 REST api 更新它。有时单个请求不会改变任何东西(即 Elasticsearch 已经是最新的文档的值)。如何检测这些实例?
我看到了这个(https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html),但我不确定它是否适用于我们的情况。
我刚刚开始与Akka合作,以便在OSGi容器中使用它(apache-servicemix-4.4.1-fuse-06-03).人们发现哪种版本的Akka最适合使用.在撰写本文时,似乎有3种选择:
思考?
我的代码:
(ns model.document
(:gen-class
:name model.document
:implements java.io.Serializable
:state "state"
:init "init"
:constructors {[String String String] []}
:methods [[getContent [] String]
[getTitle [] String]
[getUrl [] String]]))
(defn -init [content title url]
[[] (atom {:content content
:title title
:url url})])
(defn- get-field [this k]
(@(.state this) k))
(defn getContent [this]
(get-field this :content))
(defn getTitle [this]
(get-field this :title))
(defn getUrl [this]
(get-field this :url))
Run Code Online (Sandbox Code Playgroud)
它的用途是:
(ns classification-server.classifier
(:require [model.document :refer :all]))
(new model.document "my-content" "my-title" "my-url")
Run Code Online (Sandbox Code Playgroud)
我得到了无益的帮助:
Caused by: …Run Code Online (Sandbox Code Playgroud)