标签: apache-spark-sql

如何在 pyspark 中按列名称映射值

我想要什么 - 是将列名映射到键中。例如:

#+-------+----------+
#|key1   |key2      |
#+-------+----------+
#|value1 |value2    |
#|value3 |value4    |
#+-------+----------+
Run Code Online (Sandbox Code Playgroud)

将转变为

#+-------+----------+
#|   keys|values    |
#+-------+----------+
#|key1   |value1    |
#|key1   |value2    |
#|key2   |value3    |
#|key2   |value4    |
#+-------+----------+
Run Code Online (Sandbox Code Playgroud)

在 HiveQL 中我可以写类似的东西

select distinct key, velue
    from xxx
    lateral view explode(map(
            'key1', key1,
            'key2', key2) tab as key, value
Run Code Online (Sandbox Code Playgroud)

但是在pyspark上怎么写呢?我可以使用 createtemptable 但我认为这不是最好的解决方案/

python dataframe apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
2436
查看次数

如何在 Pyspark 中按元素连接两个 ArrayType(StringType()) 列?

ArrayType(StringType())在 Spark 数据框中有两列,我想按元素连接这两列:

输入

+-------------+-------------+
|col1         |col2         |
+-------------+-------------+
|['a','b']    |['c','d']    |
|['a','b','c']|['e','f','g']|
+-------------+-------------+
Run Code Online (Sandbox Code Playgroud)

输出

+-------------+-------------+----------------+
|col1         |col2         |col3            |
+-------------+-------------+----------------+
|['a','b']    |['c','d']    |['ac', 'bd']    |
|['a','b','c']|['e','f','g']|['ae','bf','cg']|
+-------------+----------- -+----------------+
Run Code Online (Sandbox Code Playgroud)

谢谢。

apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
2591
查看次数

将多个 Spark 行合并为一行

我有一个数据框,如下所示。除了字段之外,对应的所有值id都是相同的mappingcol

+--------------------+----------------+--------------------+-------+
|misc                |fruit           |mappingcol          |id     |
+--------------------+----------------+--------------------+-------+
|ddd                 |apple           |Map("name"->"Sameer"|     1 |
|ref                 |banana          |Map("name"->"Riyazi"|     2 |
|ref                 |banana          |Map("lname"->"Nikki"|     2 |
|ddd                 |apple           |Map("lname"->"tenka"|     1 |
+--------------------+----------------+--------------------+-------+
Run Code Online (Sandbox Code Playgroud)

我想以这样的方式合并具有同一行的行,以便我精确地得到一行,并且需要合并id的值。mappingcol输出应如下所示:

+--------------------+----------------+--------------------+-------+
|misc                |fruit           |mappingcol          |id     |
+--------------------+----------------+--------------------+-------+
|ddd                 |apple           |Map("name"->"Sameer"|     1 |
|ref                 |banana          |Map("name"->"Riyazi"|     2 |
+--------------------+----------------+--------------------+-------+
Run Code Online (Sandbox Code Playgroud)

=mappingcol的值将是:id1

Map(
"name" -> "Sameer",
"lname" -> "tenka"
)
Run Code Online (Sandbox Code Playgroud)

我知道地图可以使用++运算符合并,所以这不是我担心的。我只是无法理解如何合并行,因为如果我使用 a groupBy,我就没有任何东西可以聚合行。

sql scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
7534
查看次数

PySpark 如何迭代 Dataframe 列并更改数据类型?

迭代 Spark Dataframe(使用 Pyspark)并找到数据类型Decimal(38,10)-> 将其更改为 Bigint(并将所有内容重新保存到同一数据帧)的最佳方法是什么?

我有一个用于更改数据类型的部分 - 例如:

df = df.withColumn("COLUMN_X", df["COLUMN_X"].cast(IntegerType()))
Run Code Online (Sandbox Code Playgroud)

但试图找到并与迭代集成..

谢谢。

python dataframe apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
8912
查看次数

如何使用 PySpark 从数据框中获取 1000 条记录并写入文件?

我的数据框中有 100,000 多条记录。我想动态创建一个文件并为每个文件推送 1000 条记录。谁能帮我解决这个问题,先谢谢了。

python-3.x apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
7953
查看次数

Spark数据集:数据转换

我有一个格式为的 Spark 数据集 -

+--------------+--------+-----+
|name          |type    |cost |
+--------------+--------+-----+
|AAAAAAAAAAAAAA|XXXXX   |0.24|
|AAAAAAAAAAAAAA|YYYYY   |1.14|
|BBBBBBBBBBBBBB|XXXXX   |0.78|
|BBBBBBBBBBBBBB|YYYYY   |2.67|
|BBBBBBBBBBBBBB|ZZZZZ   |0.15|
|CCCCCCCCCCCCCC|XXXXX   |1.86|
|CCCCCCCCCCCCCC|YYYYY   |1.50|
|CCCCCCCCCCCCCC|ZZZZZ   |1.00|
+--------------+--------+----+
Run Code Online (Sandbox Code Playgroud)

我想将其转换为类型的对象 -

public class CostPerName {
    private String name;
    private Map<String, Double> costTypeMap;
}

Run Code Online (Sandbox Code Playgroud)

我想要的是,

+--------------+-----------------------------------------------+
|name          |           typeCost.                           |
+--------------+-----------------------------------------------+
|AAAAAAAAAAAAAA|(XXXXX, 0.24), (YYYYY, 1.14)                   |            
|BBBBBBBBBBBBBB|(XXXXX, 0.78), (YYYYY, 2.67), (ZZZZZ, 0.15)    |
|CCCCCCCCCCCCCC|(XXXXX, 1.86), (YYYYY, 1.50), (ZZZZZ, 1.00)    |
+--------------+-----------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

即,对于每个name,我想要一张 的地图(type, cost)

实现这一转变的有效途径是什么?我可以使用一些数据帧转换吗?我尝试了 groupBy 但只有在执行 sum、avg 等聚合查询时才有效。

apache-spark apache-spark-sql apache-spark-dataset

1
推荐指数
1
解决办法
191
查看次数

无法在 Pyspark 中执行用户定义的函数 RegexTokenizer

我正在尝试使用 Pyspark 使用数据中的文本特征执行文本分类。下面是我的文本预处理代码,该代码未能执行用户定义的函数 RegexTokenizer。

    tokenizer = RegexTokenizer(inputCol = "text", outputCol = "words", pattern = "\\W")
    add_stopwords = StopWordsRemover.loadDefaultStopWords("english")
    remover = StopWordsRemover(inputCol = "words", outputCol = "filtered").setStopWords(add_stopwords)
    label_stringIdx = StringIndexer(inputCol = "label", outputCol = "target")
    countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=1000, minDF=5)
    #pipleline for text pre-processing
    pipeline = Pipeline(stages=[tokenizer,remover, countVectors, label_stringIdx])

    #fit the dat for the pipeline
    pipelineFit = pipeline.fit(dataset)
    dataset = pipelineFit.transform(dataset)
    dataset.show()
Run Code Online (Sandbox Code Playgroud)

错误是:

/usr/local/lib/python3.6/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> …
Run Code Online (Sandbox Code Playgroud)

text apache-spark-sql pyspark apache-spark-ml apache-spark-mllib

1
推荐指数
1
解决办法
2522
查看次数

BROADCASTJOIN 提示在 PySpark SQL 中不起作用

我试图向尺寸较小的表提供广播提示,但物理计划仍然向我显示 SortMergeJoin。

spark.sql('select /*+ BROADCAST(pratik_test_temp.crosswalk2016) */ * from pratik_test_staging.crosswalk2016 t join pratik_test_temp.crosswalk2016 c on t.serial_id = c.serial_id').explain()

输出 : 在此输入图像描述

笔记 :

  1. 表的大小以 KB 为单位(测试数据)
  2. 连接列“serial_id”不是分区列
  3. 使用glue目录作为元存储(AWS)
  4. Spark版本-Spark 2.4.4
  5. 我也尝试过 BROADCASTJOIN 和 MAPJOIN 提示
  6. 当我尝试使用created_date[分区列] 而不是serial_id作为我的加入条件时,它向我显示广播加入 -

spark.sql('select /*+ BROADCAST(pratik_test_temp.crosswalk2016) */ * from pratik_test_staging.crosswalk2016 t join pratik_test_temp.crosswalk2016 c on t.created_date = c.created_date').explain()

输出 - 在此输入图像描述

为什么使用 AWS Glue Catalog 作为我的元存储时 Spark 行为很奇怪?

apache-spark apache-spark-sql pyspark aws-glue-data-catalog

1
推荐指数
1
解决办法
2149
查看次数

如何在pyspark中从与前一年相同的列中减去行值?

我有这样的输入数据,以年份名称作为一列,我想从今年减去过去一年的评级,并用一个新列作为评级差异。

year,movie_name,language,rating  
2019,akash,english,10   
2019,antony,kannada,9   
2020,akash,english,10   
2020,antony,kannada,8
Run Code Online (Sandbox Code Playgroud)

我的结果数据框:我想要的

year,movie_name,language,rating,rating_diff  
2019,akash,english,10,-  
2019,antony,kannada,9,-  
2020,akash,english,10,0        
2020,antony,kannada,8,-1
Run Code Online (Sandbox Code Playgroud)

任何帮助将不胜感激,谢谢。

apache-spark-sql pyspark

1
推荐指数
1
解决办法
3753
查看次数

SparkSQL 分解多列

Spark 中如何分解多个数组列?我有一个包含 5 个字符串化数组列的数据框,我想在所有 5 列上进行爆炸。为了简单起见,显示了 3 列的示例。

如果我有以下输入行:

col1                  col2                              col3
["b_val1","b_val2"]   ["at_val1","at_val2","at_val3"]   ["male","female"]
Run Code Online (Sandbox Code Playgroud)

我想在所有 3 个数组列上进行爆炸,因此输出应如下所示:

b_val1   at_val1   male
b_val1   at_val1   female
b_val2   at_val1   male
b_val2   at_val1   female
b_val1   at_val2   male
b_val1   at_val2   female
b_val2   at_val2   male
b_val2   at_val2   female
b_val1   at_val3   male
b_val1   at_val3   female
b_val2   at_val3   male
b_val2   at_val3   female
Run Code Online (Sandbox Code Playgroud)

我尝试了以下方法:

SELECT
timestamp,
explode(from_json(brandList, 'array<string>')) AS brand,
explode(from_json(articleTypeList, 'array<string>')) AS articleTypeList,
explode(from_json(gender, 'array<string>')) AS gender,
explode(from_json(masterCategoryList, 'array<string>')) AS masterCategoryList,
explode(from_json(subCategoryList, 'array<string>')) AS subCategoryList,
isLandingPage,
...
from …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

1
推荐指数
1
解决办法
7678
查看次数