如何将 PySpark 数据帧写入 DynamoDB 表?没有找到太多这方面的信息。根据我的要求,我必须将 PySpark 数据帧写入 Dynamo 数据库表。总的来说,我需要从 PySpark 代码读取/写入发电机。
提前致谢。
我想比较 2 个数据框,我想根据以下 3 个条件提取记录。
df1:
No,Name,Sal,Address,Dept,Join_Date
11,Sam,1000,ind,IT,2/11/2019
22,Tom,2000,usa,HR,2/11/2019
33,Kom,3500,uk,IT,2/11/2019
44,Nom,4000,can,HR,2/11/2019
55,Vom,5000,mex,IT,2/11/2019
66,XYZ,5000,mex,IT,2/11/2019
Run Code Online (Sandbox Code Playgroud)
df2:
No,Name,Sal,Address,Dept,Join_Date
11,Sam,1000,ind,IT,2/11/2019
22,Tom,2000,usa,HR,2/11/2019
33,Kom,3000,uk,IT,2/11/2019
44,Nom,4000,can,HR,2/11/2019
55,Xom,5000,mex,IT,2/11/2019
77,XYZ,5000,mex,IT,2/11/2019
Run Code Online (Sandbox Code Playgroud)
预期输出:
No,Name,Sal,Address,Dept,Join_Date,FLAG
11,Sam,1000,ind,IT,2/11/2019,SAME
22,Tom,2000,usa,HR,2/11/2019,SAME
33,Kom,3500,uk,IT,2/11/2019,DF1
33,Kom,3000,uk,IT,2/11/2019,DF2
44,Nom,4000,can,HR,2/11/2019,SAME
55,Vom,5000,mex,IT,2/11/2019,DF1
55,Xom,5000,mex,IT,2/11/2019,DF2
66,XYZ,5000,mex,IT,2/11/2019,DF1
77,XYZ,5000,mex,IT,2/11/2019,DF2
Run Code Online (Sandbox Code Playgroud)
我加载了如下所示的输入数据,但不知道如何继续。
df1 = pd.read_csv("D:\\inputs\\file1.csv")
df2 = pd.read_csv("D:\\inputs\\file2.csv")
Run Code Online (Sandbox Code Playgroud)
任何帮助表示赞赏。谢谢。
我在执行上述“join”语句时遇到以下错误。我正在使用 pyspark 设置。join 语句或代码中所需的任何更改。
类型错误:“DataFrame”对象不可调用
df11 = spark.read.option("header","true").option("delimiter", ",").csv("s3://mybucket/file1.csv")
df22 = spark.read.option("header","true").option("delimiter", ",").csv("s3://mybucket/file2.csv")
df11.createOrReplaceTempView("table1")
df22.createOrReplaceTempView("table2")
df1 = spark.sql( "select * from table1" )
df2 = spark.sql( "select * from table2" )
df_d = df1.join(df2, df1.NO == df2.NO, 'left').filter(F.isnull(df2.NO)).select(df1.NO,df1.NAME,df1.LAT,df1.LONG, F.lit('DELETE').alias('FLAG'))
Run Code Online (Sandbox Code Playgroud)
谢谢
我对 AWS Glue 有以下 2 个说明,请您澄清一下。因为我需要在我的项目中使用胶水。
我想将 csv/txt 文件加载到 Glue 作业中进行处理。(就像我们在 Spark 中使用数据帧所做的那样)。这在胶水中可能吗?或者我们是否必须只使用 Crawler 将数据抓取到 Glue 表中并像下面一样使用它们进行进一步处理?
empdf = glueContext.create_dynamic_frame.from_catalog(
database="emp",
table_name="emp_json")
Run Code Online (Sandbox Code Playgroud)下面我使用 Spark 代码将文件加载到 Glue 中,但我收到了冗长的错误日志。我们可以直接运行 Spark 或 PySpark 代码而无需对 Glue 进行任何更改吗?
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
dfnew = spark.read.option("header","true").option("delimiter", ",").csv("C:\inputs\TEST.txt")
dfnew.show(2)
Run Code Online (Sandbox Code Playgroud)通过使用 Boto3 的批量插入,我们最多可以插入多少条记录到 Dynamodb 的表中。假设我正在从大小为 6GB 的 S3 存储桶中读取我的输入 json。
并且在批量插入时会导致任何性能问题。任何示例都有帮助。我刚刚开始研究这个,根据我的发现,我会在这里更新。
提前致谢。
pyspark ×4
python-3.x ×2
apache-spark ×1
aws-glue ×1
boto3 ×1
dataframe ×1
python ×1
typeerror ×1