标签: spark-dataframe

如何使用DataFrame和JDBC连接提高慢速Spark作业的性能?

我试图在单个节点(local [*])上以独立模式通过JDBC访问中型Teradata表(约1亿行).

我正在使用Spark 1.4.1.并且设置在一个非常强大的机器上(2个CPU,24个内核,126G RAM).

我已经尝试了几种内存设置和调整选项,以使其更快地工作,但它们都没有产生巨大的影响.

我确信有一些我缺少的东西,下面是我的最后一次尝试,花了大约11分钟来获得这个简单的计数与使用JDBC连接通过R只需要40秒来获得计数.

bin/pyspark --driver-memory 40g --executor-memory 40g

df = sqlContext.read.jdbc("jdbc:teradata://......)
df.count()
Run Code Online (Sandbox Code Playgroud)

当我尝试使用BIG表(5B记录)时,在完成查询后没有返回任何结果.

teradata apache-spark pyspark spark-dataframe

8
推荐指数
2
解决办法
1万
查看次数

spark:如何在保持最高时间戳行的同时对数据帧执行dropDuplicates

我有一个用例,我需要删除数据帧的重复行(在这种情况下,重复意味着它们具有相同的'id'字段),同时保持具有最高'timestamp'(unix timestamp)字段的行.

我找到了drop_duplicate方法(我正在使用pyspark),但是没有人控制将保留哪个项目.

有人可以帮忙吗?Thx提前

dataframe apache-spark pyspark spark-dataframe

8
推荐指数
2
解决办法
8951
查看次数

在Spark RDD或dataframe中随机随机播放列

无论如何,我可以随机播放RDD或数据帧的列,以使该列中的条目以随机顺序出现?我不确定我可以使用哪些API来完成这样的任务.

apache-spark spark-dataframe

8
推荐指数
2
解决办法
5891
查看次数

如何从Apache Spark中的数据框中选择相同大小的分层样本?

我在Spark 2中有一个数据框,如下所示,用户有50到数千个帖子.我想创建一个新的数据框,其中包含原始数据框中的所有用户,但每个用户只有5个随机抽样的帖子.

+--------+--------------+--------------------+
| user_id|       post_id|                text|
+--------+--------------+--------------------+
|67778705|44783131591473|some text...........|
|67778705|44783134580755|some text...........|
|67778705|44783136367108|some text...........|
|67778705|44783136970669|some text...........|
|67778705|44783138143396|some text...........|
|67778705|44783155162624|some text...........|
|67778705|44783688650554|some text...........|
|68950272|88655645825660|some text...........|
|68950272|88651393135293|some text...........|
|68950272|88652615409812|some text...........|
|68950272|88655744880460|some text...........|
|68950272|88658059871568|some text...........|
|68950272|88656994832475|some text...........|
+--------+--------------+--------------------+
Run Code Online (Sandbox Code Playgroud)

有些东西,posts.groupby('user_id').agg(sample('post_id'))但在pyspark中没有这样的功能.

有什么建议?

更新:

这个问题不同于另一个密切相关的问题,分层采样 - 火花采样有两种方式:

  1. 它询问了不成比例的分层抽样,而不是上面另一个问题中的普通比例法.
  2. 它要求在Spark的Dataframe API而不是RDD中执行此操作.

我还更新了问题的标题以澄清这一点.

apache-spark pyspark spark-dataframe

8
推荐指数
2
解决办法
7420
查看次数

Spark:分解结构的数据帧数组并附加id

我目前有一个带有id和列的数据帧,这是一个结构数组:

 root
 |-- id: integer (nullable = true)
 |-- lists: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- text: string (nullable = true)
 |    |    |-- amount: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)

这是一个包含数据的示例表:

 id | lists
 -----------
 1  | [[a, 1.0], [b, 2.0]]
 2  | [[c, 3.0]]
Run Code Online (Sandbox Code Playgroud)

如何将上述数据帧转换为下面的数据帧?我需要"爆炸"数组并同时附加id.

 id | col1  | col2
 -----------------
 1  | a     | 1.0
 1  | b     | 2.0
 2  | c     | 3.0
Run Code Online (Sandbox Code Playgroud)

编辑说明:

请注意,以下两个示例之间存在差异.第一个包含"一系列元素结构".而后者只包含 …

scala apache-spark spark-dataframe

8
推荐指数
1
解决办法
1万
查看次数

将数据帧转换为libsvm格式

我有一个由SQL查询产生的数据帧

df1 = sqlContext.sql("select * from table_test")
Run Code Online (Sandbox Code Playgroud)

我需要将此数据帧转换为libsvm格式,以便可以将其作为输入提供

pyspark.ml.classification.LogisticRegression
Run Code Online (Sandbox Code Playgroud)

我试着做以下事情.但是,这导致了以下错误,因为我正在使用spark 1.5.2

df1.write.format("libsvm").save("data/foo")
Failed to load class for data source: libsvm
Run Code Online (Sandbox Code Playgroud)

我想改用MLUtils.loadLibSVMFile.我在防火墙后面,不能直接pip安装它.所以我下载了文件,scp-ed然后手动安装它.一切似乎工作正常,但我仍然得到以下错误

import org.apache.spark.mllib.util.MLUtils
No module named org.apache.spark.mllib.util.MLUtils
Run Code Online (Sandbox Code Playgroud)

问题1:我的上述方法是将数据帧转换为正确方向的libsvm格式.问题2:如果问题1为"是",如何让MLUtils正常工作.如果"否",将数据帧转换为libsvm格式的最佳方法是什么

apache-spark apache-spark-sql pyspark spark-dataframe apache-spark-mllib

8
推荐指数
1
解决办法
8213
查看次数

Pyspark圆形功能的麻烦

在pyspark中使用round函数时遇到一些麻烦 - 我有下面的代码块,我试图将new_bid列舍入到2位小数,然后重命名列bid- 我正在导入pyspark.sql.functions AS func以供参考,并使用其中round包含的功能:

output = output.select(col("ad").alias("ad_id"),
                       col("part").alias("part_id"),
                       func.round(col("new_bid"), 2).alias("bid"))
Run Code Online (Sandbox Code Playgroud)

new_bid这里的列是float类型 - 结果数据框没有新命名的bid列舍入到我想要的2位小数,而是仍然是8或9位小数.

我尝试了各种各样的东西,但似乎无法让结果数据框具有舍入值 - 任何指针都将非常感激!谢谢!

apache-spark apache-spark-sql pyspark spark-dataframe pyspark-sql

8
推荐指数
1
解决办法
2万
查看次数

优化火花低挂水果,特别是催化剂优化和火花配置

我正在使用Spark 2.1.1,我正在使用Scala API,尽管语言不太重要.我有兴趣以有效的方式优化火花查询/管道.我已经阅读了很多材料(包括伟大的"学习星火"书,我对Spark网站,Jacek Laskowski的博客以及其他人非常熟悉,而且我已经和Spark一起工作了将近两年.

但是,有太多的信息和概念需要注意,而且我没有做足够的优化来了解它们.不幸的是,一旦一切工作100%,可能只需要几天甚至几小时才能交付代码.我需要优先考虑我可以应用的修复程序.我之前已经优化了工作火花代码,但我正在寻找最好的整体策略以及尝试熟悉最好的低挂水果.总有一天,我会记住所有要调整的旋钮,但至少现在有十个非常好的旋钮.我目前认为重要的一些事情是(不是按顺序排列,但前4个恰好是我认为最重要的)...

  1. 开发 - 通过重新分区数据集或从一个分区的配置单元表中检索来减少随机(交换).
  2. 策略 - 查看Spark UI以查看哪个作业和阶段占用时间最长,并且仔细观察该作业和阶段.
  3. 开发 - 尽可能在连接之前过滤数据集,以避免创建高基数"多对多"连接,并避免在连接期间发送更多数据.
  4. 配置 - 正确执行器和内存
  5. 开发 - 尽可能远离笛卡尔积和theta-join.
  6. 开发 - 如果可能,在创建UDF之前使用spark库函数.
  7. 开发 - 如果表足够小,请尝试强制进行广播散列连接.
  8. 策略 - 除非有特定原因(这意味着我从不使用RDD API),否则切勿使用RDD API代替数据集/数据帧.
  9. 开发 - 构建数据集过滤器,以便下推谓词可以与它们一起使用(制作更多,更简单的过滤器而不是多条件过滤器).
  10. 策略与开发 - 始终保持Spark源代码打开,以便更容易找到类型声明和其他代码实现.
  11. 我想念的东西......

对我来说最有趣的增强功能是那些通过查看查询计划或DAG可视化而显而易见的增强功能.此外,使火花用户/开发人员走向"啊哈!"的老生常谈 您可能愿意分享.免责声明:以上十件事对我来说并不完全是"前十名",比如使用火花库函数代替UDF并不是非常重要(当然不是至少十大),但我想帮助给出一个好的例子.提示可能看起来像某人.

scala apache-spark apache-spark-sql spark-dataframe apache-spark-2.0

8
推荐指数
0
解决办法
235
查看次数

何时使用Spark DataFrame/Dataset API以及何时使用普通RDD?

Spark SQL DataFrame/Dataset执行引擎具有几个非常有效的时间和空间优化(例如InternalRow和表达式代码).根据许多文档,对于大多数分布式算法来说,它似乎比RDD更好.

但是,我做了一些源代码研究,但仍然不相信.我毫不怀疑InternalRow更紧凑,可以节省大量内存.但是执行算法可能不会更快地保存预定义表达式.也就是说,在源代码中表明 org.apache.spark.sql.catalyst.expressions.ScalaUDF,每个用户定义的函数都做3件事:

  1. 将催化剂类型(在InternalRow中使用)转换为scala类型(在GenericRow中使用).
  2. 应用该功能
  3. 将结果从scala类型转换回催化剂类型

显然,这比直接在RDD上应用函数而不进行任何转换要慢.任何人都可以通过一些实例分析和代码分析来确认或否认我的推测吗?

非常感谢您的任何建议或见解.

apache-spark apache-spark-sql spark-dataframe apache-spark-dataset

7
推荐指数
1
解决办法
1305
查看次数

Pyspark Dataframe将函数应用于两列

假设我有两个PySpark DataFrames df1df2.

df1=   'a' 
        1    
        2    
        5    

df2=   'b'
        3
        6
Run Code Online (Sandbox Code Playgroud)

我想df2['b']为每个值找到最接近的值df1['a'],并将最接近的值添加为新列df1.

换句话说,每个值xdf1['a'],我想找到一个y即实现min(abx(x-y))对所有y in df2['b'](注:可以假设,仅仅是有一个y能够实现的最小距离),其结果将是

'a'    'b'
 1      3
 2      3
 5      6
Run Code Online (Sandbox Code Playgroud)

我尝试使用以下代码首先创建距离矩阵(在找到达到最小距离的值之前):

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def dict(x,y):
    return abs(x-y)
udf_dict = udf(dict, IntegerType())

sql_sc = SQLContext(sc)
udf_dict(df1.a, df2.b)
Run Code Online (Sandbox Code Playgroud)

这使

Column<PythonUDF#dist(a,b)>
Run Code Online (Sandbox Code Playgroud)

然后我试了一下

sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b))
Run Code Online (Sandbox Code Playgroud)

它会永远运行而不会产生错误/输出.

我的问题是:

  1. 由于我是Spark的新手,我构建输出DataFrame的方法是否有效?(我的方法是首先为所有ab值创建一个距离矩阵,然后找到min …

pyspark spark-dataframe pyspark-sql

7
推荐指数
1
解决办法
2万
查看次数