标签: databricks

如何将pyspark.sql.dataframe.DataFrame转换回databricks笔记本中的sql表

pyspark.sql.dataframe.DataFrame通过执行以下行创建了一个类型的数据框: dataframe = sqlContext.sql("select * from my_data_table")

我怎样才能将它转换回我可以运行sql查询的sparksql表?

python sql apache-spark pyspark databricks

8
推荐指数
1
解决办法
6426
查看次数

Spark - 使用Firehose从分区文件夹中读取JSON

Kinesis firehose将文件的持久性(在本例中为时间序列JSON)管理到由YYYY/MM/DD/HH划分的文件夹层次结构(直到24编号中的小时)......很棒.

如何使用Spark 2.0然后我可以读取这些嵌套的子文件夹并从所有叶子json文件创建一个静态Dataframe?数据框阅读器是否有"选项"?

我的下一个目标是将其作为流式DF,其中Firehose持久保存到s3中的新文件自然会成为使用Spark 2.0中新结构化流媒体的流数据帧的一部分.我知道这都是实验性的 - 希望有人之前使用S3作为流媒体文件源,其中数据被分成如上所述的文件夹.当然更喜欢直接使用Kinesis流,但是这个连接器上没有2.0的日期,所以Firehose-> S3是临时的.

ND:我正在使用databricks,它将S3安装到DBFS中,但当然可以很容易地成为EMR或其他Spark提供商.很高兴看到一个笔记本电脑,如果一个人可以分享给出一个例子.

干杯!

apache-spark apache-spark-sql databricks spark-structured-streaming

8
推荐指数
1
解决办法
2710
查看次数

Apache Spark SQL 是否支持 MERGE 子句?

Apache Spark SQL 是否支持类似于 Oracle 的 MERGE SQL 子句的 MERGE 子句?

MERGE into <table> using (
  select * from <table1>
    when matched then update...
       DELETE WHERE...
    when not matched then insert...
)
Run Code Online (Sandbox Code Playgroud)

sql hadoop apache-spark apache-spark-sql databricks

8
推荐指数
2
解决办法
8290
查看次数

如何以两行划分pyspark数据帧

我在Databricks工作.

我有一个包含500行的数据帧,我想创建包含100行的两个数据帧,另一个包含剩余的400行.

+--------------------+----------+
|              userid| eventdate|
+--------------------+----------+
|00518b128fc9459d9...|2017-10-09|
|00976c0b7f2c4c2ca...|2017-12-16|
|00a60fb81aa74f35a...|2017-12-04|
|00f9f7234e2c4bf78...|2017-05-09|
|0146fe6ad7a243c3b...|2017-11-21|
|016567f169c145ddb...|2017-10-16|
|01ccd278777946cb8...|2017-07-05|
Run Code Online (Sandbox Code Playgroud)

我试过以下但是收到错误

df1 = df[:99]
df2 = df[100:499]


TypeError: unexpected item type: <type 'slice'>
Run Code Online (Sandbox Code Playgroud)

python pyspark spark-dataframe databricks

8
推荐指数
3
解决办法
1万
查看次数

NameError: 名称“dbutils”未在 pyspark 中定义

我正在 databricks 云中运行 pyspark 作业。作为这项工作的一部分,我需要将一些 csv 文件写入数据块文件系统(dbfs),并且我还需要使用一些 dbutils 本机命令,例如,

#mount azure blob to dbfs location
dbutils.fs.mount (source="...",mount_point="/mnt/...",extra_configs="{key:value}")
Run Code Online (Sandbox Code Playgroud)

一旦文件被写入挂载目录,我也试图卸载。但是,当我直接在 pyspark 作业中使用 dbutils 时,它失败了

NameError: name 'dbutils' is not defined
Run Code Online (Sandbox Code Playgroud)

我应该导入任何包以在 pyspark 代码中使用 dbutils 吗?提前致谢。

pyspark-sql azure-blob-storage databricks

8
推荐指数
2
解决办法
6836
查看次数

如何终止 Databricks 中的工作

我有一项长期运行的工作,如果满足某些条件,我想终止该工作。传统上这是用 python 完成的,如下所示:

if some_condition:
   exit('job failed!)
Run Code Online (Sandbox Code Playgroud)

这适用于 Python 脚本的传统命令行运行。但是,由于我无法理解的原因,Databricks 不会通过退出来终止脚本。

如何编写 Databricks python 脚本以在满足特定条件时自动停止执行?

python apache-spark databricks

8
推荐指数
1
解决办法
8498
查看次数

Databricks-GitHub 集成,自动将所有笔记本添加到存储库

我正在尝试为 Databricks设置GitHub 集成
我们那里有数百个笔记本,手动将每个笔记本添加到存储库中会很累。

有没有办法自动提交所有笔记本并将其从数据块送到存储库?

git automation github databricks

8
推荐指数
1
解决办法
4163
查看次数

是否可以在 databricks 中创建日历小部件?

我只是想使用日期作为笔记本的输入,并想使用小部件管理参数处理。databricks 小部件的标准文档 ( https://docs.databricks.com/user-guide/notebooks/widgets.html ) 不提供有关日历小部件的信息或创建新小部件类型的可能性。

他们只提供

  • 文本
  • 多选
  • 组合框
  • 落下

现在我为日、月和年创建不同的下拉小部件,但在我看来这不是一个好的解决方案。

有人在数据块中创建/修改小部件方面有过任何经验吗?

widget databricks

8
推荐指数
1
解决办法
2769
查看次数

使用Databricks Job api调用databricks笔记本运行-提交端点

我正在尝试建立一个调用 databricks 笔记本的 AWS lambda 函数(在 s3 触发器的情况下)。我知道我必须在 lambda 函数(python)代码中使用 databricks 的 Jobs API 来使用以下命令发出 POST 请求:运行提交函数的 JSON 负载。

虽然文档不是很清楚,但我能够调用测试脚本,并在检查响应文本时看到 databricks 登录页面 html 代码,这意味着它没有经过身份验证。

我确实阅读了用户令牌,但我不确定如何将它们纳入身份验证。

任何以其他方式完成此工作或帮助我使用 user_tokens 进行身份验证以便流程到达笔记本的执行而不是在身份验证页面停止的帮助都会有所帮助。

提前致谢。

代码示例:

import requests
import json

job_payload = {
  "run_name": 'just_a_run',
  "existing_cluster_id": '****',
  "notebook_task": 
    {
      "notebook_path": 'https://databricks.cloud.company.com/****'
    }
}

resp = requests.post('https://databricks.cloud.company.com/2.0/jobs/runs/submit', json=job_payload)
print(resp.status_code)
print(resp.text)

200


<!DOCTYPE html>

<html>
<head>
    <meta charset="utf-8"/>
    <meta http-equiv="Content-Language" content="en"/>
    <title>Databricks - Sign In</title>
    <meta name="viewport" content="width=960">
    <link rel="stylesheet" href="/login/bootstrap.min.css">
    <link rel="icon" type="image/png" href="login/favicon.ico" />

    <meta http-equiv="content-type" …
Run Code Online (Sandbox Code Playgroud)

databricks

8
推荐指数
1
解决办法
1万
查看次数

PySpark 和 Protobuf 反序列化 UDF 问题

我收到这个错误

Can't pickle <class 'google.protobuf.pyext._message.CMessage'>: it's not found as google.protobuf.pyext._message.CMessage

当我尝试在 PySpark 中创建 UDF 时。显然,它使用 CloudPickle 来序列化命令,但是,我知道 protobuf 消息包含C++实现,这意味着它不能被腌制。

我试图找到一种方法来覆盖CloudPickleSerializer,但是,我找不到方法。

这是我的示例代码:

from MyProject.Proto import MyProtoMessage
from google.protobuf.json_format import MessageToJson
import pyspark.sql.functions as F

def proto_deserialize(body):
  msg = MyProtoMessage()
  msg.ParseFromString(body)
  return MessageToJson(msg)

from_proto = F.udf(lambda s: proto_deserialize(s))

base.withColumn("content", from_proto(F.col("body")))
Run Code Online (Sandbox Code Playgroud)

提前致谢。

python pyspark databricks azure-databricks protobuf-python

8
推荐指数
1
解决办法
354
查看次数