我有一个简单的路线:
line = "Hello, world"
Run Code Online (Sandbox Code Playgroud)
我想将它转换为只有一个元素的RDD.我试过了
sc.parallelize(line)
Run Code Online (Sandbox Code Playgroud)
但它得到:
sc.parallelize(line).collect()
['H', 'e', 'l', 'l', 'o', ',', ' ', 'w', 'o', 'r', 'l', 'd']
Run Code Online (Sandbox Code Playgroud)
有任何想法吗?
我想通过Spark(pyspark,真的)从我的(本地)机器读取一个S3文件.现在,我不断收到身份验证错误
java.lang.IllegalArgumentException:必须将AWS Access Key ID和Secret Access Key指定为s3n URL的用户名或密码,或者分别设置fs.s3n.awsAccessKeyId或fs.s3n.awsSecretAccessKey属性.
我在这里和网上到处寻找,尝试了很多东西,但显然S3在过去一年或几个月里一直在变化,所有方法都失败了但是一个:
pyspark.SparkContext().textFile("s3n://user:password@bucket/key")
Run Code Online (Sandbox Code Playgroud)
(注意s3n[ s3不起作用]).现在,我不想使用带有用户和密码的URL,因为它们可以出现在日志中,我也不知道如何从~/.aws/credentials文件中获取它们.
那么,我如何使用来自现在标准 ~/.aws/credentials文件的AWS凭证(或者更好地,pyspark)从S3本地读取(理想情况下,不将凭证复制到另一个配置文件)?
PS:我想os.environ["AWS_ACCESS_KEY_ID"] = …和os.environ["AWS_SECRET_ACCESS_KEY"] = …,也没有工作.
PPS:我不知道在哪里"设置fs.s3n.awsAccessKeyId或fs.s3n.awsSecretAccessKey属性"(Google没有提出任何建议).不过,我也尝试设置这些方法很多:SparkContext.setSystemProperty(),sc.setLocalProperty(),和conf = SparkConf(); conf.set(…); conf.set(…); sc = SparkContext(conf=conf).没有任何效果.
我正在尝试按星期在Spark数据框中进行分组,并为每个组计算一列的唯一值:
test.json
{"name":"Yin", "address":1111111, "date":20151122045510}
{"name":"Yin", "address":1111111, "date":20151122045501}
{"name":"Yln", "address":1111111, "date":20151122045500}
{"name":"Yun", "address":1111112, "date":20151122065832}
{"name":"Yan", "address":1111113, "date":20160101003221}
{"name":"Yin", "address":1111111, "date":20160703045231}
{"name":"Yin", "address":1111114, "date":20150419134543}
{"name":"Yen", "address":1111115, "date":20151123174302}
Run Code Online (Sandbox Code Playgroud)
和代码:
import pyspark.sql.funcions as func
from pyspark.sql.types import TimestampType
from datetime import datetime
df_y = sqlContext.read.json("/user/test.json")
udf_dt = func.udf(lambda x: datetime.strptime(x, '%Y%m%d%H%M%S'), TimestampType())
df = df_y.withColumn('datetime', udf_dt(df_y.date))
df_g = df_y.groupby(func.hour(df_y.date))
df_g.count().distinct().show()
Run Code Online (Sandbox Code Playgroud)
pyspark的结果是
df_y.groupby(df_y.name).count().distinct().show()
+----+-----+
|name|count|
+----+-----+
| Yan| 1|
| Yun| 1|
| Yin| 4|
| Yen| 1|
| Yln| 1|
+----+-----+
Run Code Online (Sandbox Code Playgroud)
而我对大熊猫的期待是这样的:
df …Run Code Online (Sandbox Code Playgroud) 我是spark和pyspark的新手.
我正在将一个小的csv文件(~40k)读入数据帧.
from pyspark.sql import functions as F
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/tmp/sm.csv')
df = df.withColumn('verified', F.when(df['verified'] == 'Y', 1).otherwise(0))
df2 = df.map(lambda x: Row(label=float(x[0]), features=Vectors.dense(x[1:]))).toDF()
Run Code Online (Sandbox Code Playgroud)
我得到一些奇怪的错误,每次都不会发生,但确实经常发生
>>> df2.show(1)
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
>>> df2.count()
41999
>>> df2.show(1)
+--------------------+---------+
| features| label|
+--------------------+---------+
|[0.0,0.0,0.0,0.0,...|4700734.0|
+--------------------+---------+
only showing top 1 row
>>> df2.count()
41999
>>> df2.show(1)
Traceback (most recent call last):
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 157, in manager
File "spark-1.6.1/python/lib/pyspark.zip/pyspark/daemon.py", line 61, in worker
File …Run Code Online (Sandbox Code Playgroud) 我正在尝试在pyspark环境中运行脚本,但到目前为止我还没有能够.我如何在pyspark中运行像python script.py这样的脚本?谢谢
我不知道为什么我会遇到困难,看起来很简单,因为在R或熊猫中相当容易.我想避免使用pandas,因为我正在处理大量数据,我相信toPandas()所有数据都会加载到pyspark中的驱动程序内存中.
我有2个数据帧:df1和df2.我想过滤df1(删除所有行)df1.userid = df2.useridAND df1.group = df2.group.我不知道我是否应该使用filter(),join()或sql 例如:
df1:
+------+----------+--------------------+
|userid| group | all_picks |
+------+----------+--------------------+
| 348| 2|[225, 2235, 2225] |
| 567| 1|[1110, 1150] |
| 595| 1|[1150, 1150, 1150] |
| 580| 2|[2240, 2225] |
| 448| 1|[1130] |
+------+----------+--------------------+
df2:
+------+----------+---------+
|userid| group | pick |
+------+----------+---------+
| 348| 2| 2270|
| 595| 1| 2125|
+------+----------+---------+
Result I want:
+------+----------+--------------------+ …Run Code Online (Sandbox Code Playgroud) 我在python/pyspark中有一个带有列的数据框id time city zip等等......
现在我name在这个数据框中添加了一个新列.
现在,我必须以这样的方式排列列,以便name列出来id
我在下面做了
change_cols = ['id', 'name']
cols = ([col for col in change_cols if col in df]
+ [col for col in df if col not in change_cols])
df = df[cols]
Run Code Online (Sandbox Code Playgroud)
我收到了这个错误
pyspark.sql.utils.AnalysisException: u"Reference 'id' is ambiguous, could be: id#609, id#1224.;"
Run Code Online (Sandbox Code Playgroud)
为什么会出现此错误.我怎样才能纠正这个问题.
我正在使用Spark 1.3.0和Python.我有一个数据框,我希望添加一个从其他列派生的附加列.像这样,
>>old_df.columns
[col_1, col_2, ..., col_m]
>>new_df.columns
[col_1, col_2, ..., col_m, col_n]
Run Code Online (Sandbox Code Playgroud)
哪里
col_n = col_3 - col_4
Run Code Online (Sandbox Code Playgroud)
我如何在PySpark中执行此操作?
我正在尝试使用我的HiveContext运行一个insert语句,如下所示:
hiveContext.sql('insert into my_table (id, score) values (1, 10)')
Run Code Online (Sandbox Code Playgroud)
在1.5.2星火SQL文件没有明确说明这是否是支持或不,虽然它不支持"动态分区插入".
这导致堆栈跟踪像
AnalysisException:
Unsupported language features in query: insert into my_table (id, score) values (1, 10)
TOK_QUERY 0, 0,20, 0
TOK_FROM 0, -1,20, 0
TOK_VIRTUAL_TABLE 0, -1,20, 0
TOK_VIRTUAL_TABREF 0, -1,-1, 0
TOK_ANONYMOUS 0, -1,-1, 0
TOK_VALUES_TABLE 1, 13,20, 41
TOK_VALUE_ROW 1, 15,20, 41
1 1, 16,16, 41
10 1, 19,19, 44
TOK_INSERT 1, 0,-1, 12
TOK_INSERT_INTO 1, 0,11, 12
TOK_TAB 1, 4,4, 12
TOK_TABNAME 1, 4,4, 12
my_table …Run Code Online (Sandbox Code Playgroud) apache-spark apache-spark-sql pyspark apache-spark-1.5 hivecontext
这可能是最容易通过示例解释的.假设我有一个用户登录网站的DataFrame,例如:
scala> df.show(5)
+----------------+----------+
| user_name|login_date|
+----------------+----------+
|SirChillingtonIV|2012-01-04|
|Booooooo99900098|2012-01-04|
|Booooooo99900098|2012-01-06|
| OprahWinfreyJr|2012-01-10|
|SirChillingtonIV|2012-01-11|
+----------------+----------+
only showing top 5 rows
Run Code Online (Sandbox Code Playgroud)
我想在此列添加一个列,指示他们何时成为网站上的活跃用户.但有一点需要注意:有一段时间用户被认为是活动的,在此期间之后,如果他们再次登录,他们的became_active日期会重置.假设这段时间是5天.然后从上表派生的所需表将是这样的:
+----------------+----------+-------------+
| user_name|login_date|became_active|
+----------------+----------+-------------+
|SirChillingtonIV|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-04| 2012-01-04|
|Booooooo99900098|2012-01-06| 2012-01-04|
| OprahWinfreyJr|2012-01-10| 2012-01-10|
|SirChillingtonIV|2012-01-11| 2012-01-11|
+----------------+----------+-------------+
Run Code Online (Sandbox Code Playgroud)
因此,特别是,SirChillingtonIV的became_active日期被重置,因为他们的第二次登录是在活动期过期之后,但是Booooooo99900098的became_active日期没有在他/她登录的第二次重置,因为它落在活动期间.
我最初的想法是使用窗口函数lag,然后使用lagged值填充became_active列; 例如,大致类似于:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val window = Window.partitionBy("user_name").orderBy("login_date")
val df2 = df.withColumn("tmp", lag("login_date", 1).over(window))
Run Code Online (Sandbox Code Playgroud)
然后,规则填写became_active日期会是这样,如果tmp是null(即,如果它是第一次登录),或者如果login_date - tmp >= 5再 …
pyspark ×10
apache-spark ×9
python ×6
amazon-s3 ×1
credentials ×1
dataframe ×1
hivecontext ×1
python-2.7 ×1
rdd ×1
sql ×1