我在 Spark 中有一个巨大的数据列表,我只获取了它的标题并保存在 pandas 数据框中。
现在我想从中创建不同的列表来区分分类和数字
df2 = df.dtypes
df3 = pd.DataFrame(df2)
print(df3)
Run Code Online (Sandbox Code Playgroud)
df4= df3.filter(df3[1] = 'String')
这个statemnet给出错误:
语法错误:关键字不能是表达式
我是数据工程/机器学习和自学的新学生。在处理示例问题时,我遇到了以下数据清理任务
1. Remove extra whitespaces (keep one whitespace in between word but remove more
than one whitespaces) and punctuations
2. Turn all the words to lower case and remove stop words (list from NLTK)
3. Remove duplicate words in ASSEMBLY_NAME column
Run Code Online (Sandbox Code Playgroud)
尽管我在大学作业期间一直在编写代码来执行这些任务,但我从未在任何项目中使用过一段代码来完成这些任务,并且我正在寻求专家的指导,他们可以通过指出来帮助我寻求完成任务的最佳方法(in python or scala)
目前已完成的工作:
1.从parquet文件中读取数据
partFitmentDF = sqlContext.read.parquet("/mnt/blob/devdatasciencesto/pga-parts-forecast/raw/parts-fits/")
display(partFitmentDF)
Run Code Online (Sandbox Code Playgroud)
partFitmentDF.createOrReplaceTempView("partsFits")
partFitmentDF.write.mode("overwrite").format("delta").saveAsTable("partsFitsTable")
Run Code Online (Sandbox Code Playgroud)
3. 重新排列表中的 Fits_Assembly_name 数据,以便每个不同的 Itemno 的所有 Fits_Assembly_Name 和 Fits_Assembly_ID 都滚动到单行
%sql
select itemno, concat_ws(' | ' , collect_set(cast(fits_assembly_id as int))) as fits_assembly_id, concat_ws(' | ' …Run Code Online (Sandbox Code Playgroud) 我有一个 pyspark 数据框,其中包含一些带有后缀的列_24。
df.columns = [timestamp',
'air_temperature_median_24',
'air_temperature_median_6',
'wind_direction_mean_24',
'wind_speed',
'building_id']
Run Code Online (Sandbox Code Playgroud)
我尝试使用 colRegex 方法选择它们,但下面的代码导致异常:
df.select(ashrae.colRegex(".+'_24'")).show()
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-103-a8189f0298e6> in <module>
----> 1 ashrae.select(ashrae.colRegex(".+'_24'")).show()
C:\spark\spark-3.0.0-preview-bin-hadoop2.7\python\pyspark\sql\dataframe.py in colRegex(self, colName)
957 if not isinstance(colName, basestring):
958 raise ValueError("colName should be provided as string")
--> 959 jc = self._jdf.colRegex(colName)
960 return Column(jc)
961
C:\spark\spark-3.0.0-preview-bin-hadoop2.7\python\lib\py4j-0.10.8.1-src.zip\py4j\java_gateway.py in __call__(self, *args)
1284 answer = self.gateway_client.send_command(command)
1285 return_value = get_return_value(
-> 1286 answer, self.gateway_client, self.target_id, self.name)
1287
1288 for temp_arg in temp_args: …Run Code Online (Sandbox Code Playgroud) 我正在尝试继承 DataFrame 类并添加其他自定义方法,如下所示,以便我可以流畅地链接并确保所有方法引用相同的数据帧。我收到异常,因为列不可迭代
from pyspark.sql.dataframe import DataFrame
class Myclass(DataFrame):
def __init__(self,df):
super().__init__(df._jdf, df.sql_ctx)
def add_column3(self):
// Add column1 to dataframe received
self._jdf.withColumn("col3",lit(3))
return self
def add_column4(self):
// Add column to dataframe received
self._jdf.withColumn("col4",lit(4))
return self
if __name__ == "__main__":
'''
Spark Context initialization code
col1 col2
a 1
b 2
'''
df = spark.createDataFrame([("a",1), ("b",2)], ["col1","col2"])
myobj = MyClass(df)
## Trying to accomplish below where i can chain MyClass methods & Dataframe methods
myobj.add_column3().add_column4().drop_columns(["col1"])
'''
Expected Output
col2, col3,col4
1,3,4 …Run Code Online (Sandbox Code Playgroud) 有人在 Pyspark 环境中使用过 Apache Hudi 吗?如果可能的话,有可用的代码示例吗?
我一直在尝试使用来自 Google colab 的 pyspark 在 Windows 10 上的本地主机上写入/读取 MySQL Server 8.0.19 的表,但失败。还有很多类似的问题和一些建议的答案,但似乎没有一个解决方案在这里有效。这是我的代码:
<...installations ...>
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("Word Count")\
.config("spark.driver.extraClassPath", "/content/spark-2.4.5-bin-hadoop2.7/jars/mysql-connector-java-8.0.19.jar")\
.getOrCreate()
Run Code Online (Sandbox Code Playgroud)
这是连接字符串:
MyjdbcDF = spark.read.format("jdbc")\
.option("url", "jdbc:mysql://127.0.0.1:3306/mydb?user=testuser&password=pwtest")\
.option("dbtable", "collisions")\
.option("driver","com.mysql.cj.jdbc.Driver")\
.load()
Run Code Online (Sandbox Code Playgroud)
我也使用了.option("driver","com.mysql.jdbc.Driver")但仍然不断收到此错误:
Py4JJavaError: An error occurred while calling o154.load.
com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
...
...
...
Caused by: java.net.ConnectException: …Run Code Online (Sandbox Code Playgroud) 在 ForeachBatch 函数结构化 Straming 中,我想创建微批次中接收的数据帧的临时视图
func(tabdf, epoch_id):
tabaDf.createOrReplaceView("taba")
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误:
org.apache.spark.sql.streaming.StreamingQueryException: Table or view not found: taba
Caused by: org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'taba' not found
Run Code Online (Sandbox Code Playgroud)
请任何人帮助我解决这个问题。
spark-streaming apache-spark-sql pyspark spark-structured-streaming
search = search.filter(!F.col("Name").contains("ABC"))
search = search.filter(F.not(F.col("Name").contains("ABC"))
Run Code Online (Sandbox Code Playgroud)
这两种方法都由于语法错误而失败,您可以帮我过滤 pyspark 中不包含特定字符串的行吗?
^ 语法错误:语法无效
我有 PySpark 数据框 df
data = {'Passenger-Id': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5},'Age': {0: 22, 1: 38, 2: 26, 3: 35, 4: 35}}
df_pd = pd.DataFrame(data, columns=data.keys())
df = spark.createDataFrame(df_pd)
Run Code Online (Sandbox Code Playgroud)
+------------+---+
|Passenger-Id|Age|
+------------+---+
| 1| 22|
| 2| 38|
| 3| 26|
| 4| 35|
| 5| 35|
+------------+---+
Run Code Online (Sandbox Code Playgroud)
这有效
df.filter(df.Age == 22).show()
Run Code Online (Sandbox Code Playgroud)
但下面不起作用,因为 - 在列名称中
df.filter(df.Passenger-Id == 2).show()
Run Code Online (Sandbox Code Playgroud)
AttributeError:“DataFrame”对象没有属性“Passenger”
我在 Spark sql 中也遇到了同样的问题,
spark.sql("SELECT Passenger-Id FROM AutoMobile").show()
spark.sql("SELECT automobile.Passenger-Id FROM AutoMobile").show()
Run Code Online (Sandbox Code Playgroud)
出现以下错误
AnalysisException:无法解析Passenger给定输入列的“”:[automobile.Age,automobile.Passenger-Id] …
我想对我的 DF 进行一些检查,为了尝试它,我使用以下代码:
start = '2020-12-10'
end = datetime.date.today()
country='gb'
df_ua = (spark.table(f'nn_squad7_{country}.fact_table')
.filter(f.col('date_key').between(start,end))
#.filter(f.col('is_client')==1)
.filter(f.col('source')=='tickets')
.filter(f.col('subtype')=='trx')
.filter(f.col('is_trx_ok') == 1)
.select('ticket_id').distinct()
)
output = df_ua.count('ticket_id').distinct()
Run Code Online (Sandbox Code Playgroud)
我收到以下错误:
类型错误:count() 采用 1 个位置参数,但给出了 2 个
我不明白为什么我会得到它,有什么线索吗?
pyspark ×10
apache-spark ×6
python ×5
python-3.x ×2
apache-hudi ×1
dataframe ×1
mysql ×1
nltk ×1
pandas ×1
scala ×1