我正在尝试使用 AWS EMR 中的 Pyspark 读取驻留在 s3 的 Excel 文件,为了执行此操作,我下载了 Spark-Excel jarspark-excel_2.11-0.12.4.jar 和 Spark-excel_2.12-0.13.5 .jar 并放入 s3 存储桶中
scenario 1:
===========
df = spark.read.format("com.crealytics.spark.excel").option("useHeader", "true").option("inferschema", "true").load("s3://bucket/abc.xlsx")
spark-submit --jars s3://Bucket/spark-excel_2.11-0.12.4.jar test.py
Error:
Caused by: java.lang.NoClassDefFoundError: org/apache/commons/collections4/IteratorUtils
scenario2:
=========
df = spark.read.format("com.crealytics.spark.excel").option("header", "true").option("inferschema", "true").load("s3://bucket/abc.xlsx")
spark-submit --jars s3://Bucket/spark-excel_2.12-0.13.5.jar test.py
Error:
py4j.protocol.Py4JJavaError: An error occurred while calling o79.load.
: java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)
Run Code Online (Sandbox Code Playgroud)
有人可以帮我解决这个问题吗?我感谢您的帮助 !
读取 csv 后,我得到了意外的输出:
MessageName;ContactKey;DateTimeSend;MessageContent;MessageOpened;OpenDate;TimeInApp;Platform;PlatformVersion;Status
20200903 - NL SPAARUPDATE Augustus;0031t00000A4w0xAAB;09/03/2020 8:09;Vorige maand heb je dankzij de Lidl-Plus app %%savings%% euro gespaard. Goed bezig! ??????;no;;;iPhone OS;12.4.5;Success
Run Code Online (Sandbox Code Playgroud)
正如您可以想象的那样,输出需要将此信息拆分为列和单元格以创建正常的数据框。
我尝试了以下代码:
df = spark.read.csv('/FileStore/tables/BE_August_monthlysaving.csv', header='true')
display(df)
Run Code Online (Sandbox Code Playgroud)
或者,我尝试, delimiter=';'在标题之前和之后使用,但是当我这样做时,出现以下错误:
csv() 得到意外的关键字参数“分隔符”
知道如何解决这个输出吗?
我已将 Azure Databricks 群集从运行时 5.5LTS 更新到 7.3LTS。现在我在 VSCode 中调试时遇到错误。我已经更新了我的 Anaconda 连接,如下所示:
> conda create --name dbconnect python=3.7
> conda activate dbconnect
> pip uninstall pyspark
> pip install -U databricks-connect==7.3.*
> databricks-connect configure
> databricks-connect test
Run Code Online (Sandbox Code Playgroud)
到目前为止一切顺利,但现在我正在尝试调试以下内容
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
setting = spark.conf.get("spark.master")
if "local" in setting:
from pyspark.dbutils import DBUtils
dbutils = DBUtils(spark.sparkContext)
Run Code Online (Sandbox Code Playgroud)
在 上dbutils = DBUtils(spark.sparkContext),它抛出异常
发生异常:AttributeError“SparkContext”对象没有属性“conf”
我尝试过创建conf
from pyspark.dbutils import DBUtils
import pyspark
conf = pyspark.SparkConf()
pyspark.SparkContext.getOrCreate(conf=conf)
dbutils = DBUtils(spark.sparkContext) …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 Pyspark 将 Databricks 中的 INT 列转换为日期列。该列如下所示:
Report_Date
20210102
20210102
20210106
20210103
20210104
Run Code Online (Sandbox Code Playgroud)
我正在尝试使用 CAST 函数
df = df.withColumn("Report_Date", col("Report_Date").cast(DateType()))
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误:
由于数据类型不匹配,无法解析“CAST(`Report_Date` AS DATE)”:无法将 int 转换为 date;
你知道我怎样才能得到预期的输出吗?
我有一个 json 文件,如下所示:
test= {'kpiData': [{'date': '2020-06-03 10:05',
'a': 'MINIMUMINTERVAL',
'b': 0.0,
'c': True},
{'date': '2020-06-03 10:10',
'a': 'MINIMUMINTERVAL',
'b': 0.0,
'c': True},
{'date': '2020-06-03 10:15',
'a': 'MINIMUMINTERVAL',
'b': 0.0,
'c': True},
{'date': '2020-06-03 10:20',
'a': 'MINIMUMINTERVAL',
'b': 0.0,}
]}
Run Code Online (Sandbox Code Playgroud)
我想将其传输到数据框对象,如下所示:
rdd = sc.parallelize([test])
jsonDF = spark.read.json(rdd)
Run Code Online (Sandbox Code Playgroud)
这会导致记录损坏。据我了解,其原因是,True和False不能是 Python 中的条目。所以我需要在之前将这些条目转换spark.read.json()为 TRUE、true 或“True”)。test 是一个字典,rdd 是一个 pyspark.rdd.RDD 对象。对于数据帧对象,转换非常简单,但我没有找到这些对象的解决方案。
我正在尝试计算 Pyspark 数据框中两个 ArrayType 列之间的按元素乘积。我尝试使用下面的方法来实现这一点,但似乎无法得到正确的结果......
from pyspark.sql import functions as F
data.withColumn("array_product", F.expr("transform(CASUAL_TOPS_SIMILARITY_SCORE, (x, PER_UNA_SIMILARITY_SCORE) -> x * PER_UNA_SIMILARITY_SCORE)"))
Run Code Online (Sandbox Code Playgroud)
有人对我如何在这里获得正确的结果有任何提示吗?我在下面的 DataFrame 中附加了一个测试行...我需要将列CASUAL_TOPS_SIMILARITY_SCORE与PER_UNA_SIMILARITY_SCORE
import json
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test").getOrCreate()
js = '{"PER_UNA_SIMILARITY_SCORE":{"category_list":[0.9736891648,0.9242207186,0.9717901106,0.9763716155,0.9440944231,0.9708032326,0.9599383329,0.9705343027,0.804267581,0.9597317177,0.9316773281,0.8076725314,0.9555369889,0.9753550725,0.9811865431,1.0,0.8231541809,0.9738989392,0.9780283991,0.9644088011,0.9798529418,0.9347357116,0.9727502648,0.9778486916,0.8621780792,0.9735844196,0.9582644436,0.9579092722,0.8890027888,0.9394986243,0.9563411605,0.9811867597,0.9738380108,0.9577698381,0.7912932623,0.9778158279]},"CASUAL_TOPS_SIMILARITY_SCORE":{"category_list":[0.7924168764,0.7511316884,0.7925161719,0.8007234107,0.7953468064,0.7882556409,0.7778519374,0.7881058994,1.0,0.7785517364,0.7733458123,0.7426205538,0.7905195275,0.7925983778,0.7983386701,0.804267581,0.6749185095,0.7924821952,0.8016348085,0.7895650508,0.7985721918,0.772656847,0.7897495222,0.7948759958,0.6996340275,0.8024327668,0.7784598142,0.7942396044,0.7159431296,0.7850145414,0.7768001023,0.7983372946,0.7971616495,0.7927845035,0.6462844274,0.799555357]}}'
a_json = json.loads(js)
data = spark.createDataFrame(pd.DataFrame.from_dict(a_json))
Run Code Online (Sandbox Code Playgroud) 只是一个一般性问题。有谁知道如何删除 pyspark 数据帧的整个第一行。我尝试使用以下代码,但这使我的数据框镶木地板输出为空:
updated_bulk=bulk_spark_df.filter
(merged_mas_bulk_spark_df.'Number!='part=')
Run Code Online (Sandbox Code Playgroud)
Number 是一列,part 是第一行中出现的数据
我尝试使用以下代码通过 tweepy API 检索推文,但检索到的 json 字典有错误。
代码:
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
import socket
import json
consumer_key="****"
consumer_secret="****"
access_token="****"
access_secret="****"
class TweetListener(Stream):
def __init__(self, *args, csocket):
super().__init__(*args)
self.client_socket = csocket
def on_data(self, data):
try:
msg = json.loads(data)
print(msg('text').encode('utf=8'))
self.client_socket.send(msg('text').encode('utf=8'))
return True
except BaseException as e:
print('Error %s'%str(e))
return True
def on_error(self, status):
print(status)
return True
def send_data(c_socket):
twtr_stream = TweetListener(
consumer_key, consumer_secret,
access_token, access_secret,
csocket=c_socket
)
twtr_stream.filter(track=['ETH'])
s = socket.socket()
host = "127.0.0.1"
port = …Run Code Online (Sandbox Code Playgroud) 我试图在 pyspark 中定义嵌套 .json 模式,但无法使 ddl_schema 字符串正常工作。
通常在 SQL 中这将是 ROW,我已经尝试过下面的 STRUCT 但无法获得正确的数据类型,这是错误......
ParseException:
mismatched input '(' expecting {<EOF>, ',', 'COMMENT', NOT}(line 6, pos 15)
== SQL ==
driverId INT,
driverRef STRING,
number STRING,
code STRING,
name STRUCT(forename STRING, surname STRING),
---------------^^^
dob DATE,
nationality STRING,
url STRING
Run Code Online (Sandbox Code Playgroud)
+--------+----------+------+----+--------------------+----------+-----------+--------------------+
|driverId| driverRef|number|code| name| dob|nationality| url|
+--------+----------+------+----+--------------------+----------+-----------+--------------------+
| 1| hamilton| 44| HAM| {Lewis, Hamilton}|1985-01-07| British|http://en.wikiped...|
Run Code Online (Sandbox Code Playgroud)
+--------+----------+------+----+--------------------+----------+-----------+--------------------+
|driverId| driverRef|number|code| name| dob|nationality| url|
+--------+----------+------+----+--------------------+----------+-----------+--------------------+
| 1| hamilton| 44| HAM| {Lewis, Hamilton}|1985-01-07| …Run Code Online (Sandbox Code Playgroud)