标签: databricks

PySpark - 用于创建新列的字符串匹配

我有一个数据帧,如:

ID             Notes
2345          Checked by John
2398          Verified by Stacy
3983          Double Checked on 2/23/17 by Marsha 
Run Code Online (Sandbox Code Playgroud)

比方说,例如,只有3名员工需要检查:John,Stacy或Marsha.我想像这样制作一个新专栏:

ID                Notes                              Employee
2345          Checked by John                          John
2398         Verified by Stacy                        Stacy
3983     Double Checked on 2/23/17 by Marsha          Marsha
Run Code Online (Sandbox Code Playgroud)

这里是正则表达式还是grep更好?我应该尝试什么样的功能?谢谢!

编辑:我一直在尝试一堆解决方案,但似乎没有任何工作.我应该放弃并为每个员工创建具有二进制值的列吗?IE:

ID                Notes                             John       Stacy    Marsha
2345          Checked by John                        1            0       0
2398         Verified by Stacy                       0            1       0
3983     Double Checked on 2/23/17 by Marsha         0            0       1
Run Code Online (Sandbox Code Playgroud)

python regex apache-spark pyspark databricks

10
推荐指数
1
解决办法
2万
查看次数

如何在databricks文件系统中移动相同扩展名的文件?

当我尝试在 DBFS 中移动带有 * 的文件时,我遇到了文件未找到异常。这里源目录和目标目录都在 DBFS 中。我在 dbfs 目录中有名为“test_sample.csv”的源文件,我正在使用笔记本单元中的如下命令,

dbutils.fs.mv("dbfs:/usr/krishna/sample/test*.csv", "dbfs:/user/abc/Test/Test.csv")
Run Code Online (Sandbox Code Playgroud)

错误:

java.io.FileNotFoundException: dbfs:/usr/krishna/sample/test*.csv
Run Code Online (Sandbox Code Playgroud)

我很感激任何帮助。谢谢。

databricks

10
推荐指数
2
解决办法
1万
查看次数

如何以编程方式检测 Databricks 环境

我正在编写一个需要在本地和 Databricks 上运行的 spark 作业。

每个环境(文件路径)中的代码必须略有不同,因此我试图找到一种方法来检测作业是否在 Databricks 中运行。到目前为止,我发现的最好方法是在根目录中查找“dbfs”目录,如果存在,则假设它在 Databricks 上运行。感觉这不是正确的解决方案。有没有人有更好的主意?

java apache-spark databricks

10
推荐指数
1
解决办法
1773
查看次数

如何从 Databricks Delta 表中删除一列?

我最近开始发现 Databricks 并面临需要删除增量表的某个列的情况。当我使用 PostgreSQL 时,它就像

ALTER TABLE main.metrics_table 
DROP COLUMN metric_1;
Run Code Online (Sandbox Code Playgroud)

我正在查看有关 DELETE 的Databricks文档,但它仅涵盖DELETE the rows that match a predicate.

我还找到了关于 DROP 数据库、DROP 函数和 DROP 表的文档,但绝对没有关于如何从增量表中删除列的内容。我在这里缺少什么?是否有从增量表中删除列的标准方法?

sql apache-spark apache-spark-sql databricks delta-lake

10
推荐指数
4
解决办法
1万
查看次数

在 Databricks / Spark 中的 SQL 中为变量分配动态值

我觉得我一定在这里遗漏了一些明显的东西,但我似乎无法在 Spark SQL 中动态设置变量值。

假设我有两个表tableSrc和,tableBuilder并且我正在创建tableDest

我一直在尝试变体

SET myVar FLOAT = NULL

SELECT
    myVar = avg(myCol)
FROM tableSrc;

CREATE TABLE tableDest(
    refKey INT,
    derivedValue FLOAT
);


INSERT INTO tableDest
    SELECT
        refKey,
        neededValue * myVar AS `derivedValue`
    FROM tableBuilder
Run Code Online (Sandbox Code Playgroud)

在 T-SQL 中执行此操作是微不足道的,这对 Microsoft 来说是一个令人惊讶的胜利(DECLARE... SELECT)。然而,Spark 抛出

Error in SQL statement: ParseException: mismatched input 'SELECT' expecting <EOF>(line 53, pos 0)

但我似乎无法将派生值分配给变量以供重用。我尝试了几种变体,但最接近的是将变量分配给 select 语句的字符串。

数据块截图

请注意,这是根据 T-SQL 中的功能齐全的脚本改编的,因此我不会尽快拆分出十几个 SQL 变量来使用 Python Spark 查询来计算所有这些变量,只是为了在中插入{var1}{var2} …

apache-spark apache-spark-sql databricks

10
推荐指数
2
解决办法
6万
查看次数

将 python 模块导入到 databricks 中的 python 脚本中

我正在 Azure DataFactory 中处理一个项目,并且有一个运行 Databricks python 脚本的管道。这个特定的脚本位于 Databricks 文件系统中并由 ADF 管道运行,它从位于同一文件夹中的另一个 python 脚本导入模块(两个脚本都位于 中dbfs:/FileStore/code)。

下面的代码可以将 python 模块导入到 Databricks 笔记本中,但在导入到 python 脚本中时不起作用。

sys.path.insert(0,'dbfs:/FileStore/code/')
import conn_config as Connect
Run Code Online (Sandbox Code Playgroud)

在集群日志中,我得到: Import Error: No module named conn_config

我猜这个问题与python文件无法识别Databricks环境有关。有什么帮助吗?

python azure-data-factory azure-pipelines databricks azure-databricks

10
推荐指数
1
解决办法
2万
查看次数

如何将脚本路径作为 databricks 笔记本中的变量传递给 %run magic 命令?

我想使用另一个笔记本在数据块中运行笔记本%run。另外,我希望能够将我正在运行的笔记本的路径作为参数发送到主笔记本。
不使用的原因dbutils.notebook.run是我将嵌套字典存储在调用的笔记本中,并且我想在主笔记本中使用它们。

我正在寻找类似的东西:

path = "/References/parameterDefinition/schemaRepository"
Run Code Online (Sandbox Code Playgroud)
%run <path variable>
Run Code Online (Sandbox Code Playgroud)

python pyspark jupyter-notebook databricks

10
推荐指数
2
解决办法
1万
查看次数

com.fasterxml.jackson.dataformat.xml.XmlMapper.coercionConfigDefaults() 上的 NoSuchMethodError

我正在解析一个 XML 字符串,并使用 Jackson 库中的a将其转换为JsonNodeScala 中的a。XmlMapper我在 Databricks 笔记本上编码,因此编译是在云集群上完成的。编译我的代码时,我收到了这个错误java.lang.NoSuchMethodError: com.fasterxml.jackson.dataformat.xml.XmlMapper.coercionConfigDefaults()Lcom/fasterxml/jackson/databind/cfg/MutableCoercionConfig;,其中有一百行“at com.databricks. ...”

我可能忘记导入一些东西,但对我来说这没关系(如果我错了请告诉我):

import ch.qos.logback.classic._
import com.typesafe.scalalogging._
import com.fasterxml.jackson._
import com.fasterxml.jackson.core._
import com.fasterxml.jackson.databind.{ObjectMapper, JsonNode}
import com.fasterxml.jackson.dataformat.xml._
import com.fasterxml.jackson.module.scala._
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import java.io._
import java.time.Instant
import java.util.concurrent.TimeUnit
import javax.xml.parsers._
import okhttp3.{Headers, OkHttpClient, Request, Response, RequestBody, FormBody}
import okhttp3.OkHttpClient.Builder._
import org.apache.spark._
import org.xml.sax._
Run Code Online (Sandbox Code Playgroud)

由于我使用的是 Databricks,因此没有依赖项的 SBT 文件。相反,我直接在集群上安装了我需要的库。这是我正在使用的:

com.squareup.okhttp:okhttp:2.7.5
com.squareup.okhttp3:okhttp:4.9.0
com.squareup.okhttp3:okhttp:3.14.9
org.scala-lang.modules:scala-swing_3:3.0.0
ch.qos.logback:logback-classic:1.2.6
com.typesafe:scalalogging-slf4j_2.10:1.1.0
cc.spray.json:spray-json_2.9.1:1.0.1
com.fasterxml.jackson.module:jackson-module-scala_3:2.13.0
javax.xml.parsers:jaxp-api:1.4.5
org.xml.sax:2.0.1
Run Code Online (Sandbox Code Playgroud)

导致错误的代码很简单(来自这里: https: //www.baeldung.com/jackson-convert-xml-json第 5 章):

com.squareup.okhttp:okhttp:2.7.5
com.squareup.okhttp3:okhttp:4.9.0
com.squareup.okhttp3:okhttp:3.14.9
org.scala-lang.modules:scala-swing_3:3.0.0 …
Run Code Online (Sandbox Code Playgroud)

scala jackson xmlmapper databricks azure-databricks

10
推荐指数
1
解决办法
2万
查看次数

Databricks Delta Live 表:流式和增量式之间的差异

CREATE STREAMING LIVE TABLE和之间有区别吗CREATE INCREMENTAL LIVE TABLE?文档是混合的:例如,在这里STREAMING使用,而这里使用。我已经测试了两者,到目前为止我还没有注意到任何差异。INCREMENTAL

databricks delta-live-tables

10
推荐指数
1
解决办法
6663
查看次数

如何在 foreachBatch 函数中打印/记录输出?

使用表流,我尝试使用 foreachBatch 写入流

df.writestream
.format("delta")
.foreachBatch(WriteStreamToDelta)
...
Run Code Online (Sandbox Code Playgroud)

WriteStreamToDelta 看起来像

def WriteStreamToDelta(microDF, batch_id):
   microDFWrangled = microDF."some_transformations"
   
   print(microDFWrangled.count()) <-- How do I achieve the equivalence of this?

   microDFWrangled.writeStream...
Run Code Online (Sandbox Code Playgroud)

我想查看其中的行数

  1. 笔记本,位于 writeStream 单元下方
  2. 驾驶员日志
  3. 创建一个列表以附加每个微批次的行数。

apache-spark databricks spark-structured-streaming

10
推荐指数
1
解决办法
878
查看次数