我正在尝试过滤掉从当前日期到过去 3 年的数据,并尝试将其用于 Spark sql 查询:(例如:d_date列格式2009-09-18
)
WHERE d_date >= DATEADD(MONTH, -3, GETDATE())
Run Code Online (Sandbox Code Playgroud)
但出现以下错误。
未定义的函数:“DATEADD”。该函数既不是注册的临时函数,也不是数据库“default”中注册的永久函数。
Spark SQL 有等效的 DATEADD 吗?
我已经成功创建了 Great_Expectation 结果,我想将期望结果输出到 html 文件。
很少有链接强调如何使用所谓的“数据文档”以人类可读的方式显示结果https://docs.greatexpectations.io/en/latest/guides/tutorials/getting_started/set_up_data_docs.html#tutorials-getting-started -设置数据文档
但说实话,文档非常难以理解。
我的期望只是验证数据集中的乘客数量在 1 到 6 之间。我希望帮助使用“数据文档”将结果输出到文件夹,或者可以将数据输出到文件夹:
import great_expectations as ge
import great_expectations.dataset.sparkdf_dataset
from great_expectations.dataset.sparkdf_dataset import SparkDFDataset
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType
from great_expectations.data_asset import DataAsset
from great_expectations.data_context.types.base import DataContextConfig, DatasourceConfig, FilesystemStoreBackendDefaults
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.resource_identifiers import ValidationResultIdentifier
from datetime import datetime
from great_expectations.data_context import BaseDataContext
df_taxi = spark.read.csv('abfss://root@adlspretbiukadlsdev.dfs.core.windows.net/RAW/LANDING/yellow_trip_data_sample_2019-01.csv', inferSchema=True, header=True)
taxi_rides = SparkDFDataset(df_taxi)
taxi_rides.expect_column_value_lengths_to_be_between(column='passenger_count', min_value=1, max_value=6)
taxi_rides.save_expectation_suite()
Run Code Online (Sandbox Code Playgroud)
该代码是从 Apache Spark 运行的。
如果有人能给我指出正确的方向,我就能找到答案。
apache-spark pyspark databricks azure-databricks great-expectations
通常在 Azure/AWS 上的 Databricks 上,要读取存储在 Azure Blob/S3 上的文件,我会挂载存储桶或 Blob 存储,然后执行以下操作:
如果使用 Spark
df = spark.read.format('csv').load('/mnt/my_bucket/my_file.csv', header="true")
如果直接使用 pandas,则将 /dbfs 添加到路径中:
df = pd.read_csv('/dbfs/mnt/my_bucket/my_file.csv')
我正在尝试使用 GCP 在 Databricks 的托管版本上执行完全相同的操作,尽管我成功地安装了我的存储桶并使用 Spark 读取它,但我无法直接使用 Pandas 执行此操作,添加 /dbfs 不起作用我收到No such file or directory: ...错误
你们中有人遇到过类似的问题吗?我错过了什么吗?
还有当我这样做的时候
%sh
ls /dbfs
Run Code Online (Sandbox Code Playgroud)
尽管我可以在 UI 中看到 dbfs 浏览器以及已安装的存储桶和文件,但它没有返回任何内容
谢谢您的帮助
python pandas google-cloud-platform databricks gcp-databricks
我目前正在优化我们的 ETL 流程,并且希望能够看到处理数据时使用的现有集群配置。这样,我可以随着时间的推移跟踪我应该使用哪些工作节点大小。
是否有一个命令可以在 python 中返回集群工作线程 # 和大小,以便我可以将其写入数据帧?
与这个问题类似,我想在我的 pyspark DataFrame 中添加一列,其中只包含一个空地图。但是,如果我使用该问题的建议答案,则地图的类型为<null,null>,与那里发布的答案不同。
from pyspark.sql.functions import create_map
spark.range(1).withColumn("test", create_map()).printSchema()
root
|-- test: map(nullable = false)
| |-- key: null
| |-- value: null (valueContainsNull = false)
Run Code Online (Sandbox Code Playgroud)
我需要一张空<string,string>地图。我可以在 Scala 中这样做:
import org.apache.spark.sql.functions.typedLit
spark.range(1).withColumn("test", typedLit(Map[String, String]())).printSchema()
root
|-- test: map(nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
Run Code Online (Sandbox Code Playgroud)
我怎样才能在pyspark中做到这一点?我在 Databricks Runtime 7.3 LTS 上使用 Spark 3.01 和底层 Scala 2.12。我需要<string,string>地图,因为否则我无法将数据框保存到镶木地板:
AnalysisException: Parquet data source does not support map<null,null> data …Run Code Online (Sandbox Code Playgroud) 目前,我在一个单元格中使用 Spark 数据帧(自动加载器)时遇到一些问题,可能需要一些时间才能写入数据。然后,在下面的单元格中,代码引用第一个表完成的工作。但是,如果由于 Spark 的分布式特性而运行整个笔记本(特别是作为作业),则第二个单元会在第一个单元完全完成之前运行。如何让第二个单元等待 writeStream 完成,而不将它们放在单独的笔记本中。
例子:
小区1
autoload = pysparkDF.writeStream.format('delta')....table('TABLE1')
Run Code Online (Sandbox Code Playgroud)
细胞2
df = spark.sql('select count(*) from TABLE1')
Run Code Online (Sandbox Code Playgroud) 我正在 Dataproc 集群上执行一些 pyspark 作业。直到昨天一切都很顺利。然而,今天我在使用命令 DeltaTable.forPath(sparkSession, path) 读取增量表并更新它时开始出现此错误。
Traceback (most recent call last):
File "/tmp/job-0eb2543e/cohort_ka.py", line 146, in <module>
main()
File "/tmp/job-0eb2543e/cohort_ka.py", line 128, in main
persisted = DeltaTable.forPath(spark, destination)
File "/opt/conda/default/lib/python3.8/site-packages/delta/tables.py", line 387, in forPath
jdt = jvm.io.delta.tables.DeltaTable.forPath(jsparkSession, path, hadoopConf)
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 330, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling z:io.delta.tables.DeltaTable.forPath. Trace:
py4j.Py4JException: Method forPath([class org.apache.spark.sql.SparkSession, class java.lang.String, class java.util.HashMap]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) …Run Code Online (Sandbox Code Playgroud) 我正在使用 databricks-cli 在 databricks 中创建一个新作业:
databricks jobs create --json-file ./deploy/databricks/config/job.config.json
Run Code Online (Sandbox Code Playgroud)
使用以下 json:
{
"name": "Job Name",
"new_cluster": {
"spark_version": "4.1.x-scala2.11",
"node_type_id": "Standard_D3_v2",
"num_workers": 3,
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
}
},
"libraries": [
{
"maven": {
"coordinates": "com.microsoft.sqlserver:mssql-jdbc:6.5.3.jre8-preview"
}
}
],
"timeout_seconds": 3600,
"max_retries": 3,
"schedule": {
"quartz_cron_expression": "0 0 22 ? * *",
"timezone_id": "Israel"
},
"notebook_task": {
"notebook_path": "/notebooks/python_notebook"
}
}
Run Code Online (Sandbox Code Playgroud)
我想添加可通过以下方式在笔记本中访问的参数:
dbutils.widgets.text("argument1", "<default value>")
dbutils.widgets.get("argument1")
Run Code Online (Sandbox Code Playgroud) 我正在实现一项功能,需要通过主键列表查找 Cassandra。
下面是一个示例数据,其中 id 是主键
mytable
id column1
1 423
2 542
3 678
4 45534
5 435634
6 2435
7 678
8 4564
9 546
Run Code Online (Sandbox Code Playgroud)
我的大多数查询都是通过 id 进行查找,但对于某些特殊情况,我想获取 id 列表的数据。我目前正在做的方式如下:
public Object fetchFromCassandraForId(int id);
int ids[] = {1, 3, 5, 7, 9};
List<Object> results;
for(int id: ids) {
results.add(fetchFromCassandraForId(id));
}
Run Code Online (Sandbox Code Playgroud)
这会导致向 cassandra 发出多个网络调用,是否可以以某种方式进行批处理,因此我想知道 cassandra 是否支持通过 ids 列表进行快速查找
select coulmn1 from mytable where id in (1, 3, 5, 7, 9);
Run Code Online (Sandbox Code Playgroud)
?任何帮助或指示将不胜感激?
这个命令运行良好有什么原因吗:
%sql SELECT * FROM Azure.Reservations WHERE timestamp > '2021-04-02'
Run Code Online (Sandbox Code Playgroud)
返回 2 行,如下:
%sql DELETE FROM Azure.Reservations WHERE timestamp > '2021-04-02'
Run Code Online (Sandbox Code Playgroud)
失败并显示:
SQL语句中的错误:AssertionError:断言失败:没有DeleteFromTable的计划(timestamp#394 > 1617321600000000)
?
我是 Databricks 新手,但我确信我在另一个表上运行了类似的命令(没有 WHERE 子句)。该表是基于 Parquet 文件创建的。
databricks ×6
apache-spark ×5
pyspark ×4
python ×3
delta-lake ×2
azure ×1
cassandra ×1
cql ×1
java ×1
pandas ×1