标签: apache-spark-sql

使用 PySpark 从 Databricks 数据库 (hive_metastore ) 读取/提取数据

我正在尝试使用 PySpark 从 Databricks Hive_Metastore 读取数据。在下面的屏幕截图中,我尝试读取位于数据库中的名为“trips”的表nyctaxi

通常,如果该表位于 AzureSQL 服务器上,我将使用如下代码:

df = spark.read.format("jdbc")\
    .option("url", jdbcUrl)\
    .option("dbtable", tableName)\
    .load()
Run Code Online (Sandbox Code Playgroud)

或者,如果该表位于 ADLS 中,我将使用类似于以下内容的代码:

df = spark.read.csv("adl://mylake.azuredatalakestore.net/tableName.csv",header=True)
Run Code Online (Sandbox Code Playgroud)

有人可以告诉我如何使用 PySpark 从下面的 Databricks 数据库中读取表格:

在此输入图像描述

附加的屏幕截图我也有帮助

在此输入图像描述

好吧,我刚刚意识到我认为我应该问如何从“samples”meta_store 中读取表格。

无论如何,我希望帮助您从nyctaxi数据库中读取“trips”表。

python apache-spark-sql pyspark azure-databricks

1
推荐指数
1
解决办法
3536
查看次数

spark 日期格式 MMM dd, yyyy hh:mm:ss AM 到 df 中的时间戳

我需要将描述性日期格式从日志文件“MMM dd, yyyy hh:mm:ss AM/PM”转换为 spark 时间戳数据类型。我尝试了类似下面的方法,但它给出了空值。

val df = Seq(("Nov 05, 2018 02:46:47 AM"),("Nov 5, 2018 02:46:47 PM")).toDF("times")
df.withColumn("time2",date_format('times,"MMM dd, yyyy HH:mm:ss AM")).show(false)

+------------------------+-----+
|times                   |time2|
+------------------------+-----+
|Nov 05, 2018 02:46:47 AM|null |
|Nov 5, 2018 02:46:47 PM |null |
+------------------------+-----+
Run Code Online (Sandbox Code Playgroud)

预期输出

+------------------------+----------------------------+
|times                   |time2                       |
+------------------------+-----+----------------------+
|Nov 05, 2018 02:46:47 AM|2018-11-05 02:46:47.000000" |
|Nov 5, 2018 02:46:47 PM |2018-11-05 14:46:47.000000" |
+------------------------+-----+----------------------+
Run Code Online (Sandbox Code Playgroud)

转换这个的正确格式是什么?请注意,DD 可能有前导零。

apache-spark apache-spark-sql

0
推荐指数
1
解决办法
8360
查看次数

使用别名透视和聚合 PySpark 数据帧

我有一个与此类似的 PySpark DataFrame:

df = sc.parallelize([
    ("c1", "A", 3.4, 0.4, 3.5), 
    ("c1", "B", 9.6, 0.0, 0.0),
    ("c1", "A", 2.8, 0.4, 0.3),
    ("c1", "B", 5.4, 0.2, 0.11),
    ("c2", "A", 0.0, 9.7, 0.3), 
    ("c2", "B", 9.6, 8.6, 0.1),
    ("c2", "A", 7.3, 9.1, 7.0),
    ("c2", "B", 0.7, 6.4, 4.3)
]).toDF(["user_id", "type", "d1", 'd2', 'd3'])
df.show()
Run Code Online (Sandbox Code Playgroud)

这使:

+-------+----+---+---+----+
|user_id|type| d1| d2|  d3|
+-------+----+---+---+----+
|     c1|   A|3.4|0.4| 3.5|
|     c1|   B|9.6|0.0| 0.0|
|     c1|   A|2.8|0.4| 0.3|
|     c1|   B|5.4|0.2|0.11|
|     c2|   A|0.0|9.7| 0.3|
|     c2| …
Run Code Online (Sandbox Code Playgroud)

alias pivot aggregate-functions apache-spark-sql pyspark

0
推荐指数
1
解决办法
3422
查看次数

Scala:如何将任何通用序列作为此方法的输入

Scala 菜鸟在这里。仍在努力学习语法。

我正在尝试减少将测试数据转换为 DataFrame 时必须编写的代码。这是我现在所拥有的:

  def makeDf[T](seq: Seq[(Int, Int)], colNames: String*): Dataset[Row] = {
    val context = session.sqlContext
    import context.implicits._
    seq.toDF(colNames: _*)
  }
Run Code Online (Sandbox Code Playgroud)

问题是上述方法只需要一个形状序列Seq[(Int, Int)]作为输入。如何让它以任何序列作为输入?我可以将输入形状更改为Seq[AnyRef],但是代码无法将toDF调用识别为有效符号。

我无法弄清楚如何进行这项工作。有任何想法吗?谢谢!

scala dataframe apache-spark apache-spark-sql

0
推荐指数
1
解决办法
318
查看次数

加载 csv 文件时获取 com.univocity.parsers.common.TextParsingException

我正在尝试将数据中有很多新行的 tsv 数据集加入另一个数据帧并继续获取

com.univocity.parsers.common.TextParsingException

我已经清理了我的数据以用 NA 替换 \N,因为我认为这可能是原因但没有成功。

该错误将我指向错误数据中的以下记录

tt0100054 2 ?????????? ???SUHH RU NA NA 0

堆栈跟踪如下

    19/03/02 17:45:42 ERROR Executor: Exception in task 0.0 in stage 10.0 (TID 10)
com.univocity.parsers.common.TextParsingException: Length of parsed input (1000001) exceeds the maximum number of characters defined in your parser settings (1000000). 
Identified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '\n'. Parsed content:
    Sesso e …
Run Code Online (Sandbox Code Playgroud)

parsing apache-spark apache-spark-sql univocity

0
推荐指数
1
解决办法
2259
查看次数

PySpark:如何更新嵌套列?

StackOverflow 有一些关于如何更新数据框中嵌套列的答案。但是,其中一些看起来有点复杂。

在搜索时,我从 DataBricks 中找到了处理相同场景的文档:https ://docs.databricks.com/user-guide/faq/update-nested-column.html

val updated = df.selectExpr("""
    named_struct(
        'metadata', metadata,
        'items', named_struct(
          'books', named_struct('fees', items.books.fees * 1.01),
          'paper', items.paper
        )
    ) as named_struct
""").select($"named_struct.metadata", $"named_struct.items")
Run Code Online (Sandbox Code Playgroud)

这看起来也很干净。不幸的是,我不知道 Scala。我将如何将其翻译成 Python?

scala apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
1535
查看次数

将多行合并为一行

我正在尝试通过 pyspark 构建 sql 来实现这一点。目标是将多行组合成单行示例:我想将其转换为

+-----+----+----+-----+
| col1|col2|col3| col4|
+-----+----+----+-----+
|x    |  y |  z |13::1|
|x    |  y |  z |10::2|
+-----+----+----+-----+
Run Code Online (Sandbox Code Playgroud)

+-----+----+----+-----------+
| col1|col2|col3|       col4|
+-----+----+----+-----------+
|x    |  y |  z |13::1;10::2|
+-----+----+----+-----------+
Run Code Online (Sandbox Code Playgroud)

sql apache-spark-sql pyspark pyspark-sql

0
推荐指数
1
解决办法
3472
查看次数

将数据框转换为列名和值的结构数组

假设我有一个这样的数据框

val customer = Seq(
    ("C1", "Jackie Chan", 50, "Dayton", "M"),
    ("C2", "Harry Smith", 30, "Beavercreek", "M"),
    ("C3", "Ellen Smith", 28, "Beavercreek", "F"),
    ("C4", "John Chan", 26, "Dayton","M")
  ).toDF("cid","name","age","city","sex")
Run Code Online (Sandbox Code Playgroud)

我怎样才能在一列中获得 cid 值并array < struct < column_name, column_value > >在火花中获得其余的值

scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
375
查看次数

pyspark 在数据框中传递多个选项

我是 python 和 pyspark 的新手。我想知道如何在 pyspark 中编写以下 spark 数据帧函数:

val df = spark.read.format("jdbc").options(
       Map(
        "url" -> "jdbc:someDB", 
        "user" -> "root", 
        "password" -> "password", 
        "dbtable" -> "tableName", 
        "driver" -> "someDriver")).load()
Run Code Online (Sandbox Code Playgroud)

我试着在pyspark中写如下。但是,得到语法错误:

df = spark.read.format("jdbc").options(
      map(lambda : ("url","jdbc:someDB"), ("user","root"), ("password","password"), ("dbtable","tableName"), ("driver","someDriver"))).load()
Run Code Online (Sandbox Code Playgroud)

提前致谢

apache-spark-sql pyspark-sql

0
推荐指数
1
解决办法
1998
查看次数

Spark Dataframe 在覆盖 Hive 表的分区数据时出现问题

下面是我的 Hive 表定义:

CREATE EXTERNAL TABLE IF NOT EXISTS default.test2(
id integer,
count integer
)
PARTITIONED BY (
fac STRING,
fiscaldate_str DATE )
STORED AS PARQUET
LOCATION 's3://<bucket name>/backup/test2';
Run Code Online (Sandbox Code Playgroud)

我有如下配置单元表中的数据,(我刚刚插入了示例数据)

select * from default.test2

+---+-----+----+--------------+
| id|count| fac|fiscaldate_str|
+---+-----+----+--------------+
|  2|    3| NRM|    2019-01-01|
|  1|    2| NRM|    2019-01-01|
|  2|    3| NRM|    2019-01-02|
|  1|    2| NRM|    2019-01-02|
|  2|    3| NRM|    2019-01-03|
|  1|    2| NRM|    2019-01-03|
|  2|    3|STST|    2019-01-01|
|  1|    2|STST|    2019-01-01|
|  2|    3|STST| …
Run Code Online (Sandbox Code Playgroud)

hive partition apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
3348
查看次数