小编RK.*_*RK.的帖子

如何将 PySpark 数据帧写入 DynamoDB 表?

如何将 PySpark 数据帧写入 DynamoDB 表?没有找到太多这方面的信息。根据我的要求,我必须将 PySpark 数据帧写入 Dynamo 数据库表。总的来说,我需要从 PySpark 代码读取/写入发电机。

提前致谢。

amazon-dynamodb pyspark

4
推荐指数
1
解决办法
2万
查看次数

如何比较来自 PySpark 数据帧的记录

我想比较 2 个数据框,我想根据以下 3 个条件提取记录。

  1. 如果记录匹配,则“SAME”应出现在新列 FLAG 中。
  2. 如果记录不匹配,如果它来自 df1(假设为 No.66),则 'DF1' 应出现在 FLAG 列中。
  3. 如果记录不匹配,如果它来自 df2(假设为 No.77),则 'DF2' 应出现在 FLAG 列中。这里需要考虑和验证整个RECORD。记录明智的比较。
    此外,我需要使用 PySpark 代码检查数百万条记录。

df1:

No,Name,Sal,Address,Dept,Join_Date
11,Sam,1000,ind,IT,2/11/2019
22,Tom,2000,usa,HR,2/11/2019
33,Kom,3500,uk,IT,2/11/2019
44,Nom,4000,can,HR,2/11/2019
55,Vom,5000,mex,IT,2/11/2019
66,XYZ,5000,mex,IT,2/11/2019
Run Code Online (Sandbox Code Playgroud)

df2:

No,Name,Sal,Address,Dept,Join_Date
11,Sam,1000,ind,IT,2/11/2019
22,Tom,2000,usa,HR,2/11/2019
33,Kom,3000,uk,IT,2/11/2019
44,Nom,4000,can,HR,2/11/2019
55,Xom,5000,mex,IT,2/11/2019
77,XYZ,5000,mex,IT,2/11/2019
Run Code Online (Sandbox Code Playgroud)

预期输出:

No,Name,Sal,Address,Dept,Join_Date,FLAG
11,Sam,1000,ind,IT,2/11/2019,SAME
22,Tom,2000,usa,HR,2/11/2019,SAME
33,Kom,3500,uk,IT,2/11/2019,DF1
33,Kom,3000,uk,IT,2/11/2019,DF2
44,Nom,4000,can,HR,2/11/2019,SAME
55,Vom,5000,mex,IT,2/11/2019,DF1
55,Xom,5000,mex,IT,2/11/2019,DF2
66,XYZ,5000,mex,IT,2/11/2019,DF1
77,XYZ,5000,mex,IT,2/11/2019,DF2
Run Code Online (Sandbox Code Playgroud)

我加载了如下所示的输入数据,但不知道如何继续。

df1 = pd.read_csv("D:\\inputs\\file1.csv")

df2 = pd.read_csv("D:\\inputs\\file2.csv")
Run Code Online (Sandbox Code Playgroud)

任何帮助表示赞赏。谢谢。

python-3.x apache-spark-sql pyspark

3
推荐指数
1
解决办法
4765
查看次数

类型错误:“DataFrame”对象不可调用 - Spark 数据框

我在执行上述“join”语句时遇到以下错误。我正在使用 pyspark 设置。join 语句或代码中所需的任何更改。

类型错误:“DataFrame”对象不可调用

df11 = spark.read.option("header","true").option("delimiter", ",").csv("s3://mybucket/file1.csv")
df22 = spark.read.option("header","true").option("delimiter", ",").csv("s3://mybucket/file2.csv")
df11.createOrReplaceTempView("table1")
df22.createOrReplaceTempView("table2")
df1 = spark.sql( "select * from table1" )
df2 = spark.sql( "select * from table2" )

df_d = df1.join(df2, df1.NO == df2.NO, 'left').filter(F.isnull(df2.NO)).select(df1.NO,df1.NAME,df1.LAT,df1.LONG, F.lit('DELETE').alias('FLAG'))
Run Code Online (Sandbox Code Playgroud)

谢谢

python typeerror dataframe apache-spark pyspark

3
推荐指数
1
解决办法
9490
查看次数

如何将 csv/txt 文件加载到 AWS Glue 作业中

我对 AWS Glue 有以下 2 个说明,请您澄清一下。因为我需要在我的项目中使用胶水。

  1. 我想将 csv/txt 文件加载到 Glue 作业中进行处理。(就像我们在 Spark 中使用数据帧所做的那样)。这在胶水中可能吗?或者我们是否必须只使用 Crawler 将数据抓取到 Glue 表中并像下面一样使用它们进行进一步处理?

    empdf = glueContext.create_dynamic_frame.from_catalog(
        database="emp",
        table_name="emp_json")
    
    Run Code Online (Sandbox Code Playgroud)
  2. 下面我使用 Spark 代码将文件加载到 Glue 中,但我收到了冗长的错误日志。我们可以直接运行 Spark 或 PySpark 代码而无需对 Glue 进行任何更改吗?

    import sys
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    dfnew = spark.read.option("header","true").option("delimiter", ",").csv("C:\inputs\TEST.txt")
    dfnew.show(2)
    
    Run Code Online (Sandbox Code Playgroud)

pyspark aws-glue

2
推荐指数
2
解决办法
1万
查看次数

我可以使用 Boto3 的 DynamoDb BatchWrite 插入多少条记录

通过使用 Boto3 的批量插入,我们最多可以插入多少条记录到 Dynamodb 的表中。假设我正在从大小为 6GB 的 S3 存储桶中读取我的输入 json。

并且在批量插入时会导致任何性能问题。任何示例都有帮助。我刚刚开始研究这个,根据我的发现,我会在这里更新。

提前致谢。

python-3.x amazon-dynamodb boto3

2
推荐指数
2
解决办法
4201
查看次数