使用 pyspark 从给定路径读取(Spark 2.4/Java 11)
df = spark.read.format('parquet').load(path_to_read)
Run Code Online (Sandbox Code Playgroud)
失败,由于
IllegalArgumentException: '不支持的类文件主要版本 55'
我在 Java 11 (jdk-11.0.2) 上运行 Spark,并检查Spark 2.4 概览,我想知道 Spark 2.4 是否支持 Java 11?
谢谢
我有两个数据框,我正试图找出它们之间的区别。2个数据帧包含struct数组。我不需要该结构中的1个键。因此,我首先将其删除,然后转换为JSON字符串。比较时,我需要知道该数组(Json)中更改了多少个元素。有办法做到这一点吗?
双方base_data_set并target_data_set包含ID和KEY。KEY是一个array<Struct>:
root
|-- id: string (nullable = true)
|-- result: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key1: integer (nullable = true)
| | |-- key3: string (nullable = false)
| | |-- key2: string (nullable = true)
| | |-- key4: string (nullable = true)
val temp_base = base_data_set
.withColumn("base_result", explode(base_data_set(RESULT)))
.withColumn("base",
struct($"base_result.key1", $"base_result.key2", $"base_result.key3"))
.groupBy(ID)
.agg(to_json(collect_list("base")).as("base_picks"))
val temp_target …Run Code Online (Sandbox Code Playgroud) 在android studio的最新版本中,我们可以轻松地将库添加https://jitpack.io到gradle文件(模块项目)中,但在最新的更新(android-studio-bumble-bee)中,该文件的结构已更改,我不知道如何添加该库。
我正在尝试在Azure HDInsight按需群集上运行基于Spark的应用程序,并且看到许多SparkExceptions(由ConcurrentModificationExceptions引起)被记录.启动本地Spark实例时,应用程序运行时没有这些错误.
我在使用累加器时看到了类似错误的报告,而我的代码确实使用了CollectionAccumulator,但是我已经在我使用它的地方放置了同步块,并且没有任何区别.与累加器相关的代码如下所示:
class MySparkClass(sc : SparkContext) {
val myAccumulator = sc.collectionAccumulator[MyRecord]
override def add(record: MyRecord) = {
synchronized {
myAccumulator.add(record)
}
}
override def endOfBatch() = {
synchronized {
myAccumulator.value.asScala.foreach((record: MyRecord) => {
processIt(record)
})
}
}
}
Run Code Online (Sandbox Code Playgroud)
异常不会导致应用程序失败,但是当endOfBatch调用并且代码尝试从累加器中读取值时,它是空的并且processIt永远不会被调用.
我们使用HDInsight版本3.6和Spark版本2.3.0
18/11/26 11:04:37 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814) …Run Code Online (Sandbox Code Playgroud) 我正在使用 Kmeans 作为聚类算法,我的代码想要执行并向我显示此错误:
org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$1525/671078904: (struct<latitude:double,longitude:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
Run Code Online (Sandbox Code Playgroud)
这是数据框代码:
org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$1525/671078904: (struct<latitude:double,longitude:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
Run Code Online (Sandbox Code Playgroud)
对于架构它是有效的,但如果我把节目放出来,我就会遇到问题。
我有一个包含多个分类列的数据框。我正在尝试使用两列之间的内置函数查找卡方统计信息:
from pyspark.ml.stat import ChiSquareTest
r = ChiSquareTest.test(df, 'feature1', 'feature2')
Run Code Online (Sandbox Code Playgroud)
但是,它给了我错误:
IllegalArgumentException: 'requirement failed: Column feature1 must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually double.'
Run Code Online (Sandbox Code Playgroud)
的数据类型feature1是:
feature1: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)
你能帮我解决这个问题吗?
我在包含对称性的算法中看到的一个常见结构是
for (int i = 0; i < n ; i++) {
for (int j = i+1; j < n ; j++) {
[compute x]
objects[i][j] += x;
objects[j][i] -= x;
}
}
Run Code Online (Sandbox Code Playgroud)
这(虽然仍然具有 O(n^2) 复杂性)减少了利用对称性所需的计算量。您能告诉我在 pyspark 代码中引入这种优化的方法是什么吗?
例如,我编写了代码,根据公式(其中r是位置)计算作用在系统中每个粒子上的每单位质量的力:
N m_j*(r_i - r_j)
F = -G * ? -----------------
i!=j |r_i - r_j|^3
Run Code Online (Sandbox Code Playgroud)
在其中,我首先对我的数据帧与自身进行叉积以获得每个成对的相互作用,然后通过 id 将它们全部聚合以获得作用在每个粒子上的总力:
def calc_F(df_clust, G=1):
# cartesian product of the dataframe with itself
renameCols = [f"`{col}` as `{col}_other`" for col in df_clust.columns]
df_cart = …Run Code Online (Sandbox Code Playgroud) 我是第一次使用 Apache Spark。我运行我的应用程序,当我访问时,localhost:4040我得到了图片中显示的内容。我发现也许设置
spark.ui.enabled true
Run Code Online (Sandbox Code Playgroud)
可以提供帮助,但我不知道该怎么做。
提前致谢。
css user-interface localhost apache-spark google-cloud-dataproc
我正在尝试在具有约15.000.000唯一字符串值的列上使用Spark的StringIndexer功能转换器。无论我投入多少资源,Spark都会因内存不足异常而死在我身上。
from pyspark.ml.feature import StringIndexer
data = spark.read.parquet("s3://example/data-raw").select("user", "count")
user_indexer = StringIndexer(inputCol="user", outputCol="user_idx")
indexer_model = user_indexer.fit(data) # This never finishes
indexer_model \
.transform(data) \
.write.parquet("s3://example/data-indexed")
Run Code Online (Sandbox Code Playgroud)
驱动程序上会生成一个错误文件,其开头如下所示:
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 268435456 bytes for committing reserved memory.
# Possible reasons:
# The system is out of physical RAM or swap space
# In 32 bit mode, the process size limit was hit …Run Code Online (Sandbox Code Playgroud) 我正在使用此处给出的方法在Spark SQL中展平一个DataFrame。这是我的代码:
package com.acme.etl.xml
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, SparkSession}
object RuntimeError { def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("FlattenSchema").getOrCreate()
val rowTag = "idocData"
val dataFrameReader =
spark.read
.option("rowTag", rowTag)
val xmlUri = "bad_011_1.xml"
val df =
dataFrameReader
.format("xml")
.load(xmlUri)
val schema: StructType = df.schema
val columns: Array[Column] = flattenSchema(schema)
val df2 = df.select(columns: _*)
}
def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
schema.fields.flatMap(f => {
val colName: String = if (prefix …Run Code Online (Sandbox Code Playgroud)