我正在使用 Scala 并尝试将我的日历信息从 Spark 保存到 Cassandra。
我开始使用 Cassandra 创建相同的架构:
session.execute("CREATE TABLE calendar (DateNum int, Date text, YearMonthNum int, ..., PRIMARY KEY (datenum,date))")
然后将我的数据从 Spark 导入到 Cassandra:
.write
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "calendar", "keyspace" -> "ks"))
.mode(SaveMode.Append)
.save()
Run Code Online (Sandbox Code Playgroud)
但是,当我尝试读取从 Cassandra 上的 Spark 检索的数据时,行看起来非常混乱,而我想保持日历的顺序相同。
我有一个行的例子:
20090111 | 2009 年 1 月 11 日 | 200901 |...
选择/订购似乎也不能解决问题。
我最近在 Azure Databricks 上启用了预览功能“存储库中的文件”,这样我就可以将许多常规功能从笔记本转移到模块,并消除为单个作业运行大量笔记本带来的开销。
但是,我的一些函数直接依赖于 dbutils 或 Spark/pyspark 函数(例如dbutils.secrets.get()和spark.conf.set())。由于这些模块是在笔记本的后台导入的,并且直接与底层会话相关联,因此我完全不知道如何在自定义模块中引用这些模块。
对于我的小示例模块,我通过将 dbutils 设置为参数来修复它,如下例所示:
class Connection:
def __init__(self, dbutils):
token = dbutils.secrets.get(scope="my-scope", key="mykey")
...
Run Code Online (Sandbox Code Playgroud)
然而,对所有现有函数执行此操作将需要大量重写函数和调用它们的行。我怎样才能避免这个过程并以更干净的方式进行?
我有一些文件大小只有几 MB 的表,我想将它们捕获为增量表。将新数据插入其中需要非常长的时间,超过 15 分钟,这让我感到惊讶。
我猜罪魁祸首是,虽然桌子很小;这些表中有 300 多列。
我尝试了以下方法,前者比后者更快(毫不奇怪(?)): (1) INSERT INTO, (2) MERGE INTO。
在将数据插入增量表之前,我应用了一些 Spark 函数来清理数据,然后最后将其注册为临时表(例如,INSERT INTO DELTA_TBL_OF_INTEREST (cols) SELECT * FROM tempTable
对于加快琐碎数据的这一过程有什么建议吗?
尝试在 python 中编译 sql 查询。我以前在 python 中经常这样做。但我过去从未遇到过这个错误。帮我解决同样的问题。
询问:
from pandasql import sqldf
import pandas as pd
from sklearn import datasets
Q10="select bucket,count(*) as COUNT,min(probability) as MINSCORE,max(probability) as MAXSCORE,(avg(probability)*100) as PREDDEFRATE,sum(response) as RESPONSE,count(*)-sum(response) as NONRESPONSE from score group by 1;"
Bucket_Details = sqldf(Q10,globals())
display(Bucket_Details)
Run Code Online (Sandbox Code Playgroud)
类型错误:init () 获得了参数“schema”的多个值
提前致谢。
我有一个关于 Pyspark 写作的问题,我目前正在运行以下行:
sat_prospect_credentials.write.format("delta").mode("append").save(f"{TABLE_MAP[table]}")
Run Code Online (Sandbox Code Playgroud)
它将 Spark DataFrame 以增量格式写入 Azure ADLS。我遇到以下问题,我有一个名为 end_date 的列,其中填充了 Null 值(这是故意的,因为这是用于 SCD 管理),但是,当我写入 DataLake 时,该列将被删除。有谁知道如何防止这种情况发生?

谢谢!
我尝试过 schemaOverwrite、mergeSchema 和其他选项。我不知道从这里开始如何跟进
我已经在 Ubuntu 16.04 和 Mac 上成功安装了 databricks cli。当我尝试在 Ubuntu 18.04(Azure VM)上安装它时,它看起来运行得很好,然后当我尝试调用 cli 工具时它没有安装。我在 Ubuntu 16.04 和 OSX 上运行良好。关于让这个工作的任何想法?输出复制如下
vstsTestLogin@PensDataScienceVSTS:~$ pip install databricks-cli
Collecting databricks-cli
Using cached
https://files.pythonhosted.org/packages/de/8f/b0b5222c910eafb4dd6cc6de04d7821e6caefb5a9d927bc68c39206e422f/databricks_cli-0.8.2-py2-none-any.whl
Collecting tabulate>=0.7.7 (from databricks-cli)
Collecting configparser>=0.3.5 (from databricks-cli)
Collecting click>=6.7 (from databricks-cli)
Using cached https://files.pythonhosted.org/packages/34/c1/8806f99713ddb993c5366c362b2f908f18269f8d792aff1abfd700775a77/click-6.7-py2.py3-none-any.whl
Collecting six>=1.10.0 (from databricks-cli)
Using cached https://files.pythonhosted.org/packages/67/4b/141a581104b1f6397bfa78ac9d43d8ad29a7ca43ea90a2d863fe3056e86a/six-1.11.0-py2.py3-none-any.whl
Collecting requests>=2.17.3 (from databricks-cli)
Using cached https://files.pythonhosted.org/packages/65/47/7e02164a2a3db50ed6d8a6ab1d6d60b69c4c3fdf57a284257925dfc12bda/requests-2.19.1-py2.py3-none-any.whl
Collecting idna<2.8,>=2.5 (from requests>=2.17.3->databricks-cli)
Using cached https://files.pythonhosted.org/packages/4b/2a/0276479a4b3caeb8a8c1af2f8e4355746a97fab05a372e4a2c6a6b876165/idna-2.7-py2.py3-none-any.whl
Collecting certifi>=2017.4.17 (from requests>=2.17.3->databricks-cli)
Using cached https://files.pythonhosted.org/packages/df/f7/04fee6ac349e915b82171f8e23cee63644d83663b34c539f7a09aed18f9e/certifi-2018.8.24-py2.py3-none-any.whl
Collecting chardet<3.1.0,>=3.0.2 (from requests>=2.17.3->databricks-cli)
Using cached https://files.pythonhosted.org/packages/bc/a9/01ffebfb562e4274b6487b4bb1ddec7ca55ec7510b22e4c51f14098443b8/chardet-3.0.4-py2.py3-none-any.whl
Collecting urllib3<1.24,>=1.21.1 (from …Run Code Online (Sandbox Code Playgroud) 我是使用 Cassandra 的初学者。我创建了一个包含以下详细信息的表格,当我尝试使用令牌执行范围搜索时,我没有得到任何结果。我做错了什么还是我对数据模型的理解?
询问 select * from test where token(header)>=2 and token(header)<=4;
我正在使用 Azure Databricks 和 Scala。我想 show() 一个数据框,但我收到了一个我无法理解的错误,我想解决它。我的代码行是:
println("----------------------------------------------------------------Printing schema")
df.printSchema()
println("----------------------------------------------------------------Printing dataframe")
df.show()
println("----------------------------------------------------------------Error before")
Run Code Online (Sandbox Code Playgroud)
标准输出如下,消息“---------------------------------------------------- -------------------------错误之前”它不会出现。
> ----------------------------------------------------------------Printing schema
> root
> |-- processed: integer (nullable = false)
> |-- processDatetime: string (nullable = false)
> |-- executionDatetime: string (nullable = false)
> |-- executionSource: string (nullable = false)
> |-- executionAppName: string (nullable = false)
>
> ----------------------------------------------------------------Printing dataframe
> 2020-02-18T14:19:00.069+0000: [GC (Allocation Failure) [PSYoungGen: 1497248K->191833K(1789440K)] 2023293K->717886K(6063104K),
> 0.0823288 secs] [Times: user=0.18 sys=0.02, real=0.09 secs]
> 2020-02-18T14:19:40.823+0000: [GC …Run Code Online (Sandbox Code Playgroud) 所以我试图使用 delta Lake 写 df_concat.write.format("delta").mode("overwrite").save("file") it gives me this error:
java.lang.NoClassDefFoundError: org/apache/spark/sql/connector/catalog/TableProvider 和 delta Lake doc 说更新到 spark3 所以只想确认我们是否可以在 spark2.xx 上运行 deltalake
我正在使用 REGEXP 过滤具有 10 行的数据集,如下所示:
ID Product
1 "VENLAFAXINE HCL CAP ER 24HR 37.5 MG (BASE EQUIVALENT)"
2 "MINOXIDIL POWDER"
3 "MENTHOL LOZENGE 10 MG"
4 "ZINC CHLORIDE GRANULES"
5 "CLOPIDOGREL BISULFATE TAB 75 MG (BASE EQUIV)"
6 "METHYLPREDNISOLONE TAB THERAPY PACK 4 MG (21)"
7 "DEXAMETHASONE TAB THERAPY PACK 1.5 MG (7)"
8 "METHYLPREDNISOLONE DOSE P (16)"
9 "MILLIPRED DP (13)"
10 "ZONACORT 7 DAY"
Run Code Online (Sandbox Code Playgroud)
并且会让它看起来像
ID Product
6 "METHYLPREDNISOLONE TAB THERAPY PACK 4 MG (21)"
7 "DEXAMETHASONE TAB …Run Code Online (Sandbox Code Playgroud) 我正在努力在 pyspark 上的 pandas 上使用 pandas UDF。您能帮我理解如何实现这一目标吗?以下是我的尝试:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark import pandas as ps
spark = SparkSession.builder.getOrCreate()
df = ps.DataFrame({'A': 'a a b'.split(),
'B': [1, 2, 3],
'C': [4, 6, 5]}, columns=['A', 'B', 'C'])
@pandas_udf('float')
def agg_a(x):
return (x**2).mean()
@pandas_udf('float')
def agg_b(x):
return x.mean()
spark.udf.register('agg_a_',agg_a)
spark.udf.register('agg_b_',agg_b)
df_means = df.groupby('A')
dfout=df_means.agg({'B':'agg_a_','C':'agg_b_'})
Run Code Online (Sandbox Code Playgroud)
这导致了我很难理解的异常:
AnalysisException: expression 'B' is neither present in the group by, nor is it an aggregate function. Add to group by …Run Code Online (Sandbox Code Playgroud) 我想使用该命令将 Databricks 上的 Delta 表中的数据加载到 Snowflake 上的表中MERGE INTO。
目标是 Databricks 上的 Delta 表中的记录数量与 Snowflake 上的表中的记录数量看起来相同。
发生的问题是,由于 Delta Lake(S3 路径)有多个版本,Snowflake 会查询重复记录。
如何才能只读取最新版本的 Delta Lake?
MERGE INTO myTable as target USING (
SELECT
$1:DAY::TEXT AS DAY,
$1:CHANNEL_CATEGORY::TEXT AS CHANNEL_CATEGORY,
$1:SOURCE::TEXT AS SOURCE,
$1:PLATFORM::TEXT AS PLATFROM,
$1:LOB::TEXT AS LOB
FROM @StageFilePathDeltaLake
(FILE_FORMAT => 'sf_parquet_format')
) as src
ON target.CHANNEL_CATEGORY = src.CHANNEL_CATEGORY
AND target.SOURCE = src.SOURCE
WHEN MATCHED THEN
UPDATE SET
DAY= src.DAY
,PLATFORM= src.PLATFORM
,LOB= src.LOB
WHEN NOT MATCHED THEN
INSERT …Run Code Online (Sandbox Code Playgroud) 我在 Scala 中有这段代码,但对 Python 不太熟悉,无法转换它:
val formatterComma = java.text.NumberFormat.getIntegerInstance
def createTD(value: String) : String = {
return s"""<td align="center" style="border:1px solid">${value}</td>"""
}
def createTD(value: BigInt) : String = {
return createTD(value.toString)
}
def createTDDouble(value: Double) : String = {
return createTD("$" + formatterComma.format(value))
}
def createTheLink(productId: String) : String = {
return s"""<td align="center" style="border:1px solid"><a href="https://productLink/$product>Link Here</a></td>"""
}
def createTH(value: String) : String = {
return s"""<th class="gmail-highlight-red gmail-confluenceTh gmail-tablesorter-header gmail-sortableHeader gmail-tablesorter-headerUnSorted" tabindex="0" scope="col" style="width:1px;white-space:nowrap;border:1px solid #000000;padding:7px 15px 7px …Run Code Online (Sandbox Code Playgroud) apache-spark ×7
databricks ×5
pyspark ×5
python ×5
delta-lake ×4
scala ×3
cassandra ×2
datastax ×1
pandas ×1
python-3.x ×1
regex ×1
snowflake-cloud-data-platform ×1
spark-koalas ×1
sql ×1
sqldf ×1
ubuntu ×1