标签: pyspark

在Spark中将简单的一行字符串转换为RDD

我有一个简单的路线:

line = "Hello, world"
Run Code Online (Sandbox Code Playgroud)

我想将它转换为只有一个元素的RDD.我试过了

sc.parallelize(line)
Run Code Online (Sandbox Code Playgroud)

但它得到:

sc.parallelize(line).collect()
['H', 'e', 'l', 'l', 'o', ',', ' ', 'w', 'o', 'r', 'l', 'd']
Run Code Online (Sandbox Code Playgroud)

有任何想法吗?

python distributed-computing apache-spark rdd pyspark

23
推荐指数
1
解决办法
4万
查看次数

通过Spark本地读取S3文件(或更好:pyspark)

我想通过Spark(pyspark,真的)从我的(本地)机器读取一个S3文件.现在,我不断收到身份验证错误

java.lang.IllegalArgumentException:必须将AWS Access Key ID和Secret Access Key指定为s3n URL的用户名或密码,或者分别设置fs.s3n.awsAccessKeyId或fs.s3n.awsSecretAccessKey属性.

我在这里和网上到处寻找,尝试了很多东西,但显然S3在过去一年或几个月里一直在变化,所有方法都失败了但是一个:

pyspark.SparkContext().textFile("s3n://user:password@bucket/key")
Run Code Online (Sandbox Code Playgroud)

(注意s3n[ s3不起作用]).现在,我不想使用带有用户和密码的URL,因为它们可以出现在日志中,我也不知道如何从~/.aws/credentials文件中获取它们.

那么,我如何使用来自现在标准 ~/.aws/credentials文件的AWS凭证(或者更好地,pyspark)从S3本地读取(理想情况下,不将凭证复制到另一个配置文件)?

PS:我想os.environ["AWS_ACCESS_KEY_ID"] = …os.environ["AWS_SECRET_ACCESS_KEY"] = …,也没有工作.

PPS:我不知道在哪里"设置fs.s3n.awsAccessKeyId或fs.s3n.awsSecretAccessKey属性"(Google没有提出任何建议).不过,我也尝试设置这些方法很多:SparkContext.setSystemProperty(),sc.setLocalProperty(),和conf = SparkConf(); conf.set(…); conf.set(…); sc = SparkContext(conf=conf).没有任何效果.

authentication credentials amazon-s3 apache-spark pyspark

23
推荐指数
2
解决办法
3万
查看次数

在Spark 1.6 Dataframe上的其他字段中获取每个组的不同元素

我正在尝试按星期在Spark数据框中进行分组,并为每个组计算一列的唯一值:

test.json
{"name":"Yin", "address":1111111, "date":20151122045510}
{"name":"Yin", "address":1111111, "date":20151122045501}
{"name":"Yln", "address":1111111, "date":20151122045500}
{"name":"Yun", "address":1111112, "date":20151122065832}
{"name":"Yan", "address":1111113, "date":20160101003221}
{"name":"Yin", "address":1111111, "date":20160703045231}
{"name":"Yin", "address":1111114, "date":20150419134543}
{"name":"Yen", "address":1111115, "date":20151123174302}
Run Code Online (Sandbox Code Playgroud)

和代码:

import pyspark.sql.funcions as func
from pyspark.sql.types import TimestampType
from datetime import datetime

df_y = sqlContext.read.json("/user/test.json")
udf_dt = func.udf(lambda x: datetime.strptime(x, '%Y%m%d%H%M%S'), TimestampType())
df = df_y.withColumn('datetime', udf_dt(df_y.date))
df_g = df_y.groupby(func.hour(df_y.date))    
df_g.count().distinct().show()
Run Code Online (Sandbox Code Playgroud)

pyspark的结果是

df_y.groupby(df_y.name).count().distinct().show()
+----+-----+
|name|count|
+----+-----+
| Yan|    1|
| Yun|    1|
| Yin|    4|
| Yen|    1|
| Yln|    1|
+----+-----+
Run Code Online (Sandbox Code Playgroud)

而我对大熊猫的期待是这样的:

df …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

23
推荐指数
2
解决办法
3万
查看次数

调用map后调用POSpark EOFError

我是spark和pyspark的新手.

我正在将一个小的csv文件(~40k)读入数据帧.

from pyspark.sql import functions as F
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/tmp/sm.csv')
df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0))
df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF()
Run Code Online (Sandbox Code Playgroud)

我得到一些奇怪的错误,每次都不会发生,但确实经常发生

>>> df2.show(1)
+--------------------+---------+
|            features|    label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row

>>> df2.count()
41999                                                                           
>>> df2.show(1)
+--------------------+---------+
|            features|    label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row

>>> df2.count()
41999                                                                           
>>> df2.show(1)
Traceback (most recent call last):
  File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager
  File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker    
  File …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

23
推荐指数
1
解决办法
2743
查看次数

如何在PySpark中运行脚本

我正在尝试在pyspark环境中运行脚本,但到目前为止我还没有能够.我如何在pyspark中运行像python script.py这样的脚本?谢谢

python apache-spark pyspark

23
推荐指数
4
解决办法
4万
查看次数

Pyspark按另一个数据帧的列过滤数据帧

我不知道为什么我会遇到困难,看起来很简单,因为在R或熊猫中相当容易.我想避免使用pandas,因为我正在处理大量数据,我相信toPandas()所有数据都会加载到pyspark中的驱动程序内存中.

我有2个数据帧:df1df2.我想过滤df1(删除所有行)df1.userid = df2.useridAND df1.group = df2.group.我不知道我是否应该使用filter(),join()sql 例如:

df1:
+------+----------+--------------------+
|userid|   group  |      all_picks     |
+------+----------+--------------------+
|   348|         2|[225, 2235, 2225]   |
|   567|         1|[1110, 1150]        |
|   595|         1|[1150, 1150, 1150]  |
|   580|         2|[2240, 2225]        |
|   448|         1|[1130]              |
+------+----------+--------------------+

df2:
+------+----------+---------+
|userid|   group  |   pick  |
+------+----------+---------+
|   348|         2|     2270|
|   595|         1|     2125|
+------+----------+---------+

Result I want:
+------+----------+--------------------+ …
Run Code Online (Sandbox Code Playgroud)

dataframe python-2.7 apache-spark apache-spark-sql pyspark

23
推荐指数
1
解决办法
1万
查看次数

Python/pyspark数据框重新排列列

我在python/pyspark中有一个带有列的数据框id time city zip等等......

现在我name在这个数据框中添加了一个新列.

现在,我必须以这样的方式排列列,以便name列出来id

我在下面做了

change_cols = ['id', 'name']

cols = ([col for col in change_cols if col in df] 
        + [col for col in df if col not in change_cols])

df = df[cols]
Run Code Online (Sandbox Code Playgroud)

我收到了这个错误

pyspark.sql.utils.AnalysisException: u"Reference 'id' is ambiguous, could be: id#609, id#1224.;"
Run Code Online (Sandbox Code Playgroud)

为什么会出现此错误.我怎样才能纠正这个问题.

python pyspark spark-dataframe

23
推荐指数
2
解决办法
2万
查看次数

在从其他列派生的数据框中添加新列(Spark)

我正在使用Spark 1.3.0和Python.我有一个数据框,我希望添加一个从其他列派生的附加列.像这样,

>>old_df.columns
[col_1, col_2, ..., col_m]

>>new_df.columns
[col_1, col_2, ..., col_m, col_n]
Run Code Online (Sandbox Code Playgroud)

哪里

col_n = col_3 - col_4
Run Code Online (Sandbox Code Playgroud)

我如何在PySpark中执行此操作?

python apache-spark apache-spark-sql pyspark

22
推荐指数
3
解决办法
3万
查看次数

使用SparkSQL HiveContext"INSERT INTO ..."

我正在尝试使用我的HiveContext运行一个insert语句,如下所示:

hiveContext.sql('insert into my_table (id, score) values (1, 10)')
Run Code Online (Sandbox Code Playgroud)

1.5.2星火SQL文件没有明确说明这是否是支持或不,虽然它不支持"动态分区插入".

这导致堆栈跟踪像

AnalysisException: 
Unsupported language features in query: insert into my_table (id, score) values (1, 10)
TOK_QUERY 0, 0,20, 0
  TOK_FROM 0, -1,20, 0
    TOK_VIRTUAL_TABLE 0, -1,20, 0
      TOK_VIRTUAL_TABREF 0, -1,-1, 0
        TOK_ANONYMOUS 0, -1,-1, 0
      TOK_VALUES_TABLE 1, 13,20, 41
        TOK_VALUE_ROW 1, 15,20, 41
          1 1, 16,16, 41
          10 1, 19,19, 44
  TOK_INSERT 1, 0,-1, 12
    TOK_INSERT_INTO 1, 0,11, 12
      TOK_TAB 1, 4,4, 12
        TOK_TABNAME 1, 4,4, 12
          my_table …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark apache-spark-1.5 hivecontext

22
推荐指数
3
解决办法
5万
查看次数

具有复杂条件的Spark SQL窗口函数

这可能是最容易通过示例解释的.假设我有一个用户登录网站的DataFrame,例如:

scala> df.show(5)
+----------------+----------+
|       user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
|  OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)

我想在此列添加一个列,指示他们何时成为网站上的活跃用户.但有一点需要注意:有一段时间用户被认为是活动的,在此期间之后,如果他们再次登录,他们的became_active日期会重置.假设这段时间是5天.然后从上表派生的所需表将是这样的:

+----------------+----------+-------------+
|       user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-04|   2012-01-04|
|Booooooo99900098|2012-01-06|   2012-01-04|
|  OprahWinfreyJr|2012-01-10|   2012-01-10|
|SirChillingtonIV|2012-01-11|   2012-01-11|
+----------------+----------+-------------+
Run Code Online (Sandbox Code Playgroud)

因此,特别是,SirChillingtonIV的became_active日期被重置,因为他们的第二次登录是在活动期过期之后,但是Booooooo99900098的became_active日期没有在他/她登录的第二次重置,因为它落在活动期间.

我最初的想法是使用窗口函数lag,然后使用lagged值填充became_active列; 例如,大致类似于:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))
Run Code Online (Sandbox Code Playgroud)

然后,规则填写became_active日期会是这样,如果tmpnull(即,如果它是第一次登录),或者如果login_date - tmp >= 5再 …

sql window-functions apache-spark apache-spark-sql pyspark

22
推荐指数
2
解决办法
2万
查看次数