小编Aje*_*eet的帖子

如何在Kubernetes容器/吊舱上安装S3存储桶?

我正在尝试在Amazon EKS集群上运行我的Spark作业。我的Spark作业在每个数据节点/工作人员/执行者处需要一些静态数据(参考数据),并且该参考数据可在S3上获得。

有人可以帮助我找到一个干净且性能良好的解决方案,以将S3桶安装在吊舱上吗?

S3 API是一个选项,我正在将其用于输入记录和输出结果。但是“参考数据”是静态数据,因此我不想在我的spark作业的每次运行/执行中下载它。在首次运行时,作业将下载数据,而即将进行的作业将检查数据是否在本地可用,而无需再次下载。

fuse amazon-s3 s3fs apache-spark kubernetes

8
推荐指数
2
解决办法
5345
查看次数

Cassandra + Spark用于实时分析

我正在为“大数据(空间数据)的实时渲染”应用程序。借助Spark Streaming + Spark SQL + WebSocket,我能够在仪表板上呈现预定义的查询。但是我想通过交互式查询和临时查询来获取数据。

为此,我尝试使用“ Spark Streaming + Cassandra”实现它。这些查询需要聚合并过滤大量数据。

我是Cassandra和Spark的新手,所以我对以下方法感到困惑,这种方法会更好\更快:

  1. Spark Streaming->过滤(Spark)->保存到Cassandra->交互式查询-> UI(仪表板)
  2. Spark流->过滤(Spark)->保存到Cassandra-> Spark SQL->交互式查询-> UI(仪表板)

卡桑德拉(Cassandra)会足够快地给出实时结果吗?或者我应该从Cassandra创建一个RDD来对其执行交互式查询。

查询之一是:

"SELECT *  FROM PERFORMANCE.GEONAMES A  INNER JOIN  
(SELECT max(GEONAMEID) AS MAPINFO_ID FROM  PERFORMANCE.GEONAMES
where longitude between %LL_LONG% and %UR_LONG% 
and latitude between %LL_LAT% and %UR_LAT%  
and %WHERE_CLAUSE% GROUP BY LEFT(QUADKEY, %QUAD_TREE_LEVEL%)  )
AS B ON A.GEONAMEID = B.MAPINFO_ID"
Run Code Online (Sandbox Code Playgroud)

任何意见或建议,将不胜感激。谢谢,

感谢@doanduyhai建议使用SASI二级索引,它确实产生了很大的变化。

cassandra apache-spark spark-streaming spark-dataframe

5
推荐指数
1
解决办法
1428
查看次数

Spark SQL - 如何为自定义对象编写自定义编码器,并将 Map[String, Any] 作为其字段?

我正在尝试将 CSV 加载为自定义对象类型的 DataFrame:

case class Geom(attributes: Map[String, Any])
Run Code Online (Sandbox Code Playgroud)

我试过这个:

import session.implicits._

implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
implicit val geomEncoder = org.apache.spark.sql.Encoders.product[Geom]

val sparkSQLGeometryRDD = session.read
  .option("delimiter", "\t")
  .option("inferSchema", "true")
  .option("header", "true")
  .csv("src\\main\\resources\\TexasPostCodes.txt")
  //.as[MyObjEncoded]//(encoder)
  .persist()

val columns = sparkSQLGeometryRDD.schema.fieldNames
//sparkSQLGeometryRDD.show()

val mappedDF = sparkSQLGeometryRDD
  .map(x => x.getValuesMap[Any](columns.toList))
  .map(x => Geom(x))
  .show
Run Code Online (Sandbox Code Playgroud)

但它抛出了这个异常:

Exception in thread "main" java.lang.ClassNotFoundException: scala.Any
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
Run Code Online (Sandbox Code Playgroud)

有人可以帮我找出我的代码有什么问题吗?

将案例类和编码器从方法中移出后,它工作正常。

object SpatialEncoders {
   implicit def MapEncoder: Encoder[Map[String, Any]]=     Encoders.kryo[Map[String, Any]]
   implicit def GeomEncoder: …
Run Code Online (Sandbox Code Playgroud)

hadoop scala apache-spark apache-spark-sql

5
推荐指数
0
解决办法
1181
查看次数