标签: spark-dataframe

哪个是高效的,Dataframe或RDD还是hiveql?

我是Apache Spark的新手.

我的工作是读取两个CSV文件,从中选择一些特定列,合并,聚合并将结果写入单个CSV文件.

例如,

CSV1

name,age,deparment_id
Run Code Online (Sandbox Code Playgroud)

CSV2

department_id,deparment_name,location
Run Code Online (Sandbox Code Playgroud)

我想获得第三个CSV文件

name,age,deparment_name
Run Code Online (Sandbox Code Playgroud)

我正在将CSV加载到数据帧中.然后能够使用join,select,filter,drop数据帧中存在的几种方法获得第三个数据帧

我也可以用几个来做同样的事情 RDD.map()

我也可以通过执行hiveql使用来做同样的事情HiveContext

我想知道如果我的CSV文件很大,哪个是有效的方法?为什么?

apache-spark apache-spark-sql spark-dataframe

9
推荐指数
2
解决办法
5631
查看次数

如何将Spark中`Dataframe`的两列合并为一个2-Tuple?

我有一个DataFrame df有五列的Spark .我想添加另一列,其值为第一列和第二列的元组.当使用withColumn()方法时,我得到不匹配错误,因为输入不是列类型,而是(列,列).我想知道在这种情况下是否有一个解决方案旁边的行循环运行?

var dfCol=(col1:Column,col2:Column)=>(col1,col2)
val vv = df.withColumn( "NewColumn", dfCol( df(df.schema.fieldNames(1)) , df(df.schema.fieldNames(2)) ) )
Run Code Online (Sandbox Code Playgroud)

scala apache-spark-sql spark-dataframe

9
推荐指数
3
解决办法
2万
查看次数

将RDD [org.apache.spark.sql.Row]转换为RDD [org.apache.spark.mllib.linalg.Vector]

我对Spark和Scala相对较新.

我从以下数据帧开始(单个列由密集的双打矢量组成):

scala> val scaledDataOnly_pruned = scaledDataOnly.select("features")
scaledDataOnly_pruned: org.apache.spark.sql.DataFrame = [features: vector]

scala> scaledDataOnly_pruned.show(5)
+--------------------+
|            features|
+--------------------+
|[-0.0948337274182...|
|[-0.0948337274182...|
|[-0.0948337274182...|
|[-0.0948337274182...|
|[-0.0948337274182...|
+--------------------+
Run Code Online (Sandbox Code Playgroud)

直接转换为RDD会生成org.apache.spark.rdd.RDD [org.apache.spark.sql.Row]的实例:

scala> val scaledDataOnly_rdd = scaledDataOnly_pruned.rdd
scaledDataOnly_rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[32] at rdd at <console>:66
Run Code Online (Sandbox Code Playgroud)

有谁知道如何将此DF转换为org.apache.spark.rdd.RDD [org.apache.spark.mllib.linalg.Vector]的实例?到目前为止,我的各种尝试都没有成功.

提前感谢您的任何指示!

scala apache-spark rdd spark-dataframe apache-spark-mllib

9
推荐指数
2
解决办法
1万
查看次数

将 sparkdataframe 写入 S3 中的 .csv 文件并在 pyspark 中选择一个名称

我有一个数据框,ai 将在 S3 中将其写入一个 .csv 文件,我使用以下代码:

df.coalesce(1).write.csv("dbfs:/mnt/mount1/2016//product_profit_weekly",mode='overwrite',header=True)
Run Code Online (Sandbox Code Playgroud)

它在 product_profit_weekly 文件夹中放置了一个 .csv 文件,目前 .csv 文件在 S3 中有一个奇怪的名称,是否可以在我要写的时候选择一个文件名?

amazon-s3 apache-spark apache-spark-sql spark-dataframe pyspark-sql

9
推荐指数
1
解决办法
2万
查看次数

如何交叉加入2个数据帧?

我正在努力获得2个数据帧的CROSS JOIN.我正在使用spark 2.0.如何用2个数据帧实现CROSSS JOIN.

编辑:

val df=df.join(df_t1, df("Col1")===df_t1("col")).join(df2,joinType=="cross join").where(df("col2")===df2("col2"))
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql spark-dataframe

9
推荐指数
3
解决办法
2万
查看次数

ID匹配时,在其他Pyspark Dataframe中逐列分割Pyspark Dataframe

我有一个PySpark DataFrame,df1,看起来像:

CustomerID  CustomerValue
12          .17
14          .15
14          .25
17          .50
17          .01
17          .35
Run Code Online (Sandbox Code Playgroud)

我有第二个PySpark DataFrame,df2,它是由CustomerID分组并由sum函数聚合的df1.它看起来像这样:

 CustomerID  CustomerValueSum
 12          .17
 14          .40
 17          .86
Run Code Online (Sandbox Code Playgroud)

我想为df1添加第三列,即df1 ['CustomerValue']除以df2 ['CustomerValueSum'],用于相同的CustomerID.这看起来像:

CustomerID  CustomerValue  NormalizedCustomerValue
12          .17            1.00
14          .15            .38
14          .25            .62
17          .50            .58
17          .01            .01
17          .35            .41
Run Code Online (Sandbox Code Playgroud)

换句话说,我正在尝试将此Python/Pandas代码转换为PySpark:

normalized_list = []
for idx, row in df1.iterrows():
    (
        normalized_list
        .append(
            row.CustomerValue / df2[df2.CustomerID == row.CustomerID].CustomerValueSum
        )
    )
df1['NormalizedCustomerValue'] = [val.values[0] for val in normalized_list]
Run Code Online (Sandbox Code Playgroud)

我怎样才能做到这一点?

python pyspark spark-dataframe

9
推荐指数
2
解决办法
2万
查看次数

PySpark: org.apache.spark.sql.AnalysisException: 属性名称 ... 在 " ,;{}()\n\t=" 中包含无效字符。请使用别名重命名

我正在尝试将 Parquet 数据加载到 中PySpark,其中列的名称中有一个空格:

df = spark.read.parquet('my_parquet_dump')
df.select(df['Foo Bar'].alias('foobar'))
Run Code Online (Sandbox Code Playgroud)

尽管我已经别名列,我还是从收到此错误和错误传播JVM的一侧PySpark。我在下面附上了堆栈跟踪。

有没有办法可以将这个镶木地板文件加载到PySpark.

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/usr/local/python/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

/usr/local/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:

Py4JJavaError: An error occurred while calling o864.collectToPython.
: org.apache.spark.sql.AnalysisException: Attribute name "Foo Bar" contains invalid character(s) among …
Run Code Online (Sandbox Code Playgroud)

python apache-spark parquet pyspark spark-dataframe

9
推荐指数
1
解决办法
3万
查看次数

spark sql substring函数有什么问题?

这应该不需要解释.但有人可能会描述子串的pos参数背后的逻辑,因为我无法理解这一点(使用Spark 2.1):

scala> val df = Seq("abcdef").toDS()
df: org.apache.spark.sql.Dataset[String] = [value: string]

scala> df.show
+------+
| value|
+------+
|abcdef|
+------+

scala> df.selectExpr("substring(value, 0, 2)", "substring(value, 1, 2)", "substring(value, 2,2)", "substring(value, 3,2)").show
+----------------------+----------------------+----------------------+----------------------+
|substring(value, 0, 2)|substring(value, 1, 2)|substring(value, 2, 2)|substring(value, 3, 2)|
+----------------------+----------------------+----------------------+----------------------+
|                    ab|                    ab|                    bc|                    cd|
+----------------------+----------------------+----------------------+----------------------+
Run Code Online (Sandbox Code Playgroud)

apache-spark-sql spark-dataframe

9
推荐指数
1
解决办法
8904
查看次数

pyspark -- 对 Array(Integer()) 类型的列中的值求和的最佳方法

可以说这是我的数据框...

name | scores
Dan  |  [10,5,2,12]
Ann  |  [ 12,3,5]
Jon  |  [ ] 
Run Code Online (Sandbox Code Playgroud)

所需的输出类似于

name | scores         | Total
Dan  |  [10,5,2,12]   | 29
Ann  |   [ 12,3,5]    | 20
Jon  |  [ ]           | 0
Run Code Online (Sandbox Code Playgroud)

我按照......制作了一个UDF

sum_cols = udf(lambda arr: if arr == [] then 0 else __builtins__.sum(arr),IntegerType())

df.withColumn('Total', sum_cols(col('scores'))).show()
Run Code Online (Sandbox Code Playgroud)

但是,我了解到 UDF 相对于纯 pySpark 函数来说相对较慢。

有没有办法在没有 UDF 的情况下在 pySpark 中执行上面的代码?

apache-spark apache-spark-sql pyspark spark-dataframe

9
推荐指数
2
解决办法
8155
查看次数

TypeError:列不可迭代-如何遍历ArrayType()?

考虑以下DataFrame:

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[john, sam, jane]      |
|pet   |[whiskers, rover, fido]|
+------+-----------------------+
Run Code Online (Sandbox Code Playgroud)

可以使用以下代码创建:

import pyspark.sql.functions as f
data = [
    ('person', ['john', 'sam', 'jane']),
    ('pet', ['whiskers', 'rover', 'fido'])
]

df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)
Run Code Online (Sandbox Code Playgroud)

有没有一种方法可以通过对每个元素应用函数而不使用?来直接修改ArrayType()列?"names"udf

例如,假设我想将该函数foo应用于"names"列。(我将使用其中的例子foostr.upper只用于说明目的,但我的问题是关于可以应用到一个可迭代的元素任何有效的功能。)

foo = lambda x: x.upper()  # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()
Run Code Online (Sandbox Code Playgroud)

TypeError:列不可迭代

我可以使用udf

foo_udf = f.udf(lambda row: [foo(x) …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark spark-dataframe pyspark-sql

9
推荐指数
1
解决办法
4438
查看次数