标签: apache-flink

Apache Flink中的Join输出

在Apache Flink中,如果我在一个主键上连接两个数据集,我会得到一个元组2,其中包含每个数据集的相应数据集条目.

问题是,当将该map()方法应用于输出的元组2数据集时,它看起来并不好看,特别是如果两个数据集的条目具有大量特征.

在两个输入数据集中使用元组得到一些像这样的代码:

var in1: DataSet[(Int, Int, Int, Int, Int)] = /* */
var in2: DataSet[(Int, Int, Int, Int)] = /* */

val out = in1.join(in2).where(0, 1, 2).equalTo(0, 1, 2)
  .map(join => (join._1._1, join._1._2, join._1._3,
                    join._1._4, join._1._5, join._2._4))
Run Code Online (Sandbox Code Playgroud)

我不介意使用POJO或案例类,但我不知道这会如何使它变得更好.

问题1:有没有一种很好的方法来展现元组2?例如使用其他运营商.

问题2:如何在同一个键上处理3个数据集的连接?这将使示例源更加混乱.

谢谢你的帮助.

scala apache-flink

3
推荐指数
1
解决办法
883
查看次数

Hadoop中的分布式文件处理?

我有大量的压缩tar文件,其中每个tar本身包含几个文件.我想提取这些文件,我想使用hadoop或类似的技术来加速处理.有这种问题的工具吗?据我所知,hadoop和类似的框架如spark或flink不直接使用文件,也不会直接访问文件系统.我还想对提取的文件进行一些基本的重命名,并将它们移动到适当的目录中.

我可以想象一个解决方案,其中一个创建所有tar文件的列表.然后将此列表传递给映射器,并且单个映射器从列表中提取一个文件.这是一种合理的方法吗?

hadoop batch-processing apache-spark apache-flink

3
推荐指数
1
解决办法
423
查看次数

从FlinkML多元线性回归中提取权重

我正在为Flink运行示例多元线性回归(0.10-SNAPSHOT).我无法弄清楚如何提取权重(例如斜率和截距,beta0-beta1,你想要称之为什么).我不是斯卡拉的超级经验,这可能是我的问题的一半.

感谢任何人给予的任何帮助.

object Job {
 def main(args: Array[String]) {
    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    val survival = env.readCsvFile[(String, String, String, String)]("/home/danger/IdeaProjects/quickstart/docs/haberman.data")

    val survivalLV = survival
      .map{tuple =>
      val list = tuple.productIterator.toList
      val numList = list.map(_.asInstanceOf[String].toDouble)
      LabeledVector(numList(3), DenseVector(numList.take(3).toArray))
    }

    val mlr = MultipleLinearRegression()
      .setStepsize(1.0)
      .setIterations(100)
      .setConvergenceThreshold(0.001)

    mlr.fit(survivalLV) 
    println(mlr.toString())     // This doesn't do anything productive...
    println(mlr.weightsOption)  // Neither does this.

  }
}
Run Code Online (Sandbox Code Playgroud)

scala machine-learning linear-regression apache-flink flinkml

3
推荐指数
1
解决办法
244
查看次数

在flatMap函数中Apache Flink Streaming类型不匹配

尝试在scala 2.10.4中使用0.10.0 flink版本的流API.在尝试编译第一个版本时:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time._

object Main {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val words : DataStream[String] = text.flatMap[String](
      new Function[String,TraversableOnce[String]] { 
        def apply(line:String):TraversableOnce[String] = line.split(" ")
      })

    env.execute("Window Stream wordcount")
  }
}
Run Code Online (Sandbox Code Playgroud)

我收到编译时错误:

[error]  found   : String => TraversableOnce[String]
[error]  required: org.apache.flink.api.common.functions.FlatMapFunction[String,String]
[error]       new Function[String,TraversableOnce[String]] { def apply(line:String):TraversableOnce[String] = line.split(" ")})
[error]       ^
Run Code Online (Sandbox Code Playgroud)

在我已经包含在项目中的反编译版本的DataStream.class中,有接受这种类型的函数(最后一个):

public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> evidence$12, ClassTag<R> evidence$13) {
        if (flatMapper …
Run Code Online (Sandbox Code Playgroud)

scala apache-flink flink-streaming

3
推荐指数
1
解决办法
2011
查看次数

当我重新运行Flink消费者时,Kafka再次消费最新消息

我在用Scala编写的Apache Flink API中创建了一个Kafka使用者.每当我从某个主题传递一些消息时,它就会接收它们.但是,当我重新启动使用者时,它会消耗发送到该主题的最新消息,而不是接收新消息或未消息消息.

这就是我在做的事情:

  1. 运行生产者:

    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
    
    Run Code Online (Sandbox Code Playgroud)
  2. 运行消费者:

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val st = env
        .addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties))
    env.enableCheckpointing(5000)
    st.print()
    env.execute()
    
    Run Code Online (Sandbox Code Playgroud)
  3. 传递一些消息

  4. 阻止消费者
  5. 再次运行使用者打印我发送的最后一条消息.我希望它只打印新消息.

apache-kafka apache-flink

3
推荐指数
1
解决办法
1139
查看次数

eclipse导入的maven项目无法解析其他导入的maven项目中的符号

我从git clone导入了"flink"项目并配置了相关设置.我可以使用"mvn"在命令行中成功编译/打包它.

有几个项目报告无法解析符号的错误.例如:"MathUtils无法解析AbstractAlignedProcessingTimeWindowOperator.java /flink-streaming-java_2.10/src/main/java/org/apache/flink/streaming/runtime/operators/windowing line 109 Java Problem".

"MathUtils"位于项目"flink-runtime_2.10"中,该项目已经编译并打包到jar中,其输出类文件夹为空.

对于项目"flink-streaming-java_2.10",其"java构建路径" - >库 - >"Maven依赖项"包括"flink-runtime_2.10"的引用项.但它无法解析"flink-runtime_2.10"中的符号.

在项目组中,其构建器为"java builder"和"maven proejct builder"的所有java项目都存在此类问题,但构建器为"scala builder"和"maven project builder"或简单"maven项目构建器"的项目没有问题.

import dependencies m2eclipse maven apache-flink

3
推荐指数
1
解决办法
2707
查看次数

Apache Flink - org.apache.flink.client.program.ProgramInvocationException

我使用Scala 2.11.7使用Apache FLink 1.0.3创建了一个应用程序,我想在本地测试它(单个jvm).所以我按照网站上的说明做了以下事情:

./bin/start-local.sh
tail log/flink-*-jobmanager-*.log
Run Code Online (Sandbox Code Playgroud)

它开始很好,我可以在localhost:8081看到web界面.然后,我尝试提交我的应用程序,但我得到一个例外或一个奇怪的消息.例如,当我键入以下任一命令时:

./bin/flink run ./myApp.jar
./bin/flink run ./myApp.jar -c MyMain
./bin/flink run ./myApp.jar -c myMain.class
./bin/flink run ./myApp.jar -c myMain.scala
./bin/flink run ./myApp.jar -c my.package.myMain
./bin/flink run ./myApp.jar -c my.package.myMain.class
./bin/flink run ./myApp.jar -c my.package.myMain.scala
Run Code Online (Sandbox Code Playgroud)

我得到以下异常:

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file.
    at org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:571)
    at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:188)
    at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:126)
    at org.apache.flink.client.CliFrontend.buildProgram(CliFrontend.java:922)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:301)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Run Code Online (Sandbox Code Playgroud)

当我输入以下任一命令时:

./bin/flink …
Run Code Online (Sandbox Code Playgroud)

scala apache-flink

3
推荐指数
2
解决办法
5225
查看次数

Flink批处理:HDFS上的数据本地规划?

我们一直在玩Flink.到目前为止,我们一直在Hadoop 2.x/YARN上使用Spark和标准M/R.

除了YARN上的Flink执行模型之外,AFAIK不像spark那样是动态的,执行者在YARN中动态地获取和释放虚拟核心,问题的主要内容如下.

Flink看起来真是太神奇了:对于流媒体API,我只会说它很棒并且超越顶级.

Batch API:处理图非常强大,并且以独特的方式进行优化和并行运行,比Spark和其他人更多地利用集群可扩展性,优化完全非常复杂的DAG,这些DAG共享共同的处理步骤.

我发现的唯一缺点是,我希望只是我的误解和缺乏知识,在规划使用HDFS输入的批处理作业时,它似乎不喜欢数据本地处理.

不幸的是,这不是一个小问题,因为在90%的使用案例中,你在HDFS上有一个大数据分区存储,通常你做的事情如下:

  • 读取和过滤(例如只接受失败或成功)
  • 聚合,减少,使用它

第一部分,在简单的M/R或spark中完成时,总是按照" 首选本地处理 " 的惯用法进行规划,以便数据由保持数据块的同一节点处理,更快,以避免数据 - 通过网络转移.

在我们使用3个节点的集群进行的测试中,设置为专门测试此功能和行为,Flink似乎完美地处理了HDFS块,因此,例如,如果文件由3个块组成,Flink完美地处理3个输入分裂并安排它们在平行下.但没有数据位置模式.

请分享您的意见,我希望我只是错过了一些东西,或者它已经在新版本中出现了.提前感谢任何花时间回答这个问题的人.

apache-flink

3
推荐指数
1
解决办法
539
查看次数

Apache Flink:ProcessWindowFunction不适用

我想ProcessWindowFunction在我的Apache Flink项目中使用。但是使用过程函数时出现一些错误,请参见下面的代码片段

错误是:

在类型WindowedStream,元组,TimeWindow>的方法处理(ProcessWindowFunction,R,元组,TimeWindow>)是不适用的参数(JDBCExample.MyProcessWindows)

我的程序:

DataStream<Tuple2<String, JSONObject>> inputStream;

inputStream = env.addSource(new JsonArraySource());

inputStream.keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .process(new MyProcessWindows());
Run Code Online (Sandbox Code Playgroud)

我的ProcessWindowFunction

private class MyProcessWindows 
  extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{

  public void process(
      String key, 
      Context context, 
      Iterable<Tuple2<String, JSONObject>> input, 
      Collector<Tuple2<String, String>> out) throws Exception 
  {
    ...
  }

}
Run Code Online (Sandbox Code Playgroud)

stream-processing apache-flink flink-streaming

3
推荐指数
1
解决办法
478
查看次数

Flink窗口状态大小和状态管理

在阅读了flink的文档并四处搜寻之后,我无法完全理解flink的窗口中如何处理状态。可以说我有一个每小时运行的带有聚合函数的滚动窗口,该函数将msg累积到某些Java pojo或scala case类中。该窗口的大小将与在一小时内进入该窗口的事件的数量相关,或者仅将其与pojo / case类相关,因为会将事件累积到该对象中。(例如,如果将10000 msgs计数为整数,大小将接近10000 * msg大小还是int的大小?)此外,如果im使用pojos或case类,flink是否会为我处理状态(如果内存溢出到磁盘)在检查点用尽/保存状态等)还是我必须使用flink的状态对象?

谢谢你的帮助!

stream-processing apache-flink

3
推荐指数
1
解决办法
363
查看次数