我是Apache Spark的新手.
我的工作是读取两个CSV文件,从中选择一些特定列,合并,聚合并将结果写入单个CSV文件.
例如,
name,age,deparment_id
Run Code Online (Sandbox Code Playgroud)
department_id,deparment_name,location
Run Code Online (Sandbox Code Playgroud)
name,age,deparment_name
Run Code Online (Sandbox Code Playgroud)
我正在将CSV加载到数据帧中.然后能够使用join,select,filter,drop数据帧中存在的几种方法获得第三个数据帧
我也可以用几个来做同样的事情 RDD.map()
我也可以通过执行hiveql使用来做同样的事情HiveContext
我想知道如果我的CSV文件很大,哪个是有效的方法?为什么?
我有一个DataFrame df有五列的Spark .我想添加另一列,其值为第一列和第二列的元组.当使用withColumn()方法时,我得到不匹配错误,因为输入不是列类型,而是(列,列).我想知道在这种情况下是否有一个解决方案旁边的行循环运行?
var dfCol=(col1:Column,col2:Column)=>(col1,col2)
val vv = df.withColumn( "NewColumn", dfCol( df(df.schema.fieldNames(1)) , df(df.schema.fieldNames(2)) ) )
Run Code Online (Sandbox Code Playgroud) 我对Spark和Scala相对较新.
我从以下数据帧开始(单个列由密集的双打矢量组成):
scala> val scaledDataOnly_pruned = scaledDataOnly.select("features")
scaledDataOnly_pruned: org.apache.spark.sql.DataFrame = [features: vector]
scala> scaledDataOnly_pruned.show(5)
+--------------------+
| features|
+--------------------+
|[-0.0948337274182...|
|[-0.0948337274182...|
|[-0.0948337274182...|
|[-0.0948337274182...|
|[-0.0948337274182...|
+--------------------+
Run Code Online (Sandbox Code Playgroud)
直接转换为RDD会生成org.apache.spark.rdd.RDD [org.apache.spark.sql.Row]的实例:
scala> val scaledDataOnly_rdd = scaledDataOnly_pruned.rdd
scaledDataOnly_rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[32] at rdd at <console>:66
Run Code Online (Sandbox Code Playgroud)
有谁知道如何将此DF转换为org.apache.spark.rdd.RDD [org.apache.spark.mllib.linalg.Vector]的实例?到目前为止,我的各种尝试都没有成功.
提前感谢您的任何指示!
我有一个数据框,ai 将在 S3 中将其写入一个 .csv 文件,我使用以下代码:
df.coalesce(1).write.csv("dbfs:/mnt/mount1/2016//product_profit_weekly",mode='overwrite',header=True)
Run Code Online (Sandbox Code Playgroud)
它在 product_profit_weekly 文件夹中放置了一个 .csv 文件,目前 .csv 文件在 S3 中有一个奇怪的名称,是否可以在我要写的时候选择一个文件名?
amazon-s3 apache-spark apache-spark-sql spark-dataframe pyspark-sql
我正在努力获得2个数据帧的CROSS JOIN.我正在使用spark 2.0.如何用2个数据帧实现CROSSS JOIN.
编辑:
val df=df.join(df_t1, df("Col1")===df_t1("col")).join(df2,joinType=="cross join").where(df("col2")===df2("col2"))
Run Code Online (Sandbox Code Playgroud) 我有一个PySpark DataFrame,df1,看起来像:
CustomerID CustomerValue
12 .17
14 .15
14 .25
17 .50
17 .01
17 .35
Run Code Online (Sandbox Code Playgroud)
我有第二个PySpark DataFrame,df2,它是由CustomerID分组并由sum函数聚合的df1.它看起来像这样:
CustomerID CustomerValueSum
12 .17
14 .40
17 .86
Run Code Online (Sandbox Code Playgroud)
我想为df1添加第三列,即df1 ['CustomerValue']除以df2 ['CustomerValueSum'],用于相同的CustomerID.这看起来像:
CustomerID CustomerValue NormalizedCustomerValue
12 .17 1.00
14 .15 .38
14 .25 .62
17 .50 .58
17 .01 .01
17 .35 .41
Run Code Online (Sandbox Code Playgroud)
换句话说,我正在尝试将此Python/Pandas代码转换为PySpark:
normalized_list = []
for idx, row in df1.iterrows():
(
normalized_list
.append(
row.CustomerValue / df2[df2.CustomerID == row.CustomerID].CustomerValueSum
)
)
df1['NormalizedCustomerValue'] = [val.values[0] for val in normalized_list]
Run Code Online (Sandbox Code Playgroud)
我怎样才能做到这一点?
我正在尝试将 Parquet 数据加载到 中PySpark,其中列的名称中有一个空格:
df = spark.read.parquet('my_parquet_dump')
df.select(df['Foo Bar'].alias('foobar'))
Run Code Online (Sandbox Code Playgroud)
尽管我已经别名列,我还是从收到此错误和错误传播JVM的一侧PySpark。我在下面附上了堆栈跟踪。
有没有办法可以将这个镶木地板文件加载到PySpark.
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/usr/local/python/pyspark/sql/utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
/usr/local/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
Py4JJavaError: An error occurred while calling o864.collectToPython.
: org.apache.spark.sql.AnalysisException: Attribute name "Foo Bar" contains invalid character(s) among …Run Code Online (Sandbox Code Playgroud) 这应该不需要解释.但有人可能会描述子串的pos参数背后的逻辑,因为我无法理解这一点(使用Spark 2.1):
scala> val df = Seq("abcdef").toDS()
df: org.apache.spark.sql.Dataset[String] = [value: string]
scala> df.show
+------+
| value|
+------+
|abcdef|
+------+
scala> df.selectExpr("substring(value, 0, 2)", "substring(value, 1, 2)", "substring(value, 2,2)", "substring(value, 3,2)").show
+----------------------+----------------------+----------------------+----------------------+
|substring(value, 0, 2)|substring(value, 1, 2)|substring(value, 2, 2)|substring(value, 3, 2)|
+----------------------+----------------------+----------------------+----------------------+
| ab| ab| bc| cd|
+----------------------+----------------------+----------------------+----------------------+
Run Code Online (Sandbox Code Playgroud) 可以说这是我的数据框...
name | scores
Dan | [10,5,2,12]
Ann | [ 12,3,5]
Jon | [ ]
Run Code Online (Sandbox Code Playgroud)
所需的输出类似于
name | scores | Total
Dan | [10,5,2,12] | 29
Ann | [ 12,3,5] | 20
Jon | [ ] | 0
Run Code Online (Sandbox Code Playgroud)
我按照......制作了一个UDF
sum_cols = udf(lambda arr: if arr == [] then 0 else __builtins__.sum(arr),IntegerType())
df.withColumn('Total', sum_cols(col('scores'))).show()
Run Code Online (Sandbox Code Playgroud)
但是,我了解到 UDF 相对于纯 pySpark 函数来说相对较慢。
有没有办法在没有 UDF 的情况下在 pySpark 中执行上面的代码?
考虑以下DataFrame:
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[john, sam, jane] |
|pet |[whiskers, rover, fido]|
+------+-----------------------+
Run Code Online (Sandbox Code Playgroud)
可以使用以下代码创建:
import pyspark.sql.functions as f
data = [
('person', ['john', 'sam', 'jane']),
('pet', ['whiskers', 'rover', 'fido'])
]
df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)
Run Code Online (Sandbox Code Playgroud)
有没有一种方法可以通过对每个元素应用函数而不使用?来直接修改ArrayType()列?"names"udf
例如,假设我想将该函数foo应用于"names"列。(我将使用其中的例子foo是str.upper只用于说明目的,但我的问题是关于可以应用到一个可迭代的元素任何有效的功能。)
foo = lambda x: x.upper() # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()
Run Code Online (Sandbox Code Playgroud)
TypeError:列不可迭代
我可以使用udf:
foo_udf = f.udf(lambda row: [foo(x) …Run Code Online (Sandbox Code Playgroud) spark-dataframe ×10
apache-spark ×7
pyspark ×4
pyspark-sql ×2
python ×2
scala ×2
amazon-s3 ×1
parquet ×1
rdd ×1