小编Vla*_*cak的帖子

为不同的事件构建有状态链并在spark中分配全局ID

我们正在努力,spark 1.6我们正在努力保持类似事件的全球身份.可能很少有具有相同ID的事件"组"(在示例中为数字.仅为唯一性添加字母).我们知道其中一些事件是相似的,所以我们可以连接它们.我们希望保留以下内容:

Z -> 1, 2, 3
X -> 4
Run Code Online (Sandbox Code Playgroud)

因此,如果将来有一些id为4的事件,我们可以指定X为全局身份.

请查看示例以获得更好的说明:

假设我们有一些流数据进入火花工作.

1a
1b
2c
2d
2e
3f
3g
3h
4i
Run Code Online (Sandbox Code Playgroud)

事件1是我们想要分配的第一次出场1 to Z.接下来我们知道1b和2c是相似的.所以我们想保留某个2->1地图.同样的事情是2e和3f所以我们需要映射3-2.所以,现在我们有3对1->Z,2->1,3->2.

我们想要创造"历史"的道路:Z <- 1 <- 2 <- 3 最后,我们将举办所有活动ID = Z.

1a -> Z
1b -> Z
2c -> Z
2d -> Z
2e -> Z
3f -> Z
3g -> Z
3h -> Z
4i -> …
Run Code Online (Sandbox Code Playgroud)

java algorithm scala apache-spark spark-streaming

11
推荐指数
1
解决办法
403
查看次数

避免以编程方式使用创建的上下文启动HiveThriftServer2

我们正在尝试使用ThriftServer从spark 2.0.0中的spark temp表中查询数据.

首先,我们创建了启用Hive支持的sparkSession.目前,我们使用sqlContext启动ThriftServer,如下所示:

HiveThriftServer2.startWithContext(spark.sqlContext());
Run Code Online (Sandbox Code Playgroud)

我们有注册临时表"spark_temp_table"的火花流:

StreamingQuery streamingQuery = streamedData.writeStream()
                                             .format("memory")
                                             .queryName("spark_temp_table")
                                             .start();
Run Code Online (Sandbox Code Playgroud)

通过直线,我们可以看到临时表(运行SHOW TABLES);

当我们想用这种方法运行第二个作业(第二个sparkSession)时,我们必须用不同的端口启动第二个ThriftServer.

我这里有两个问题:

  1. 有没有办法让一个端口上有一个ThriftServer可以访问不同sparkSessions中的所有临时表?

  2. HiveThriftServer2.startWithContext(spark.sqlContext());@DeveloperApi.注释.有没有办法以编程方式在代码中启动带有上下文的thrift服务器?
    我看到--conf spark.sql.hive.thriftServer.singleSession=true在启动时传递给ThriftServer的配置(sbin/start-thriftserver.sh)但是我不明白如何为作业定义它.我试图在sparkSession构建器中设置此配置属性,但beeline没有显示临时表.

hadoop hive apache-spark apache-spark-sql apache-spark-2.0

10
推荐指数
1
解决办法
953
查看次数

提交spring boot应用程序jar以spark-submit

我很新兴火花,我试图尝试火花提交.我在spring boot中创建了一个应用程序,用于mvn package创建jar.但是当我尝试提交jar时spark-submit,它无法找到Main类.但主要类存在于jar中.

 spark-submit --class com.dip.sparkapp.SparkappApplication --master local target/sparkapp-0.0.1-SNAPSHOT.jar
Run Code Online (Sandbox Code Playgroud)

spring spring-boot apache-spark

8
推荐指数
1
解决办法
3344
查看次数

spark结构化流动态字符串过滤器

我们正在尝试对结构化流应用程序使用动态过滤器。

假设我们有以下 Spark 结构化流应用程序的伪实现:

spark.readStream()
     .format("kafka")
     .option(...)
     ...
     .load()
     .filter(getFilter()) <-- dynamic staff - def filter(conditionExpr: String):
     .writeStream()
     .format("kafka")
     .option(.....)
     .start();
Run Code Online (Sandbox Code Playgroud)

和 getFilter 返回字符串

String getFilter() {
   // dynamic staff to create expression
   return expression; // eg. "column = true";
}
Run Code Online (Sandbox Code Playgroud)

在当前版本的 Spark 中是否有可能具有动态过滤条件?我的意思是该getFilter()方法应该动态返回一个过滤条件(假设它每 10 分钟刷新一次)。我们试图研究广播变量,但不确定结构化流媒体是否支持这样的事情。

提交作业后似乎无法更新作业的配置。作为部署,我们使用yarn.

每个建议/选项都受到高度赞赏。


编辑: 假设getFilter()回报:

(columnA = 1 AND columnB = true) OR customHiveUDF(columnC, 'input') != 'required' OR columnD > 8
Run Code Online (Sandbox Code Playgroud)

10 分钟后,我们可以有小的变化(在第一个 OR 之前没有第一个表达式),并且可能我们可以有一个新的表达式 ( columnA = 2 …

apache-spark spark-structured-streaming

5
推荐指数
1
解决办法
1868
查看次数