我正在浏览Apache帖子,发现了一个名为Beam的新术语.任何人都可以解释Apache Beam究竟是什么?我试图谷歌,但无法得到一个明确的答案.
这是关于用Java编写的风暴拓扑中的单元测试螺栓和喷口的一般性问题.
单元测试(JUnit?)螺栓和喷嘴的推荐做法和指南是什么?
例如,我可以为a编写一个JUnit测试Bolt,但是如果没有完全理解框架(如a的生命周期Bolt)和序列化含义,很容易犯错误的基于构造函数的非可序列化成员变量的创建.在JUnit中,此测试将通过,但在拓扑中,它将无法工作.我完全可以想象有许多测试点需要考虑(例如序列化和生命周期的这个例子).
因此,是否建议您使用基于JUnit的单元测试,运行小型模拟拓扑(LocalMode?)并测试该拓扑下的Bolt(或Spout)隐含合约?或者,使用JUnit是否可以,但其含义是我们必须仔细模拟Bolt的生命周期(创建它,调用prepare(),模拟Config等)?在这种情况下,要测试的被测类(Bolt/Spout)的一般测试点是什么?
在创建适当的单元测试方面,其他开发人员做了什么?
我注意到有一个拓扑测试API(参见:https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java).是否更好地使用某些API,并为每个人站起来"测试拓扑" Bolt&Spout(并验证Bolt必须提供的隐式合同,例如 - 它的声明输出)?
谢谢
我正在使用Storm,它适用于很多用例.最近我看了一下Trident,它是Storm的高级抽象.它支持一次性处理,使状态处理更容易.
但现在我想知道..为什么我不能总是使用Trident而不是Storm?
到目前为止我读到的内容:
使用Trident而不是Storm时还有其他缺点吗?因为现在,我认为上面列出的缺点是微不足道的.
Trident无法实现哪些用例?
后果:
自从我问到我的公司决定先去三叉戟这个问题.当出现性能问题时,我们只会使用纯粹的Storm.可悲的是,这不是一个积极的决定,它只是成为默认行为(当时我不在身边).
他们的假设是,在大多数用例中,我们需要状态或仅一次处理,否则我们将在不久的将来需要它.我理解他们的推理是因为从Storm转到Trident或者回来并不是一个简单的转换,但在我个人看来,没有状态的流处理的概念并没有被所有人理解,这是使用Trident的主要原因.
我有一个基本的流处理流程,看起来像
master topic -> my processing in a mapper/filter -> output topics
Run Code Online (Sandbox Code Playgroud)
我想知道处理"坏消息"的最佳方法.这可能是我无法正确反序列化的消息,或者处理/过滤逻辑可能以某种意外的方式失败(我没有外部依赖,所以不应该有这种类型的瞬态错误).
我正在考虑将所有处理/过滤代码包装在try catch中,如果出现异常,则路由到"错误主题".然后我可以研究该消息并对其进行修改或修改我的代码,然后将其重播为master.如果我让任何异常传播,则流似乎被卡住并且不再拾取消息.
为了完整性,这里是我的代码(伪ish):
class Document {
// Fields
}
class AnalysedDocument {
Document document;
String rawValue;
Exception exception;
Analysis analysis;
// All being well
AnalysedDocument(Document document, Analysis analysis) {...}
// Analysis failed
AnalysedDocument(Document document, Exception exception) {...}
// Deserialisation failed
AnalysedDocument(String rawValue, Exception exception) {...}
}
KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
.stream(Serdes.String(), Serdes.String(), "master")
.mapValues(new ValueMapper<String, AnalysedDocument>() { …Run Code Online (Sandbox Code Playgroud) 我有一个Kafka主题,我发送位置事件(key = user_id,value = user_location).我能够阅读并处理它KStream:
KStreamBuilder builder = new KStreamBuilder();
KStream<String, Location> locations = builder
.stream("location_topic")
.map((k, v) -> {
// some processing here, omitted form clarity
Location location = new Location(lat, lon);
return new KeyValue<>(k, location);
});
Run Code Online (Sandbox Code Playgroud)
这很有效,但我想拥有KTable每个用户的最后一个已知位置.我怎么能这样做?
我能够写一个中间主题并从中读取:
// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");
// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");
Run Code Online (Sandbox Code Playgroud)
有没有一种简单的方法来获得KTable一个KStream?这是我第一个使用Kafka Streams的应用程序,所以我可能会遗漏一些明显的东西.
我目前正在与Kafka和Flink合作,我在我的本地PC上运行了kafka,并创建了一个正在消费的主题.
Desktop\kafka\bin\windows> kafka-console-consumer.bat --zookeeper localhost:2181 -topic test
有没有办法获得有关该消息的更多细节?让我们说时间?键?我检查了kafka文档,但我没有找到关于这个主题的东西
在轮询Kafka时,我使用该subscribe()功能订阅了多个主题.现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后seek(),并poll()从一个话题.在轮询数据之前seek(),是否会迭代地调用每个主题名称来实现结果?如何在Kafka中准确存储偏移量?
我每个主题都有一个分区,只有一个消费者可以阅读所有主题.
在集群中运行时,如果发生错误,工作人员通常会死亡(JVM关闭).它可能是由许多因素引起的,大多数时候它是一个挑战(暴风雨的最大困难?),找出导致崩溃的原因.
当然,风暴管理员重新启动死亡工人,并且在风暴集群中活跃度相当不错,工作人员崩溃仍然是一个混乱我们应该避免,因为它增加了开销,延迟(可能很长,直到工人被发现死亡和重生)如果您没有设计拓扑来防止这种情况,则会导致数据丢失.
是否有一种简单的方法/工具/方法来检查风暴工人崩溃的时间和原因?它们没有显示在storm-ui中(而显示了监督者),并且所有东西都需要手动监视(例如jstack + JVM opts).
以下是一些可能发生的情况:
我已经构建了Spark和Flink k-means应用程序.我的测试用例是在3节点集群上对100万个点进行聚类.
当内存中的瓶颈开始时,Flink开始外包到磁盘并且工作缓慢但工作正常.但是,如果内存已满并且再次启动(无限循环?),Spark会丢失执行程序.
我试着在邮件列表的帮助下自定义内存设置,谢谢.但Spark仍然无效.
是否有必要设置任何配置?我的意思是Flink工作的内存很低,Spark也必须能够; 或不?
我一直在努力记录当前项目的所有Storm指标的含义.
在此过程中,我从该组响应以及github收集了数据.
虽然有些指标非常自我解释,但我对一些螺栓指标感到困惑.
例如,Process Latency和Execute Latency之间有什么区别?
通过此Google论坛上的帖子,我收集了以下信息:
列表项处理延迟=调用ack时的时间戳 - 执行传递元组时的时间戳
列表项执行延迟=执行函数结束时的时间戳 - 执行传递时的时间戳元组(来源:http://goo.gl/3KRAl)
和
根据我在风暴用户界面中看到的内容,我的执行延迟几乎总是大于进程延迟.怎么会这样?任何人都可以帮我确定两种延迟的确切定义吗?
提前致谢!
apache-storm ×4
apache-kafka ×3
apache-beam ×1
apache-flink ×1
apache-spark ×1
java ×1
junit ×1
jvm ×1
memory ×1
metrics ×1
real-time ×1
trident ×1
unit-testing ×1