在Apache Flink中,如果我在一个主键上连接两个数据集,我会得到一个元组2,其中包含每个数据集的相应数据集条目.
问题是,当将该map()方法应用于输出的元组2数据集时,它看起来并不好看,特别是如果两个数据集的条目具有大量特征.
在两个输入数据集中使用元组得到一些像这样的代码:
var in1: DataSet[(Int, Int, Int, Int, Int)] = /* */
var in2: DataSet[(Int, Int, Int, Int)] = /* */
val out = in1.join(in2).where(0, 1, 2).equalTo(0, 1, 2)
.map(join => (join._1._1, join._1._2, join._1._3,
join._1._4, join._1._5, join._2._4))
Run Code Online (Sandbox Code Playgroud)
我不介意使用POJO或案例类,但我不知道这会如何使它变得更好.
问题1:有没有一种很好的方法来展现元组2?例如使用其他运营商.
问题2:如何在同一个键上处理3个数据集的连接?这将使示例源更加混乱.
谢谢你的帮助.
我有大量的压缩tar文件,其中每个tar本身包含几个文件.我想提取这些文件,我想使用hadoop或类似的技术来加速处理.有这种问题的工具吗?据我所知,hadoop和类似的框架如spark或flink不直接使用文件,也不会直接访问文件系统.我还想对提取的文件进行一些基本的重命名,并将它们移动到适当的目录中.
我可以想象一个解决方案,其中一个创建所有tar文件的列表.然后将此列表传递给映射器,并且单个映射器从列表中提取一个文件.这是一种合理的方法吗?
我正在为Flink运行示例多元线性回归(0.10-SNAPSHOT).我无法弄清楚如何提取权重(例如斜率和截距,beta0-beta1,你想要称之为什么).我不是斯卡拉的超级经验,这可能是我的问题的一半.
感谢任何人给予的任何帮助.
object Job {
def main(args: Array[String]) {
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
val survival = env.readCsvFile[(String, String, String, String)]("/home/danger/IdeaProjects/quickstart/docs/haberman.data")
val survivalLV = survival
.map{tuple =>
val list = tuple.productIterator.toList
val numList = list.map(_.asInstanceOf[String].toDouble)
LabeledVector(numList(3), DenseVector(numList.take(3).toArray))
}
val mlr = MultipleLinearRegression()
.setStepsize(1.0)
.setIterations(100)
.setConvergenceThreshold(0.001)
mlr.fit(survivalLV)
println(mlr.toString()) // This doesn't do anything productive...
println(mlr.weightsOption) // Neither does this.
}
}
Run Code Online (Sandbox Code Playgroud) scala machine-learning linear-regression apache-flink flinkml
尝试在scala 2.10.4中使用0.10.0 flink版本的流API.在尝试编译第一个版本时:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time._
object Main {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val words : DataStream[String] = text.flatMap[String](
new Function[String,TraversableOnce[String]] {
def apply(line:String):TraversableOnce[String] = line.split(" ")
})
env.execute("Window Stream wordcount")
}
}
Run Code Online (Sandbox Code Playgroud)
我收到编译时错误:
[error] found : String => TraversableOnce[String]
[error] required: org.apache.flink.api.common.functions.FlatMapFunction[String,String]
[error] new Function[String,TraversableOnce[String]] { def apply(line:String):TraversableOnce[String] = line.split(" ")})
[error] ^
Run Code Online (Sandbox Code Playgroud)
在我已经包含在项目中的反编译版本的DataStream.class中,有接受这种类型的函数(最后一个):
public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> evidence$12, ClassTag<R> evidence$13) {
if (flatMapper …Run Code Online (Sandbox Code Playgroud) 我在用Scala编写的Apache Flink API中创建了一个Kafka使用者.每当我从某个主题传递一些消息时,它就会接收它们.但是,当我重新启动使用者时,它会消耗发送到该主题的最新消息,而不是接收新消息或未消息消息.
这就是我在做的事情:
运行生产者:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic corr2
Run Code Online (Sandbox Code Playgroud)运行消费者:
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "test")
val env = StreamExecutionEnvironment.getExecutionEnvironment
val st = env
.addSource(new FlinkKafkaConsumer09[String]("corr2", new SimpleStringSchema(), properties))
env.enableCheckpointing(5000)
st.print()
env.execute()
Run Code Online (Sandbox Code Playgroud)传递一些消息
我从git clone导入了"flink"项目并配置了相关设置.我可以使用"mvn"在命令行中成功编译/打包它.
有几个项目报告无法解析符号的错误.例如:"MathUtils无法解析AbstractAlignedProcessingTimeWindowOperator.java /flink-streaming-java_2.10/src/main/java/org/apache/flink/streaming/runtime/operators/windowing line 109 Java Problem".
"MathUtils"位于项目"flink-runtime_2.10"中,该项目已经编译并打包到jar中,其输出类文件夹为空.
对于项目"flink-streaming-java_2.10",其"java构建路径" - >库 - >"Maven依赖项"包括"flink-runtime_2.10"的引用项.但它无法解析"flink-runtime_2.10"中的符号.
在项目组中,其构建器为"java builder"和"maven proejct builder"的所有java项目都存在此类问题,但构建器为"scala builder"和"maven project builder"或简单"maven项目构建器"的项目没有问题.
我使用Scala 2.11.7使用Apache FLink 1.0.3创建了一个应用程序,我想在本地测试它(单个jvm).所以我按照网站上的说明做了以下事情:
./bin/start-local.sh
tail log/flink-*-jobmanager-*.log
Run Code Online (Sandbox Code Playgroud)
它开始很好,我可以在localhost:8081看到web界面.然后,我尝试提交我的应用程序,但我得到一个例外或一个奇怪的消息.例如,当我键入以下任一命令时:
./bin/flink run ./myApp.jar
./bin/flink run ./myApp.jar -c MyMain
./bin/flink run ./myApp.jar -c myMain.class
./bin/flink run ./myApp.jar -c myMain.scala
./bin/flink run ./myApp.jar -c my.package.myMain
./bin/flink run ./myApp.jar -c my.package.myMain.class
./bin/flink run ./myApp.jar -c my.package.myMain.scala
Run Code Online (Sandbox Code Playgroud)
我得到以下异常:
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: Neither a 'Main-Class', nor a 'program-class' entry was found in the jar file.
at org.apache.flink.client.program.PackagedProgram.getEntryPointClassNameFromJar(PackagedProgram.java:571)
at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:188)
at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:126)
at org.apache.flink.client.CliFrontend.buildProgram(CliFrontend.java:922)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:301)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Run Code Online (Sandbox Code Playgroud)
当我输入以下任一命令时:
./bin/flink …Run Code Online (Sandbox Code Playgroud) 我们一直在玩Flink.到目前为止,我们一直在Hadoop 2.x/YARN上使用Spark和标准M/R.
除了YARN上的Flink执行模型之外,AFAIK不像spark那样是动态的,执行者在YARN中动态地获取和释放虚拟核心,问题的主要内容如下.
Flink看起来真是太神奇了:对于流媒体API,我只会说它很棒并且超越顶级.
Batch API:处理图非常强大,并且以独特的方式进行优化和并行运行,比Spark和其他人更多地利用集群可扩展性,优化完全非常复杂的DAG,这些DAG共享共同的处理步骤.
我发现的唯一缺点是,我希望只是我的误解和缺乏知识,在规划使用HDFS输入的批处理作业时,它似乎不喜欢数据本地处理.
不幸的是,这不是一个小问题,因为在90%的使用案例中,你在HDFS上有一个大数据分区存储,通常你做的事情如下:
第一部分,在简单的M/R或spark中完成时,总是按照" 首选本地处理 " 的惯用法进行规划,以便数据由保持数据块的同一节点处理,更快,以避免数据 - 通过网络转移.
在我们使用3个节点的集群进行的测试中,设置为专门测试此功能和行为,Flink似乎完美地处理了HDFS块,因此,例如,如果文件由3个块组成,Flink完美地处理3个输入分裂并安排它们在平行下.但没有数据位置模式.
请分享您的意见,我希望我只是错过了一些东西,或者它已经在新版本中出现了.提前感谢任何花时间回答这个问题的人.
我想ProcessWindowFunction在我的Apache Flink项目中使用。但是使用过程函数时出现一些错误,请参见下面的代码片段
错误是:
在类型WindowedStream,元组,TimeWindow>的方法处理(ProcessWindowFunction,R,元组,TimeWindow>)是不适用的参数(JDBCExample.MyProcessWindows)
我的程序:
DataStream<Tuple2<String, JSONObject>> inputStream;
inputStream = env.addSource(new JsonArraySource());
inputStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
.process(new MyProcessWindows());
Run Code Online (Sandbox Code Playgroud)
我的ProcessWindowFunction:
private class MyProcessWindows
extends ProcessWindowFunction<Tuple2<String, JSONObject>, Tuple2<String, String>, String, Window>
{
public void process(
String key,
Context context,
Iterable<Tuple2<String, JSONObject>> input,
Collector<Tuple2<String, String>> out) throws Exception
{
...
}
}
Run Code Online (Sandbox Code Playgroud) 在阅读了flink的文档并四处搜寻之后,我无法完全理解flink的窗口中如何处理状态。可以说我有一个每小时运行的带有聚合函数的滚动窗口,该函数将msg累积到某些Java pojo或scala case类中。该窗口的大小将与在一小时内进入该窗口的事件的数量相关,或者仅将其与pojo / case类相关,因为会将事件累积到该对象中。(例如,如果将10000 msgs计数为整数,大小将接近10000 * msg大小还是int的大小?)此外,如果im使用pojos或case类,flink是否会为我处理状态(如果内存溢出到磁盘)在检查点用尽/保存状态等)还是我必须使用flink的状态对象?
谢谢你的帮助!
apache-flink ×10
scala ×4
apache-kafka ×1
apache-spark ×1
dependencies ×1
flinkml ×1
hadoop ×1
import ×1
m2eclipse ×1
maven ×1