PyR*_*der 7 python apache-spark-sql pyspark
因此,我正在使用AWS Glue自动生成的代码从S3读取csv文件,并将其通过JDBC连接写入表中。看起来很简单,Job成功运行,没有任何错误,但是它什么也没写。当我检查Glue Spark动态框架时,它会显示所有行(使用.count())。但是,当对其执行.show()时,不会产生任何结果。
.printSchema()工作正常。尝试在使用.show()时记录错误,但没有错误或未打印任何内容。使用.toDF及其有效的show方法将DynamicFrame转换为数据帧。我以为文件有问题,试图缩小到某些列。但是即使文件中只有2列,也是一样。用双引号清楚标记字符串,仍然没有成功。
我们有需要从Glue配置中选择的诸如JDBC连接之类的东西。我猜常规的Spark数据框架无法做到。因此需要动态框架。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
datasource0 = glueContext.create_dynamic_frame.from_options('s3', {'paths': ['s3://bucket/file.csv']}, 'csv', format_options={'withHeader': True,'skipFirst': True,'quoteChar':'"','escaper':'\\'})
datasource0.printSchema()
datasource0.show(5)
Run Code Online (Sandbox Code Playgroud)
输出量
root
|-- ORDERID: string
|-- EVENTTIMEUTC: string
Run Code Online (Sandbox Code Playgroud)
这是转换为常规数据帧所产生的结果。
datasource0.toDF().show()
Run Code Online (Sandbox Code Playgroud)
输出量
+-------+-----------------+
|ORDERID| EVENTTIMEUTC|
+-------+-----------------+
| 2| "1/13/2018 7:50"|
| 3| "1/13/2018 7:50"|
| 4| "1/13/2018 7:50"|
| 5| "1/13/2018 7:50"|
| 6| "1/13/2018 8:52"|
| 7| "1/13/2018 8:52"|
| 8| "1/13/2018 8:53"|
| 9| "1/13/2018 8:53"|
| 10| "1/16/2018 1:33"|
| 11| "1/16/2018 2:28"|
| 12| "1/16/2018 2:37"|
| 13| "1/17/2018 1:17"|
| 14| "1/17/2018 2:23"|
| 15| "1/17/2018 4:33"|
| 16| "1/17/2018 6:28"|
| 17| "1/17/2018 6:28"|
| 18| "1/17/2018 6:36"|
| 19| "1/17/2018 6:38"|
| 20| "1/17/2018 7:26"|
| 21| "1/17/2018 7:28"|
+-------+-----------------+
only showing top 20 rows
Run Code Online (Sandbox Code Playgroud)
这是一些数据。
ORDERID, EVENTTIMEUTC
1, "1/13/2018 7:10"
2, "1/13/2018 7:50"
3, "1/13/2018 7:50"
4, "1/13/2018 7:50"
5, "1/13/2018 7:50"
6, "1/13/2018 8:52"
7, "1/13/2018 8:52"
8, "1/13/2018 8:53"
9, "1/13/2018 8:53"
10, "1/16/2018 1:33"
11, "1/16/2018 2:28"
12, "1/16/2018 2:37"
13, "1/17/2018 1:17"
14, "1/17/2018 2:23"
15, "1/17/2018 4:33"
16, "1/17/2018 6:28"
17, "1/17/2018 6:28"
18, "1/17/2018 6:36"
19, "1/17/2018 6:38"
20, "1/17/2018 7:26"
21, "1/17/2018 7:28"
22, "1/17/2018 7:29"
23, "1/17/2018 7:46"
24, "1/17/2018 7:51"
25, "1/18/2018 2:22"
26, "1/18/2018 5:48"
27, "1/18/2018 5:50"
28, "1/18/2018 5:50"
29, "1/18/2018 5:51"
30, "1/18/2018 5:53"
100, "1/18/2018 10:32"
101, "1/18/2018 10:33"
102, "1/18/2018 10:33"
103, "1/18/2018 10:42"
104, "1/18/2018 10:59"
105, "1/18/2018 11:16"
Run Code Online (Sandbox Code Playgroud)
阅读的文档DynamicFrame,他们对此并不是很明确,但在某些情况下,在DataFrame您调用之前不会处理底层信息toDF(),因此您实际上是在调用可能为空的.show()内容:
为了解决这些限制,AWS Glue 引入了 DynamicFrame。DynamicFrame 与 DataFrame 类似,不同之处在于每个记录都是自描述的,因此最初不需要模式。相反,AWS Glue 在需要时实时计算架构,并使用选择(或联合)类型显式编码架构不一致。您可以解决这些不一致问题,以使数据集与需要固定架构的数据存储兼容。
.toDF() :
通过将 DynamicRecords 转换为 DataFrame 字段,将 DynamicFrame 转换为 Apache Spark DataFrame。返回新的数据帧。
检查此处的代码,当您尝试打印时,似乎底层 Java 数据框可能为空:https://github.com/awslabs/aws-glue-libs/blob/f973095b9f2aa784cbcc87681a00da3127125337/awsglue/dynamicframe .py#L78
def show(self, num_rows=20):
print(self._jdf.showString(num_rows))
Run Code Online (Sandbox Code Playgroud)
其中__init__依赖于传入的参数 ( jdf) ,该参数可能尚未收集:
def __init__(self, jdf, glue_ctx, name=""):
self._jdf = jdf
self.glue_ctx = glue_ctx
self._ssql_ctx = glue_ctx._ssql_ctx
self._sc = glue_ctx and glue_ctx._sc
self._schema = None
self._lazy_rdd = None
self.name = name
Run Code Online (Sandbox Code Playgroud)
当调用toDF()底层数据帧进行处理时:
... SNIP ...
scala_options.append(self.glue_ctx.convert_resolve_option(option.path, option.action, option.target))
return DataFrame(self._jdf.toDF(self.glue_ctx._jvm.PythonUtils.toSeq(scala_options)), self.glue_ctx)
Run Code Online (Sandbox Code Playgroud)
Java 文档提到此.toDF()方法从 RDD 转换(即它从工作人员收集结果):
这可以非常方便地将元组 RDD 转换为
DataFrame具有有意义名称的 RDD。