标签: apache-spark-2.0

Spark中的各种连接类型有哪些?

我查看了文档,并说它支持以下连接类型:

要执行的联接类型.默认内心.必须是以下之一:inner,cross,outer,full,full_outer,left,left_outer,right,right_outer,left_semi,left_anti.

我查看了关于SQL连接的StackOverflow答案,并且顶部几个答案没有提到上面的一些连接,例如left_semileft_anti.他们在Spark中意味着什么?

scala apache-spark apache-spark-sql spark-dataframe apache-spark-2.0

36
推荐指数
2
解决办法
5万
查看次数

Spark镶木地板分区:大量文件

我正在尝试利用spark分区.我试图做类似的事情

data.write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)

这里的问题每个分区都会产生大量的镶木地板文件,如果我尝试从根目录中读取,会导致读取速度慢.

为了避免我试过

data.coalese(numPart).write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)

但是,这会在每个分区中创建numPart数量的镶木地板文件.现在我的分区大小不同了.所以我理想的是希望每个分区有单独的合并.然而,这看起来并不容易.我需要访问所有分区合并到一定数量并存储在一个单独的位置.

写入后我应该如何使用分区来避免许多文件?

bigdata apache-spark rdd spark-dataframe apache-spark-2.0

29
推荐指数
4
解决办法
3万
查看次数

Spark 2.0 Dataset vs DataFrame

从spark 2.0.1开始我有一些问题.我阅读了很多文档,但到目前为止找不到足够的答案:

  • 有什么区别
    • df.select("foo")
    • df.select($"foo")
  • 我能正确理解吗
    • myDataSet.map(foo.someVal)是类型安全的,不会转换为RDD但保留在DataSet表示/没有额外的开销(2.0.0的性能明智)
  • 所有其他命令,例如select,..只是语法糖.它们不是类型安全的,可以使用地图代替.如果df.select("foo")没有地图声明,我怎么能输入?
    • 为什么我应该使用UDF/UADF而不是地图(假设地图保留在数据集表示中)?

scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0

23
推荐指数
1
解决办法
4687
查看次数

如何从现有的SparkContext创建SparkSession

我有一个使用Spark 2.0新API的Spark应用程序SparkSession.我正在使用另一个应用程序之上构建此应用程序SparkContext.我想传递SparkContext给我的应用程序并SparkSession使用现有的初始化SparkContext.

但是我找不到怎么做的方法.我发现SparkSession构造函数SparkContext是私有的,所以我不能以这种方式初始化它,构建器不提供任何setSparkContext方法.你认为有一些解决方法吗?

scala apache-spark apache-spark-2.0

22
推荐指数
5
解决办法
3万
查看次数

使用包含嵌入逗号的引用字段读取csv文件

我正在Pyspark中读取一个csv文件,如下所示:

df_raw=spark.read.option("header","true").csv(csv_path)
Run Code Online (Sandbox Code Playgroud)

但是,数据文件引用了带有嵌入式逗号的字段,不应将其视为逗号.我如何在Pyspark处理这个问题?我知道熊猫可以解决这个问题,但是Spark可以吗?我使用的版本是Spark 2.0.0.

这是一个在Pandas中工作的示例但是使用Spark失败:

In [1]: import pandas as pd

In [2]: pdf = pd.read_csv('malformed_data.csv')

In [3]: sdf=spark.read.format("org.apache.spark.csv").csv('malformed_data.csv',header=True)

In [4]: pdf[['col12','col13','col14']]
Out[4]:
                    col12                                             col13  \
0  32 XIY "W"   JK, RE LK  SOMETHINGLIKEAPHENOMENON#YOUGOTSOUL~BRINGDANOISE
1                     NaN                     OUTKAST#THROOTS~WUTANG#RUNDMC

   col14
0   23.0
1    0.0

In [5]: sdf.select("col12","col13",'col14').show()
+------------------+--------------------+--------------------+
|             col12|               col13|               col14|
+------------------+--------------------+--------------------+
|"32 XIY ""W""   JK|              RE LK"|SOMETHINGLIKEAPHE...|
|              null|OUTKAST#THROOTS~W...|                 0.0|
+------------------+--------------------+--------------------+
Run Code Online (Sandbox Code Playgroud)

文件内容:

    col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19
80015360210876000,11.22,X,4076710258,,,sxsw,,"32 YIU ""A""",S5,,"32 XIY ""W""   JK, RE LK",SOMETHINGLIKEAPHENOMENON#YOUGOTSOUL~BRINGDANOISE,23.0,cyclingstats,2012-25-19,432,2023-05-17,CODERED
61670000229561918,137.12,U,8234971771,,,woodstock,,,T4,,,OUTKAST#THROOTS~WUTANG#RUNDMC,0.0,runstats,2013-21-22,1333,2019-11-23,CODEBLUE
Run Code Online (Sandbox Code Playgroud)

csv apache-spark apache-spark-sql pyspark apache-spark-2.0

21
推荐指数
4
解决办法
3万
查看次数

动态绑定Spark SQL中的变量/参数?

如何在Apache Spark SQL中绑定变量?例如:

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("SELECT * FROM src WHERE col1 = ${VAL1}").collect().foreach(println)
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql apache-spark-2.0

13
推荐指数
2
解决办法
3万
查看次数

Apache Spark与Apache Spark 2

与Apache Spark相比,Apache Spark2带来了哪些改进?

  1. 从架构角度来看
  2. 从应用的角度来看
  3. 或者更多

apache-spark apache-spark-2.0

12
推荐指数
2
解决办法
2万
查看次数

火花堆内存配置和钨

我认为通过项目Tungesten的集成,spark会自动使用堆内存.

什么是spark.memory.offheap.size和spark.memory.offheap.enabled?我是否需要手动指定Tungsten的关闭堆内存量?

apache-spark apache-spark-sql spark-dataframe apache-spark-2.0 off-heap

12
推荐指数
1
解决办法
6787
查看次数

由于内存泄漏导致Spark执行器崩溃

当运行使用来自kafka主题100个分区的数据的spark流媒体应用程序,并且每个执行程序运行10个执行程序,5个核心和20GB RAM时,执行程序将崩溃并显示以下日志:

ERROR ResourceLeakDetector:泄漏:ByteBuf.release()是垃圾收集之前,不叫.启用高级泄漏报告以找出泄漏发生的位置.

ERROR YarnClusterScheduler:在worker23.oct.com上丢失执行者18:奴隶丢失了

ERROR ApplicationMaster:收到的信号期限

此异常出现在spark JIRA中:

https://issues.apache.org/jira/browse/SPARK-17380

有人在升级到spark 2.0.2后写道,问题解决了.但是我们使用spark 2.1作为HDP 2.6的一部分.所以我猜这个bug在火花2.1中没有解决.

还有人遇到过这个bug,并在spark用户列表中写过但没有得到答案:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Receiver-Resource-Leak-td27857.html

顺便说一句 - 流媒体应用程序没有调用cache()persist(),因此不涉及任何缓存.

有没有人遇到过崩溃的流媒体应用?

out-of-memory netty spark-streaming apache-spark-2.0

12
推荐指数
0
解决办法
731
查看次数

为什么在流数据集上使用缓存失败并显示"AnalysisException:必须使用writeStream.start()执行带有流源的查询"?

SparkSession
  .builder
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "C:/tmp/spark")
  .config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint")
  .appName("my-test")
  .getOrCreate
  .readStream
  .schema(schema)
  .json("src/test/data")
  .cache
  .writeStream
  .start
  .awaitTermination
Run Code Online (Sandbox Code Playgroud)

在Spark 2.1.0中执行此示例时出现错误.没有.cache选项,它按预期工作,但.cache我有选择:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/test/data]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:196)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:58)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:102)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:65)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489)
at org.me.App$.main(App.scala:23)
at org.me.App.main(App.scala)
Run Code Online (Sandbox Code Playgroud)

任何的想法?

scala apache-spark apache-spark-sql apache-spark-2.0 spark-structured-streaming

11
推荐指数
1
解决办法
3668
查看次数