我正在尝试在Amazon EKS集群上运行我的Spark作业。我的Spark作业在每个数据节点/工作人员/执行者处需要一些静态数据(参考数据),并且该参考数据可在S3上获得。
有人可以帮助我找到一个干净且性能良好的解决方案,以将S3桶安装在吊舱上吗?
S3 API是一个选项,我正在将其用于输入记录和输出结果。但是“参考数据”是静态数据,因此我不想在我的spark作业的每次运行/执行中下载它。在首次运行时,作业将下载数据,而即将进行的作业将检查数据是否在本地可用,而无需再次下载。
我正在为“大数据(空间数据)的实时渲染”应用程序。借助Spark Streaming + Spark SQL + WebSocket,我能够在仪表板上呈现预定义的查询。但是我想通过交互式查询和临时查询来获取数据。
为此,我尝试使用“ Spark Streaming + Cassandra”实现它。这些查询需要聚合并过滤大量数据。
我是Cassandra和Spark的新手,所以我对以下方法感到困惑,这种方法会更好\更快:
卡桑德拉(Cassandra)会足够快地给出实时结果吗?或者我应该从Cassandra创建一个RDD来对其执行交互式查询。
查询之一是:
"SELECT * FROM PERFORMANCE.GEONAMES A INNER JOIN
(SELECT max(GEONAMEID) AS MAPINFO_ID FROM PERFORMANCE.GEONAMES
where longitude between %LL_LONG% and %UR_LONG%
and latitude between %LL_LAT% and %UR_LAT%
and %WHERE_CLAUSE% GROUP BY LEFT(QUADKEY, %QUAD_TREE_LEVEL%) )
AS B ON A.GEONAMEID = B.MAPINFO_ID"
Run Code Online (Sandbox Code Playgroud)
任何意见或建议,将不胜感激。谢谢,
感谢@doanduyhai建议使用SASI二级索引,它确实产生了很大的变化。
我正在尝试将 CSV 加载为自定义对象类型的 DataFrame:
case class Geom(attributes: Map[String, Any])
Run Code Online (Sandbox Code Playgroud)
我试过这个:
import session.implicits._
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
implicit val geomEncoder = org.apache.spark.sql.Encoders.product[Geom]
val sparkSQLGeometryRDD = session.read
.option("delimiter", "\t")
.option("inferSchema", "true")
.option("header", "true")
.csv("src\\main\\resources\\TexasPostCodes.txt")
//.as[MyObjEncoded]//(encoder)
.persist()
val columns = sparkSQLGeometryRDD.schema.fieldNames
//sparkSQLGeometryRDD.show()
val mappedDF = sparkSQLGeometryRDD
.map(x => x.getValuesMap[Any](columns.toList))
.map(x => Geom(x))
.show
Run Code Online (Sandbox Code Playgroud)
但它抛出了这个异常:
Exception in thread "main" java.lang.ClassNotFoundException: scala.Any
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
Run Code Online (Sandbox Code Playgroud)
有人可以帮我找出我的代码有什么问题吗?
将案例类和编码器从方法中移出后,它工作正常。
object SpatialEncoders {
implicit def MapEncoder: Encoder[Map[String, Any]]= Encoders.kryo[Map[String, Any]]
implicit def GeomEncoder: …
Run Code Online (Sandbox Code Playgroud)