我有一个数据帧,如:
ID Notes
2345 Checked by John
2398 Verified by Stacy
3983 Double Checked on 2/23/17 by Marsha
Run Code Online (Sandbox Code Playgroud)
比方说,例如,只有3名员工需要检查:John,Stacy或Marsha.我想像这样制作一个新专栏:
ID Notes Employee
2345 Checked by John John
2398 Verified by Stacy Stacy
3983 Double Checked on 2/23/17 by Marsha Marsha
Run Code Online (Sandbox Code Playgroud)
这里是正则表达式还是grep更好?我应该尝试什么样的功能?谢谢!
编辑:我一直在尝试一堆解决方案,但似乎没有任何工作.我应该放弃并为每个员工创建具有二进制值的列吗?IE:
ID Notes John Stacy Marsha
2345 Checked by John 1 0 0
2398 Verified by Stacy 0 1 0
3983 Double Checked on 2/23/17 by Marsha 0 0 1
Run Code Online (Sandbox Code Playgroud) 当我尝试在 DBFS 中移动带有 * 的文件时,我遇到了文件未找到异常。这里源目录和目标目录都在 DBFS 中。我在 dbfs 目录中有名为“test_sample.csv”的源文件,我正在使用笔记本单元中的如下命令,
dbutils.fs.mv("dbfs:/usr/krishna/sample/test*.csv", "dbfs:/user/abc/Test/Test.csv")
Run Code Online (Sandbox Code Playgroud)
错误:
java.io.FileNotFoundException: dbfs:/usr/krishna/sample/test*.csv
Run Code Online (Sandbox Code Playgroud)
我很感激任何帮助。谢谢。
我正在编写一个需要在本地和 Databricks 上运行的 spark 作业。
每个环境(文件路径)中的代码必须略有不同,因此我试图找到一种方法来检测作业是否在 Databricks 中运行。到目前为止,我发现的最好方法是在根目录中查找“dbfs”目录,如果存在,则假设它在 Databricks 上运行。感觉这不是正确的解决方案。有没有人有更好的主意?
我最近开始发现 Databricks 并面临需要删除增量表的某个列的情况。当我使用 PostgreSQL 时,它就像
ALTER TABLE main.metrics_table
DROP COLUMN metric_1;
Run Code Online (Sandbox Code Playgroud)
我正在查看有关 DELETE 的Databricks文档,但它仅涵盖DELETE the rows that match a predicate.
我还找到了关于 DROP 数据库、DROP 函数和 DROP 表的文档,但绝对没有关于如何从增量表中删除列的内容。我在这里缺少什么?是否有从增量表中删除列的标准方法?
我觉得我一定在这里遗漏了一些明显的东西,但我似乎无法在 Spark SQL 中动态设置变量值。
假设我有两个表tableSrc和,tableBuilder并且我正在创建tableDest。
我一直在尝试变体
SET myVar FLOAT = NULL
SELECT
myVar = avg(myCol)
FROM tableSrc;
CREATE TABLE tableDest(
refKey INT,
derivedValue FLOAT
);
INSERT INTO tableDest
SELECT
refKey,
neededValue * myVar AS `derivedValue`
FROM tableBuilder
Run Code Online (Sandbox Code Playgroud)
在 T-SQL 中执行此操作是微不足道的,这对 Microsoft 来说是一个令人惊讶的胜利(DECLARE... SELECT)。然而,Spark 抛出
Error in SQL statement: ParseException:
mismatched input 'SELECT' expecting <EOF>(line 53, pos 0)
但我似乎无法将派生值分配给变量以供重用。我尝试了几种变体,但最接近的是将变量分配给 select 语句的字符串。
请注意,这是根据 T-SQL 中的功能齐全的脚本改编的,因此我不会尽快拆分出十几个 SQL 变量来使用 Python Spark 查询来计算所有这些变量,只是为了在中插入{var1}、{var2} …
我正在 Azure DataFactory 中处理一个项目,并且有一个运行 Databricks python 脚本的管道。这个特定的脚本位于 Databricks 文件系统中并由 ADF 管道运行,它从位于同一文件夹中的另一个 python 脚本导入模块(两个脚本都位于 中dbfs:/FileStore/code)。
下面的代码可以将 python 模块导入到 Databricks 笔记本中,但在导入到 python 脚本中时不起作用。
sys.path.insert(0,'dbfs:/FileStore/code/')
import conn_config as Connect
Run Code Online (Sandbox Code Playgroud)
在集群日志中,我得到: Import Error: No module named conn_config
我猜这个问题与python文件无法识别Databricks环境有关。有什么帮助吗?
python azure-data-factory azure-pipelines databricks azure-databricks
我想使用另一个笔记本在数据块中运行笔记本%run。另外,我希望能够将我正在运行的笔记本的路径作为参数发送到主笔记本。
不使用的原因dbutils.notebook.run是我将嵌套字典存储在调用的笔记本中,并且我想在主笔记本中使用它们。
我正在寻找类似的东西:
path = "/References/parameterDefinition/schemaRepository"
Run Code Online (Sandbox Code Playgroud)
%run <path variable>
Run Code Online (Sandbox Code Playgroud) 我正在解析一个 XML 字符串,并使用 Jackson 库中的a将其转换为JsonNodeScala 中的a。XmlMapper我在 Databricks 笔记本上编码,因此编译是在云集群上完成的。编译我的代码时,我收到了这个错误java.lang.NoSuchMethodError: com.fasterxml.jackson.dataformat.xml.XmlMapper.coercionConfigDefaults()Lcom/fasterxml/jackson/databind/cfg/MutableCoercionConfig;,其中有一百行“at com.databricks. ...”
我可能忘记导入一些东西,但对我来说这没关系(如果我错了请告诉我):
import ch.qos.logback.classic._
import com.typesafe.scalalogging._
import com.fasterxml.jackson._
import com.fasterxml.jackson.core._
import com.fasterxml.jackson.databind.{ObjectMapper, JsonNode}
import com.fasterxml.jackson.dataformat.xml._
import com.fasterxml.jackson.module.scala._
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import java.io._
import java.time.Instant
import java.util.concurrent.TimeUnit
import javax.xml.parsers._
import okhttp3.{Headers, OkHttpClient, Request, Response, RequestBody, FormBody}
import okhttp3.OkHttpClient.Builder._
import org.apache.spark._
import org.xml.sax._
Run Code Online (Sandbox Code Playgroud)
由于我使用的是 Databricks,因此没有依赖项的 SBT 文件。相反,我直接在集群上安装了我需要的库。这是我正在使用的:
com.squareup.okhttp:okhttp:2.7.5
com.squareup.okhttp3:okhttp:4.9.0
com.squareup.okhttp3:okhttp:3.14.9
org.scala-lang.modules:scala-swing_3:3.0.0
ch.qos.logback:logback-classic:1.2.6
com.typesafe:scalalogging-slf4j_2.10:1.1.0
cc.spray.json:spray-json_2.9.1:1.0.1
com.fasterxml.jackson.module:jackson-module-scala_3:2.13.0
javax.xml.parsers:jaxp-api:1.4.5
org.xml.sax:2.0.1
Run Code Online (Sandbox Code Playgroud)
导致错误的代码很简单(来自这里: https: //www.baeldung.com/jackson-convert-xml-json第 5 章):
com.squareup.okhttp:okhttp:2.7.5
com.squareup.okhttp3:okhttp:4.9.0
com.squareup.okhttp3:okhttp:3.14.9
org.scala-lang.modules:scala-swing_3:3.0.0 …Run Code Online (Sandbox Code Playgroud) 使用表流,我尝试使用 foreachBatch 写入流
df.writestream
.format("delta")
.foreachBatch(WriteStreamToDelta)
...
Run Code Online (Sandbox Code Playgroud)
WriteStreamToDelta 看起来像
def WriteStreamToDelta(microDF, batch_id):
microDFWrangled = microDF."some_transformations"
print(microDFWrangled.count()) <-- How do I achieve the equivalence of this?
microDFWrangled.writeStream...
Run Code Online (Sandbox Code Playgroud)
我想查看其中的行数
databricks ×10
apache-spark ×5
python ×3
pyspark ×2
delta-lake ×1
jackson ×1
java ×1
regex ×1
scala ×1
sql ×1
xmlmapper ×1