我们发现,自Spark 1.3到现在的Spark 2.0.1,来自Oracle数据库的Spark API的加载数据一直很慢.典型的代码在Java中是这样的:
Map<String, String> options = new HashMap<String, String>();
options.put("url", ORACLE_CONNECTION_URL);
options.put("dbtable", dbTable);
options.put("batchsize", "100000");
options.put("driver", "oracle.jdbc.OracleDriver");
Dataset<Row> jdbcDF = sparkSession.read().options(options)
.format("jdbc")
.load().cache();
jdbcDF.createTempView("my");
//= sparkSession.sql(dbTable);
jdbcDF.printSchema();
jdbcDF.show();
System.out.println(jdbcDF.count());
Run Code Online (Sandbox Code Playgroud)
我们的一位成员试图自定义这部分,他当时改进了很多(Spark 1.3.0).但Spark核心代码的某些部分成为Spark的内部代码,因此在版本之后无法使用.此外,我们看到HADOOP的SQOOP比Spark快得多(但它写入HDFS,需要大量的工作才能转换为数据集以供Spark使用).使用Spark的数据集写入方法写入Oracle似乎对我们有好处.令人费解的是为什么会这样!
我有一个DataFrame我需要根据特定的分区写入S3.代码如下所示:
dataframe
.write
.mode(SaveMode.Append)
.partitionBy("year", "month", "date", "country", "predicate")
.parquet(outputPath)
Run Code Online (Sandbox Code Playgroud)
将partitionBy数据拆分成相当多的文件夹(~400),每个文件夹只有一点点数据(~1GB).这就出现了问题 - 因为默认值为spark.sql.shuffle.partitions200,每个文件夹中的1GB数据被分成200个小的镶木地板文件,导致总共写入大约80000个镶木地板文件.由于多种原因,这不是最佳的,我想避免这种情况.
我当然可以设置spark.sql.shuffle.partitions一个更小的数字,比如说10,但据我所知,这个设置也控制了连接和聚合中shuffle的分区数,所以我真的不想改变它.
有谁知道是否有另一种方法来控制写入多少文件?
我已陷入这种困境,我无法选择哪种解决方案对我更好.我有一个非常大的表(几个100GB)和几个较小的(几个GB).为了在Spark中创建我的数据管道并使用spark ML,我需要加入这些表并执行几个GroupBy(聚合)操作.那些操作对我来说真的很慢,所以我选择了这两个中的一个:
我可以说Parquet分区工作速度更快,可扩展性更高,而且Cassandra使用的内存开销更少.所以问题是:
如果开发人员推断并了解数据布局及其使用方式,那么使用Parquet会不会更好,因为您可以更好地控制它?我为什么要为Cassandra带来的开销付出代价?
我正在开发 5 节点集群,每个集群 7 核,每个节点 25GB。我当前的执行使用 1-2GB 输入数据,我能知道为什么我会遇到以下错误吗?我使用 pyspark 数据框(火花 1.6.2)
[Stage 9487:===================================================>(198 + 2) / 200]16/08/13 16:43:18 ERROR TaskSchedulerImpl: Lost executor 3 on server05: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[Stage 9487:=================================================>(198 + -49) / 200]16/08/13 16:43:19 ERROR TaskSchedulerImpl: Lost executor 1 on server04: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[Stage 9487:=========> (24 …Run Code Online (Sandbox Code Playgroud) 我有一个带有可配置列名的数据框,例如
Journey channelA channelB channelC
j1 1 0 0
j1 0 1 0
j1 1 0 0
j2 0 0 1
j2 0 1 0
Run Code Online (Sandbox Code Playgroud)
通过可配置,我的意思是数据帧中可能有'n'个通道.
现在我需要进行转换,我需要找到所有通道的总和
df.groupBy("Journey").agg(sum("channelA"), sum("channelB"), sum("channelC"))
Run Code Online (Sandbox Code Playgroud)
其输出将是:
Journey sum(channelA) sum(channelB) sum(channelC)
j1 2 1 0
j2 0 1 1
Run Code Online (Sandbox Code Playgroud)
现在我想将列名重命名为原始名称,我可以使用
.withColumnRenamed("sum(channelA)", channelA)
Run Code Online (Sandbox Code Playgroud)
但正如我所提到的那样,通道列表是可配置的,我希望通用列重命名语句将所有求和列重命名为原始列名,以获得预期的数据帧:
Journey channelA channelB channelC
j1 2 1 0
j2 0 1 1
Run Code Online (Sandbox Code Playgroud)
任何建议如何处理这个
我正在尝试编写一个Scala函数,它可以根据提供的输入字符串推断Spark DataTypes:
/**
* Example:
* ========
* toSparkType("string") => StringType
* toSparkType("boolean") => BooleanType
* toSparkType("date") => DateType
* etc.
*/
def toSparkType(inputType : String) : DataType = {
var dt : DataType = null
if(matchesStringRegex(inputType)) {
dt = StringType
} else if(matchesBooleanRegex(inputType)) {
dt = BooleanType
} else if(matchesDateRegex(inputType)) {
dt = DateType
} else if(...) {
...
}
dt
}
Run Code Online (Sandbox Code Playgroud)
我的目标是支持可用的大部分(如果不是全部的话)DataTypes.当我开始实现这个功能,我开始思考:" 星火/斯卡拉可能已经有一个助手/ util的方法,会为我做这件事. "毕竟,我知道我可以这样做:
var structType = new StructType()
structType.add("some_new_string_col", "string", true, Metadata.empty)
structType.add("some_new_boolean_col", …Run Code Online (Sandbox Code Playgroud) 我有一个生成的DataFrame,如下所示:
df.groupBy($"Hour", $"Category")
.agg(sum($"value").alias("TotalValue"))
.sort($"Hour".asc,$"TotalValue".desc))
Run Code Online (Sandbox Code Playgroud)
结果如下:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
Run Code Online (Sandbox Code Playgroud)
我想根据每个独特的价值作出新的dataframes col("Hour"),即
因此,所需的输出将是:
df0 as:
+----+--------+----------+
|Hour|Category|TotalValue| …Run Code Online (Sandbox Code Playgroud) 我知道UDFs是Spark的完整黑盒子,不会尝试优化它.但是Column在(https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Column)中列出的类型及其功能的使用是否会
成为函数"符合条件" Catalyst Optimizer?
例如,UDF通过添加1到现有列来创建新列
val addOne = udf( (num: Int) => num + 1 )
df.withColumn("col2", addOne($"col1"))
Run Code Online (Sandbox Code Playgroud)
相同的功能,使用Column类型:
def addOne(col1: Column) = col1.plus(1)
df.withColumn("col2", addOne($"col1"))
Run Code Online (Sandbox Code Playgroud)
要么
spark.sql("select *, col1 + 1 from df")
Run Code Online (Sandbox Code Playgroud)
他们之间的表现会有什么不同吗?
用例如下:
我有一个大型数据框,其中包含一个"user_id"列(每个user_id可以出现在很多行中).我有一个用户my_users列表,我需要分析.
Groupby,filter和aggregate可能是一个好主意,但pyspark中包含的可用聚合函数不符合我的需要.在pyspark ver中,用户定义的聚合函数仍然不完全支持,我决定暂时保留它.
相反,我只是迭代my_users列表,过滤数据框中的每个用户,然后进行分析.为了优化这个过程,我决定为my_users中的每个用户使用python多处理池
执行分析(并传递给池)的函数有两个参数:user_id和主数据帧的路径,我在其上执行所有计算(PARQUET格式).在方法中,我加载数据帧,并对其进行处理(DataFrame不能作为参数本身传递)
我得到各种奇怪的错误,在一些进程(每次运行中不同),看起来像:
当我在没有任何多处理的情况下运行它时,一切都运行顺畅,但速度很慢..
这些错误来自哪里?
我会提供一些代码示例,以使事情更清晰:
PYSPRAK_SUBMIT_ARGS = '--driver-memory 4g --conf spark.driver.maxResultSize=3g --master local[*] pyspark-shell' #if it's relevant
# ....
def users_worker(df_path, user_id):
df = spark.read.parquet(df_path) # The problem is here!
## the analysis of user_id in df is here
def user_worker_wrapper(args):
users_worker(*args)
def analyse():
# ...
users_worker_args …Run Code Online (Sandbox Code Playgroud) python apache-spark python-multiprocessing pyspark spark-dataframe
我在Databricks工作.
我有一个包含500行的数据帧,我想创建包含100行的两个数据帧,另一个包含剩余的400行.
+--------------------+----------+
| userid| eventdate|
+--------------------+----------+
|00518b128fc9459d9...|2017-10-09|
|00976c0b7f2c4c2ca...|2017-12-16|
|00a60fb81aa74f35a...|2017-12-04|
|00f9f7234e2c4bf78...|2017-05-09|
|0146fe6ad7a243c3b...|2017-11-21|
|016567f169c145ddb...|2017-10-16|
|01ccd278777946cb8...|2017-07-05|
Run Code Online (Sandbox Code Playgroud)
我试过以下但是收到错误
df1 = df[:99]
df2 = df[100:499]
TypeError: unexpected item type: <type 'slice'>
Run Code Online (Sandbox Code Playgroud) spark-dataframe ×10
apache-spark ×7
pyspark ×3
scala ×3
parquet ×2
python ×2
cassandra ×1
databricks ×1
dataframe ×1
oracle ×1
types ×1