标签: stream-processing

Akka Stream Kafka vs Kafka Streams

我目前正在与Akka Stream Kafka合作与kafka互动,我很惊讶与Kafka Streams有什么不同.

我知道基于Akka的方法实现了反应性规范并处理了kafka流似乎缺乏的背压和功能.

使用kafka流比akka溪流kafka有什么好处?

scala stream-processing typesafe akka-stream apache-kafka-streams

36
推荐指数
2
解决办法
1万
查看次数

Apache Spark和Apache Apex有什么区别?

Apache Apex - 是一个开源的企业级统一流和批处理平台.它在GE Predix平台中用于物联网.这两个平台之间的主要区别是什么?

问题

  1. 从数据科学的角度来看,它与Spark的不同之处是什么?
  2. Apache Apex是否提供Spark MLlib等功能?如果我们必须在Apache apex上构建可扩展的ML模型,该怎么做以及使用哪种语言?
  3. 数据科学家是否必须学习Java来构建可扩展的ML模型?它有像pyspark这样的python API吗?
  4. Apache Apex可以与Spark集成吗?我们可以在Apex之上使用Spark MLlib构建ML模型吗?

machine-learning stream-processing apache-spark pyspark apache-apex

16
推荐指数
1
解决办法
7868
查看次数

真实世界模拟TIS-100

最近的游戏TIS-100以一个相当有趣的机器架构为中心,其中CPU由"节点"组成,它们可以与相邻的邻居通信.遗憾的是,我找不到参考手册的官方链接供公众讨论,但总的来说,每个节点都支持一个非常简单的ISA,但并行运行在同一个时钟上.每个节点有两个寄存器,一个ACC寄存器和一个BCK辅助寄存器.参考手册说CPU是为流处理而设计的.

这让我感到非常有趣且可能有用.这种架构是否在现实世界中使用?它有点像时钟处理器和FPGA之间的混合.

cpu-architecture stream-processing isa tis-100

14
推荐指数
2
解决办法
1917
查看次数

在实践中(非理论),小批量与实时流之间有什么区别?

在实践中(非理论),小批量与实时流之间有什么区别?从理论上讲,我理解迷你批量是在给定的时间范围内批量生成的,而实时流式更像是在数据到达时做某事但是我最大的问题是为什么不使用epsilon时间框架(比如说一毫秒)或我想了解为什么一个人比其他人更有效的解决方案?

我最近遇到了一个例子,其中迷你批处理(Apache Spark)用于欺诈检测,实时流(Apache Flink)用于欺诈预防.有人还评论说小批量不是防止欺诈的有效解决方案(因为目标是防止交易发生)现在我想知道为什么这对迷你批次(Spark)不会那么有效?为什么以1毫秒的延迟运行迷你批处理无效?批处理是一种在任何地方使用的技术,包括操作系统和内核TCP/IP堆栈,其中磁盘或网络的数据确实被缓冲,那么说一个比其他更有效的令人信服的因素是什么?

data-processing stream-processing batch-processing apache-spark apache-flink

13
推荐指数
3
解决办法
5398
查看次数

akka stream asyncBoundary vs mapAsync

我想明白之间的差别asyncBoundarymapAsync.从一目了然,我猜他们应该是一样的.但是,当我运行代码时,它看起来asyncBoundary比性能更快mapAsync

这是代码

implicit val system = ActorSystem("sourceDemo")
implicit val materializer = ActorMaterializer()


Source(1 to 100).mapAsync(100)(t => Future {t + 1}).mapAsync(100)(t => Future {t * 2}).map(println).to(Sink.ignore).run()
Source(1 to 100).map(_ + 1).withAttributes(Attributes.asyncBoundary).map(_ * 2).map(t => println("async boundary", t)).to(Sink.ignore).run()
Run Code Online (Sandbox Code Playgroud)

输出:异步边界总是比mayAsync完成更快.

从描述asyncBoundary(https://doc.akka.io/docs/akka-stream-and-http-experimental/current/scala/stream-flows-and-basics.html)的文档中,我看到它正在运行在不同的CPU上,但mapAsync是使用Future的多线程.未来也是异步的.

请问有关这两个API的更多说明吗?

scala stream-processing akka akka-stream

10
推荐指数
1
解决办法
1120
查看次数

Apache Apex与Apache Storm有何不同?

Apache Apex看起来与Apache Storm类似.

  • 用户在两个平台上构建应用程序/拓扑作为定向非循环图(DAG).Apex使用运营商/流,Storm使用spouts/streams/bolt.
  • 它们都是实时处理数据而不是批处理.
  • 两者似乎都具有高吞吐量和低延迟

因此,一目了然,两者看起来都很相似,但我并没有完全发挥作用.有人可以解释一下有哪些主要区别?换句话说,我何时应该使用一个而不是另一个?

stream-processing bigdata apache-storm apache-apex

9
推荐指数
1
解决办法
1240
查看次数

服务器CPU和GPU与LAMP

我试图找出更多关于使用HipHop运行php应用程序甚至是c ++编译的php应用程序时可以使用的硬件.我想设置一个微服务器并使用GPU来帮助CPU处理请求......

任何人?

php linux stream-processing

8
推荐指数
1
解决办法
8675
查看次数

同步来自多个数据源的数据

我们的团队正在尝试建立一个预测性维护系统,其任务是查看一组事件并预测这些事件是否描绘了一组已知异常。

我们正处于设计阶段,当前的系统设计如下:

  • 这些事件可能发生在物联网系统的多个来源(例如云平台,边缘设备或任何中间平台)上
  • 这些事件由数据源推送到消息队列系统中(当前,我们选择了Apache Kafka)。
  • 每个数据源都有其自己的队列(Kafka主题)。
  • 从队列中,数据被多个推理引擎(实际上是神经网络)消耗。
  • 根据功能集,推理引擎将订阅多个Kafka主题,并从这些主题中流式传输数据以连续输出推理。
  • 总体架构遵循单一责任原则,这意味着每个组件都将彼此分离并在单独的Docker容器中运行。

问题:

为了将一组事件分类为异常,这些事件必须在同一时间窗口内发生。例如,说有三个数据源将各自的事件推送到Kafka主题中,但是由于某些原因,数据未同步。因此,其中一个推理引擎会从每个kafka主题中提取最新条目,但是所提取数据中的相应事件并不属于同一时间窗口(例如1小时)。由于数据不同步,将导致无效的预测。

我们需要弄清楚如何确保按顺序推送来自所有三个源的数据,以便当推理引擎从多个kakfa主题请求条目(例如最后100个条目)时,每个主题中的对应条目都属于同一时间窗口?

synchronization distributed-system stream-processing apache-kafka iot

8
推荐指数
1
解决办法
222
查看次数

流处理器的 ArrowCircuit 实例,可能会阻塞

Control.Arrow.Operations.ArrowCircuit课程适用于:

可用于解释同步电路的箭头类型。

我想知道这里的同步是什么意思。我在维基百科了一下,他们说的是数字电子产品。我的电子设备非常生锈,所以这里有一个问题:所谓的异步流处理器的这种实例有什么问题(如果有的话):

data StreamProcessor a b = Get (a -> StreamProcessor a b) | 
                           Put b    (StreamProcessor a b) |
                           Halt

instance Category StreamProcessor where
    id = Get (\ x -> Put x id)
  
    Put c bc . ab = Put c (bc . ab)
    Get bbc . Put b ab = (bbc b) . ab
    Get bbc . Get aab = Get $ \ a -> (Get bbc) …
Run Code Online (Sandbox Code Playgroud)

haskell functional-programming circuit stream-processing typeclass

8
推荐指数
1
解决办法
228
查看次数

懒惰地从大文件中提取线条

我试图通过Clojure的大(> 1GB)文件中的行号抓取5行.我快到了,但看到了一些奇怪的事情,我想知道发生了什么.

到目前为止我有:

(defn multi-nth [values indices]
  (map (partial nth values) indices))

(defn read-lines [file indices]
  (with-open [rdr (clojure.java.io/reader file)]
    (let [lines (line-seq rdr)]
      (multi-nth lines indices))))
Run Code Online (Sandbox Code Playgroud)

现在,(read-lines "my-file" [0])工作没有问题.但是,传入[0 1]给我以下堆栈跟踪:

java.lang.RuntimeException: java.io.IOException: Stream closed
        Util.java:165 clojure.lang.Util.runtimeException
      LazySeq.java:51 clojure.lang.LazySeq.sval
      LazySeq.java:60 clojure.lang.LazySeq.seq
         Cons.java:39 clojure.lang.Cons.next
          RT.java:769 clojure.lang.RT.nthFrom
          RT.java:742 clojure.lang.RT.nth
         core.clj:832 clojure.core/nth
         AFn.java:163 clojure.lang.AFn.applyToHelper
         AFn.java:151 clojure.lang.AFn.applyTo
         core.clj:602 clojure.core/apply
        core.clj:2341 clojure.core/partial[fn]
      RestFn.java:408 clojure.lang.RestFn.invoke
        core.clj:2430 clojure.core/map[fn]
Run Code Online (Sandbox Code Playgroud)

在我从文件中读取第二行之前,似乎关闭了流.有趣的是,如果我手动从文件中提取一行,那么(nth lines 200)multi-nth调用适用于所有值<= 200.

知道发生了什么事吗?

clojure stream-processing lazy-evaluation

7
推荐指数
2
解决办法
2690
查看次数