小编Ste*_*han的帖子

将 Spark 数据帧中的列乘以常数值

我在 Spark 数据框中创建新列时遇到问题。我正在尝试使用 withColumn() 创建一个新列,如下所示:

.withColumn('%_diff_from_avg', 
     ((col('aggregate_sales') - col('avg_sales')) / col('avg_sales') * 100))
Run Code Online (Sandbox Code Playgroud)

这会导致正确计算某些值,但结果表中的大多数值都为空。我不明白为什么。

有趣的是,当我从计算中删除 '* 100' 时,我的所有值都被正确填充 - 即没有空值。例如:

.withColumn('%_diff_from_avg', 
    ((col('aggregate_sales') - col('avg_sales')) / col('avg_sales')))
Run Code Online (Sandbox Code Playgroud)

似乎工作。

因此,似乎是乘以 100 导致了这个问题。

谁能解释为什么?

python pyspark-sql

7
推荐指数
1
解决办法
2万
查看次数

将具有不同列顺序和字段名的多个CSV文件读入Spark

我有一个要读取到Spark数据框中的CSV文件目录。我知道当文件具有相同的字段名和列顺序时,这很简单:

raw_transactions_df = spark.read.csv("file_*.csv", inferSchema=True, header=True)
Run Code Online (Sandbox Code Playgroud)

但是,由于我的文件来自不同的系统:

  • 它们没有相同的列顺序。
  • 在某些文件中,字段名之一的拼写不同

在这种情况下,是否有一种干净的方法可以将目录中的所有csv文件以可重复的方式加载到公共的spark数据帧中?

我第一次尝试这样做如下:

import csv

final_headers = ['col1', 'col2', 'col3', 'col4', 'col5', 'col6', 'col7']

merged_rows = []
for f in trans_files:
    with open(f, 'r') as csv_in:
        csvreader = csv.reader(csv_in, delimiter=',')
        headers = dict((h, i) for i, h in enumerate(next(csvreader)))
        headers = { x.replace('col7_id', 'col7'): headers[x] for x in headers.keys() }

        for row in csvreader:
            merged_rows.append(tuple(row[headers[x]] for x in final_headers))

merged_df = spark.createDataFrame(merged_rows, final_headers)
Run Code Online (Sandbox Code Playgroud)

这在一定程度上有效-但会导致所有列均为StringType的DF。如果尝试将已定义的架构传递给spark.createDataFrame,则会导致异常:

TypeError: DecimalType(16,0) can not accept object '83215400105' in …
Run Code Online (Sandbox Code Playgroud)

python pyspark-sql

5
推荐指数
0
解决办法
785
查看次数

将包含多个字符串日期格式的列转换为Spark中的DateTime

我的Spark DataDrame中有一个包含多种字符串格式的日期列.我想将这些转换为DateTime.

我的专栏中的两种格式是:

  • mm/dd/yyyy; 和
  • yyyy-mm-dd

到目前为止,我的解决方案是使用UDF更改第一个日期格式以匹配第二个日期格式,如下所示:

import re

def parseDate(dateString):
    if re.match('\d{1,2}\/\d{1,2}\/\d{4}', dateString) is not None:
        return datetime.strptime(dateString, '%M/%d/%Y').strftime('%Y-%M-%d')
    else:
        return dateString

# Create Spark UDF based on above function
dateUdf = udf(parseDate)

df = (df.select(to_date(dateUdf(raw_transactions_df['trans_dt']))))
Run Code Online (Sandbox Code Playgroud)

这可行,但不是所有容错的.我特别关注:

  • 我还没有遇到的日期格式.
  • 区分mm/dd/yyyydd/mm/yyyy(我正在使用的正则表达式目前没有这样做).

有一个更好的方法吗?

python apache-spark apache-spark-sql pyspark

4
推荐指数
1
解决办法
1744
查看次数