我查看了文档,并说它支持以下连接类型:
要执行的联接类型.默认内心.必须是以下之一:inner,cross,outer,full,full_outer,left,left_outer,right,right_outer,left_semi,left_anti.
我查看了关于SQL连接的StackOverflow答案,并且顶部几个答案没有提到上面的一些连接,例如left_semi
和left_anti
.他们在Spark中意味着什么?
scala apache-spark apache-spark-sql spark-dataframe apache-spark-2.0
我正在尝试利用spark分区.我试图做类似的事情
data.write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)
这里的问题每个分区都会产生大量的镶木地板文件,如果我尝试从根目录中读取,会导致读取速度慢.
为了避免我试过
data.coalese(numPart).write.partitionBy("key").parquet("/location")
Run Code Online (Sandbox Code Playgroud)
但是,这会在每个分区中创建numPart数量的镶木地板文件.现在我的分区大小不同了.所以我理想的是希望每个分区有单独的合并.然而,这看起来并不容易.我需要访问所有分区合并到一定数量并存储在一个单独的位置.
写入后我应该如何使用分区来避免许多文件?
从spark 2.0.1开始我有一些问题.我阅读了很多文档,但到目前为止找不到足够的答案:
df.select("foo")
df.select($"foo")
myDataSet.map(foo.someVal)
是类型安全的,不会转换为RDD
但保留在DataSet表示/没有额外的开销(2.0.0的性能明智)df.select("foo")
没有地图声明,我怎么能输入?
scala apache-spark apache-spark-sql apache-spark-dataset apache-spark-2.0
我有一个使用Spark 2.0新API的Spark应用程序SparkSession
.我正在使用另一个应用程序之上构建此应用程序SparkContext
.我想传递SparkContext
给我的应用程序并SparkSession
使用现有的初始化SparkContext
.
但是我找不到怎么做的方法.我发现SparkSession
构造函数SparkContext
是私有的,所以我不能以这种方式初始化它,构建器不提供任何setSparkContext
方法.你认为有一些解决方法吗?
我正在Pyspark中读取一个csv文件,如下所示:
df_raw=spark.read.option("header","true").csv(csv_path)
Run Code Online (Sandbox Code Playgroud)
但是,数据文件引用了带有嵌入式逗号的字段,不应将其视为逗号.我如何在Pyspark处理这个问题?我知道熊猫可以解决这个问题,但是Spark可以吗?我使用的版本是Spark 2.0.0.
这是一个在Pandas中工作的示例但是使用Spark失败:
In [1]: import pandas as pd
In [2]: pdf = pd.read_csv('malformed_data.csv')
In [3]: sdf=spark.read.format("org.apache.spark.csv").csv('malformed_data.csv',header=True)
In [4]: pdf[['col12','col13','col14']]
Out[4]:
col12 col13 \
0 32 XIY "W" JK, RE LK SOMETHINGLIKEAPHENOMENON#YOUGOTSOUL~BRINGDANOISE
1 NaN OUTKAST#THROOTS~WUTANG#RUNDMC
col14
0 23.0
1 0.0
In [5]: sdf.select("col12","col13",'col14').show()
+------------------+--------------------+--------------------+
| col12| col13| col14|
+------------------+--------------------+--------------------+
|"32 XIY ""W"" JK| RE LK"|SOMETHINGLIKEAPHE...|
| null|OUTKAST#THROOTS~W...| 0.0|
+------------------+--------------------+--------------------+
Run Code Online (Sandbox Code Playgroud)
文件内容:
col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19
80015360210876000,11.22,X,4076710258,,,sxsw,,"32 YIU ""A""",S5,,"32 XIY ""W"" JK, RE LK",SOMETHINGLIKEAPHENOMENON#YOUGOTSOUL~BRINGDANOISE,23.0,cyclingstats,2012-25-19,432,2023-05-17,CODERED
61670000229561918,137.12,U,8234971771,,,woodstock,,,T4,,,OUTKAST#THROOTS~WUTANG#RUNDMC,0.0,runstats,2013-21-22,1333,2019-11-23,CODEBLUE
Run Code Online (Sandbox Code Playgroud) 如何在Apache Spark SQL中绑定变量?例如:
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("SELECT * FROM src WHERE col1 = ${VAL1}").collect().foreach(println)
Run Code Online (Sandbox Code Playgroud) 与Apache Spark相比,Apache Spark2带来了哪些改进?
我认为通过项目Tungesten的集成,spark会自动使用堆内存.
什么是spark.memory.offheap.size和spark.memory.offheap.enabled?我是否需要手动指定Tungsten的关闭堆内存量?
apache-spark apache-spark-sql spark-dataframe apache-spark-2.0 off-heap
当运行使用来自kafka主题100个分区的数据的spark流媒体应用程序,并且每个执行程序运行10个执行程序,5个核心和20GB RAM时,执行程序将崩溃并显示以下日志:
ERROR ResourceLeakDetector:泄漏:ByteBuf.release()是垃圾收集之前,不叫.启用高级泄漏报告以找出泄漏发生的位置.
ERROR YarnClusterScheduler:在worker23.oct.com上丢失执行者18:奴隶丢失了
ERROR ApplicationMaster:收到的信号期限
此异常出现在spark JIRA中:
https://issues.apache.org/jira/browse/SPARK-17380
有人在升级到spark 2.0.2后写道,问题解决了.但是我们使用spark 2.1作为HDP 2.6的一部分.所以我猜这个bug在火花2.1中没有解决.
还有人遇到过这个bug,并在spark用户列表中写过但没有得到答案:
顺便说一句 - 流媒体应用程序没有调用cache()
或persist()
,因此不涉及任何缓存.
有没有人遇到过崩溃的流媒体应用?
SparkSession
.builder
.master("local[*]")
.config("spark.sql.warehouse.dir", "C:/tmp/spark")
.config("spark.sql.streaming.checkpointLocation", "C:/tmp/spark/spark-checkpoint")
.appName("my-test")
.getOrCreate
.readStream
.schema(schema)
.json("src/test/data")
.cache
.writeStream
.start
.awaitTermination
Run Code Online (Sandbox Code Playgroud)
在Spark 2.1.0中执行此示例时出现错误.没有.cache
选项,它按预期工作,但.cache
我有选择:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/test/data]
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:196)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:35)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:58)
at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:69)
at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:73)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:102)
at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:65)
at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:89)
at org.apache.spark.sql.Dataset.persist(Dataset.scala:2479)
at org.apache.spark.sql.Dataset.cache(Dataset.scala:2489)
at org.me.App$.main(App.scala:23)
at org.me.App.main(App.scala)
Run Code Online (Sandbox Code Playgroud)
任何的想法?
scala apache-spark apache-spark-sql apache-spark-2.0 spark-structured-streaming
apache-spark-2.0 ×10
apache-spark ×9
scala ×5
bigdata ×1
csv ×1
netty ×1
off-heap ×1
pyspark ×1
rdd ×1