nim*_*sam 4 python elasticsearch pyspark pyspark-sql pyspark-dataframes
我使用Apache spark作为ETL工具将表从Oracle提取到Elasticsearch中。
我遇到的问题是,数值列引发识别它们,decimal而Elasticsearch不接受decimal类型。所以我将每个decimal列转换double为Elasticsearch接受的列。
dataFrame = dataFrame.select(
[col(name) if 'decimal' not in colType else col(name).cast('double') for name, colType in dataFrame.dtypes]
)
Run Code Online (Sandbox Code Playgroud)
当前的问题是每个数字列将是一倍 ; 它是否具有十进制值。
我的问题是,有什么方法可以检测到列类型应该转换为整数类型还是双精度类型?
您可以从数据框的架构中检索数据类型为== DecimalType()的所有列名称,请参见以下示例(在Spark 2.4.0上测试):
更新:仅使用df.dtypes就足以检索信息。
from pyspark.sql.functions import col
df = spark.createDataFrame([ (1, 12.3, 1.5, 'test', 13.23) ], ['i1', 'd2', 'f3', 's4', 'd5'])
df = df.withColumn('d2', col('d2').astype('decimal(10,1)')) \
.withColumn('d5', col('d5').astype('decimal(10,2)'))
#DataFrame[i1: bigint, d2: decimal(10,1), f3: double, s4: string, d5: decimal(10,2)]
decimal_cols = [ f[0] for f in df.dtypes if f[1].startswith('decimal') ]
print(decimal_cols)
['d2', 'd5']
Run Code Online (Sandbox Code Playgroud)
只是后续操作:上述方法不适用于array,struct和嵌套数据结构。如果struct中的字段名称不包含空格,点等字符,则可以直接使用df.dtypes中的类型。
import re
from pyspark.sql.functions import array, struct, col
decimal_to_double = lambda x: re.sub(r'decimal\(\d+,\d+\)', 'double', x)
df1 = df.withColumn('a6', array('d2','d5')).withColumn('s7', struct('i1','d2'))
# DataFrame[i1: bigint, d2: decimal(10,1), l3: double, s4: string, d5: decimal(10,2), a6: array<decimal(11,2)>, s7: struct<i1:bigint,d2:decimal(10,1)>]
df1.select(*[ col(d[0]).astype(decimal_to_double(d[1])) if 'decimal' in d[1] else col(d[0]) for d in df1.dtypes ])
# DataFrame[i1: bigint, d2: double, l3: double, s4: string, d5: double, a6: array<double>, s7: struct<i1:bigint,d2:double>]
Run Code Online (Sandbox Code Playgroud)
但是,如果任何StructType()包含空格,点等的字段名,上述方法可能无法正常工作。在这种情况下,建议您检查一下:df.schema.jsonValue()['fields']检索和操作JSON数据以进行dtype转换。
解决方案是在确定适当的类型之前检查小数位数。
我添加了一个函数来检查并返回数据类型:
def check(self, colType):
# you should import re before
# colType will be like decimal(15,0); so get these numbers
[digits, decimals] = re.findall(r'\d+', colType)
# if there's no decimal points, convert it to int
return 'int' if decimals == '0' else 'double'
Run Code Online (Sandbox Code Playgroud)
然后我为每一列调用它:
dataFrame = dataFrame.select(
[col(name) if 'decimal' not in colType else col(name).cast(self.check(colType)) for name, colType in dataFrame.dtypes]
)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
80 次 |
| 最近记录: |