标签: pyspark

PySpark:Python 字典中所有数据帧的联合

我有一个字典my_dict_of_df,每次我的程序运行时,它都包含可变数量的数据帧。我想创建一个新的数据帧,它是所有这些数据帧的联合。

我的数据框看起来像-

my_dict_of_df["df_1"], my_dict_of_df["df_2"] and so on...
Run Code Online (Sandbox Code Playgroud)

我如何联合所有这些数据框?

python dataframe pyspark

0
推荐指数
1
解决办法
3882
查看次数

在 Pyspark 中选择字符(“-”)之前/之后的特定字符串

我使用 substring 来获取第一个和最后一个值。但是如何在字符串中找到特定字符并在它之前/之后获取值

pyspark

0
推荐指数
1
解决办法
8745
查看次数

PySpark:如何更新嵌套列?

StackOverflow 有一些关于如何更新数据框中嵌套列的答案。但是,其中一些看起来有点复杂。

在搜索时,我从 DataBricks 中找到了处理相同场景的文档:https ://docs.databricks.com/user-guide/faq/update-nested-column.html

val updated = df.selectExpr("""
    named_struct(
        'metadata', metadata,
        'items', named_struct(
          'books', named_struct('fees', items.books.fees * 1.01),
          'paper', items.paper
        )
    ) as named_struct
""").select($"named_struct.metadata", $"named_struct.items")
Run Code Online (Sandbox Code Playgroud)

这看起来也很干净。不幸的是,我不知道 Scala。我将如何将其翻译成 Python?

scala apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
1535
查看次数

将多行合并为一行

我正在尝试通过 pyspark 构建 sql 来实现这一点。目标是将多行组合成单行示例:我想将其转换为

+-----+----+----+-----+
| col1|col2|col3| col4|
+-----+----+----+-----+
|x    |  y |  z |13::1|
|x    |  y |  z |10::2|
+-----+----+----+-----+
Run Code Online (Sandbox Code Playgroud)

+-----+----+----+-----------+
| col1|col2|col3|       col4|
+-----+----+----+-----------+
|x    |  y |  z |13::1;10::2|
+-----+----+----+-----------+
Run Code Online (Sandbox Code Playgroud)

sql apache-spark-sql pyspark pyspark-sql

0
推荐指数
1
解决办法
3472
查看次数

属性错误:管道对象没有属性转换

我已经使用 spark ml 管道构建了一个逻辑回归模型并保存了它。我正在尝试将管道应用于新记录集并收到错误。我的管道中有向量汇编器、标准缩放器和逻辑回归模型。

我尝试pipeline.transform并收到以下错误

AttributeError: 'Pipeline' 对象没有属性 'transform'

下面是代码

from pyspark.ml import Pipeline
pipelineModel = Pipeline.load("/user/userid/lr_pipe")
scored_temp = pipelineModel.transform(combined_data_imputed_final)
Run Code Online (Sandbox Code Playgroud)

这是我保存管道的方法

from pyspark.ml.classification import LogisticRegression

vector = VectorAssembler(inputCols=final_features, outputCol="final_features")
scaler = StandardScaler(inputCol="final_features", outputCol="final_scaled_features")
lr = LogisticRegression(labelCol="label", featuresCol="final_scaled_features", maxIter=30)

stages = [vector,scaler,lr]

pipe = Pipeline(stages=stages)

lrModel = pipe.fit(train_transformed_data_1).transform(train_transformed_data_1)
pipe.save("lr_pipe")
Run Code Online (Sandbox Code Playgroud)

我期待它完成所有管道步骤并为记录评分。

pipeline apache-spark pyspark apache-spark-ml

0
推荐指数
1
解决办法
1456
查看次数

如何在控制台中查看数据帧(相当于结构化流媒体的 .show())?

我想看看我的 DataFrame 会出现什么..

这是火花代码

from pyspark.sql import SparkSession
import pyspark.sql.functions as psf
import logging
import time

spark = SparkSession \
    .builder \
    .appName("Console Example") \
    .getOrCreate()

logging.info("started to listen to the host..")

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "127.0.0.1") \
    .option("port", 9999) \
    .load()

data = lines.selectExpr("CAST(value AS STRING)")
query1 = data.writeStream.format("console").start()
time.sleep(10)
query1.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

我正在获取进度报告,但显然每个触发器的输入行都是 0。

2019-08-19 23:45:45 INFO  MicroBatchExecution:54 - Streaming query made progress: {
  "id" : "a4b26eaf-1032-4083-9e42-a9f2f0426eb7",
  "runId" : "35c2b82a-191d-4998-9c98-17b24f5e3e9d",
  "name" : null,
  "timestamp" : …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming pyspark pyspark-sql

0
推荐指数
1
解决办法
1468
查看次数

Spark Dataframe 在覆盖 Hive 表的分区数据时出现问题

下面是我的 Hive 表定义:

CREATE EXTERNAL TABLE IF NOT EXISTS default.test2(
id integer,
count integer
)
PARTITIONED BY (
fac STRING,
fiscaldate_str DATE )
STORED AS PARQUET
LOCATION 's3://<bucket name>/backup/test2';
Run Code Online (Sandbox Code Playgroud)

我有如下配置单元表中的数据,(我刚刚插入了示例数据)

select * from default.test2

+---+-----+----+--------------+
| id|count| fac|fiscaldate_str|
+---+-----+----+--------------+
|  2|    3| NRM|    2019-01-01|
|  1|    2| NRM|    2019-01-01|
|  2|    3| NRM|    2019-01-02|
|  1|    2| NRM|    2019-01-02|
|  2|    3| NRM|    2019-01-03|
|  1|    2| NRM|    2019-01-03|
|  2|    3|STST|    2019-01-01|
|  1|    2|STST|    2019-01-01|
|  2|    3|STST| …
Run Code Online (Sandbox Code Playgroud)

hive partition apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
3348
查看次数

使用 spark-submit 时,python 脚本在哪里执行?

Python : 3.7.3
OS: CentOS 7
Spark: 2.2.0
In Cloudera
YARN : 2.6.0-cdh5.10.2
Run Code Online (Sandbox Code Playgroud)

嗨,我尝试使用带有pyspark 的python脚本执行Apache Spark,但我不明白它是如何工作的。我尝试在客户端模式下使用yarn发送整个conda 环境,并在执行. 但问题是,主要的 python 脚本在哪里运行,因为我需要指定我的共享 conda 环境的位置才能执行而不会出错,因为在我尝试执行的主机中我没有安装依赖项,我不想要安装它。--archivesspark-submitspark-submit

我使用此功能打包环境https://conda.github.io/conda-pack/spark.html,我需要导入地图外的依赖项(因为在地图内,纱线运送依赖项和执行者很好地导入了这个依赖项)。

有没有一种方法可以在不打开和在主机上使用的情况下使用附带的环境执行主 python 脚本?

我的环境是:

PYSPARK_DRIVER_PYTHON=./enviroment/bin/python
PYSPARK_PYTHON=./enviroment/bin/python
Run Code Online (Sandbox Code Playgroud)

其中环境是yarn附带的依赖项的符号链接

--archives ~/dependencies.tar.gz#enviroment
Run Code Online (Sandbox Code Playgroud)

并配置执行器

--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python
Run Code Online (Sandbox Code Playgroud)

所以最后的命令是

PYSPARK_DRIVER_PYTHON=./enviroment/bin/python \
PYSPARK_PYTHON=./environment/bin/python \
spark-submit \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./environment/bin/python \
--master yarn --deploy-mode client \
--archives enviroment/dependencies.tar.gz#enviroment \
cluster-import-check.py
Run Code Online (Sandbox Code Playgroud)

我的代码是

# coding=utf8
from pyspark import SparkConf
from pyspark import …
Run Code Online (Sandbox Code Playgroud)

python hadoop-yarn anaconda apache-spark pyspark

0
推荐指数
1
解决办法
1301
查看次数

如何从 Databrick/PySpark 覆盖/更新 Azure Cosmos DB 中的集合

我在 Databricks Notebook 上编写了以下 PySpark 代码,它使用以下代码行成功地将结果从 sparkSQL 保存到 Azure Cosmos DB:

df.write.format("com.microsoft.azure.cosmosdb.spark").mode("overwrite").options(**writeConfig3).save()
Run Code Online (Sandbox Code Playgroud)

完整代码如下:

test = spark.sql("""SELECT
  Sales.CustomerID AS pattersonID1
 ,Sales.InvoiceNumber AS myinvoicenr1
FROM Sales
limit 4""")


## my personal cosmos DB
writeConfig3 = {
    "Endpoint": "https://<cosmosdb-account>.documents.azure.com:443/",
    "Masterkey": "<key>==",
    "Database": "mydatabase",
    "Collection": "mycontainer",
    "Upsert": "true"
}

df = test.coalesce(1)

df.write.format("com.microsoft.azure.cosmosdb.spark").mode("overwrite").options(**writeConfig3).save()
Run Code Online (Sandbox Code Playgroud)

使用上面的代码我已经成功地写入了我的 Cosmos DB 数据库(mydatabase)和集合(mycontainer) 在此处输入图片说明

当我尝试通过使用以下更改 SparkSQL 来覆盖容器时(只需将 pattersonID1 更改为 pattersonID2,并将 myinvoicenr1 更改为 myinvoicenr2

test = spark.sql("""SELECT
  Sales.CustomerID AS pattersonID2
 ,Sales.InvoiceNumber AS myinvoicenr2
FROM Sales
limit 4""")
Run Code Online (Sandbox Code Playgroud)

相反,使用新查询覆盖/更新集合 Cosmos DB 会按如下方式附加容器:

在此处输入图片说明

并且仍然在集合中保留原始查询: …

pyspark pyspark-sql azure-cosmosdb azure-databricks

0
推荐指数
1
解决办法
2402
查看次数

用pyspark中对应的数字替换数组中的元素

我有一个如下所示的数据框:

   +----------+--------------------------------+
   | Index    |           flagArray            |
   +----------+--------------------------------+
   |    1     | ['A','S','A','E','Z','S','S']  | 
   +----------+--------------------------------+
   |    2     | ['A','Z','Z','E','Z','S','S']  |
   +--------- +--------------------------------+
Run Code Online (Sandbox Code Playgroud)

我想用其相应的数值来表示数组元素。

     A - 0
     F - 1
     S - 2
     E - 3
     Z - 4
Run Code Online (Sandbox Code Playgroud)

所以我的输出数据帧应该看起来像

   +----------+--------------------------------+--------------------------------+
   | Index    |           flagArray            |           finalArray           |
   +----------+--------------------------------+--------------------------------+
   |    1     | ['A','S','A','E','Z','S','S']  | [0, 2, 0, 3, 4, 2, 2]          | 
   +----------+--------------------------------+--------------------------------+
   |    2     | ['A','Z','Z','E','Z','S','S']  | [0, 4, 4, 3, 4, 2, 2]          |
   +--------- +--------------------------------+--------------------------------+
Run Code Online (Sandbox Code Playgroud)

我在 pyspark 中编写了一个 udf,我通过编写一些 …

python-3.x apache-spark apache-spark-sql pyspark pyspark-dataframes

0
推荐指数
1
解决办法
1435
查看次数