小编Ale*_*Ott的帖子

无法推断 pyspark 中 CSV 的架构

我正在使用 databricks 并尝试读取这样的 csv 文件:

df = (spark.read      
  .option("header", "true")
  .option("inferSchema", "true")
  .csv(path_to_my_file)
)
Run Code Online (Sandbox Code Playgroud)

我收到错误:

AnalysisException: 'Unable to infer schema for CSV. It must be specified manually.;'
Run Code Online (Sandbox Code Playgroud)

我检查过我的文件不为空,并且我还尝试自己指定架构,如下所示:

schema = "datetime timestamp, id STRING, zone_id STRING, name INT, time INT, a INT"
df = (spark.read      
  .option("header", "true")
  .schema(schema)
  .csv(path_to_my_file)
)
Run Code Online (Sandbox Code Playgroud)

但是当尝试使用 display(df) 查看它时,它只是在下面给了我这个,我完全迷失了,不知道该怎么办。

df.show() and df.printSchema()给出以下结果: 在此输入图像描述

在此输入图像描述

看起来数据没有被读入数据帧。

错误快照: 在此输入图像描述

apache-spark pyspark

4
推荐指数
1
解决办法
4万
查看次数

AttributeError: 'DataFrame' 对象没有属性 '_data'

在 Pandas 数据帧上并行化时 Azure Databricks 执行错误。代码能够创建RDD但在执行时中断.collect()

设置:

import pandas as pd
# initialize list of lists 
data = [['tom', 10], ['nick', 15], ['juli', 14]] 
  
# Create the pandas DataFrame 
my_df = pd.DataFrame(data, columns = ['Name', 'Age']) 

def testfn(i):
  return my_df.iloc[i]
test_var=sc.parallelize([0,1,2],50).map(testfn).collect()
print (test_var)
Run Code Online (Sandbox Code Playgroud)

错误:

Py4JJavaError                             Traceback (most recent call last)
<command-2941072546245585> in <module>
      1 def testfn(i):
      2   return my_df.iloc[i]
----> 3 test_var=sc.parallelize([0,1,2],50).map(testfn).collect()
      4 print (test_var)

/databricks/spark/python/pyspark/rdd.py in collect(self)
    901         # Default path used in OSS Spark / for non-credential …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark databricks azure-databricks

4
推荐指数
1
解决办法
3432
查看次数

delta Lake - 在 pyspark 中插入 sql 失败,出现 java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.expressions.Alias

Dataproc 集群是使用2.0.x带有 delta io 包的映像创建的io.delta:delta-core_2.12:0.7.0

Spark 版本是 3.1.1

Spark shell 启动:

pyspark --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
Run Code Online (Sandbox Code Playgroud)

执行命令以创建增量表并插入到增量 sql 中:

spark.sql("""CREATE TABLE IF NOT EXISTS customer(
             c_id Long, c_name String, c_city String
             )
           USING DELTA LOCATION 'gs://edw-bi-dev-dataexports/delta-table-poc/dt_poc/customer'
         """)

spark.sql("INSERT INTO customer VALUES(1, 'Shawn', 'Tx')")
Run Code Online (Sandbox Code Playgroud)

错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/sql/session.py", line 719, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark google-cloud-dataproc databricks delta-lake

4
推荐指数
1
解决办法
901
查看次数

Databricks 连接到 IntelliJ + python 线程“main”中出现错误异常 java.lang.NoSuchMethodError:

我尝试将我的 databricks 与我的 IDE 连接

我的机器上没有下载 Spark ad/或 scala,但我下载了 pyspark (pip install pyspark)。我构建了必要的环境变量并创建了一个文件夹 Hadoop,在其中放置了一个文件夹 bin,在其中放置了一个 winutils.exe 文件。

这是一个循序渐进的过程,缓慢而稳定地解决了我的所有错误,除了最后一个:

import logging
from pyspark.sql import SparkSession
from pyspark import SparkConf

if __name__ == "__main__":
    spark = SparkSession.builder.getOrCreate()
    spark.sparkContext.setLogLevel("OFF")
Run Code Online (Sandbox Code Playgroud)

给予

1/03/30 15:14:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Exception in thread "main" …
Run Code Online (Sandbox Code Playgroud)

python databricks databricks-connect

4
推荐指数
1
解决办法
1260
查看次数

无法使用 databricks-connect“V2”V.13.2 访问 databricks 集群

当尝试使用 databricks-connect 13.2.0 执行本地 Spark 代码时,它不起作用。

我有以下问题:

错误:

  • 详情="INVALID_STATE: cluster xxxxx is not Shared or Single User Cluster. (requestId=05bc3105-4828-46d4-a381-7580f3b55416)"
  • 调试错误字符串="UNKNOWN:Error received from peer {grpc_message:"INVALID_STATE: cluster 0711-122239-bb999j6u is not Shared or Single User Cluster. (requestId=05bc3105-4828-46d4-a381-7580f3b55416)", grpc_status:9, created_time:"2023-07-11T15:26:08.9729+02:00"}"

该集群是共享的,我尝试了几种集群配置,但它不起作用!集群运行时版本为13.2。

另外,我使用:

  • Python 3.10
  • openjdk版本“1.8.0_292”
  • Azure 数据块

有人对新的 databricks connect 遇到过类似的问题吗?

感谢帮助!

我尝试了以下代码:

from databricks.connect import DatabricksSession
from pyspark.sql.types import *

from delta.tables import DeltaTable
from datetime import date


if __name__ == "__main__":
    spark = DatabricksSession.builder.getOrCreate()

    # Create a Spark DataFrame consisting …
Run Code Online (Sandbox Code Playgroud)

python azure databricks azure-databricks databricks-connect

4
推荐指数
1
解决办法
2023
查看次数

Cassandra Astra 安全部署到 heroku

我正在使用 python 和 Cassandra(Astra provider)开发一个应用程序,并试图将它部署在 Heroku 上。

问题是连接到数据库需要本地存在凭据 zip 文件- https://docs.datastax.com/en/astra/aws/doc/dscloud/astra/dscloudConnectPythonDriver.html '/path/to/secure- connect-database_name.zip' 并且 Heroku 不支持上传凭证文件。

我可以将用户名和密码配置为环境变量,但凭证 zip 文件不能配置为环境变量。

heroku config:set CASSANDRA_USERNAME=cassandra
heroku config:set CASSANDRA_PASSWORD=cassandra
heroku config:set CASSANDRA_KEYSPACE=mykeyspace
Run Code Online (Sandbox Code Playgroud)

有什么方法可以使用 zip 文件作为环境变量,我想提取所有文件并在 Heroku 中为每个文件配置一个环境变量。

但是如果我开始使用从环境变量中提取的文件,我不确定要指定什么而不是Cluster(cloud=cloud_config, auth_provider=auth_provider)

我知道我可以在我的私人 git repo 中检入凭证 zip,这样它就可以工作,但检查凭证似乎并不安全。

我想到的另一个想法是将其存储在 S3 中并在部署期间获取文件并将其解压缩到临时目录中以供使用。

任何指示或帮助都非常感谢。

heroku cassandra datastax datastax-python-driver datastax-astra

3
推荐指数
1
解决办法
120
查看次数

Spark sql DATEADD

我正在尝试过滤掉从当前日期到过去 3 年的数据,并尝试将其用于 Spark sql 查询:(例如:d_date列格式2009-09-18

WHERE d_date >= DATEADD(MONTH, -3, GETDATE())
Run Code Online (Sandbox Code Playgroud)

但出现以下错误。

未定义的函数:“DATEADD”。该函数既不是注册的临时函数,也不是数据库“default”中注册的永久函数。

Spark SQL 有等效的 DATEADD 吗?

apache-spark apache-spark-sql

3
推荐指数
1
解决办法
3万
查看次数

无法直接从 GCP Databricks 上的 pandas 读取

通常在 Azure/AWS 上的 Databricks 上,要读取存储在 Azure Blob/S3 上的文件,我会挂载存储桶或 Blob 存储,然后执行以下操作:

如果使用 Spark

df = spark.read.format('csv').load('/mnt/my_bucket/my_file.csv', header="true")

如果直接使用 pandas,则将 /dbfs 添加到路径中:

df = pd.read_csv('/dbfs/mnt/my_bucket/my_file.csv')

我正在尝试使用 GCP 在 Databricks 的托管版本上执行完全相同的操作,尽管我成功地安装了我的存储桶并使用 Spark 读取它,但我无法直接使用 Pandas 执行此操作,添加 /dbfs 不起作用我收到No such file or directory: ...错误

你们中有人遇到过类似的问题吗?我错过了什么吗?

还有当我这样做的时候

%sh 
ls /dbfs
Run Code Online (Sandbox Code Playgroud)

尽管我可以在 UI 中看到 dbfs 浏览器以及已安装的存储桶和文件,但它没有返回任何内容

谢谢您的帮助

python pandas google-cloud-platform databricks gcp-databricks

3
推荐指数
1
解决办法
1033
查看次数

Pyspark 添加字符串类型的空文字映射

这个问题类似,我想在我的 pyspark DataFrame 中添加一列,其中只包含一个空地图。但是,如果我使用该问题的建议答案,则地图的类型为<null,null>,与那里发布的答案不同。

from pyspark.sql.functions import create_map
spark.range(1).withColumn("test", create_map()).printSchema()

root
 |-- test: map(nullable = false)
 |    |-- key: null
 |    |-- value: null (valueContainsNull = false)
Run Code Online (Sandbox Code Playgroud)

我需要一张空<string,string>地图。我可以在 Scala 中这样做:

import org.apache.spark.sql.functions.typedLit
spark.range(1).withColumn("test", typedLit(Map[String, String]())).printSchema()

root
 |-- test: map(nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
Run Code Online (Sandbox Code Playgroud)

我怎样才能在pyspark中做到这一点?我在 Databricks Runtime 7.3 LTS 上使用 Spark 3.01 和底层 Scala 2.12。我需要<string,string>地图,因为否则我无法将数据框保存到镶木地板:

AnalysisException: Parquet data source does not support map<null,null> data …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

3
推荐指数
1
解决办法
2274
查看次数

PySpark 等待笔记本中完成 (Databricks)

目前,我在一个单元格中使用 Spark 数据帧(自动加载器)时遇到一些问题,可能需要一些时间才能写入数据。然后,在下面的单元格中,代码引用第一个表完成的工作。但是,如果由于 Spark 的分布式特性而运行整个笔记本(特别是作为作业),则第二个单元会在第一个单元完全完成之前运行。如何让第二个单元等待 writeStream 完成,而不将它们放在单独的笔记本中。

例子:

小区1

autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
Run Code Online (Sandbox Code Playgroud)

细胞2

df = spark.sql('select count(*) from TABLE1')
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark databricks spark-structured-streaming

3
推荐指数
1
解决办法
3939
查看次数