小编Ale*_*Ott的帖子

cassandra通过java中的主键列表查找

我正在实现一项功能,需要通过主键列表查找 Cassandra。

下面是一个示例数据,其中 id 是主键

mytable
id          column1
1           423
2           542
3           678
4           45534
5           435634
6           2435
7           678
8           4564
9           546
Run Code Online (Sandbox Code Playgroud)

我的大多数查询都是通过 id 进行查找,但对于某些特殊情况,我想获取 id 列表的数据。我目前正在做的方式如下:


public Object fetchFromCassandraForId(int id);

int ids[] = {1, 3, 5, 7, 9};
List<Object> results;
for(int id: ids) {
  results.add(fetchFromCassandraForId(id));
}
Run Code Online (Sandbox Code Playgroud)

这会导致向 cassandra 发出多个网络调用,是否可以以某种方式进行批处理,因此我想知道 cassandra 是否支持通过 ids 列表进行快速查找

select coulmn1 from mytable where id in (1, 3, 5, 7, 9);
Run Code Online (Sandbox Code Playgroud)

?任何帮助或指示将不胜感激?

java cql cassandra datastax-java-driver cassandra-3.0

2
推荐指数
1
解决办法
1021
查看次数

使用 PySpark 的数据帧中的前 N ​​列

如何选择一个数据框中的前N列并将其放入另一个数据框中?

有一个包含 180 列的 DF,我想创建另一个包含前 105 列的 DF,而不在脚本中隐式提及列名称。

python scala apache-spark pyspark

2
推荐指数
1
解决办法
4248
查看次数

使用 Pyspark 计算 Dataframe 中每一行的行总和

有一种求DF中行总和的场景如下

ID DEPT [..] SUB1 SUB2 SUB3 SUB4  **SUM1**
1  PHY      50    20   30   30   130
2  COY      52    62   63   34   211
3  DOY      53    52   53   84
4  ROY      56    52   53   74
5  SZY      57    62   73   54
Run Code Online (Sandbox Code Playgroud)

需要找到每一行的 SUB1 SUB2 SUB3 SUB4 的行总和,并将其作为新列 SUM1。数据帧中 SUB1 列的序号位置为 16。

python apache-spark pyspark

2
推荐指数
1
解决办法
1万
查看次数

获取 StructType 格式的 Parquet 文件的架构

我正在尝试读取镶木地板文件来保存架构,然后在读取 csv 文件时使用此架构将其分配给数据帧。

fee.parquet该文件loan__fee.csv具有相同的内容,但文件格式不同。

下面是我的代码 - 我收到一个错误,架构应该是“StructType”。如何将从镶木地板文件读取的模式转换为 StructType

from pyarrow.parquet import ParquetFile
import pyarrow.parquet
fee_schema = pyarrow.parquet.read_schema("/dbfs/FileStore/fee.parquet", memory_map=True)

df_mod = spark.read.csv('/FileStore/loan__fee.csv', header="true", schema=fee_schema)
Run Code Online (Sandbox Code Playgroud)

它给出错误:

类型错误:架构应该是 StructType 或字符串

我尝试了几个选项,例如fee_schema.to_string(show_schema_metadata = True)但它不起作用并给出 ParseError。

谢谢你的时间!

apache-spark pyspark databricks pyarrow azure-databricks

2
推荐指数
1
解决办法
3657
查看次数

AssertionError:断言失败:没有在 Databricks 中进行 DeleteFromTable 的计划

这个命令运行良好有什么原因吗:

%sql SELECT * FROM Azure.Reservations WHERE timestamp > '2021-04-02'
Run Code Online (Sandbox Code Playgroud)

返回 2 行,如下:

%sql DELETE FROM Azure.Reservations WHERE timestamp > '2021-04-02'
Run Code Online (Sandbox Code Playgroud)

失败并显示:

SQL语句中的错误:AssertionError:断言失败:没有DeleteFromTable的计划(timestamp#394 > 1617321600000000)

我是 Databricks 新手,但我确信我在另一个表上运行了类似的命令(没有 WHERE 子句)。该表是基于 Parquet 文件创建的。

apache-spark databricks azure-databricks delta-lake

2
推荐指数
1
解决办法
8767
查看次数

如何使用 Azure 数据工厂 (ADF) 创建 Spot 实例 - 作业集群 - 链接服务

我有一个包含 Databricks 活动的 ADF 管道。

该活动每次都会创建一个新的作业集群,并且我已将所有必需的 Spark 配置添加到相应的链接服务中。

现在,随着 Databricks 提供 Spot 实例,我想在 Databricks 中使用 Spot 配置创建新集群。

我尝试从LinkedService文档中找到帮助,但没有成功!

如何使用 ADF 执行此操作?

干杯!!!

azure cost-management azure-data-factory azure-databricks databricks-workflows

2
推荐指数
1
解决办法
3347
查看次数

How to merge a spark dataframe with hive table on Databricks Deltalake?

I have a dataframe as below:

val data = Seq(("James", "Sales", 34, "Developer"), ("Michael", "Sales", 56, "Architect"), ("Robert", "Sales", 30, "Manager"), ("Maria", "Finance", 24, "Consultant"))
val df1 = data.toDF("name","dept","id", "role")
df1.printSchema()
root
|-- name: string (nullable = true)
|-- dept: string (nullable = true)
|-- id: integer (nullable = true)
|-- role: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

I have a hive table with same columns and exact schema:

val df2 = spark.sql("select * from db.table")
Run Code Online (Sandbox Code Playgroud)

From the incoming dataframe df1 I …

apache-spark databricks delta-lake

2
推荐指数
1
解决办法
1780
查看次数

从数据湖重命名 Azure Databricks 中的文件时出现问题

我正在尝试使用“rename ()”函数通过“import os”库在Azure Databricks中使用Python重命名文件,这确实非常简单,但是在Databricks中执行此操作时我无法到达其中的路径我的文件是。在数据湖中,但是执行命令“% fs ls path_file”是的,我看到了它,我什至可以毫无问题地读取它并使用 pyspark 处理它。

我留下我的代码示例:

import os
old_name = r"/mnt/datalake/path/part-00000-tid-1761178-3f1b0942-223-1-c000.csv"
new_name = r"/mnt/datalake/path/example.csv"

os.rename(old_name, new_name)
Run Code Online (Sandbox Code Playgroud)

上面返回一个错误,指出找不到路径或文件,但“ls”命令可以毫无问题地执行相同的路径。

另一方面,我尝试用 pySpark 重命名该文件,但它使用了我没有安装的 hadoop 库(org.apache.hadoop.conf.Configuration),并且无法在生产环境中安装它......

我会缺少什么?

python azure azure-data-lake databricks azure-databricks

2
推荐指数
1
解决办法
3532
查看次数

将信息保存到 Cassandra 不会保持顺序

我正在使用 Scala 并尝试将我的日历信息从 Spark 保存到 Cassandra。

我开始使用 Cassandra 创建相同的架构:

session.execute("CREATE TABLE calendar (DateNum int, Date text, YearMonthNum int, ..., PRIMARY KEY (datenum,date))")

然后将我的数据从 Spark 导入到 Cassandra:

        .write
        .format("org.apache.spark.sql.cassandra")
        .options(Map("table" -> "calendar", "keyspace" -> "ks"))
        .mode(SaveMode.Append)
        .save()
Run Code Online (Sandbox Code Playgroud)

但是,当我尝试读取从 Cassandra 上的 Spark 检索的数据时,行看起来非常混乱,而我想保持日历的顺序相同。

我有一个行的例子:

20090111 | 2009 年 1 月 11 日 | 200901 |...

选择/订购似乎也不能解决问题。

scala cassandra apache-spark spark-cassandra-connector

2
推荐指数
1
解决办法
73
查看次数

azure databricks 中具有 Spark/dbutils 依赖项的自定义 python 模块

我最近在 Azure Databricks 上启用了预览功能“存储库中的文件”,这样我就可以将许多常规功能从笔记本转移到模块,并消除为单个作业运行大量笔记本带来的开销。

但是,我的一些函数直接依赖于 dbutils 或 Spark/pyspark 函数(例如dbutils.secrets.get()spark.conf.set())。由于这些模块是在笔记本的后台导入的,并且直接与底层会话相关联,因此我完全不知道如何在自定义模块中引用这些模块。

对于我的小示例模块,我通过将 dbutils 设置为参数来修复它,如下例所示:

class Connection:
    def __init__(self, dbutils):
        token = dbutils.secrets.get(scope="my-scope", key="mykey")
        ...
Run Code Online (Sandbox Code Playgroud)

然而,对所有现有函数执行此操作将需要大量重写函数和调用它们的行。我怎样才能避免这个过程并以更干净的方式进行?

python python-3.x pyspark databricks azure-databricks

2
推荐指数
1
解决办法
2236
查看次数