我正在使用 databricks 并尝试读取这样的 csv 文件:
df = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv(path_to_my_file)
)
Run Code Online (Sandbox Code Playgroud)
我收到错误:
AnalysisException: 'Unable to infer schema for CSV. It must be specified manually.;'
Run Code Online (Sandbox Code Playgroud)
我检查过我的文件不为空,并且我还尝试自己指定架构,如下所示:
schema = "datetime timestamp, id STRING, zone_id STRING, name INT, time INT, a INT"
df = (spark.read
.option("header", "true")
.schema(schema)
.csv(path_to_my_file)
)
Run Code Online (Sandbox Code Playgroud)
但是当尝试使用 display(df) 查看它时,它只是在下面给了我这个,我完全迷失了,不知道该怎么办。
df.show() and df.printSchema()给出以下结果:

看起来数据没有被读入数据帧。
在 Pandas 数据帧上并行化时 Azure Databricks 执行错误。代码能够创建RDD但在执行时中断.collect()
设置:
import pandas as pd
# initialize list of lists
data = [['tom', 10], ['nick', 15], ['juli', 14]]
# Create the pandas DataFrame
my_df = pd.DataFrame(data, columns = ['Name', 'Age'])
def testfn(i):
return my_df.iloc[i]
test_var=sc.parallelize([0,1,2],50).map(testfn).collect()
print (test_var)
Run Code Online (Sandbox Code Playgroud)
错误:
Py4JJavaError Traceback (most recent call last)
<command-2941072546245585> in <module>
1 def testfn(i):
2 return my_df.iloc[i]
----> 3 test_var=sc.parallelize([0,1,2],50).map(testfn).collect()
4 print (test_var)
/databricks/spark/python/pyspark/rdd.py in collect(self)
901 # Default path used in OSS Spark / for non-credential …Run Code Online (Sandbox Code Playgroud) Dataproc 集群是使用2.0.x带有 delta io 包的映像创建的io.delta:delta-core_2.12:0.7.0
Spark 版本是 3.1.1
Spark shell 启动:
pyspark --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
Run Code Online (Sandbox Code Playgroud)
执行命令以创建增量表并插入到增量 sql 中:
spark.sql("""CREATE TABLE IF NOT EXISTS customer(
c_id Long, c_name String, c_city String
)
USING DELTA LOCATION 'gs://edw-bi-dev-dataexports/delta-table-poc/dt_poc/customer'
""")
spark.sql("INSERT INTO customer VALUES(1, 'Shawn', 'Tx')")
Run Code Online (Sandbox Code Playgroud)
错误:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/session.py", line 719, in sql
return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 111, in deco …Run Code Online (Sandbox Code Playgroud) apache-spark pyspark google-cloud-dataproc databricks delta-lake
我尝试将我的 databricks 与我的 IDE 连接
我的机器上没有下载 Spark ad/或 scala,但我下载了 pyspark (pip install pyspark)。我构建了必要的环境变量并创建了一个文件夹 Hadoop,在其中放置了一个文件夹 bin,在其中放置了一个 winutils.exe 文件。
这是一个循序渐进的过程,缓慢而稳定地解决了我的所有错误,除了最后一个:
import logging
from pyspark.sql import SparkSession
from pyspark import SparkConf
if __name__ == "__main__":
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("OFF")
Run Code Online (Sandbox Code Playgroud)
给予
1/03/30 15:14:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Exception in thread "main" …Run Code Online (Sandbox Code Playgroud) 当尝试使用 databricks-connect 13.2.0 执行本地 Spark 代码时,它不起作用。
我有以下问题:
错误:
"INVALID_STATE: cluster xxxxx is not Shared or Single User Cluster. (requestId=05bc3105-4828-46d4-a381-7580f3b55416)""UNKNOWN:Error received from peer {grpc_message:"INVALID_STATE: cluster 0711-122239-bb999j6u is not Shared or Single User Cluster. (requestId=05bc3105-4828-46d4-a381-7580f3b55416)", grpc_status:9, created_time:"2023-07-11T15:26:08.9729+02:00"}"该集群是共享的,我尝试了几种集群配置,但它不起作用!集群运行时版本为13.2。
另外,我使用:
有人对新的 databricks connect 遇到过类似的问题吗?
感谢帮助!
我尝试了以下代码:
from databricks.connect import DatabricksSession
from pyspark.sql.types import *
from delta.tables import DeltaTable
from datetime import date
if __name__ == "__main__":
spark = DatabricksSession.builder.getOrCreate()
# Create a Spark DataFrame consisting …Run Code Online (Sandbox Code Playgroud) 我正在使用 python 和 Cassandra(Astra provider)开发一个应用程序,并试图将它部署在 Heroku 上。
问题是连接到数据库需要本地存在凭据 zip 文件- https://docs.datastax.com/en/astra/aws/doc/dscloud/astra/dscloudConnectPythonDriver.html '/path/to/secure- connect-database_name.zip' 并且 Heroku 不支持上传凭证文件。
我可以将用户名和密码配置为环境变量,但凭证 zip 文件不能配置为环境变量。
heroku config:set CASSANDRA_USERNAME=cassandra
heroku config:set CASSANDRA_PASSWORD=cassandra
heroku config:set CASSANDRA_KEYSPACE=mykeyspace
Run Code Online (Sandbox Code Playgroud)
有什么方法可以使用 zip 文件作为环境变量,我想提取所有文件并在 Heroku 中为每个文件配置一个环境变量。
但是如果我开始使用从环境变量中提取的文件,我不确定要指定什么而不是Cluster(cloud=cloud_config, auth_provider=auth_provider)?
我知道我可以在我的私人 git repo 中检入凭证 zip,这样它就可以工作,但检查凭证似乎并不安全。
我想到的另一个想法是将其存储在 S3 中并在部署期间获取文件并将其解压缩到临时目录中以供使用。
任何指示或帮助都非常感谢。
heroku cassandra datastax datastax-python-driver datastax-astra
我正在尝试过滤掉从当前日期到过去 3 年的数据,并尝试将其用于 Spark sql 查询:(例如:d_date列格式2009-09-18
)
WHERE d_date >= DATEADD(MONTH, -3, GETDATE())
Run Code Online (Sandbox Code Playgroud)
但出现以下错误。
未定义的函数:“DATEADD”。该函数既不是注册的临时函数,也不是数据库“default”中注册的永久函数。
Spark SQL 有等效的 DATEADD 吗?
通常在 Azure/AWS 上的 Databricks 上,要读取存储在 Azure Blob/S3 上的文件,我会挂载存储桶或 Blob 存储,然后执行以下操作:
如果使用 Spark
df = spark.read.format('csv').load('/mnt/my_bucket/my_file.csv', header="true")
如果直接使用 pandas,则将 /dbfs 添加到路径中:
df = pd.read_csv('/dbfs/mnt/my_bucket/my_file.csv')
我正在尝试使用 GCP 在 Databricks 的托管版本上执行完全相同的操作,尽管我成功地安装了我的存储桶并使用 Spark 读取它,但我无法直接使用 Pandas 执行此操作,添加 /dbfs 不起作用我收到No such file or directory: ...错误
你们中有人遇到过类似的问题吗?我错过了什么吗?
还有当我这样做的时候
%sh
ls /dbfs
Run Code Online (Sandbox Code Playgroud)
尽管我可以在 UI 中看到 dbfs 浏览器以及已安装的存储桶和文件,但它没有返回任何内容
谢谢您的帮助
python pandas google-cloud-platform databricks gcp-databricks
与这个问题类似,我想在我的 pyspark DataFrame 中添加一列,其中只包含一个空地图。但是,如果我使用该问题的建议答案,则地图的类型为<null,null>,与那里发布的答案不同。
from pyspark.sql.functions import create_map
spark.range(1).withColumn("test", create_map()).printSchema()
root
|-- test: map(nullable = false)
| |-- key: null
| |-- value: null (valueContainsNull = false)
Run Code Online (Sandbox Code Playgroud)
我需要一张空<string,string>地图。我可以在 Scala 中这样做:
import org.apache.spark.sql.functions.typedLit
spark.range(1).withColumn("test", typedLit(Map[String, String]())).printSchema()
root
|-- test: map(nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
Run Code Online (Sandbox Code Playgroud)
我怎样才能在pyspark中做到这一点?我在 Databricks Runtime 7.3 LTS 上使用 Spark 3.01 和底层 Scala 2.12。我需要<string,string>地图,因为否则我无法将数据框保存到镶木地板:
AnalysisException: Parquet data source does not support map<null,null> data …Run Code Online (Sandbox Code Playgroud) 目前,我在一个单元格中使用 Spark 数据帧(自动加载器)时遇到一些问题,可能需要一些时间才能写入数据。然后,在下面的单元格中,代码引用第一个表完成的工作。但是,如果由于 Spark 的分布式特性而运行整个笔记本(特别是作为作业),则第二个单元会在第一个单元完全完成之前运行。如何让第二个单元等待 writeStream 完成,而不将它们放在单独的笔记本中。
例子:
小区1
autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
Run Code Online (Sandbox Code Playgroud)
细胞2
df = spark.sql('select count(*) from TABLE1')
Run Code Online (Sandbox Code Playgroud) apache-spark ×6
databricks ×6
pyspark ×5
python ×4
azure ×1
cassandra ×1
datastax ×1
delta-lake ×1
heroku ×1
pandas ×1