我试图在单个节点(local [*])上以独立模式通过JDBC访问中型Teradata表(约1亿行).
我正在使用Spark 1.4.1.并且设置在一个非常强大的机器上(2个CPU,24个内核,126G RAM).
我已经尝试了几种内存设置和调整选项,以使其更快地工作,但它们都没有产生巨大的影响.
我确信有一些我缺少的东西,下面是我的最后一次尝试,花了大约11分钟来获得这个简单的计数与使用JDBC连接通过R只需要40秒来获得计数.
bin/pyspark --driver-memory 40g --executor-memory 40g
df = sqlContext.read.jdbc("jdbc:teradata://......)
df.count()
Run Code Online (Sandbox Code Playgroud)
当我尝试使用BIG表(5B记录)时,在完成查询后没有返回任何结果.
我有一个用例,我需要删除数据帧的重复行(在这种情况下,重复意味着它们具有相同的'id'字段),同时保持具有最高'timestamp'(unix timestamp)字段的行.
我找到了drop_duplicate方法(我正在使用pyspark),但是没有人控制将保留哪个项目.
有人可以帮忙吗?Thx提前
无论如何,我可以随机播放RDD或数据帧的列,以使该列中的条目以随机顺序出现?我不确定我可以使用哪些API来完成这样的任务.
我在Spark 2中有一个数据框,如下所示,用户有50到数千个帖子.我想创建一个新的数据框,其中包含原始数据框中的所有用户,但每个用户只有5个随机抽样的帖子.
+--------+--------------+--------------------+
| user_id| post_id| text|
+--------+--------------+--------------------+
|67778705|44783131591473|some text...........|
|67778705|44783134580755|some text...........|
|67778705|44783136367108|some text...........|
|67778705|44783136970669|some text...........|
|67778705|44783138143396|some text...........|
|67778705|44783155162624|some text...........|
|67778705|44783688650554|some text...........|
|68950272|88655645825660|some text...........|
|68950272|88651393135293|some text...........|
|68950272|88652615409812|some text...........|
|68950272|88655744880460|some text...........|
|68950272|88658059871568|some text...........|
|68950272|88656994832475|some text...........|
+--------+--------------+--------------------+
Run Code Online (Sandbox Code Playgroud)
有些东西,posts.groupby('user_id').agg(sample('post_id'))但在pyspark中没有这样的功能.
有什么建议?
更新:
这个问题不同于另一个密切相关的问题,分层采样 - 火花采样有两种方式:
我还更新了问题的标题以澄清这一点.
我目前有一个带有id和列的数据帧,这是一个结构数组:
root
|-- id: integer (nullable = true)
|-- lists: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- text: string (nullable = true)
| | |-- amount: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)
这是一个包含数据的示例表:
id | lists
-----------
1 | [[a, 1.0], [b, 2.0]]
2 | [[c, 3.0]]
Run Code Online (Sandbox Code Playgroud)
如何将上述数据帧转换为下面的数据帧?我需要"爆炸"数组并同时附加id.
id | col1 | col2
-----------------
1 | a | 1.0
1 | b | 2.0
2 | c | 3.0
Run Code Online (Sandbox Code Playgroud)
编辑说明:
请注意,以下两个示例之间存在差异.第一个包含"一系列元素结构".而后者只包含 …
我有一个由SQL查询产生的数据帧
df1 = sqlContext.sql("select * from table_test")
Run Code Online (Sandbox Code Playgroud)
我需要将此数据帧转换为libsvm格式,以便可以将其作为输入提供
pyspark.ml.classification.LogisticRegression
Run Code Online (Sandbox Code Playgroud)
我试着做以下事情.但是,这导致了以下错误,因为我正在使用spark 1.5.2
df1.write.format("libsvm").save("data/foo")
Failed to load class for data source: libsvm
Run Code Online (Sandbox Code Playgroud)
我想改用MLUtils.loadLibSVMFile.我在防火墙后面,不能直接pip安装它.所以我下载了文件,scp-ed然后手动安装它.一切似乎工作正常,但我仍然得到以下错误
import org.apache.spark.mllib.util.MLUtils
No module named org.apache.spark.mllib.util.MLUtils
Run Code Online (Sandbox Code Playgroud)
问题1:我的上述方法是将数据帧转换为正确方向的libsvm格式.问题2:如果问题1为"是",如何让MLUtils正常工作.如果"否",将数据帧转换为libsvm格式的最佳方法是什么
apache-spark apache-spark-sql pyspark spark-dataframe apache-spark-mllib
在pyspark中使用round函数时遇到一些麻烦 - 我有下面的代码块,我试图将new_bid列舍入到2位小数,然后重命名列bid- 我正在导入pyspark.sql.functions AS func以供参考,并使用其中round包含的功能:
output = output.select(col("ad").alias("ad_id"),
col("part").alias("part_id"),
func.round(col("new_bid"), 2).alias("bid"))
Run Code Online (Sandbox Code Playgroud)
new_bid这里的列是float类型 - 结果数据框没有新命名的bid列舍入到我想要的2位小数,而是仍然是8或9位小数.
我尝试了各种各样的东西,但似乎无法让结果数据框具有舍入值 - 任何指针都将非常感激!谢谢!
apache-spark apache-spark-sql pyspark spark-dataframe pyspark-sql
我正在使用Spark 2.1.1,我正在使用Scala API,尽管语言不太重要.我有兴趣以有效的方式优化火花查询/管道.我已经阅读了很多材料(包括伟大的"学习星火"书,我对Spark网站,Jacek Laskowski的博客以及其他人非常熟悉,而且我已经和Spark一起工作了将近两年.
但是,有太多的信息和概念需要注意,而且我没有做足够的优化来了解它们.不幸的是,一旦一切工作100%,可能只需要几天甚至几小时才能交付代码.我需要优先考虑我可以应用的修复程序.我之前已经优化了工作火花代码,但我正在寻找最好的整体策略以及尝试熟悉最好的低挂水果.总有一天,我会记住所有要调整的旋钮,但至少现在有十个非常好的旋钮.我目前认为重要的一些事情是(不是按顺序排列,但前4个恰好是我认为最重要的)...
对我来说最有趣的增强功能是那些通过查看查询计划或DAG可视化而显而易见的增强功能.此外,使火花用户/开发人员走向"啊哈!"的老生常谈 您可能愿意分享.免责声明:以上十件事对我来说并不完全是"前十名",比如使用火花库函数代替UDF并不是非常重要(当然不是至少十大),但我想帮助给出一个好的例子.提示可能看起来像某人.
scala apache-spark apache-spark-sql spark-dataframe apache-spark-2.0
Spark SQL DataFrame/Dataset执行引擎具有几个非常有效的时间和空间优化(例如InternalRow和表达式代码).根据许多文档,对于大多数分布式算法来说,它似乎比RDD更好.
但是,我做了一些源代码研究,但仍然不相信.我毫不怀疑InternalRow更紧凑,可以节省大量内存.但是执行算法可能不会更快地保存预定义表达式.也就是说,在源代码中表明 org.apache.spark.sql.catalyst.expressions.ScalaUDF,每个用户定义的函数都做3件事:
显然,这比直接在RDD上应用函数而不进行任何转换要慢.任何人都可以通过一些实例分析和代码分析来确认或否认我的推测吗?
非常感谢您的任何建议或见解.
apache-spark apache-spark-sql spark-dataframe apache-spark-dataset
假设我有两个PySpark DataFrames df1和df2.
df1= 'a'
1
2
5
df2= 'b'
3
6
Run Code Online (Sandbox Code Playgroud)
我想df2['b']为每个值找到最接近的值df1['a'],并将最接近的值添加为新列df1.
换句话说,每个值x的df1['a'],我想找到一个y即实现min(abx(x-y))对所有y in df2['b'](注:可以假设,仅仅是有一个y能够实现的最小距离),其结果将是
'a' 'b'
1 3
2 3
5 6
Run Code Online (Sandbox Code Playgroud)
我尝试使用以下代码首先创建距离矩阵(在找到达到最小距离的值之前):
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
def dict(x,y):
return abs(x-y)
udf_dict = udf(dict, IntegerType())
sql_sc = SQLContext(sc)
udf_dict(df1.a, df2.b)
Run Code Online (Sandbox Code Playgroud)
这使
Column<PythonUDF#dist(a,b)>
Run Code Online (Sandbox Code Playgroud)
然后我试了一下
sql_sc.CreateDataFrame(udf_dict(df1.a, df2.b))
Run Code Online (Sandbox Code Playgroud)
它会永远运行而不会产生错误/输出.
我的问题是:
a和b值创建一个距离矩阵,然后找到min …spark-dataframe ×10
apache-spark ×9
pyspark ×6
pyspark-sql ×2
scala ×2
dataframe ×1
teradata ×1