标签: pyspark

如何使用pyspark读取Excel文件?

我正在尝试使用 AWS EMR 中的 Pyspark 读取驻留在 s3 的 Excel 文件,为了执行此操作,我下载了 Spark-Excel jarspark-excel_2.11-0.12.4.jar 和 Spark-excel_2.12-0.13.5 .jar 并放入 s3 存储桶中

scenario 1:
===========
df = spark.read.format("com.crealytics.spark.excel").option("useHeader", "true").option("inferschema", "true").load("s3://bucket/abc.xlsx")

spark-submit --jars s3://Bucket/spark-excel_2.11-0.12.4.jar test.py

Error:
Caused by: java.lang.NoClassDefFoundError: org/apache/commons/collections4/IteratorUtils

scenario2:
=========
df = spark.read.format("com.crealytics.spark.excel").option("header", "true").option("inferschema", "true").load("s3://bucket/abc.xlsx")

spark-submit --jars s3://Bucket/spark-excel_2.12-0.13.5.jar test.py

Error:
py4j.protocol.Py4JJavaError: An error occurred while calling o79.load.
: java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)

Run Code Online (Sandbox Code Playgroud)

有人可以帮我解决这个问题吗?我感谢您的帮助 !

python pandas apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
1万
查看次数

Pyspark 读取 csv

读取 csv 后,我得到了意外的输出:

MessageName;ContactKey;DateTimeSend;MessageContent;MessageOpened;OpenDate;TimeInApp;Platform;PlatformVersion;Status
20200903 - NL SPAARUPDATE Augustus;0031t00000A4w0xAAB;09/03/2020 8:09;Vorige maand heb je dankzij de Lidl-Plus app %%savings%% euro gespaard. Goed bezig! ??????;no;;;iPhone OS;12.4.5;Success
Run Code Online (Sandbox Code Playgroud)

正如您可以想象的那样,输出需要将此信息拆分为列和单元格以创建正常的数据框。

我尝试了以下代码:

df = spark.read.csv('/FileStore/tables/BE_August_monthlysaving.csv', header='true')

display(df)

Run Code Online (Sandbox Code Playgroud)

或者,我尝试, delimiter=';'在标题之前和之后使用,但是当我这样做时,出现以下错误:

csv() 得到意外的关键字参数“分隔符”

知道如何解决这个输出吗?

csv dataframe apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
806
查看次数

升级集群的 Databricks 运行时后调试 PySpark 时出错

我已将 Azure Databricks 群集从运行时 5.5LTS 更新到 7.3LTS。现在我在 VSCode 中调试时遇到错误。我已经更新了我的 Anaconda 连接,如下所示:

> conda create --name dbconnect python=3.7
> conda activate dbconnect
> pip uninstall pyspark
> pip install -U databricks-connect==7.3.*
> databricks-connect configure
> databricks-connect test
Run Code Online (Sandbox Code Playgroud)

到目前为止一切顺利,但现在我正在尝试调试以下内容

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
setting = spark.conf.get("spark.master")

if "local" in setting:
    from pyspark.dbutils import DBUtils
    dbutils = DBUtils(spark.sparkContext)
Run Code Online (Sandbox Code Playgroud)

在 上dbutils = DBUtils(spark.sparkContext),它抛出异常

发生异常:AttributeError“SparkContext”对象没有属性“conf”

我尝试过创建conf

from pyspark.dbutils import DBUtils
import pyspark
conf = pyspark.SparkConf()
pyspark.SparkContext.getOrCreate(conf=conf)
dbutils = DBUtils(spark.sparkContext) …
Run Code Online (Sandbox Code Playgroud)

python pyspark azure-databricks databricks-connect

0
推荐指数
1
解决办法
313
查看次数

将 int YYYYMMDD 转换为日期 pyspark

我正在尝试使用 Pyspark 将 Databricks 中的 INT 列转换为日期列。该列如下所示:

Report_Date
20210102
20210102
20210106
20210103
20210104
Run Code Online (Sandbox Code Playgroud)

我正在尝试使用 CAST 函数

df = df.withColumn("Report_Date", col("Report_Date").cast(DateType()))
Run Code Online (Sandbox Code Playgroud)

但我收到以下错误:

由于数据类型不匹配,无法解析“CAST(`Report_Date` AS DATE)”:无法将 int 转换为 date;

你知道我怎样才能得到预期的输出吗?

date-formatting apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
1万
查看次数

由于 False 作为条目,pyspark 中 json 文件的记录已损坏

我有一个 json 文件,如下所示:

test= {'kpiData': [{'date': '2020-06-03 10:05',
   'a': 'MINIMUMINTERVAL',
   'b': 0.0,
   'c': True},
  {'date': '2020-06-03 10:10',
   'a': 'MINIMUMINTERVAL',
   'b': 0.0,
   'c': True},
  {'date': '2020-06-03 10:15',
   'a': 'MINIMUMINTERVAL',
   'b': 0.0,
   'c': True},
  {'date': '2020-06-03 10:20',
   'a': 'MINIMUMINTERVAL',
   'b': 0.0,}
]}
Run Code Online (Sandbox Code Playgroud)

我想将其传输到数据框对象,如下所示:

rdd = sc.parallelize([test])
jsonDF = spark.read.json(rdd)
Run Code Online (Sandbox Code Playgroud)

这会导致记录损坏。据我了解,其原因是,TrueFalse不能是 Python 中的条目。所以我需要在之前将这些条目转换spark.read.json()为 TRUE、true 或“True”)。test 是一个字典,rdd 是一个 pyspark.rdd.RDD 对象。对于数据帧对象,转换非常简单,但我没有找到这些对象的解决方案。

json apache-spark rdd apache-spark-sql pyspark

0
推荐指数
1
解决办法
2163
查看次数

如何使用 pyspark 计算两个 ArrayType 列之间的按元素乘法

我正在尝试计算 Pyspark 数据框中两个 ArrayType 列之间的按元素乘积。我尝试使用下面的方法来实现这一点,但似乎无法得到正确的结果......

from pyspark.sql import functions as F

data.withColumn("array_product", F.expr("transform(CASUAL_TOPS_SIMILARITY_SCORE, (x, PER_UNA_SIMILARITY_SCORE) -> x * PER_UNA_SIMILARITY_SCORE)"))
Run Code Online (Sandbox Code Playgroud)

有人对我如何在这里获得正确的结果有任何提示吗?我在下面的 DataFrame 中附加了一个测试行...我需要将列CASUAL_TOPS_SIMILARITY_SCOREPER_UNA_SIMILARITY_SCORE

import json 
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test").getOrCreate()

js = '{"PER_UNA_SIMILARITY_SCORE":{"category_list":[0.9736891648,0.9242207186,0.9717901106,0.9763716155,0.9440944231,0.9708032326,0.9599383329,0.9705343027,0.804267581,0.9597317177,0.9316773281,0.8076725314,0.9555369889,0.9753550725,0.9811865431,1.0,0.8231541809,0.9738989392,0.9780283991,0.9644088011,0.9798529418,0.9347357116,0.9727502648,0.9778486916,0.8621780792,0.9735844196,0.9582644436,0.9579092722,0.8890027888,0.9394986243,0.9563411605,0.9811867597,0.9738380108,0.9577698381,0.7912932623,0.9778158279]},"CASUAL_TOPS_SIMILARITY_SCORE":{"category_list":[0.7924168764,0.7511316884,0.7925161719,0.8007234107,0.7953468064,0.7882556409,0.7778519374,0.7881058994,1.0,0.7785517364,0.7733458123,0.7426205538,0.7905195275,0.7925983778,0.7983386701,0.804267581,0.6749185095,0.7924821952,0.8016348085,0.7895650508,0.7985721918,0.772656847,0.7897495222,0.7948759958,0.6996340275,0.8024327668,0.7784598142,0.7942396044,0.7159431296,0.7850145414,0.7768001023,0.7983372946,0.7971616495,0.7927845035,0.6462844274,0.799555357]}}'

a_json = json.loads(js)

data = spark.createDataFrame(pd.DataFrame.from_dict(a_json))
Run Code Online (Sandbox Code Playgroud)

python apache-spark-sql pyspark

0
推荐指数
1
解决办法
1255
查看次数

从 pyspark 数据框中删除第一行

只是一个一般性问题。有谁知道如何删除 pyspark 数据帧的整个第一行。我尝试使用以下代码,但这使我的数据框镶木地板输出为空:

updated_bulk=bulk_spark_df.filter
(merged_mas_bulk_spark_df.'Number!='part=')
Run Code Online (Sandbox Code Playgroud)

Number 是一列,part 是第一行中出现的数据

row filter parquet pyspark

0
推荐指数
1
解决办法
6239
查看次数

从 tweepy API 请求数据时出现错误“dict”对象不可调用

我尝试使用以下代码通过 tweepy API 检索推文,但检索到的 json 字典有错误。

代码:

import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
import socket
import json
consumer_key="****"
consumer_secret="****"
access_token="****"
access_secret="****"


class TweetListener(Stream):
    def __init__(self, *args, csocket):    
        super().__init__(*args)
        self.client_socket = csocket
 
    def on_data(self, data):
        try:
            msg = json.loads(data)
            print(msg('text').encode('utf=8'))
            self.client_socket.send(msg('text').encode('utf=8'))
            return True
        except BaseException as e:
            print('Error %s'%str(e))
        return True
    def on_error(self, status):
        print(status)
        return True
def send_data(c_socket):
    twtr_stream = TweetListener(
        consumer_key, consumer_secret,
        access_token, access_secret,
        csocket=c_socket
    )
    twtr_stream.filter(track=['ETH'])
s = socket.socket()
host = "127.0.0.1"
port = …
Run Code Online (Sandbox Code Playgroud)

python sockets json tweepy pyspark

0
推荐指数
1
解决办法
271
查看次数

Spark DDL 架构 JSON 结构

问题

我试图在 pyspark 中定义嵌套 .json 模式,但无法使 ddl_schema 字符串正常工作。

通常在 SQL 中这将是 ROW,我已经尝试过下面的 STRUCT 但无法获得正确的数据类型,这是错误......

ParseException: 
mismatched input '(' expecting {<EOF>, ',', 'COMMENT', NOT}(line 6, pos 15)

== SQL ==

    driverId INT,
    driverRef STRING,
    number STRING,
    code STRING,
    name STRUCT(forename STRING, surname STRING),
---------------^^^
    dob DATE,
    nationality STRING,
    url STRING
Run Code Online (Sandbox Code Playgroud)

数据样本

            +--------+----------+------+----+--------------------+----------+-----------+--------------------+
            |driverId| driverRef|number|code|                name|       dob|nationality|                 url|
            +--------+----------+------+----+--------------------+----------+-----------+--------------------+
            |       1|  hamilton|    44| HAM|   {Lewis, Hamilton}|1985-01-07|    British|http://en.wikiped...|
Run Code Online (Sandbox Code Playgroud)

代码示例

            +--------+----------+------+----+--------------------+----------+-----------+--------------------+
            |driverId| driverRef|number|code|                name|       dob|nationality|                 url|
            +--------+----------+------+----+--------------------+----------+-----------+--------------------+
            |       1|  hamilton|    44| HAM|   {Lewis, Hamilton}|1985-01-07| …
Run Code Online (Sandbox Code Playgroud)

ddl json apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
3350
查看次数

在 PySpark 中屏蔽电子邮件和电话号码

我想屏蔽电子邮件 - “@”之前的第一个和最后一个字符保持未屏蔽,其余字符应屏蔽。

对于电话号码,第一位和最后一位数字保持未屏蔽,其余数字将被屏蔽。

在此输入图像描述

python string bigdata apache-spark pyspark

0
推荐指数
1
解决办法
1407
查看次数