这个命令运行良好有什么原因吗:
%sql SELECT * FROM Azure.Reservations WHERE timestamp > '2021-04-02'
Run Code Online (Sandbox Code Playgroud)
返回 2 行,如下:
%sql DELETE FROM Azure.Reservations WHERE timestamp > '2021-04-02'
Run Code Online (Sandbox Code Playgroud)
失败并显示:
SQL语句中的错误:AssertionError:断言失败:没有DeleteFromTable的计划(timestamp#394 > 1617321600000000)
?
我是 Databricks 新手,但我确信我在另一个表上运行了类似的命令(没有 WHERE 子句)。该表是基于 Parquet 文件创建的。
在这个命令(取自)中会replaceWhere导致记录删除吗?例如:命令中提到的日期范围有 1000 行。new df 只有100条,这样会导致删除900条记录吗?
df.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'") \
.save("/mnt/delta/events")
Run Code Online (Sandbox Code Playgroud) 现在,databricks 自动加载器需要一个从中加载所有文件的目录路径。但是,如果某些其他类型的日志文件也开始进入该目录 - 有没有办法要求 Autoloader 在准备数据帧时排除这些文件?
df = spark.readStream.format("cloudFiles") \
.option(<cloudFiles-option>, <option-value>) \
.schema(<schema>) \
.load(<input-path>)
Run Code Online (Sandbox Code Playgroud) 我在项目中使用 <spark.version>3.1.2</spark.version> 和“delta”湖 io.delta:delta-core_2.12:1.0.0 。
在阅读“delta”文件时,我遇到以下错误:IllegalArgumentException: Unknown message type: 9 error
java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 4 ($anonfun$apply$2 at DatabricksLogging.scala:77) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: java.lang.IllegalArgumentException: Unknown message type: 9 at org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder.fromByteBuffer(BlockTransferMessage.java:71) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:80) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ... 1 more
at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
at com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:464)
at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:401)
at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog$lzycompute(DeltaTableV2.scala:73)
at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:177)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:305) …Run Code Online (Sandbox Code Playgroud) 我在 Azure Databricks 中的 jupyter 笔记本文件中的 %python 下创建了 python 变量。如何访问相同的变量以在 %sql 下进行比较。下面是示例:
%python
RunID_Goal = sqlContext.sql("SELECT CONCAT(SUBSTRING(RunID,1,6),SUBSTRING(RunID,1,6),'01_')
FROM RunID_Pace").first()[0]
AS RunID_Goal
Run Code Online (Sandbox Code Playgroud)
%sql
SELECT Type , KPIDate, Value
FROM table
WHERE
RunID = RunID_Goal (This is the variable created under %python and want to compare over here)
Run Code Online (Sandbox Code Playgroud)
当我运行此命令时,它会抛出错误: SQL 语句中的错误: AnalysisException:无法解析RunID_Goal给定输入列的“ ”:我是新的 azure databricks 和 Spark sql 任何形式的帮助将不胜感激。
apache-spark apache-spark-sql pyspark databricks azure-databricks
数据框由两列(s3ObjectName,batchName)组成,其中包含数万行,例如:-
| s3对象名称 | 批次名称 |
|---|---|
| a1.json | 45 |
| b2.json | 45 |
| c3.json | 45 |
| d4.json | 46 |
| e5.json | 46 |
目标是使用 foreachPartition() 和 foreach() 函数从 S3 存储桶中检索对象并使用数据帧中每行的详细信息并行写入数据湖
// s3 connector details defined as an object so it can be serialized and available on all executors in the cluster
object container {
def getDataSource() = {
val AccessKey = dbutils.secrets.get(scope = "ADBTEL_Scope", key = "Telematics-TrueMotion-AccessKey-ID")
val SecretKey = dbutils.secrets.get(scope = "ADBTEL_Scope", key = "Telematics-TrueMotion-AccessKey-Secret")
val creds = new BasicAWSCredentials(AccessKey, SecretKey)
val clientRegion: Regions = Regions.US_EAST_1
AmazonS3ClientBuilder.standard()
.withRegion(clientRegion)
.withCredentials(new …Run Code Online (Sandbox Code Playgroud) 这可能是一个非常基本的问题,因为我是 pyspark 的初学者。我已经阅读了一个 csv 文件并尝试在其上应用一些 pyspark 功能,例如过滤、拆分或替换。但我面临一个错误这是我的代码...
emp_data = spark\
.read\
.format('csv')\
.option("inferSchema","true")\
.option("header","true")\
.load("/FileStore/tables/employee_earnings_report_2016-1.csv")
Run Code Online (Sandbox Code Playgroud)
阅读文件后,我应用了过滤器..运行良好
import pyspark.sql.functions as f
df = emp_data.filter((f.col("POSTAL") == 2148) | (f.col("POSTAL") == 2125)).show(5)
+-----------------+-----------+-----+-------+----------+-------+------+-------------------------+--------------+------+------+
| NAME| REGULAR|RETRO| OTHER| OVERTIME|INJURED|DETAIL|QUINN/EDUCATION INCENTIVE|TOTAL EARNINGS|POSTAL|Gender|
+-----------------+-----------+-----+-------+----------+-------+------+-------------------------+--------------+------+------+
| Abbasi,Sophia| $18,249.83| NA| NA| NA| NA| NA| NA| $18,249.83| 2148| M|
|Abbruzzese,Angela| $5,000.90| NA| NA| NA| NA| NA| NA| $5,000.90| 2125| M|
| Abbruzzese,Donna| $621.90| NA| NA| NA| NA| NA| NA| $621.90| 2125| M|
| Abdelrahim,Maha| $1,181.60| NA| NA| NA| NA| …Run Code Online (Sandbox Code Playgroud) 在下面的代码块中,我有一个数据帧geo,我想对其进行迭代以获取 中每个英国邮政编码的东距、北距、经度和纬度geo。我编写了一个函数来调用 API,另一个函数则返回四个变量。
我已经使用get_data邮政编码测试了该调用,以证明它有效(这是任何人都可以使用的公共 API):
import requests
import pandas as pd
geo = spark.table('property_address').toPandas()
def call_api(url: str) -> dict:
postcode_response =requests.get(url)
return postcode_response.json()
def get_data(postcode):
url = f"http://api.getthedata.com/postcode/{postcode}"
req = r.get(url)
results = req.json()['data']
easting = results['easting']
northing = results['northing']
latitude = results['latitude']
longitude = results ['longitude']
return easting ,northing,latitude, longitude
get_data('SW1A 1AA')
Run Code Online (Sandbox Code Playgroud)
返回:
Out[108]: (529090, 179645, '51.501009', '-0.141588')
Run Code Online (Sandbox Code Playgroud)
我想要做的是为每一行运行它geo并将其作为数据集返回。我的研究引导我apply,并且我的尝试基于本指南。
我试图传递一个调用的列property_postcode并geo迭代每一行以返回值,这是我的尝试:
def get_columns(row):
column_name …Run Code Online (Sandbox Code Playgroud) I have a dataframe as below:
val data = Seq(("James", "Sales", 34, "Developer"), ("Michael", "Sales", 56, "Architect"), ("Robert", "Sales", 30, "Manager"), ("Maria", "Finance", 24, "Consultant"))
val df1 = data.toDF("name","dept","id", "role")
df1.printSchema()
root
|-- name: string (nullable = true)
|-- dept: string (nullable = true)
|-- id: integer (nullable = true)
|-- role: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
I have a hive table with same columns and exact schema:
val df2 = spark.sql("select * from db.table")
Run Code Online (Sandbox Code Playgroud)
From the incoming dataframe df1 I …
我正在尝试使用“rename ()”函数通过“import os”库在Azure Databricks中使用Python重命名文件,这确实非常简单,但是在Databricks中执行此操作时我无法到达其中的路径我的文件是。在数据湖中,但是执行命令“% fs ls path_file”是的,我看到了它,我什至可以毫无问题地读取它并使用 pyspark 处理它。
我留下我的代码示例:
import os
old_name = r"/mnt/datalake/path/part-00000-tid-1761178-3f1b0942-223-1-c000.csv"
new_name = r"/mnt/datalake/path/example.csv"
os.rename(old_name, new_name)
Run Code Online (Sandbox Code Playgroud)
上面返回一个错误,指出找不到路径或文件,但“ls”命令可以毫无问题地执行相同的路径。
另一方面,我尝试用 pySpark 重命名该文件,但它使用了我没有安装的 hadoop 库(org.apache.hadoop.conf.Configuration),并且无法在生产环境中安装它......
我会缺少什么?
databricks ×10
apache-spark ×6
pyspark ×4
delta-lake ×3
python ×3
azure ×1
dataframe ×1
foreach ×1
scala ×1