标签: apache-spark-sql

Spark SQL 表最大列数

我试图通过在 Scala 程序中创建列数超过 200 的 RDD 来创建 Spark SQL 表。当我将架构创建为以下内容时,编译(sbt 编译)失败并出现 java.lang.StackOverflowError 异常:

StructField("RT", StringType,nullable = true) ::
StructField("SERIALNO", StringType,nullable = true) ::
StructField("SPORDER", StringType,nullable = true) ::
// ... remaining 200+ columns
Run Code Online (Sandbox Code Playgroud)

无法粘贴堆栈跟踪,因为它超过 1.5k 行将

列数减少到大约 100-120 编译成功。另外,当我使用架构字符串创建架构(拆分架构字符串,然后创建它的映射)时,编译成功(https://spark.apache.org/docs/1.3中标题“以编程方式指定架构”下的第一个示例。 0/sql-programming-guide.html)。

手动指定导致异常的模式似乎有什么问题?

scala apache-spark-sql

1
推荐指数
1
解决办法
979
查看次数

Windows:Apache Spark 历史服务器配置

我想使用 Spark 的 History Server 来利用 Web UI 的日志记录机制,但我发现在 Windows 计算机上运行此代码有些困难。

我做了以下事情:

设置我的 Spark-defaults.conf 文件以反映

spark.eventLog.enabled=true
spark.eventLog.dir=file://C:/spark-1.6.2-bin-hadoop2.6/logs
spark.history.fs.logDirectory=file://C:/spark-1.6.2-bin-hadoop2.6/logs
Run Code Online (Sandbox Code Playgroud)

我的spark-env.sh反映:

SPARK_LOG_DIR    "file://C:/spark-1.6.2-bin-hadoop2.6/logs"
SPARK_HISTORY_OPTS   "-Dspark.history.fs.logDirectory=file://C:/spark-1.6.2-bin-hadoop2.6/logs"
Run Code Online (Sandbox Code Playgroud)

我正在使用 Git-BASH 运行 start-history-server.sh 文件,如下所示:

USERA@SYUHUH MINGW64 /c/spark-1.6.2-bin-hadoop2.6/sbin
$ sh start-history-server.sh
Run Code Online (Sandbox Code Playgroud)

而且,我收到此错误:

USERA@SYUHUH MINGW64 /c/spark-1.6.2-bin-hadoop2.6/sbin
$ sh start-history-server.sh
C:\spark-1.6.2-bin-hadoop2.6/conf/spark-env.sh: line 69: SPARK_LOG_DIR: command not found
C:\spark-1.6.2-bin-hadoop2.6/conf/spark-env.sh: line 70: SPARK_HISTORY_OPTS: command not found
ps: unknown option -- o
Try `ps --help' for more information.
starting org.apache.spark.deploy.history.HistoryServer, logging to C:\spark-1.6.2-bin-hadoop2.6/logs/spark--org.apache.spark.deploy.history.HistoryServer-1-SGPF02M9ZB.out
ps: unknown option -- o
Try `ps --help' for …
Run Code Online (Sandbox Code Playgroud)

windows git bash apache-spark apache-spark-sql

1
推荐指数
1
解决办法
3249
查看次数

Spark-dataframe:使用 2 个布尔条件创建新列

我想根据 2 个布尔条件通过按位 AND 运算来改变我的数据帧

df %>% mutate(newVariable = ifelse(variable1 == "value1" & variable2 == "value2, variable3, NULL)
Run Code Online (Sandbox Code Playgroud)

所以在 PySpark 中测试了这个:

import pyspark.sql.functions as func

df.withColumn("newVariable", func.when( \
     func.col("variable1") == "value1" & func.col("variable2") == "value2", \
     func.col("variable3")))
Run Code Online (Sandbox Code Playgroud)

但我有一个错误

使用 Spark DataFrame 创建这种新变量的正确方法是什么?

apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
6262
查看次数

Spark数据帧保存到具有自动增量列的SQL表

我在数据库中有下表

+----------------+------------+------+-----+---------+----------------+
| Field          | Type       | Null | Key | Default | Extra          |
+----------------+------------+------+-----+---------+----------------+
| id             | bigint(20) | NO   | PRI | NULL    | auto_increment |
| VERSION        | bigint(20) | NO   |     | NULL    |                |
| user_id        | bigint(20) | NO   | MUL | NULL    |                |
| measurement_id | bigint(20) | NO   | MUL | NULL    |                |
| day            | timestamp  | NO   |     | NULL    |                |
| hour           | tinyint(4) | …
Run Code Online (Sandbox Code Playgroud)

jdbc apache-spark apache-spark-sql

1
推荐指数
1
解决办法
2897
查看次数

新手(spark 数据帧)- df.count().show() 返回 AttributeError

对于新手问题表示歉意。我刚刚学习。

我只是尝试从 Cloudant 数据库创建 Spark 数据帧并计算条目数。调用函数进行计数后,出现错误:

AttributeErrorTraceback (most recent call last)
<ipython-input-5-56a7e10a510b> in <module>()
----> 1 count(cloudantdata,spark)

<ipython-input-2-f2dcd9d73d7e> in count(df, spark)
      1 def count(df,spark):
      2     #TODO Please enter your code here
----> 3     df.count().show()
      4     return

AttributeError: 'int' object has no attribute 'show'
Run Code Online (Sandbox Code Playgroud)

请参阅笔记本: https://apsportal.ibm.com/analytics/notebooks/c83c959b-2994-4ac7-9af7-f9d33d4dc461/view ?access_token=6a057cadfdd07252e5977a5eb65936185673dd1d1213ab8a003874edbfde6808

python apache-spark-sql

1
推荐指数
1
解决办法
1万
查看次数

数据框中列的几何平均值

我使用此代码来计算数据框中所有行的几何平均值:

from pyspark.sql.functions import rand, randn, sqrt
df = sqlContext.range(0, 10)    
df = df.select(rand(seed=10).alias("c1"), randn(seed=27).alias("c2"))

df.show()

newdf = df.withColumn('total', sqrt(sum(df[col] for col in df.columns)))
newdf.show()
Run Code Online (Sandbox Code Playgroud)

这显示:

在此输入图像描述

要计算列而不是行的几何平均值,我认为这段代码应该足够了:

newdf = df.withColumn('total', sqrt(sum(df[row] for row in df.rows)))
Run Code Online (Sandbox Code Playgroud)

但这会引发错误:NameError: global name 'row' is not defined

因此,访问列的 api 与访问行的 api 不同。

我应该格式化数据以将行转换为列,然后重新使用工作算法:newdf = df.withColumn('total', sqrt(sum(df[col] for col in df.columns)))或者是否有按原样处理行和列的解决方案?

apache-spark-sql pyspark

1
推荐指数
1
解决办法
2491
查看次数

集群中节点不健康

集群上的节点处于不健康状态的原因有哪些?

根据我有限的理解,当给定节点上的 HDFS 利用率超过阈值时,通常会发生这种情况。此阈值是使用 max-disk-utilization-per-disk-percentage 属性定义的。

我有时观察到在 Spark-sql 上触发内存密集型 Spark 作业或使用 pyspark 节点进入不健康状态。经过进一步查看,我在处于不健康状态的节点上进行了 ssh,发现实际上 dfs 利用率低于 75%,并且在我的集群上为上述属性设置的值是 99。

所以我认为我遗漏了一些其他事实,这基本上导致了这种行为。

在此先感谢您的帮助。

曼尼什·梅赫拉

hdfs amazon-emr emr hadoop-yarn apache-spark-sql

1
推荐指数
1
解决办法
6744
查看次数

基于字符串结尾的正则表达式过滤数据集

我正在使用 REGEXP 过滤具有 10 行的数据集,如下所示:

ID     Product
1      "VENLAFAXINE HCL CAP ER 24HR 37.5 MG (BASE EQUIVALENT)"
2      "MINOXIDIL POWDER"
3      "MENTHOL LOZENGE 10 MG"
4      "ZINC CHLORIDE GRANULES"
5      "CLOPIDOGREL BISULFATE TAB 75 MG (BASE EQUIV)"
6      "METHYLPREDNISOLONE TAB THERAPY PACK 4 MG (21)"
7      "DEXAMETHASONE TAB THERAPY PACK 1.5 MG (7)"
8      "METHYLPREDNISOLONE DOSE P (16)"
9      "MILLIPRED DP (13)"
10     "ZONACORT 7 DAY"
Run Code Online (Sandbox Code Playgroud)

并且会让它看起来像

ID     Product
6      "METHYLPREDNISOLONE TAB THERAPY PACK 4 MG (21)"
7      "DEXAMETHASONE TAB …
Run Code Online (Sandbox Code Playgroud)

regex sql apache-spark apache-spark-sql

1
推荐指数
1
解决办法
73
查看次数

PySpark DataFrame - 过滤嵌套列

我知道那里有很多类似的问题,但我还没有找到任何与我的场景完全匹配的问题,所以请不要对重复标志满意。我正在使用 Spark 3.0.1 在 Azure Databricks 中使用 Python 3 笔记本。

我有以下数据帧

+---+---------+--------+
|ID |FirstName|LastName|
+---+---------+--------+
|1  |John     |Doe     |
|2  |Michael  |        |
|3  |Angela   |Merkel  |
+---+---------+--------+
Run Code Online (Sandbox Code Playgroud)

可以使用此代码创建

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
import pyspark.sql.functions as F

data2 = [(1,"John","Doe"),
    (2,"Michael",""),
    (3,"Angela","Merkel")
  ]

schema = StructType([ \
    StructField("ID",IntegerType(),True), \
    StructField("FirstName",StringType(),True), \
    StructField("LastName",StringType(),True), \
  ])
 
df1 = spark.createDataFrame(data=data2,schema=schema)
df1.printSchema()
df1.show(truncate=False)
Run Code Online (Sandbox Code Playgroud)

我把它转换成这个 DataFrame

+---+-----------------------------------------+
|ID |Names                                    |
+---+-----------------------------------------+
|1  |[[FirstName, John], [LastName, Doe]]     |
|2  |[[FirstName, Michael], [LastName, …
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
78
查看次数

如何在pyspark中压缩/连接值和列表

我正在处理一个使用 4 个输入的函数。

为此,我想得到一个总结这 4 个元素的列表。但是我有两个变量,其中数据是唯一的,两个变量由列表组成。我可以用 压缩这两个列表arrays_zip,但无法获得包含 4 个元素的数组列表:

+----+----+---------+---------+
| l1 | l2 |   l3    |   l4    |
+----+----+---------+---------+
| 1  | 5  | [1,2,3] | [2,2,2] |
| 2  | 9  | [8,2,7] | [1,7,7] |
| 3  | 3  | [8,4,9] | [5,1,3] |
| 4  | 1  | [5,5,3] | [8,4,3] |

Run Code Online (Sandbox Code Playgroud)

我想得到什么:


+----+----+---------+---------+------------------------------------------+
| l1 | l2 |   l3    |   l4    |                       l5                 |
+----+----+---------+---------+------------------------------------------+
| 1  | 5  | [1,2,3] | [2,2,2] …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
54
查看次数