标签: apache-flink

在实践中(非理论),小批量与实时流之间有什么区别?

在实践中(非理论),小批量与实时流之间有什么区别?从理论上讲,我理解迷你批量是在给定的时间范围内批量生成的,而实时流式更像是在数据到达时做某事但是我最大的问题是为什么不使用epsilon时间框架(比如说一毫秒)或我想了解为什么一个人比其他人更有效的解决方案?

我最近遇到了一个例子,其中迷你批处理(Apache Spark)用于欺诈检测,实时流(Apache Flink)用于欺诈预防.有人还评论说小批量不是防止欺诈的有效解决方案(因为目标是防止交易发生)现在我想知道为什么这对迷你批次(Spark)不会那么有效?为什么以1毫秒的延迟运行迷你批处理无效?批处理是一种在任何地方使用的技术,包括操作系统和内核TCP/IP堆栈,其中磁盘或网络的数据确实被缓冲,那么说一个比其他更有效的令人信服的因素是什么?

data-processing stream-processing batch-processing apache-spark apache-flink

13
推荐指数
3
解决办法
5398
查看次数

确切一次和至少一次保证之间的差异

我正在研究分布式系统并参考这个老问题:stackoverflow链接

我真的无法理解完全一次,至少一次和最多一次保证之间的区别,我在Kafka,Flink和Storm以及Cassandra中也读到了这些概念.例如有人说Flink更好,因为只有一次保证,而Storm只有至少一次.

我知道,一次性模式对延迟更好,但同时对于容错更糟糕吗?如果我没有重复,如何恢复流?然后......如果这是一个真正的问题,为什么一次保证被认为比其他保证更好?

有人可以给我更好的定义吗?

cassandra apache-kafka apache-storm apache-flink

13
推荐指数
2
解决办法
6588
查看次数

使用Amazon S3运行Apache Flink

有人成功使用Apache Flink 0.9来处理存储在AWS S3上的数据吗?我发现他们正在使用自己的S3FileSystem而不是Hadoop中的一个...而且看起来它不起作用.我把以下路径s3://bucket.s3.amazonaws.com/文件夹失败,但出现以下异常:

java.io.IOException:无法建立与Amazon S3的连接:com.amazonaws.services.s3.model.AmazonS3Exception:我们计算的请求签名与您提供的签名不匹配.检查您的密钥和签名方法.(服务:Amazon S3;状态代码:403;

hadoop amazon-s3 apache-flink

11
推荐指数
1
解决办法
2577
查看次数

从IDE运行Flink时,如何启动Flink作业管理器Web界面

我想从IDE本地启动它时访问Flink Web界面.

我需要这个,因为我想访问Flink的计数器(累加器).

apache-flink

11
推荐指数
2
解决办法
2275
查看次数

如何计算流中的唯一单词?

有没有办法用Flink Streaming计算流中唯一单词的数量?结果将是一个不断增加的数字流.

apache-flink flink-streaming

11
推荐指数
1
解决办法
2097
查看次数

如何将流数据与Dataflow/Beam中的大型历史数据集相结合

我正在研究通过Google Dataflow/Apache Beam处理来自Web用户会话的日志,并且需要将用户的日志(流式传输)与上个月用户会话的历史记录相结合.

我看过以下方法:

  1. 使用30天的固定窗口:最有可能放入大窗口以适应内存,我不需要更新用户的历史记录,只需参考它
  2. 使用CoGroupByKey连接两个数据集,但这两个数据集必须具有相同的窗口大小(https://cloud.google.com/dataflow/model/group-by-key#join),这在我的案例(24小时对30天)
  3. 使用侧输入检索用户的会话历史对于一个给定elementprocessElement(ProcessContext processContext)

我的理解是,通过加载的数据.withSideInputs(pCollectionView)需要适合内存.我知道我可以将所有单个用户的会话历史记录放入内存,但不是所有会话历史记录.

我的问题是,是否有一种方法可以从仅与当前用户会话相关的侧输入加载/流式传输数据?

我想象一个parDo函数,它将通过指定用户的ID从侧面输入加载用户的历史会话.但只有当前用户的历史会话才适合内存; 通过侧输入加载所有历史会话将太大.

一些伪代码来说明:

public static class MetricFn extends DoFn<LogLine, String> {

    final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;

    public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
        this.pHistoryView = historyView;
    }

    @Override
    public void processElement(ProcessContext processContext) throws Exception {
        Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);

        final LogLine currentLogLine = processContext.element();
        final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
        final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
        processContext.output(outputMetric);
    }
}
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataflow apache-flink apache-beam

11
推荐指数
1
解决办法
2137
查看次数

无法将替换解析为值:AWS Lambda中的$ {akka.stream.materializer}

我有一个java应用程序,我正在使用它Flink Api.所以基本上我正在尝试用代码做的是创建两个记录很少的数据集,然后将它们作为两个表和必要的字段注册.

 DataSet<Company> comp = env.fromElements(
                new Company("Aux", 1),
                new Company("Comp2", 2),
                new Company("Comp3", 3));

        DataSet<Employee> emp = env.fromElements(
                new Employee("Kula", 1),
                new Employee("Ish", 1),
                new Employee("Kula", 3));


        tEnv.registerDataSet("Employee", emp, "name, empId");
        tEnv.registerDataSet("Company", comp, "cName, empId");
Run Code Online (Sandbox Code Playgroud)

然后我尝试使用以下方法连接这两个表Table API:

Table anotherJoin = tEnv.sql("SELECT Employee.name, Employee.empId, Company.cName FROM " +
                "Employee RIGHT JOIN Company on Employee.empId = Company.empId");
Run Code Online (Sandbox Code Playgroud)

而我只是在控制台上打印结果.在我的机器上完美地工作.我创建了一个fat-jar使用maven-shade-pluginwith依赖项,我正在尝试在AWS中执行它Lambda.

因此,当我尝试在那里执行它时,我会被抛出以下异常(我只发布前几行):

reference.conf @ file:/var/task/reference.conf:804:无法将替换解析为值:$ {akka.stream.materializer}:com.typesafe.config.ConfigException $ UnresolvedSubstitution com.typesafe.config.ConfigException $ …

java amazon-web-services akka aws-lambda apache-flink

11
推荐指数
2
解决办法
4254
查看次数

如何在Apache Flink中使用Scala XML?

我试图在Flink中使用Scala XML库来解析XML,但我无法使其工作.请注意,我需要在相同的处理函数中对我的代码使用序列化和非序列化(字符串)版本.

我尝试过不同的解决方案,它们总是在IntelliJ中工作,但是当我在Flink集群上运行时却没有.他们总是回归不同java.lang.LinkageError: com/sun/org/apache/xerces/internal/jaxp/SAXParserImpl$JAXPSAXParser; 我尝试了很多东西,但我仍然得到类似于这个的错误.

这是我的Flink Job的样子:

object StreamingJob {
  import org.apache.flink.streaming.api.scala._

  val l = List(
    """<ciao>ciao</ciao>""",
  )

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // set up kafka section excluded
    env.setParallelism(10)

    val stream = env.fromCollection(l)

    stream
      .uid("process")
      .map(new Processor)
      .print

    env.execute("Flink-TEST")
  }
}
Run Code Online (Sandbox Code Playgroud)

这是我的处理功能的一个例子:

import javax.xml.parsers.{SAXParser, SAXParserFactory}
import org.apache.flink.api.common.functions.MapFunction
import scala.xml.{Elem, XML}
import scala.xml.factory.XMLLoader

class Processor extends MapFunction[String, String] {
  override def map(translatedMessage: String): String = {
    val xml = Processor.xmlLoader.loadString(translatedMessage)
    xml.toString
  }
} …
Run Code Online (Sandbox Code Playgroud)

scala maven scala-xml maven-shade-plugin apache-flink

11
推荐指数
1
解决办法
205
查看次数

Flink:在CoFlatMapFunction中共享状态

卡住了一下CoFlatMapFunction.如果我把它放在DataStream之前的窗口上似乎工作正常但是如果放在窗口的"应用"功能之后就失败了.

我正在测试两个流,主要的"功能"在flatMap1不断摄取数据和控制流"模型"时flatMap2根据请求更改模型.

我能够设置并看到b0/b1正确设置flatMap2,但flatMap1始终看到b0和b1在初始化时设置为0.

我错过了一些明显的东西吗?

public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){
        b0=0.0;
        b1=0.0;
    }

    @Override
    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");
    }

    @Override
    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

10
推荐指数
1
解决办法
1319
查看次数

flink - 使用匕首注射 - 不可序列化?

我使用Flink(最新通过git)从kafka流到cassandra.为了简化单元测试我通过Dagger添加依赖注入.

ObjectGraph似乎正在正确设置自己,但是'内部对象'被Flink标记为"不可序列化".如果我直接包含这些对象,它们可以工作 - 那么有什么区别?

有问题的类实现了MapFunction@Inject一个用于cassandra的模块和一个用于读取配置文件的模块.

有没有办法建立这个,所以我可以使用后期绑定或Flink使这不可能?


编辑:

fwiw - 依赖注入(通过匕首)和RichMapFunction不能共存.Dagger不允许您包含任何在其定义中扩展的对象.

进一步:

通过Dagger Lazy <t>实例化的对象也不会序列化.

线程"main"中的异常org.apache.flink.api.common.InvalidProgramException:对象com.someapp.SaveMap@2e029d61不可序列化
...
引起:java.io.NotSerializableException:dagger.internal.LazyBinding $ 1

java serialization dagger apache-flink

10
推荐指数
1
解决办法
5254
查看次数