小编Mat*_*Sax的帖子

什么是Apache Beam?

我正在浏览Apache帖子,发现了一个名为Beam的新术语.任何人都可以解释Apache Beam究竟是什么?我试图谷歌,但无法得到一个明确的答案.

apache-beam

50
推荐指数
2
解决办法
2万
查看次数

测试风暴螺栓和喷口

这是关于用Java编写的风暴拓扑中的单元测试螺栓和喷口的一般性问题.

单元测试(JUnit?)螺栓喷嘴的推荐做法和指南是什么?

例如,我可以为a编写一个JUnit测试Bolt,但是如果没有完全理解框架(如a的生命周期Bolt)和序列化含义,很容易犯错误的基于构造函数的非可序列化成员变量的创建.在JUnit中,此测试将通过,但在拓扑中,它将无法工作.我完全可以想象有许多测试点需要考虑(例如序列化和生命周期的这个例子).

因此,是否建议您使用基于JUnit的单元测试,运行小型模拟拓扑(LocalMode?)并测试该拓扑下的Bolt(或Spout)隐含合约?或者,使用JUnit是否可以,但其含义是我们必须仔细模拟Bolt的生命周期(创建它,调用prepare(),模拟Config等)?在这种情况下,要测试的被测类(Bolt/Spout)的一般测试点是什么?

在创建适当的单元测试方面,其他开发人员做了什么?

我注意到有一个拓扑测试API(参见:https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java).是否更好地使用某些API,并为每个人站起来"测试拓扑" Bolt&Spout(并验证Bolt必须提供的隐式合同,例如 - 它的声明输出)?

谢谢

junit unit-testing apache-storm

48
推荐指数
3
解决办法
2万
查看次数

风暴与三叉戟:何时不使用三叉戟?

我正在使用Storm,它适用于很多用例.最近我看了一下Trident,它是Storm的高级抽象.它支持一次性处理,使状态处理更容易.

但现在我想知道..为什么我不能总是使用Trident而不是Storm?

到目前为止我读到的内容:

  • Trident批量处理消息,因此吞吐时间可能更长.
  • Trident尚无法在拓扑中处理循环.

使用Trident而不是Storm时还有其他缺点吗?因为现在,我认为上面列出的缺点是微不足道的.

Trident无法实现哪些用例?


后果:

自从我问到我的公司决定先去三叉戟这个问题.当出现性能问题时,我们只会使用纯粹的Storm.可悲的是,这不是一个积极的决定,它只是成为默认行为(当时我不在身边).

他们的假设是,在大多数用例中,我们需要状态或仅一次处理,否则我们将在不久的将来需要它.我理解他们的推理是因为从Storm转到Trident或者回来并不是一个简单的转换,但在我个人看来,没有状态的流处理的概念并没有被所有人理解,这是使用Trident的主要原因.

trident apache-storm

39
推荐指数
2
解决办法
2万
查看次数

使用Kafka的Streams API处理错误消息

我有一个基本的流处理流程,看起来像

master topic -> my processing in a mapper/filter -> output topics
Run Code Online (Sandbox Code Playgroud)

我想知道处理"坏消息"的最佳方法.这可能是我无法正确反序列化的消息,或者处理/过滤逻辑可能以某种意外的方式失败(我没有外部依赖,所以不应该有这种类型的瞬态错误).

我正在考虑将所有处理/过滤代码包装在try catch中,如果出现异常,则路由到"错误主题".然后我可以研究该消息并对其进行修改或修改我的代码,然后将其重播为master.如果我让任何异常传播,则流似乎被卡住并且不再拾取消息.

  • 这种方法被认为是最佳做法吗?
  • 有没有方便的Kafka溪流来处理这个问题?我不认为有DLQ的概念......
  • 什么是阻止卡夫卡干扰"坏消息"的替代方法?
  • 有哪些替代错误处理方法?

为了完整性,这里是我的代码(伪ish):

class Document {
    // Fields
}

class AnalysedDocument {

    Document document;
    String rawValue;
    Exception exception;
    Analysis analysis;

    // All being well
    AnalysedDocument(Document document, Analysis analysis) {...}

    // Analysis failed
    AnalysedDocument(Document document, Exception exception) {...}

    // Deserialisation failed
    AnalysedDocument(String rawValue, Exception exception) {...}
}

KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
    .stream(Serdes.String(), Serdes.String(), "master")
    .mapValues(new ValueMapper<String, AnalysedDocument>() { …
Run Code Online (Sandbox Code Playgroud)

error-handling apache-kafka apache-kafka-streams

31
推荐指数
2
解决办法
2万
查看次数

Kafka Streams API:KStream到KTable

我有一个Kafka主题,我发送位置事件(key = user_id,value = user_location).我能够阅读并处理它KStream:

KStreamBuilder builder = new KStreamBuilder();

KStream<String, Location> locations = builder
        .stream("location_topic")
        .map((k, v) -> {
            // some processing here, omitted form clarity
            Location location = new Location(lat, lon);
            return new KeyValue<>(k, location);
        });
Run Code Online (Sandbox Code Playgroud)

这很有效,但我想拥有KTable每个用户的最后一个已知位置.我怎么能这样做?

我能够写一个中间主题并从中读取:

// write to intermediate topic
locations.to(Serdes.String(), new LocationSerde(), "location_topic_aux");

// build KTable from intermediate topic
KTable<String, Location> table = builder.table("location_topic_aux", "store");
Run Code Online (Sandbox Code Playgroud)

有没有一种简单的方法来获得KTable一个KStream?这是我第一个使用Kafka Streams的应用程序,所以我可能会遗漏一些明显的东西.

apache-kafka-streams

31
推荐指数
1
解决办法
1万
查看次数

Kafka Consumer获得关键值对

我目前正在与Kafka和Flink合作,我在我的本地PC上运行了kafka,并创建了一个正在消费的主题.

Desktop\kafka\bin\windows> kafka-console-consumer.bat --zookeeper localhost:2181 -topic test

但它只是检索消息, 在此输入图像描述

有没有办法获得有关该消息的更多细节?让我们说时间?键?我检查了kafka文档,但我没有找到关于这个主题的东西

apache-kafka kafka-consumer-api

29
推荐指数
2
解决办法
3万
查看次数

Kafka如何为每个主题存储偏移量?

在轮询Kafka时,我使用该subscribe()功能订阅了多个主题.现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后seek(),并poll()从一个话题.在轮询数据之前seek(),是否会迭代地调用每个主题名称实现结果?如何在Kafka中准确存储偏移量?

我每个主题都有一个分区,只有一个消费者可以阅读所有主题.

java apache-kafka kafka-consumer-api

16
推荐指数
1
解决办法
1万
查看次数

监视工作人员在apache风暴中崩溃

在集群中运行时,如果发生错误,工作人员通常会死亡(JVM关闭).它可能是由许多因素引起的,大多数时候它是一个挑战(暴风雨的最大困难?),找出导致崩溃的原因.

当然,风暴管理员重新启动死亡工人,并且在风暴集群中活跃度相当不错,工作人员崩溃仍然是一个混乱我们应该避免,因为它增加了开销,延迟(可能很长,直到工人被发现死亡和重生)如果您没有设计拓扑来防止这种情况,则会导致数据丢失.

是否有一种简单的方法/工具/方法来检查风暴工人崩溃的时间和原因?它们没有显示在storm-ui中(而显示了监督者),并且所有东西都需要手动监视(例如jstack + JVM opts).

以下是一些可能发生的情况:

  • 超时和许多可能的原因:java垃圾收集速度慢,网络错误,超时配置错误.我们从管理程序日志本地获得的唯一输出是"state:timeout"或"state:disallowed",这很差.此外,当一名工人死亡时,关于storm-ui的统计数据将重新启动.当你害怕超时时,你最终会使用长时间的,这对于实时处理来说似乎不是一个好的解决方案.
  • 具有意外行为的高背压,饥饿的工人心跳以及例如引发超时.Acking似乎是处理背压的唯一方法,需要根据您的负载精心制作螺栓.不是acking似乎是不行,因为它确实会使工作人员崩溃并最终得到不好的结果(处理的数据比压力下的acking拓扑更少?).
  • 代码运行时异常,有时不会在storm-ui中显示,需要手动检查应用程序日志(最简单的情况).
  • 可以通过JVM转储找到的内存泄漏.

jvm apache-storm

15
推荐指数
1
解决办法
1276
查看次数

Spark vs Flink内存不足

我已经构建了Spark和Flink k-means应用程序.我的测试用例是在3节点集群上对100万个点进行聚类.

当内存中的瓶颈开始时,Flink开始外包到磁盘并且工作缓慢但工作正常.但是,如果内存已满并且再次启动(无限循环?),Spark会丢失执行程序.

我试着在邮件列表的帮助下自定义内存设置,谢谢.但Spark仍然无效.

是否有必要设置任何配置?我的意思是Flink工作的内存很低,Spark也必须能够; 或不?

memory apache-spark apache-flink

14
推荐指数
1
解决办法
2384
查看次数

Storm UI:执行和处理延迟之间的差异

我一直在努力记录当前项目的所有Storm指标的含义.

在此过程中,我从该组响应以及github收集了数据.

虽然有些指标非常自我解释,但我对一些螺栓指标感到困惑.

例如,Process Latency和Execute Latency之间有什么区别?

通过此Google论坛上的帖子,我收集了以下信息:

  • 列表项处理延迟=调用ack时的时间戳 - 执行传递元组时的时间戳

  • 列表项执行延迟=执行函数结束时的时间戳 - 执行传递时的时间戳元组(来源:http://goo.gl/3KRAl)

  • 列表项处理延迟是直到元组被激活的时间,执行延迟是元组执行所花费的时间(来源:http://goo.gl/m0fTC)

根据我在风暴用户界面中看到的内容,我的执行延迟几乎总是大于进程延迟.怎么会这样?任何人都可以帮我确定两种延迟的确切定义吗?

提前致谢!

metrics real-time apache-storm

13
推荐指数
1
解决办法
6093
查看次数