我正在实现一项功能,需要通过主键列表查找 Cassandra。
下面是一个示例数据,其中 id 是主键
mytable
id column1
1 423
2 542
3 678
4 45534
5 435634
6 2435
7 678
8 4564
9 546
Run Code Online (Sandbox Code Playgroud)
我的大多数查询都是通过 id 进行查找,但对于某些特殊情况,我想获取 id 列表的数据。我目前正在做的方式如下:
public Object fetchFromCassandraForId(int id);
int ids[] = {1, 3, 5, 7, 9};
List<Object> results;
for(int id: ids) {
results.add(fetchFromCassandraForId(id));
}
Run Code Online (Sandbox Code Playgroud)
这会导致向 cassandra 发出多个网络调用,是否可以以某种方式进行批处理,因此我想知道 cassandra 是否支持通过 ids 列表进行快速查找
select coulmn1 from mytable where id in (1, 3, 5, 7, 9);
Run Code Online (Sandbox Code Playgroud)
?任何帮助或指示将不胜感激?
如何选择一个数据框中的前N列并将其放入另一个数据框中?
有一个包含 180 列的 DF,我想创建另一个包含前 105 列的 DF,而不在脚本中隐式提及列名称。
有一种求DF中行总和的场景如下
ID DEPT [..] SUB1 SUB2 SUB3 SUB4 **SUM1**
1 PHY 50 20 30 30 130
2 COY 52 62 63 34 211
3 DOY 53 52 53 84
4 ROY 56 52 53 74
5 SZY 57 62 73 54
Run Code Online (Sandbox Code Playgroud)
需要找到每一行的 SUB1 SUB2 SUB3 SUB4 的行总和,并将其作为新列 SUM1。数据帧中 SUB1 列的序号位置为 16。
我正在尝试读取镶木地板文件来保存架构,然后在读取 csv 文件时使用此架构将其分配给数据帧。
fee.parquet该文件loan__fee.csv具有相同的内容,但文件格式不同。
下面是我的代码 - 我收到一个错误,架构应该是“StructType”。如何将从镶木地板文件读取的模式转换为 StructType
from pyarrow.parquet import ParquetFile
import pyarrow.parquet
fee_schema = pyarrow.parquet.read_schema("/dbfs/FileStore/fee.parquet", memory_map=True)
df_mod = spark.read.csv('/FileStore/loan__fee.csv', header="true", schema=fee_schema)
Run Code Online (Sandbox Code Playgroud)
它给出错误:
类型错误:架构应该是 StructType 或字符串
我尝试了几个选项,例如fee_schema.to_string(show_schema_metadata = True)但它不起作用并给出 ParseError。
谢谢你的时间!
这个命令运行良好有什么原因吗:
%sql SELECT * FROM Azure.Reservations WHERE timestamp > '2021-04-02'
Run Code Online (Sandbox Code Playgroud)
返回 2 行,如下:
%sql DELETE FROM Azure.Reservations WHERE timestamp > '2021-04-02'
Run Code Online (Sandbox Code Playgroud)
失败并显示:
SQL语句中的错误:AssertionError:断言失败:没有DeleteFromTable的计划(timestamp#394 > 1617321600000000)
?
我是 Databricks 新手,但我确信我在另一个表上运行了类似的命令(没有 WHERE 子句)。该表是基于 Parquet 文件创建的。
我有一个包含 Databricks 活动的 ADF 管道。
该活动每次都会创建一个新的作业集群,并且我已将所有必需的 Spark 配置添加到相应的链接服务中。
现在,随着 Databricks 提供 Spot 实例,我想在 Databricks 中使用 Spot 配置创建新集群。
我尝试从LinkedService文档中找到帮助,但没有成功!
如何使用 ADF 执行此操作?
干杯!!!
azure cost-management azure-data-factory azure-databricks databricks-workflows
I have a dataframe as below:
val data = Seq(("James", "Sales", 34, "Developer"), ("Michael", "Sales", 56, "Architect"), ("Robert", "Sales", 30, "Manager"), ("Maria", "Finance", 24, "Consultant"))
val df1 = data.toDF("name","dept","id", "role")
df1.printSchema()
root
|-- name: string (nullable = true)
|-- dept: string (nullable = true)
|-- id: integer (nullable = true)
|-- role: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
I have a hive table with same columns and exact schema:
val df2 = spark.sql("select * from db.table")
Run Code Online (Sandbox Code Playgroud)
From the incoming dataframe df1 I …
我正在尝试使用“rename ()”函数通过“import os”库在Azure Databricks中使用Python重命名文件,这确实非常简单,但是在Databricks中执行此操作时我无法到达其中的路径我的文件是。在数据湖中,但是执行命令“% fs ls path_file”是的,我看到了它,我什至可以毫无问题地读取它并使用 pyspark 处理它。
我留下我的代码示例:
import os
old_name = r"/mnt/datalake/path/part-00000-tid-1761178-3f1b0942-223-1-c000.csv"
new_name = r"/mnt/datalake/path/example.csv"
os.rename(old_name, new_name)
Run Code Online (Sandbox Code Playgroud)
上面返回一个错误,指出找不到路径或文件,但“ls”命令可以毫无问题地执行相同的路径。
另一方面,我尝试用 pySpark 重命名该文件,但它使用了我没有安装的 hadoop 库(org.apache.hadoop.conf.Configuration),并且无法在生产环境中安装它......
我会缺少什么?
我正在使用 Scala 并尝试将我的日历信息从 Spark 保存到 Cassandra。
我开始使用 Cassandra 创建相同的架构:
session.execute("CREATE TABLE calendar (DateNum int, Date text, YearMonthNum int, ..., PRIMARY KEY (datenum,date))")
然后将我的数据从 Spark 导入到 Cassandra:
.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "calendar", "keyspace" -> "ks"))
.mode(SaveMode.Append)
.save()
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试读取从 Cassandra 上的 Spark 检索的数据时,行看起来非常混乱,而我想保持日历的顺序相同。
我有一个行的例子:
20090111 | 2009 年 1 月 11 日 | 200901 |...
选择/订购似乎也不能解决问题。
我最近在 Azure Databricks 上启用了预览功能“存储库中的文件”,这样我就可以将许多常规功能从笔记本转移到模块,并消除为单个作业运行大量笔记本带来的开销。
但是,我的一些函数直接依赖于 dbutils 或 Spark/pyspark 函数(例如dbutils.secrets.get()和spark.conf.set())。由于这些模块是在笔记本的后台导入的,并且直接与底层会话相关联,因此我完全不知道如何在自定义模块中引用这些模块。
对于我的小示例模块,我通过将 dbutils 设置为参数来修复它,如下例所示:
class Connection:
def __init__(self, dbutils):
token = dbutils.secrets.get(scope="my-scope", key="mykey")
...
Run Code Online (Sandbox Code Playgroud)
然而,对所有现有函数执行此操作将需要大量重写函数和调用它们的行。我怎样才能避免这个过程并以更干净的方式进行?
apache-spark ×6
databricks ×5
pyspark ×4
python ×4
azure ×2
cassandra ×2
delta-lake ×2
scala ×2
cql ×1
java ×1
pyarrow ×1
python-3.x ×1