我们正在努力,spark 1.6我们正在努力保持类似事件的全球身份.可能很少有具有相同ID的事件"组"(在示例中为数字.仅为唯一性添加字母).我们知道其中一些事件是相似的,所以我们可以连接它们.我们希望保留以下内容:
Z -> 1, 2, 3
X -> 4
Run Code Online (Sandbox Code Playgroud)
因此,如果将来有一些id为4的事件,我们可以指定X为全局身份.
请查看示例以获得更好的说明:
假设我们有一些流数据进入火花工作.
1a
1b
2c
2d
2e
3f
3g
3h
4i
Run Code Online (Sandbox Code Playgroud)
事件1是我们想要分配的第一次出场1 to Z.接下来我们知道1b和2c是相似的.所以我们想保留某个2->1地图.同样的事情是2e和3f所以我们需要映射3-2.所以,现在我们有3对1->Z,2->1,3->2.
我们想要创造"历史"的道路:Z <- 1 <- 2 <- 3
最后,我们将举办所有活动ID = Z.
1a -> Z
1b -> Z
2c -> Z
2d -> Z
2e -> Z
3f -> Z
3g -> Z
3h -> Z
4i -> …Run Code Online (Sandbox Code Playgroud) 我们正在尝试使用ThriftServer从spark 2.0.0中的spark temp表中查询数据.
首先,我们创建了启用Hive支持的sparkSession.目前,我们使用sqlContext启动ThriftServer,如下所示:
HiveThriftServer2.startWithContext(spark.sqlContext());
Run Code Online (Sandbox Code Playgroud)
我们有注册临时表"spark_temp_table"的火花流:
StreamingQuery streamingQuery = streamedData.writeStream()
.format("memory")
.queryName("spark_temp_table")
.start();
Run Code Online (Sandbox Code Playgroud)
通过直线,我们可以看到临时表(运行SHOW TABLES);
当我们想用这种方法运行第二个作业(第二个sparkSession)时,我们必须用不同的端口启动第二个ThriftServer.
我这里有两个问题:
有没有办法让一个端口上有一个ThriftServer可以访问不同sparkSessions中的所有临时表?
HiveThriftServer2.startWithContext(spark.sqlContext());用@DeveloperApi.注释.有没有办法以编程方式在代码中启动带有上下文的thrift服务器?
我看到--conf spark.sql.hive.thriftServer.singleSession=true在启动时传递给ThriftServer的配置(sbin/start-thriftserver.sh)但是我不明白如何为作业定义它.我试图在sparkSession构建器中设置此配置属性,但beeline没有显示临时表.
我很新兴火花,我试图尝试火花提交.我在spring boot中创建了一个应用程序,用于mvn package创建jar.但是当我尝试提交jar时spark-submit,它无法找到Main类.但主要类存在于jar中.
spark-submit --class com.dip.sparkapp.SparkappApplication --master local target/sparkapp-0.0.1-SNAPSHOT.jar
Run Code Online (Sandbox Code Playgroud) 我们正在尝试对结构化流应用程序使用动态过滤器。
假设我们有以下 Spark 结构化流应用程序的伪实现:
spark.readStream()
.format("kafka")
.option(...)
...
.load()
.filter(getFilter()) <-- dynamic staff - def filter(conditionExpr: String):
.writeStream()
.format("kafka")
.option(.....)
.start();
Run Code Online (Sandbox Code Playgroud)
和 getFilter 返回字符串
String getFilter() {
// dynamic staff to create expression
return expression; // eg. "column = true";
}
Run Code Online (Sandbox Code Playgroud)
在当前版本的 Spark 中是否有可能具有动态过滤条件?我的意思是该getFilter()方法应该动态返回一个过滤条件(假设它每 10 分钟刷新一次)。我们试图研究广播变量,但不确定结构化流媒体是否支持这样的事情。
提交作业后似乎无法更新作业的配置。作为部署,我们使用yarn.
每个建议/选项都受到高度赞赏。
编辑:
假设getFilter()回报:
(columnA = 1 AND columnB = true) OR customHiveUDF(columnC, 'input') != 'required' OR columnD > 8
Run Code Online (Sandbox Code Playgroud)
10 分钟后,我们可以有小的变化(在第一个 OR 之前没有第一个表达式),并且可能我们可以有一个新的表达式 ( columnA = 2 …