我在 Spark 数据框中创建新列时遇到问题。我正在尝试使用 withColumn() 创建一个新列,如下所示:
.withColumn('%_diff_from_avg',
((col('aggregate_sales') - col('avg_sales')) / col('avg_sales') * 100))
Run Code Online (Sandbox Code Playgroud)
这会导致正确计算某些值,但结果表中的大多数值都为空。我不明白为什么。
有趣的是,当我从计算中删除 '* 100' 时,我的所有值都被正确填充 - 即没有空值。例如:
.withColumn('%_diff_from_avg',
((col('aggregate_sales') - col('avg_sales')) / col('avg_sales')))
Run Code Online (Sandbox Code Playgroud)
似乎工作。
因此,似乎是乘以 100 导致了这个问题。
谁能解释为什么?
我有一个要读取到Spark数据框中的CSV文件目录。我知道当文件具有相同的字段名和列顺序时,这很简单:
raw_transactions_df = spark.read.csv("file_*.csv", inferSchema=True, header=True)
Run Code Online (Sandbox Code Playgroud)
但是,由于我的文件来自不同的系统:
在这种情况下,是否有一种干净的方法可以将目录中的所有csv文件以可重复的方式加载到公共的spark数据帧中?
我第一次尝试这样做如下:
import csv
final_headers = ['col1', 'col2', 'col3', 'col4', 'col5', 'col6', 'col7']
merged_rows = []
for f in trans_files:
with open(f, 'r') as csv_in:
csvreader = csv.reader(csv_in, delimiter=',')
headers = dict((h, i) for i, h in enumerate(next(csvreader)))
headers = { x.replace('col7_id', 'col7'): headers[x] for x in headers.keys() }
for row in csvreader:
merged_rows.append(tuple(row[headers[x]] for x in final_headers))
merged_df = spark.createDataFrame(merged_rows, final_headers)
Run Code Online (Sandbox Code Playgroud)
这在一定程度上有效-但会导致所有列均为StringType的DF。如果尝试将已定义的架构传递给spark.createDataFrame,则会导致异常:
TypeError: DecimalType(16,0) can not accept object '83215400105' in …Run Code Online (Sandbox Code Playgroud) 我的Spark DataDrame中有一个包含多种字符串格式的日期列.我想将这些转换为DateTime.
我的专栏中的两种格式是:
mm/dd/yyyy; 和yyyy-mm-dd到目前为止,我的解决方案是使用UDF更改第一个日期格式以匹配第二个日期格式,如下所示:
import re
def parseDate(dateString):
if re.match('\d{1,2}\/\d{1,2}\/\d{4}', dateString) is not None:
return datetime.strptime(dateString, '%M/%d/%Y').strftime('%Y-%M-%d')
else:
return dateString
# Create Spark UDF based on above function
dateUdf = udf(parseDate)
df = (df.select(to_date(dateUdf(raw_transactions_df['trans_dt']))))
Run Code Online (Sandbox Code Playgroud)
这可行,但不是所有容错的.我特别关注:
mm/dd/yyyy和dd/mm/yyyy(我正在使用的正则表达式目前没有这样做).有一个更好的方法吗?