小编MrG*_*rts的帖子

Spark:通过在两个数据帧上添加行索引/数字来合并2个数据帧

问:在PySpark中,有没有办法合并两个数据帧或将数据帧的一列复制到另一个?

例如,我有两个Dataframe:

DF1              
C1                    C2                                                        
23397414             20875.7353   
5213970              20497.5582   
41323308             20935.7956   
123276113            18884.0477   
76456078             18389.9269 
Run Code Online (Sandbox Code Playgroud)

借调数据框

DF2
C3                       C4
2008-02-04               262.00                 
2008-02-05               257.25                 
2008-02-06               262.75                 
2008-02-07               237.00                 
2008-02-08               231.00 
Run Code Online (Sandbox Code Playgroud)

然后我想将DF2的C3添加到DF1,如下所示:

New DF              
    C1                    C2          C3                                              
    23397414             20875.7353   2008-02-04
    5213970              20497.5582   2008-02-05
    41323308             20935.7956   2008-02-06
    123276113            18884.0477   2008-02-07
    76456078             18389.9269   2008-02-08
Run Code Online (Sandbox Code Playgroud)

我希望这个例子很清楚.

apache-spark apache-spark-sql pyspark

12
推荐指数
3
解决办法
2万
查看次数

爆炸数组 - (Dataframe)pySpark

我有这样的数据帧:

  +-----+--------------------+
|index|              merged|
+-----+--------------------+
|    0|[[2.5, 2.4], [3.5...|
|    1|[[-1.0, -1.0], [-...|
|    2|[[-1.0, -1.0], [-...|
|    3|[[0.0, 0.0], [0.5...|
|    4|[[0.5, 0.5], [1.0...|
|    5|[[0.5, 0.5], [1.0...|
|    6|[[-1.0, -1.0], [0...|
|    7|[[0.0, 0.0], [0.5...|
|    8|[[0.5, 0.5], [1.0...|
+-----+--------------------+
Run Code Online (Sandbox Code Playgroud)

我想将合并的列分解为

+-----+-------+-------+
|index|Column1|Column2|
+-----+-------+-------+
|    0|    2.5|   2.4 |
|    1|    3.5|    0.5|
|    2|   -1.0|   -1.0|
|    3|   -1.0|   -1.0|
|    4|   0.0 |   0.0 |
|    5|    0.5|   0.74|
+-----+-------+-------+
Run Code Online (Sandbox Code Playgroud)

每个元组[[2.5,2.4],[3.5,0,5]]重新填充两列,知道2,5和3,5将存储在第1列中,而(2.4,0,5)将存储在第二列中

所以我尝试了这个

df= df.withColumn("merged", df["merged"].cast("array<array<float>>"))
df= …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark spark-dataframe

5
推荐指数
1
解决办法
1913
查看次数

聚合数据帧pyspark

我在数据帧中使用 Spark 1.6.2

我想转换这个数据帧

+---------+-------------+-----+-------+-------+-------+-------+--------+
|ID       |           P |index|xinf   |xup    |yinf   |ysup   |     M  |
+---------+-------------+-----+-------+-------+-------+-------+--------+
|        0|10279.9003906|   13|    0.3|    0.5|    2.5|    3.0|540928.0|
|        2|12024.2998047|   13|    0.3|    0.5|    2.5|    3.0|541278.0|
|        0|10748.7001953|   13|    0.3|    0.5|    2.5|    3.0|541243.0|
|        1|      10988.5|   13|    0.3|    0.5|    2.5|    3.0|540917.0|
+---------+-------------+-----+-------+-------+-------+-------+--------+
Run Code Online (Sandbox Code Playgroud)

+---------+-------------+-----+-------+-------+-------+-------+--------+
|Id       |           P |index|xinf   |xup    |yinf   |ysup   |     M  |
+---------+-------------+-----+-------+-------+-------+-------+--------+
|        0|10514.3002929|   13|    0.3|    0.5|    2.5|    3.0|540928.0,541243.0|
|        2|12024.2998047|   13|    0.3|    0.5|    2.5|    3.0|541278.0|
|        1|      10988.5|   13|    0.3|    0.5|    2.5| …
Run Code Online (Sandbox Code Playgroud)

python group-by mapreduce apache-spark

4
推荐指数
1
解决办法
6236
查看次数

限制对特定vpc的lambda或api网关的访问

是否可以使api网关lambda函数只能由特定的vpc访问.

我在亚马逊文档中搜索过,但我没有找到关于这个主题的任何内容.

先感谢您

amazon-web-services amazon-vpc aws-lambda aws-api-gateway

4
推荐指数
1
解决办法
1110
查看次数

Spark不尊重表格的区分大小写

我有一个火花scala区分大小写的问题.我想从postgres表中读取,其中包含一些字符(大写)但默认情况下spark将名称转换为小写,我收到错误

org.postgresql.util.PSQLException:错误:关系"textlogs"不存在

val opts = Map(
  "url" -> "jdbc:postgresql://localhost:5433/sparkdb",
  "dbtable" -> "TextLogs",
  "user" -> "admin",
  "password" -> "mypassword"
)
val df = spark
           .read
           .format("jdbc")
           .options(opts)
           .load
Run Code Online (Sandbox Code Playgroud)

有没有办法强制火花来尊重区分大小写?

postgresql scala apache-spark apache-spark-sql

4
推荐指数
1
解决办法
630
查看次数

从S3中读取的无服务器AWS(Python):访问被拒绝

我从lambda函数(无框架工作服务器)到AWS中的S3服务的getObject访问存在问题。这是我的代码示例:

import boto3
import csv
def hello(event, context):
    s3 = boto3.resource('s3')
    bucket = s3.Bucket('myBucket')
    obj = bucket.Object(key='MOCK_DATA.csv')
    response = obj.get()
    lines = response['Body'].read().split()
    body = []
    for row in csv.DictReader(lines):
        body.append(row)
    return body
Run Code Online (Sandbox Code Playgroud)

然后在我的serverless.yml中,我将全部访问权限授予了lambda

  iamRoleStatements:
   - Effect: "Allow"
     Action:
       - "s3:*"
     Resource:
         - "arn:aws:s3:::myBucket"
Run Code Online (Sandbox Code Playgroud)

但是当我运行代码时,我收到错误:

START RequestId: a6c006b7-21e5-11e8-8193-c3378825927 Version: $LATEST
An error occurred (AccessDenied) when calling the GetObject operation: Access Denied: ClientError
Traceback (most recent call last):
  File "/var/task/handler.py", line 5, in hello
    response = obj.get()
  File "/var/runtime/boto3/resources/factory.py", line 520, …
Run Code Online (Sandbox Code Playgroud)

python amazon-s3 amazon-web-services boto3 serverless-framework

4
推荐指数
1
解决办法
1234
查看次数

PySpark 2:KMeans输入数据不直接缓存

我不知道为什么收到这条消息

WARN KMeans: The input data is not directly cached, which may hurt performance if its parent RDDs are also uncached.
Run Code Online (Sandbox Code Playgroud)

当我尝试使用Spark时 KMeans

df_Part = assembler.transform(df_Part)    
df_Part.cache()
while (k<=max_cluster) and (wssse > seuilStop):
                    kmeans = KMeans().setK(k)
                    model = kmeans.fit(df_Part)
                    wssse = model.computeCost(df_Part)
                    k=k+1
Run Code Online (Sandbox Code Playgroud)

它说我的输入(Dataframe)没有被缓存!

我试图打印df_Part.is_cached并收到True,这意味着我的数据帧被缓存了,那么为什么Spark仍然警告我这个呢?

python k-means apache-spark apache-spark-sql pyspark

3
推荐指数
1
解决办法
1128
查看次数