问:在PySpark中,有没有办法合并两个数据帧或将数据帧的一列复制到另一个?
例如,我有两个Dataframe:
DF1
C1 C2
23397414 20875.7353
5213970 20497.5582
41323308 20935.7956
123276113 18884.0477
76456078 18389.9269
Run Code Online (Sandbox Code Playgroud)
借调数据框
DF2
C3 C4
2008-02-04 262.00
2008-02-05 257.25
2008-02-06 262.75
2008-02-07 237.00
2008-02-08 231.00
Run Code Online (Sandbox Code Playgroud)
然后我想将DF2的C3添加到DF1,如下所示:
New DF
C1 C2 C3
23397414 20875.7353 2008-02-04
5213970 20497.5582 2008-02-05
41323308 20935.7956 2008-02-06
123276113 18884.0477 2008-02-07
76456078 18389.9269 2008-02-08
Run Code Online (Sandbox Code Playgroud)
我希望这个例子很清楚.
我有这样的数据帧:
+-----+--------------------+
|index| merged|
+-----+--------------------+
| 0|[[2.5, 2.4], [3.5...|
| 1|[[-1.0, -1.0], [-...|
| 2|[[-1.0, -1.0], [-...|
| 3|[[0.0, 0.0], [0.5...|
| 4|[[0.5, 0.5], [1.0...|
| 5|[[0.5, 0.5], [1.0...|
| 6|[[-1.0, -1.0], [0...|
| 7|[[0.0, 0.0], [0.5...|
| 8|[[0.5, 0.5], [1.0...|
+-----+--------------------+
Run Code Online (Sandbox Code Playgroud)
我想将合并的列分解为
+-----+-------+-------+
|index|Column1|Column2|
+-----+-------+-------+
| 0| 2.5| 2.4 |
| 1| 3.5| 0.5|
| 2| -1.0| -1.0|
| 3| -1.0| -1.0|
| 4| 0.0 | 0.0 |
| 5| 0.5| 0.74|
+-----+-------+-------+
Run Code Online (Sandbox Code Playgroud)
每个元组[[2.5,2.4],[3.5,0,5]]重新填充两列,知道2,5和3,5将存储在第1列中,而(2.4,0,5)将存储在第二列中
所以我尝试了这个
df= df.withColumn("merged", df["merged"].cast("array<array<float>>"))
df= …Run Code Online (Sandbox Code Playgroud) 我在数据帧中使用 Spark 1.6.2
我想转换这个数据帧
+---------+-------------+-----+-------+-------+-------+-------+--------+
|ID | P |index|xinf |xup |yinf |ysup | M |
+---------+-------------+-----+-------+-------+-------+-------+--------+
| 0|10279.9003906| 13| 0.3| 0.5| 2.5| 3.0|540928.0|
| 2|12024.2998047| 13| 0.3| 0.5| 2.5| 3.0|541278.0|
| 0|10748.7001953| 13| 0.3| 0.5| 2.5| 3.0|541243.0|
| 1| 10988.5| 13| 0.3| 0.5| 2.5| 3.0|540917.0|
+---------+-------------+-----+-------+-------+-------+-------+--------+
Run Code Online (Sandbox Code Playgroud)
到
+---------+-------------+-----+-------+-------+-------+-------+--------+
|Id | P |index|xinf |xup |yinf |ysup | M |
+---------+-------------+-----+-------+-------+-------+-------+--------+
| 0|10514.3002929| 13| 0.3| 0.5| 2.5| 3.0|540928.0,541243.0|
| 2|12024.2998047| 13| 0.3| 0.5| 2.5| 3.0|541278.0|
| 1| 10988.5| 13| 0.3| 0.5| 2.5| …Run Code Online (Sandbox Code Playgroud) 是否可以使api网关或lambda函数只能由特定的vpc访问.
我在亚马逊文档中搜索过,但我没有找到关于这个主题的任何内容.
先感谢您
我有一个火花scala区分大小写的问题.我想从postgres表中读取,其中包含一些字符(大写)但默认情况下spark将名称转换为小写,我收到错误
org.postgresql.util.PSQLException:错误:关系"textlogs"不存在
val opts = Map(
"url" -> "jdbc:postgresql://localhost:5433/sparkdb",
"dbtable" -> "TextLogs",
"user" -> "admin",
"password" -> "mypassword"
)
val df = spark
.read
.format("jdbc")
.options(opts)
.load
Run Code Online (Sandbox Code Playgroud)
有没有办法强制火花来尊重区分大小写?
我从lambda函数(无框架工作服务器)到AWS中的S3服务的getObject访问存在问题。这是我的代码示例:
import boto3
import csv
def hello(event, context):
s3 = boto3.resource('s3')
bucket = s3.Bucket('myBucket')
obj = bucket.Object(key='MOCK_DATA.csv')
response = obj.get()
lines = response['Body'].read().split()
body = []
for row in csv.DictReader(lines):
body.append(row)
return body
Run Code Online (Sandbox Code Playgroud)
然后在我的serverless.yml中,我将全部访问权限授予了lambda
iamRoleStatements:
- Effect: "Allow"
Action:
- "s3:*"
Resource:
- "arn:aws:s3:::myBucket"
Run Code Online (Sandbox Code Playgroud)
但是当我运行代码时,我收到错误:
START RequestId: a6c006b7-21e5-11e8-8193-c3378825927 Version: $LATEST
An error occurred (AccessDenied) when calling the GetObject operation: Access Denied: ClientError
Traceback (most recent call last):
File "/var/task/handler.py", line 5, in hello
response = obj.get()
File "/var/runtime/boto3/resources/factory.py", line 520, …Run Code Online (Sandbox Code Playgroud) python amazon-s3 amazon-web-services boto3 serverless-framework
我不知道为什么收到这条消息
WARN KMeans: The input data is not directly cached, which may hurt performance if its parent RDDs are also uncached.
Run Code Online (Sandbox Code Playgroud)
当我尝试使用Spark时 KMeans
df_Part = assembler.transform(df_Part)
df_Part.cache()
while (k<=max_cluster) and (wssse > seuilStop):
kmeans = KMeans().setK(k)
model = kmeans.fit(df_Part)
wssse = model.computeCost(df_Part)
k=k+1
Run Code Online (Sandbox Code Playgroud)
它说我的输入(Dataframe)没有被缓存!
我试图打印df_Part.is_cached并收到True,这意味着我的数据帧被缓存了,那么为什么Spark仍然警告我这个呢?
apache-spark ×5
python ×4
pyspark ×3
amazon-s3 ×1
amazon-vpc ×1
aws-lambda ×1
boto3 ×1
group-by ×1
k-means ×1
mapreduce ×1
postgresql ×1
scala ×1