标签: spark-dataframe

为什么来自Oracle的Spark查询(加载)与SQOOP相比如此之慢?

我们发现,自Spark 1.3到现在的Spark 2.0.1,来自Oracle数据库的Spark API的加载数据一直很慢.典型的代码在Java中是这样的:

        Map<String, String> options = new HashMap<String, String>();
        options.put("url", ORACLE_CONNECTION_URL);
        options.put("dbtable", dbTable);
        options.put("batchsize", "100000");
        options.put("driver", "oracle.jdbc.OracleDriver");

        Dataset<Row> jdbcDF = sparkSession.read().options(options)
                .format("jdbc")
                .load().cache();
        jdbcDF.createTempView("my");

        //= sparkSession.sql(dbTable);
        jdbcDF.printSchema();
        jdbcDF.show();

        System.out.println(jdbcDF.count());
Run Code Online (Sandbox Code Playgroud)

我们的一位成员试图自定义这部分,他当时改进了很多(Spark 1.3.0).但Spark核心代码的某些部分成为Spark的内部代码,因此在版本之后无法使用.此外,我们看到HADOOP的SQOOP比Spark快得多(但它写入HDFS,需要大量的工作才能转换为数据集以供Spark使用).使用Spark的数据集写入方法写入Oracle似乎对我们有好处.令人费解的是为什么会这样!

oracle apache-spark apache-spark-sql spark-dataframe

9
推荐指数
1
解决办法
1811
查看次数

如何控制使用partitionBy时生成的镶木地板文件数量

我有一个DataFrame我需要根据特定的分区写入S3.代码如下所示:

dataframe
  .write
  .mode(SaveMode.Append)
  .partitionBy("year", "month", "date", "country", "predicate")
  .parquet(outputPath)
Run Code Online (Sandbox Code Playgroud)

partitionBy数据拆分成相当多的文件夹(~400),每个文件夹只有一点点数据(~1GB).这就出现了问题 - 因为默认值为spark.sql.shuffle.partitions200,每个文件夹中的1GB数据被分成200个小的镶木地板文件,导致总共写入大约80000个镶木地板文件.由于多种原因,这不是最佳的,我想避免这种情况.

我当然可以设置spark.sql.shuffle.partitions一个更小的数字,比如说10,但据我所知,这个设置也控制了连接和聚合中shuffle的分区数,所以我真的不想改变它.

有谁知道是否有另一种方法来控制写入多少文件?

apache-spark spark-dataframe

8
推荐指数
1
解决办法
4886
查看次数

Parquet vs Cassandra使用Spark和DataFrames

我已陷入这种困境,我无法选择哪种解决方案对我更好.我有一个非常大的表(几个100GB)和几个较小的(几个GB).为了在Spark中创建我的数据管道并使用spark ML,我需要加入这些表并执行几个GroupBy(聚合)操作.那些操作对我来说真的很慢,所以我选择了这两个中的一个:

  • 使用Cassandra并使用索引来加速GoupBy操作.
  • 根据数据布局使用Parquet和Partitioning.

我可以说Parquet分区工作速度更快,可扩展性更高,而且Cassandra使用的内存开销更少.所以问题是:

如果开发人员推断并了解数据布局及其使用方式,那么使用Parquet会不会更好,因为您可以更好地控制它?我为什么要为Cassandra带来的开销付出代价?

cassandra apache-spark parquet spark-dataframe

8
推荐指数
2
解决办法
6099
查看次数

远程 RPC 客户端解除关联。可能是由于容器超过阈值或网络问题。检查驱动程序日志以获取 WARN 消息

我正在开发 5 节点集群,每个集群 7 核,每个节点 25GB。我当前的执行使用 1-2GB 输入数据,我能知道为什么我会遇到以下错误吗?我使用 pyspark 数据框(火花 1.6.2)

[Stage 9487:===================================================>(198 + 2) / 200]16/08/13 16:43:18 ERROR TaskSchedulerImpl: Lost executor 3 on server05: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[Stage 9487:=================================================>(198 + -49) / 200]16/08/13 16:43:19 ERROR TaskSchedulerImpl: Lost executor 1 on server04: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
[Stage 9487:=========>                                          (24 …
Run Code Online (Sandbox Code Playgroud)

pyspark spark-dataframe

8
推荐指数
1
解决办法
1万
查看次数

如何在spark SQL中重命名列名

我有一个带有可配置列名的数据框,例如

Journey channelA channelB channelC
j1      1        0        0
j1      0        1        0
j1      1        0        0
j2      0        0        1 
j2      0        1        0
Run Code Online (Sandbox Code Playgroud)

通过可配置,我的意思是数据帧中可能有'n'个通道.

现在我需要进行转换,我需要找到所有通道的总和

df.groupBy("Journey").agg(sum("channelA"), sum("channelB"), sum("channelC"))
Run Code Online (Sandbox Code Playgroud)

其输出将是:

Journey sum(channelA) sum(channelB) sum(channelC)
j1      2             1             0
j2      0             1             1
Run Code Online (Sandbox Code Playgroud)

现在我想将列名重命名为原始名称,我可以使用

.withColumnRenamed("sum(channelA)", channelA)
Run Code Online (Sandbox Code Playgroud)

但正如我所提到的那样,通道列表是可配置的,我希望通用列重命名语句将所有求和列重命名为原始列名,以获得预期的数据帧:

Journey channelA channelB channelC
j1      2        1             0
j2      0        1             1
Run Code Online (Sandbox Code Playgroud)

任何建议如何处理这个

dataframe apache-spark-sql spark-dataframe

8
推荐指数
1
解决办法
2万
查看次数

从字符串文字中推断Spark DataType

我正在尝试编写一个Scala函数,它可以根据提供的输入字符串推断Spark DataTypes:

/**
 * Example:
 * ========
 * toSparkType("string")  =>    StringType
 * toSparkType("boolean") =>    BooleanType
 * toSparkType("date")    =>    DateType
 * etc.
 */
def toSparkType(inputType : String) : DataType = {
    var dt : DataType = null

    if(matchesStringRegex(inputType)) {
        dt = StringType
    } else if(matchesBooleanRegex(inputType)) {
        dt = BooleanType
    } else if(matchesDateRegex(inputType)) {
        dt = DateType
    } else if(...) {
        ...
    }

    dt
}
Run Code Online (Sandbox Code Playgroud)

我的目标是支持可用的大部分(如果不是全部的话)DataTypes.当我开始实现这个功能,我开始思考:" 星火/斯卡拉可能已经有一个助手/ util的方法,会为我做这件事. "毕竟,我知道我可以这样做:

var structType = new StructType()

structType.add("some_new_string_col", "string", true, Metadata.empty)
structType.add("some_new_boolean_col", …
Run Code Online (Sandbox Code Playgroud)

types scala introspection apache-spark spark-dataframe

8
推荐指数
2
解决办法
4332
查看次数

SPARK DataFrame:如何基于相同的列值为每个组有效地拆分数据框

我有一个生成的DataFrame,如下所示:

df.groupBy($"Hour", $"Category")
  .agg(sum($"value").alias("TotalValue"))
  .sort($"Hour".asc,$"TotalValue".desc))
Run Code Online (Sandbox Code Playgroud)

结果如下:

+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
|   0|   cat26|      30.9|
|   0|   cat13|      22.1|
|   0|   cat95|      19.6|
|   0|  cat105|       1.3|
|   1|   cat67|      28.5|
|   1|    cat4|      26.8|
|   1|   cat13|      12.6|
|   1|   cat23|       5.3|
|   2|   cat56|      39.6|
|   2|   cat40|      29.7|
|   2|  cat187|      27.9|
|   2|   cat68|       9.8|
|   3|    cat8|      35.6|
| ...|    ....|      ....|
+----+--------+----------+
Run Code Online (Sandbox Code Playgroud)

我想根据每个独特的价值作出新的dataframes col("Hour"),即

  • 对于小时== 0的组
  • 对于小时== 1的组
  • 对于小时== 2的组,依此类推...

因此,所需的输出将是:

df0 as:

+----+--------+----------+
|Hour|Category|TotalValue| …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark parquet apache-spark-sql spark-dataframe

8
推荐指数
1
解决办法
2万
查看次数

UDF的vs Spark sql vs列表达式性能优化

我知道UDFs是Spark的完整黑盒子,不会尝试优化它.但是Column在(https://spark.apache.org/docs/2.1.0/api/scala/index.html#org.apache.spark.sql.Column)中列出的类型及其功能的使用是否会
成为函数"符合条件" Catalyst Optimizer

例如,UDF通过添加1到现有列来创建新列

val addOne = udf( (num: Int) => num + 1 )
df.withColumn("col2", addOne($"col1"))
Run Code Online (Sandbox Code Playgroud)

相同的功能,使用Column类型:

def addOne(col1: Column) = col1.plus(1)
df.withColumn("col2", addOne($"col1"))
Run Code Online (Sandbox Code Playgroud)

要么

spark.sql("select *, col1 + 1 from df")
Run Code Online (Sandbox Code Playgroud)

他们之间的表现会有什么不同吗?

scala apache-spark apache-spark-sql spark-dataframe

8
推荐指数
1
解决办法
1650
查看次数

使用spark进行多处理(PySpark)

用例如下:

我有一个大型数据框,其中包含一个"user_id"列(每个user_id可以出现在很多行中).我有一个用户my_users列表,我需要分析.

Groupby,filteraggregate可能是一个好主意,但pyspark中包含的可用聚合函数不符合我的需要.在pyspark ver中,用户定义的聚合函数仍然不完全支持,我决定暂时保留它.

相反,我只是迭代my_users列表,过滤数据框中的每个用户,然后进行分析.为了优化这个过程,我决定为my_users中的每个用户使用python多处理池

执行分析(并传递给池)的函数有两个参数:user_id主数据帧路径,我在其上执行所有计算(PARQUET格式).在方法中,我加载数据帧,并对其进行处理(DataFrame不能作为参数本身传递)

我得到各种奇怪的错误,在一些进程(每次运行中不同),看起来像:

  • JVM中不存在PythonUtils(读取'镶木地板'数据时)

打印错误消息的屏幕

  • KeyError:找不到'c'(同样,当读取'镶木地板'数据框时.无论如何,'c'是什么?)

当我在没有任何多处理的情况下运行它时,一切都运行顺畅,但速度很慢..

这些错误来自哪里?

我会提供一些代码示例,以使事情更清晰:

PYSPRAK_SUBMIT_ARGS = '--driver-memory 4g --conf spark.driver.maxResultSize=3g --master local[*] pyspark-shell' #if it's relevant

# ....

def users_worker(df_path, user_id):
    df = spark.read.parquet(df_path) # The problem is here!
    ## the analysis of user_id in df is here

def user_worker_wrapper(args):
    users_worker(*args)

def analyse():
    # ...
    users_worker_args …
Run Code Online (Sandbox Code Playgroud)

python apache-spark python-multiprocessing pyspark spark-dataframe

8
推荐指数
1
解决办法
4152
查看次数

如何以两行划分pyspark数据帧

我在Databricks工作.

我有一个包含500行的数据帧,我想创建包含100行的两个数据帧,另一个包含剩余的400行.

+--------------------+----------+
|              userid| eventdate|
+--------------------+----------+
|00518b128fc9459d9...|2017-10-09|
|00976c0b7f2c4c2ca...|2017-12-16|
|00a60fb81aa74f35a...|2017-12-04|
|00f9f7234e2c4bf78...|2017-05-09|
|0146fe6ad7a243c3b...|2017-11-21|
|016567f169c145ddb...|2017-10-16|
|01ccd278777946cb8...|2017-07-05|
Run Code Online (Sandbox Code Playgroud)

我试过以下但是收到错误

df1 = df[:99]
df2 = df[100:499]


TypeError: unexpected item type: <type 'slice'>
Run Code Online (Sandbox Code Playgroud)

python pyspark spark-dataframe databricks

8
推荐指数
3
解决办法
1万
查看次数