标签: databricks

AssertionError:断言失败:没有在 Databricks 中进行 DeleteFromTable 的计划

这个命令运行良好有什么原因吗:

%sql SELECT * FROM Azure.Reservations WHERE timestamp > '2021-04-02'
Run Code Online (Sandbox Code Playgroud)

返回 2 行,如下:

%sql DELETE FROM Azure.Reservations WHERE timestamp > '2021-04-02'
Run Code Online (Sandbox Code Playgroud)

失败并显示:

SQL语句中的错误:AssertionError:断言失败:没有DeleteFromTable的计划(timestamp#394 > 1617321600000000)

我是 Databricks 新手,但我确信我在另一个表上运行了类似的命令(没有 WHERE 子句)。该表是基于 Parquet 文件创建的。

apache-spark databricks azure-databricks delta-lake

2
推荐指数
1
解决办法
8767
查看次数

'replaceWhere' 会导致删除吗?

在这个命令(取自)中会replaceWhere导致记录删除吗?例如:命令中提到的日期范围有 1000 行。new df 只有100条,这样会导致删除900条记录吗?

df.write \
  .format("delta") \
  .mode("overwrite") \
  .option("replaceWhere", "date >= '2017-01-01' AND date <= '2017-01-31'") \
  .save("/mnt/delta/events")
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark databricks

2
推荐指数
1
解决办法
2663
查看次数

我们可以从 Databricks Autoloader 中排除或仅包含特定文件扩展名吗?

现在,databricks 自动加载器需要一个从中加载所有文件的目录路径。但是,如果某些其他类型的日志文件也开始进入该目录 - 有没有办法要求 Autoloader 在准备数据帧时排除这些文件?

df = spark.readStream.format("cloudFiles") \
  .option(<cloudFiles-option>, <option-value>) \
  .schema(<schema>) \
  .load(<input-path>)
Run Code Online (Sandbox Code Playgroud)

databricks databricks-autoloader

2
推荐指数
1
解决办法
2230
查看次数

IllegalArgumentException:读取“delta”文件时未知消息类型:9

我在项目中使用 <spark.version>3.1.2</spark.version> 和“delta”湖 io.delta:delta-core_2.12:1.0.0 。

在阅读“delta”文件时,我遇到以下错误:IllegalArgumentException: Unknown message type: 9 error

java.util.concurrent.ExecutionException: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 4 ($anonfun$apply$2 at DatabricksLogging.scala:77) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: java.lang.IllegalArgumentException: Unknown message type: 9  at org.apache.spark.network.shuffle.protocol.BlockTransferMessage$Decoder.fromByteBuffer(BlockTransferMessage.java:71)  at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:80)    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)     at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)     at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)    ... 1 more 
    at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
    at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
    at com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
    at org.apache.spark.sql.delta.DeltaLog$.apply(DeltaLog.scala:464)
    at org.apache.spark.sql.delta.DeltaLog$.forTable(DeltaLog.scala:401)
    at org.apache.spark.sql.delta.catalog.DeltaTableV2.deltaLog$lzycompute(DeltaTableV2.scala:73)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:177)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:305) …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql databricks delta-lake

2
推荐指数
1
解决办法
2519
查看次数

如何在 Spark SQL 中访问 python 变量?

我在 Azure Databricks 中的 jupyter 笔记本文件中的 %python 下创建了 python 变量。如何访问相同的变量以在 %sql 下进行比较。下面是示例:

%python

RunID_Goal = sqlContext.sql("SELECT CONCAT(SUBSTRING(RunID,1,6),SUBSTRING(RunID,1,6),'01_') 
FROM RunID_Pace").first()[0] 
AS RunID_Goal
Run Code Online (Sandbox Code Playgroud)
%sql
SELECT Type , KPIDate, Value
FROM table
WHERE
RunID = RunID_Goal (This is the variable created under %python and want to compare over here)
Run Code Online (Sandbox Code Playgroud)

当我运行此命令时,它会抛出错误: SQL 语句中的错误: AnalysisException:无法解析RunID_Goal给定输入列的“ ”:我是新的 azure databricks 和 Spark sql 任何形式的帮助将不胜感激。

apache-spark apache-spark-sql pyspark databricks azure-databricks

2
推荐指数
1
解决办法
8743
查看次数

Spark 错误:-“foreach 的值不是对象的成员”

数据框由两列(s3ObjectName,batchName)组成,其中包含数万行,例如:-

s3对象名称 批次名称
a1.json 45
b2.json 45
c3.json 45
d4.json 46
e5.json 46

目标是使用 foreachPartition() 和 foreach() 函数从 S3 存储桶中检索对象并使用数据帧中每行的详细信息并行写入数据湖

  // s3 connector details defined as an object so it can be serialized and available on all executors in the cluster

object container {
  
  def getDataSource() = {
    val AccessKey = dbutils.secrets.get(scope = "ADBTEL_Scope", key = "Telematics-TrueMotion-AccessKey-ID")
    val SecretKey = dbutils.secrets.get(scope = "ADBTEL_Scope", key = "Telematics-TrueMotion-AccessKey-Secret")
    val creds = new BasicAWSCredentials(AccessKey, SecretKey)
    val clientRegion: Regions = Regions.US_EAST_1
    AmazonS3ClientBuilder.standard()
    .withRegion(clientRegion)
    .withCredentials(new …
Run Code Online (Sandbox Code Playgroud)

foreach scala apache-spark databricks

2
推荐指数
1
解决办法
3268
查看次数

AttributeError:“NoneType”对象没有属性“select”| pySpark

这可能是一个非常基本的问题,因为我是 pyspark 的初学者。我已经阅读了一个 csv 文件并尝试在其上应用一些 pyspark 功能,例如过滤、拆分或替换。但我面临一个错误这是我的代码...

emp_data = spark\
            .read\
            .format('csv')\
            .option("inferSchema","true")\
            .option("header","true")\
            .load("/FileStore/tables/employee_earnings_report_2016-1.csv")
Run Code Online (Sandbox Code Playgroud)

阅读文件后,我应用了过滤器..运行良好

import pyspark.sql.functions as f
df = emp_data.filter((f.col("POSTAL") == 2148) | (f.col("POSTAL") == 2125)).show(5)

+-----------------+-----------+-----+-------+----------+-------+------+-------------------------+--------------+------+------+
|             NAME|    REGULAR|RETRO|  OTHER|  OVERTIME|INJURED|DETAIL|QUINN/EDUCATION INCENTIVE|TOTAL EARNINGS|POSTAL|Gender|
+-----------------+-----------+-----+-------+----------+-------+------+-------------------------+--------------+------+------+
|    Abbasi,Sophia| $18,249.83|   NA|     NA|        NA|     NA|    NA|                       NA|    $18,249.83|  2148|     M|
|Abbruzzese,Angela|  $5,000.90|   NA|     NA|        NA|     NA|    NA|                       NA|     $5,000.90|  2125|     M|
| Abbruzzese,Donna|    $621.90|   NA|     NA|        NA|     NA|    NA|                       NA|       $621.90|  2125|     M|
|  Abdelrahim,Maha|  $1,181.60|   NA|     NA|        NA|     NA| …
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark-sql pyspark databricks

2
推荐指数
1
解决办法
2万
查看次数

将邮政编码 API 调用应用于数据帧中的每一行

在下面的代码块中,我有一个数据帧geo,我想对其进行迭代以获取 中每个英国邮政编码的东距、北距、经度和纬度geo。我编写了一个函数来调用 API,另一个函数则返回四个变量。

我已经使用get_data邮政编码测试了该调用,以证明它有效(这是任何人都可以使用的公共 API):

import requests 
import pandas as pd

geo = spark.table('property_address').toPandas()


def call_api(url: str) -> dict:
  postcode_response =requests.get(url)
  return postcode_response.json()

def get_data(postcode):

  url = f"http://api.getthedata.com/postcode/{postcode}"
  
  req = r.get(url)
  

  results = req.json()['data']
  easting = results['easting']
  northing = results['northing']
  latitude = results['latitude']
  longitude = results ['longitude']
  
  return easting ,northing,latitude, longitude

get_data('SW1A 1AA')
Run Code Online (Sandbox Code Playgroud)

返回:

Out[108]: (529090, 179645, '51.501009', '-0.141588')
Run Code Online (Sandbox Code Playgroud)

我想要做的是为每一行运行它geo并将其作为数据集返回。我的研究引导我apply,并且我的尝试基于本指南

我试图传递一个调用的列property_postcodegeo迭代每一行以返回值,这是我的尝试:

def get_columns(row):
  column_name …
Run Code Online (Sandbox Code Playgroud)

python pyspark databricks

2
推荐指数
1
解决办法
295
查看次数

How to merge a spark dataframe with hive table on Databricks Deltalake?

I have a dataframe as below:

val data = Seq(("James", "Sales", 34, "Developer"), ("Michael", "Sales", 56, "Architect"), ("Robert", "Sales", 30, "Manager"), ("Maria", "Finance", 24, "Consultant"))
val df1 = data.toDF("name","dept","id", "role")
df1.printSchema()
root
|-- name: string (nullable = true)
|-- dept: string (nullable = true)
|-- id: integer (nullable = true)
|-- role: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

I have a hive table with same columns and exact schema:

val df2 = spark.sql("select * from db.table")
Run Code Online (Sandbox Code Playgroud)

From the incoming dataframe df1 I …

apache-spark databricks delta-lake

2
推荐指数
1
解决办法
1780
查看次数

从数据湖重命名 Azure Databricks 中的文件时出现问题

我正在尝试使用“rename ()”函数通过“import os”库在Azure Databricks中使用Python重命名文件,这确实非常简单,但是在Databricks中执行此操作时我无法到达其中的路径我的文件是。在数据湖中,但是执行命令“% fs ls path_file”是的,我看到了它,我什至可以毫无问题地读取它并使用 pyspark 处理它。

我留下我的代码示例:

import os
old_name = r"/mnt/datalake/path/part-00000-tid-1761178-3f1b0942-223-1-c000.csv"
new_name = r"/mnt/datalake/path/example.csv"

os.rename(old_name, new_name)
Run Code Online (Sandbox Code Playgroud)

上面返回一个错误,指出找不到路径或文件,但“ls”命令可以毫无问题地执行相同的路径。

另一方面,我尝试用 pySpark 重命名该文件,但它使用了我没有安装的 hadoop 库(org.apache.hadoop.conf.Configuration),并且无法在生产环境中安装它......

我会缺少什么?

python azure azure-data-lake databricks azure-databricks

2
推荐指数
1
解决办法
3532
查看次数