小编Ale*_*Ott的帖子

Spark sql DATEADD

我正在尝试过滤掉从当前日期到过去 3 年的数据,并尝试将其用于 Spark sql 查询:(例如:d_date列格式2009-09-18

WHERE d_date >= DATEADD(MONTH, -3, GETDATE())
Run Code Online (Sandbox Code Playgroud)

但出现以下错误。

未定义的函数:“DATEADD”。该函数既不是注册的临时函数,也不是数据库“default”中注册的永久函数。

Spark SQL 有等效的 DATEADD 吗?

apache-spark apache-spark-sql

3
推荐指数
1
解决办法
3万
查看次数

如何将 Apache Spark 中的远大期望结果保存到文件中 - 使用数据文档

我已经成功创建了 Great_Expectation 结果,我想将期望结果输出到 html 文件。

很少有链接强调如何使用所谓的“数据文档”以人类可读的方式显示结果https://docs.greatexpectations.io/en/latest/guides/tutorials/getting_started/set_up_data_docs.html#tutorials-getting-started -设置数据文档

但说实话,文档非常难以理解。

我的期望只是验证数据集中的乘客数量在 1 到 6 之间。我希望帮助使用“数据文档”将结果输出到文件夹,或者可以将数据输出到文件夹:

import great_expectations as ge
import great_expectations.dataset.sparkdf_dataset
from great_expectations.dataset.sparkdf_dataset import SparkDFDataset
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType
from great_expectations.data_asset import DataAsset

from great_expectations.data_context.types.base import DataContextConfig, DatasourceConfig, FilesystemStoreBackendDefaults
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.resource_identifiers import ValidationResultIdentifier
from datetime import datetime
from great_expectations.data_context import BaseDataContext


df_taxi = spark.read.csv('abfss://root@adlspretbiukadlsdev.dfs.core.windows.net/RAW/LANDING/yellow_trip_data_sample_2019-01.csv', inferSchema=True, header=True)

taxi_rides = SparkDFDataset(df_taxi)

taxi_rides.expect_column_value_lengths_to_be_between(column='passenger_count', min_value=1, max_value=6)

taxi_rides.save_expectation_suite()
Run Code Online (Sandbox Code Playgroud)

该代码是从 Apache Spark 运行的。

如果有人能给我指出正确的方向,我就能找到答案。

apache-spark pyspark databricks azure-databricks great-expectations

3
推荐指数
1
解决办法
3928
查看次数

无法直接从 GCP Databricks 上的 pandas 读取

通常在 Azure/AWS 上的 Databricks 上,要读取存储在 Azure Blob/S3 上的文件,我会挂载存储桶或 Blob 存储,然后执行以下操作:

如果使用 Spark

df = spark.read.format('csv').load('/mnt/my_bucket/my_file.csv', header="true")

如果直接使用 pandas,则将 /dbfs 添加到路径中:

df = pd.read_csv('/dbfs/mnt/my_bucket/my_file.csv')

我正在尝试使用 GCP 在 Databricks 的托管版本上执行完全相同的操作,尽管我成功地安装了我的存储桶并使用 Spark 读取它,但我无法直接使用 Pandas 执行此操作,添加 /dbfs 不起作用我收到No such file or directory: ...错误

你们中有人遇到过类似的问题吗?我错过了什么吗?

还有当我这样做的时候

%sh 
ls /dbfs
Run Code Online (Sandbox Code Playgroud)

尽管我可以在 UI 中看到 dbfs 浏览器以及已安装的存储桶和文件,但它没有返回任何内容

谢谢您的帮助

python pandas google-cloud-platform databricks gcp-databricks

3
推荐指数
1
解决办法
1033
查看次数

Azure Databricks python 命令显示当前集群配置

我目前正在优化我们的 ETL 流程,并且希望能够看到处理数据时使用的现有集群配置。这样,我可以随着时间的推移跟踪我应该使用哪些工作节点大小。

是否有一个命令可以在 python 中返回集群工作线程 # 和大小,以便我可以将其写入数据帧?

python azure databricks azure-databricks

3
推荐指数
1
解决办法
1731
查看次数

Pyspark 添加字符串类型的空文字映射

这个问题类似,我想在我的 pyspark DataFrame 中添加一列,其中只包含一个空地图。但是,如果我使用该问题的建议答案,则地图的类型为<null,null>,与那里发布的答案不同。

from pyspark.sql.functions import create_map
spark.range(1).withColumn("test", create_map()).printSchema()

root
 |-- test: map(nullable = false)
 |    |-- key: null
 |    |-- value: null (valueContainsNull = false)
Run Code Online (Sandbox Code Playgroud)

我需要一张空<string,string>地图。我可以在 Scala 中这样做:

import org.apache.spark.sql.functions.typedLit
spark.range(1).withColumn("test", typedLit(Map[String, String]())).printSchema()

root
 |-- test: map(nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
Run Code Online (Sandbox Code Playgroud)

我怎样才能在pyspark中做到这一点?我在 Databricks Runtime 7.3 LTS 上使用 Spark 3.01 和底层 Scala 2.12。我需要<string,string>地图,因为否则我无法将数据框保存到镶木地板:

AnalysisException: Parquet data source does not support map<null,null> data …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

3
推荐指数
1
解决办法
2274
查看次数

PySpark 等待笔记本中完成 (Databricks)

目前,我在一个单元格中使用 Spark 数据帧(自动加载器)时遇到一些问题,可能需要一些时间才能写入数据。然后,在下面的单元格中,代码引用第一个表完成的工作。但是,如果由于 Spark 的分布式特性而运行整个笔记本(特别是作为作业),则第二个单元会在第一个单元完全完成之前运行。如何让第二个单元等待 writeStream 完成,而不将它们放在单独的笔记本中。

例子:

小区1

autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
Run Code Online (Sandbox Code Playgroud)

细胞2

df = spark.sql('select count(*) from TABLE1')
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark databricks spark-structured-streaming

3
推荐指数
1
解决办法
3939
查看次数

在云 Dataproc 中的 Pyspark 作业上使用 DeltaTable.forPath 时出错

我正在 Dataproc 集群上执行一些 pyspark 作业。直到昨天一切都很顺利。然而,今天我在使用命令 DeltaTable.forPath(sparkSession, path) 读取增量表并更新它时开始出现此错误。

Traceback (most recent call last):
  File "/tmp/job-0eb2543e/cohort_ka.py", line 146, in <module>
    main()
  File "/tmp/job-0eb2543e/cohort_ka.py", line 128, in main
    persisted = DeltaTable.forPath(spark, destination)
  File "/opt/conda/default/lib/python3.8/site-packages/delta/tables.py", line 387, in forPath
    jdt = jvm.io.delta.tables.DeltaTable.forPath(jsparkSession, path, hadoopConf)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 330, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath. Trace:
py4j.Py4JException: Method forPath([class org.apache.spark.sql.SparkSession, class java.lang.String, class java.util.HashMap]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) …
Run Code Online (Sandbox Code Playgroud)

google-cloud-dataproc delta-lake

3
推荐指数
1
解决办法
1359
查看次数

如何使用参数创建数据块作业

我正在使用 databricks-cli 在 databricks 中创建一个新作业:

databricks jobs create --json-file ./deploy/databricks/config/job.config.json
Run Code Online (Sandbox Code Playgroud)

使用以下 json:

{
    "name": "Job Name",
    "new_cluster": {
        "spark_version": "4.1.x-scala2.11",
        "node_type_id": "Standard_D3_v2",
        "num_workers": 3,
        "spark_env_vars": {
            "PYSPARK_PYTHON": "/databricks/python3/bin/python3"
        }
    },
    "libraries": [
        {
            "maven": {
                "coordinates": "com.microsoft.sqlserver:mssql-jdbc:6.5.3.jre8-preview"
            }
        }
    ],
    "timeout_seconds": 3600,
    "max_retries": 3,
    "schedule": {
        "quartz_cron_expression": "0 0 22 ? * *",
        "timezone_id": "Israel"
    },
    "notebook_task": {
        "notebook_path": "/notebooks/python_notebook"
    }
}
Run Code Online (Sandbox Code Playgroud)

我想添加可通过以下方式在笔记本中访问的参数:

dbutils.widgets.text("argument1", "<default value>")
dbutils.widgets.get("argument1")
Run Code Online (Sandbox Code Playgroud)

python pyspark databricks azure-databricks databricks-cli

2
推荐指数
1
解决办法
3722
查看次数

cassandra通过java中的主键列表查找

我正在实现一项功能,需要通过主键列表查找 Cassandra。

下面是一个示例数据,其中 id 是主键

mytable
id          column1
1           423
2           542
3           678
4           45534
5           435634
6           2435
7           678
8           4564
9           546
Run Code Online (Sandbox Code Playgroud)

我的大多数查询都是通过 id 进行查找,但对于某些特殊情况,我想获取 id 列表的数据。我目前正在做的方式如下:


public Object fetchFromCassandraForId(int id);

int ids[] = {1, 3, 5, 7, 9};
List<Object> results;
for(int id: ids) {
  results.add(fetchFromCassandraForId(id));
}
Run Code Online (Sandbox Code Playgroud)

这会导致向 cassandra 发出多个网络调用,是否可以以某种方式进行批处理,因此我想知道 cassandra 是否支持通过 ids 列表进行快速查找

select coulmn1 from mytable where id in (1, 3, 5, 7, 9);
Run Code Online (Sandbox Code Playgroud)

?任何帮助或指示将不胜感激?

java cql cassandra datastax-java-driver cassandra-3.0

2
推荐指数
1
解决办法
1021
查看次数

AssertionError:断言失败:没有在 Databricks 中进行 DeleteFromTable 的计划

这个命令运行良好有什么原因吗:

%sql SELECT * FROM Azure.Reservations WHERE timestamp > '2021-04-02'
Run Code Online (Sandbox Code Playgroud)

返回 2 行,如下:

%sql DELETE FROM Azure.Reservations WHERE timestamp > '2021-04-02'
Run Code Online (Sandbox Code Playgroud)

失败并显示:

SQL语句中的错误:AssertionError:断言失败:没有DeleteFromTable的计划(timestamp#394 > 1617321600000000)

我是 Databricks 新手,但我确信我在另一个表上运行了类似的命令(没有 WHERE 子句)。该表是基于 Parquet 文件创建的。

apache-spark databricks azure-databricks delta-lake

2
推荐指数
1
解决办法
8767
查看次数