在实践中(非理论),小批量与实时流之间有什么区别?从理论上讲,我理解迷你批量是在给定的时间范围内批量生成的,而实时流式更像是在数据到达时做某事但是我最大的问题是为什么不使用epsilon时间框架(比如说一毫秒)或我想了解为什么一个人比其他人更有效的解决方案?
我最近遇到了一个例子,其中迷你批处理(Apache Spark)用于欺诈检测,实时流(Apache Flink)用于欺诈预防.有人还评论说小批量不是防止欺诈的有效解决方案(因为目标是防止交易发生)现在我想知道为什么这对迷你批次(Spark)不会那么有效?为什么以1毫秒的延迟运行迷你批处理无效?批处理是一种在任何地方使用的技术,包括操作系统和内核TCP/IP堆栈,其中磁盘或网络的数据确实被缓冲,那么说一个比其他更有效的令人信服的因素是什么?
data-processing stream-processing batch-processing apache-spark apache-flink
我正在研究分布式系统并参考这个老问题:stackoverflow链接
我真的无法理解完全一次,至少一次和最多一次保证之间的区别,我在Kafka,Flink和Storm以及Cassandra中也读到了这些概念.例如有人说Flink更好,因为只有一次保证,而Storm只有至少一次.
我知道,一次性模式对延迟更好,但同时对于容错更糟糕吗?如果我没有重复,如何恢复流?然后......如果这是一个真正的问题,为什么一次保证被认为比其他保证更好?
有人可以给我更好的定义吗?
有人成功使用Apache Flink 0.9来处理存储在AWS S3上的数据吗?我发现他们正在使用自己的S3FileSystem而不是Hadoop中的一个...而且看起来它不起作用.我把以下路径s3://bucket.s3.amazonaws.com/文件夹失败,但出现以下异常:
java.io.IOException:无法建立与Amazon S3的连接:com.amazonaws.services.s3.model.AmazonS3Exception:我们计算的请求签名与您提供的签名不匹配.检查您的密钥和签名方法.(服务:Amazon S3;状态代码:403;
我想从IDE本地启动它时访问Flink Web界面.
我需要这个,因为我想访问Flink的计数器(累加器).
有没有办法用Flink Streaming计算流中唯一单词的数量?结果将是一个不断增加的数字流.
我正在研究通过Google Dataflow/Apache Beam处理来自Web用户会话的日志,并且需要将用户的日志(流式传输)与上个月用户会话的历史记录相结合.
我看过以下方法:
element
的processElement(ProcessContext processContext)
我的理解是,通过加载的数据.withSideInputs(pCollectionView)
需要适合内存.我知道我可以将所有单个用户的会话历史记录放入内存,但不是所有会话历史记录.
我的问题是,是否有一种方法可以从仅与当前用户会话相关的侧输入加载/流式传输数据?
我想象一个parDo函数,它将通过指定用户的ID从侧面输入加载用户的历史会话.但只有当前用户的历史会话才适合内存; 通过侧输入加载所有历史会话将太大.
一些伪代码来说明:
public static class MetricFn extends DoFn<LogLine, String> {
final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;
public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
this.pHistoryView = historyView;
}
@Override
public void processElement(ProcessContext processContext) throws Exception {
Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);
final LogLine currentLogLine = processContext.element();
final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
processContext.output(outputMetric);
}
}
Run Code Online (Sandbox Code Playgroud) 我有一个java应用程序,我正在使用它Flink Api
.所以基本上我正在尝试用代码做的是创建两个记录很少的数据集,然后将它们作为两个表和必要的字段注册.
DataSet<Company> comp = env.fromElements(
new Company("Aux", 1),
new Company("Comp2", 2),
new Company("Comp3", 3));
DataSet<Employee> emp = env.fromElements(
new Employee("Kula", 1),
new Employee("Ish", 1),
new Employee("Kula", 3));
tEnv.registerDataSet("Employee", emp, "name, empId");
tEnv.registerDataSet("Company", comp, "cName, empId");
Run Code Online (Sandbox Code Playgroud)
然后我尝试使用以下方法连接这两个表Table API
:
Table anotherJoin = tEnv.sql("SELECT Employee.name, Employee.empId, Company.cName FROM " +
"Employee RIGHT JOIN Company on Employee.empId = Company.empId");
Run Code Online (Sandbox Code Playgroud)
而我只是在控制台上打印结果.这在我的机器上完美地工作.我创建了一个fat-jar
使用maven-shade-plugin
with依赖项,我正在尝试在AWS中执行它Lambda
.
因此,当我尝试在那里执行它时,我会被抛出以下异常(我只发布前几行):
reference.conf @ file:/var/task/reference.conf:804:无法将替换解析为值:$ {akka.stream.materializer}:com.typesafe.config.ConfigException $ UnresolvedSubstitution com.typesafe.config.ConfigException $ …
我试图在Flink中使用Scala XML库来解析XML,但我无法使其工作.请注意,我需要在相同的处理函数中对我的代码使用序列化和非序列化(字符串)版本.
我尝试过不同的解决方案,它们总是在IntelliJ中工作,但是当我在Flink集群上运行时却没有.他们总是回归不同java.lang.LinkageError: com/sun/org/apache/xerces/internal/jaxp/SAXParserImpl$JAXPSAXParser
; 我尝试了很多东西,但我仍然得到类似于这个的错误.
这是我的Flink Job的样子:
object StreamingJob {
import org.apache.flink.streaming.api.scala._
val l = List(
"""<ciao>ciao</ciao>""",
)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// set up kafka section excluded
env.setParallelism(10)
val stream = env.fromCollection(l)
stream
.uid("process")
.map(new Processor)
.print
env.execute("Flink-TEST")
}
}
Run Code Online (Sandbox Code Playgroud)
这是我的处理功能的一个例子:
import javax.xml.parsers.{SAXParser, SAXParserFactory}
import org.apache.flink.api.common.functions.MapFunction
import scala.xml.{Elem, XML}
import scala.xml.factory.XMLLoader
class Processor extends MapFunction[String, String] {
override def map(translatedMessage: String): String = {
val xml = Processor.xmlLoader.loadString(translatedMessage)
xml.toString
}
} …
Run Code Online (Sandbox Code Playgroud) 卡住了一下CoFlatMapFunction
.如果我把它放在DataStream
之前的窗口上似乎工作正常但是如果放在窗口的"应用"功能之后就失败了.
我正在测试两个流,主要的"功能"在flatMap1
不断摄取数据和控制流"模型"时flatMap2
根据请求更改模型.
我能够设置并看到b0/b1正确设置flatMap2
,但flatMap1
始终看到b0和b1在初始化时设置为0.
我错过了一些明显的东西吗?
public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
private static final long serialVersionUID = 1L;
Double b0;
Double b1;
public applyModel(){
b0=0.0;
b1=0.0;
}
@Override
public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
System.out.print("Main: " + this + "\n");
}
@Override
public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
System.out.print("Old Model: " + this + "\n");
b0 = value.getB0();
b1 = value.getB1();
System.out.print("New Model: " …
Run Code Online (Sandbox Code Playgroud) 我使用Flink(最新通过git)从kafka流到cassandra.为了简化单元测试我通过Dagger添加依赖注入.
ObjectGraph似乎正在正确设置自己,但是'内部对象'被Flink标记为"不可序列化".如果我直接包含这些对象,它们可以工作 - 那么有什么区别?
有问题的类实现了MapFunction和@Inject一个用于cassandra的模块和一个用于读取配置文件的模块.
有没有办法建立这个,所以我可以使用后期绑定或Flink使这不可能?
fwiw - 依赖注入(通过匕首)和RichMapFunction不能共存.Dagger不允许您包含任何在其定义中扩展的对象.
通过Dagger Lazy <t>实例化的对象也不会序列化.
线程"main"中的异常org.apache.flink.api.common.InvalidProgramException:对象com.someapp.SaveMap@2e029d61不可序列化
...
引起:java.io.NotSerializableException:dagger.internal.LazyBinding $ 1
apache-flink ×10
java ×2
akka ×1
amazon-s3 ×1
apache-beam ×1
apache-kafka ×1
apache-spark ×1
apache-storm ×1
aws-lambda ×1
cassandra ×1
dagger ×1
hadoop ×1
maven ×1
scala ×1
scala-xml ×1