小编Oli*_*Oli的帖子

Spark 2.4 和 Java 11 兼容性

使用 pyspark 从给定路径读取(Spark 2.4/Java 11)

df = spark.read.format('parquet').load(path_to_read)
Run Code Online (Sandbox Code Playgroud)

失败,由于

IllegalArgumentException: '不支持的类文件主要版本 55'

我在 Java 11 (jdk-11.0.2) 上运行 Spark,并检查Spark 2.4 概览,我想知道 Spark 2.4 是否支持 Java 11?

谢谢

java apache-spark pyspark

11
推荐指数
0
解决办法
9859
查看次数

Spark如何获取Scala中两个JSONS中更改的键数?

我有两个数据框,我正试图找出它们之间的区别。2个数据帧包含struct数组。我不需要该结构中的1个键。因此,我首先将其删除,然后转换为JSON字符串。比较时,我需要知道该数组(Json)中更改了多少个元素。有办法做到这一点吗?

双方base_data_settarget_data_set包含IDKEYKEY是一个array<Struct>

root
 |-- id: string (nullable = true)
 |-- result: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: integer (nullable = true)
 |    |    |-- key3: string (nullable = false)
 |    |    |-- key2: string (nullable = true)
 |    |    |-- key4: string (nullable = true)

val temp_base = base_data_set
    .withColumn("base_result", explode(base_data_set(RESULT)))
    .withColumn("base",
        struct($"base_result.key1", $"base_result.key2", $"base_result.key3"))
    .groupBy(ID)
    .agg(to_json(collect_list("base")).as("base_picks"))

val temp_target …
Run Code Online (Sandbox Code Playgroud)

json scala apache-spark apache-spark-sql

10
推荐指数
1
解决办法
193
查看次数

如何将 https://jitpack.io 库添加到 android studio 版本大黄蜂?

在android studio的最新版本中,我们可以轻松地将库添加https://jitpack.iogradle文件(模块项目)中,但在最新的更新(android-studio-bumble-bee)中,该文件的结构已更改,我不知道如何添加该库。

gradle kotlin android-studio-bumblebee

10
推荐指数
2
解决办法
6149
查看次数

使用Spark collectionAccumulator时出现ConcurrentModificationException

我正在尝试在Azure HDInsight按需群集上运行基于Spark的应用程序,并且看到许多SparkExceptions(由ConcurrentModificationExceptions引起)被记录.启动本地Spark实例时,应用程序运行时没有这些错误.

在使用累加器时看到了类似错误的报告,而我的代码确实使用了CollectionAccumulator,但是我已经在我使用它的地方放置了同步块,并且没有任何区别.与累加器相关的代码如下所示:

class MySparkClass(sc : SparkContext) {
    val myAccumulator = sc.collectionAccumulator[MyRecord]

    override def add(record: MyRecord) = {
        synchronized {
            myAccumulator.add(record)
        }
    }

    override def endOfBatch() = {
        synchronized {
            myAccumulator.value.asScala.foreach((record: MyRecord) => {
                processIt(record)
            })
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

异常不会导致应用程序失败,但是当endOfBatch调用并且代码尝试从累加器中读取值时,它是空的并且processIt永远不会被调用.

我们使用HDInsight版本3.6和Spark版本2.3.0

18/11/26 11:04:37 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814) …
Run Code Online (Sandbox Code Playgroud)

scala azure hdinsight apache-spark

8
推荐指数
1
解决办法
806
查看次数

无法执行用户定义的函数(VectorAssembler

我正在使用 Kmeans 作为聚类算法,我的代码想要执行并向我显示此错误:

org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$1525/671078904: (struct<latitude:double,longitude:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
Run Code Online (Sandbox Code Playgroud)

这是数据框代码:

org.apache.spark.SparkException: Failed to execute user defined function(VectorAssembler$$Lambda$1525/671078904: (struct<latitude:double,longitude:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
Run Code Online (Sandbox Code Playgroud)

对于架构它是有效的,但如果我把节目放出来,我就会遇到问题。

scala apache-spark apache-spark-ml

6
推荐指数
2
解决办法
5904
查看次数

IllegalArgumentException:列的类型必须是 struct&lt;type:tinyint,size:int,indices:array&lt;int&gt;,values:array&lt;double&gt;&gt; 但实际上是 double。

我有一个包含多个分类列的数据框。我正在尝试使用两列之间的内置函数查找卡方统计信息:

from pyspark.ml.stat import ChiSquareTest

r = ChiSquareTest.test(df, 'feature1', 'feature2')
Run Code Online (Sandbox Code Playgroud)

但是,它给了我错误:

IllegalArgumentException: 'requirement failed: Column feature1 must be of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>> but was actually double.'
Run Code Online (Sandbox Code Playgroud)

的数据类型feature1是:

feature1: double (nullable = true)
Run Code Online (Sandbox Code Playgroud)

你能帮我解决这个问题吗?

apache-spark pyspark apache-spark-ml

6
推荐指数
2
解决办法
8264
查看次数

spark中的高效对称计算

我在包含对称性的算法中看到的一个常见结构是

for (int i = 0; i < n ; i++) {
    for (int j = i+1; j < n ; j++) {
        [compute x]
        objects[i][j] += x;
        objects[j][i] -= x;
    }
}
Run Code Online (Sandbox Code Playgroud)

这(虽然仍然具有 O(n^2) 复杂性)减少了利用对称性所需的计算量。您能告诉我在 pyspark 代码中引入这种优化的方法是什么吗?

例如,我编写了代码,根据公式(其中r是位置)计算作用在系统中每个粒子上的每单位质量的力:

         N    m_j*(r_i - r_j)
F = -G * ?   -----------------
        i!=j   |r_i - r_j|^3
Run Code Online (Sandbox Code Playgroud)

在其中,我首先对我的数据帧与自身进行叉积以获得每个成对的相互作用,然后通过 id 将它们全部聚合以获得作用在每个粒子上的总力:

def calc_F(df_clust, G=1):

    # cartesian product of the dataframe with itself
    renameCols = [f"`{col}` as `{col}_other`" for col in df_clust.columns]
    df_cart = …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

6
推荐指数
1
解决办法
221
查看次数

Spark UI 显示格式错误(CSS 损坏)

我是第一次使用 Apache Spark。我运行我的应用程序,当我访问时,localhost:4040我得到了图片中显示的内容。我发现也许设置

spark.ui.enabled true 
Run Code Online (Sandbox Code Playgroud)

可以提供帮助,但我不知道该怎么做。

用户界面视图

提前致谢。

css user-interface localhost apache-spark google-cloud-dataproc

5
推荐指数
2
解决办法
2152
查看次数

在Spark ML中,为什么在具有百万个不同值的列上拟合StringIndexer会产生OOM错误?

我正在尝试在具有约15.000.000唯一字符串值的列上使用Spark的StringIndexer功能转换器。无论我投入多少资源,Spark都会因内存不足异常而死在我身上。

from pyspark.ml.feature import StringIndexer

data = spark.read.parquet("s3://example/data-raw").select("user", "count")

user_indexer = StringIndexer(inputCol="user", outputCol="user_idx")

indexer_model = user_indexer.fit(data) # This never finishes

indexer_model \
    .transform(data) \
    .write.parquet("s3://example/data-indexed")
Run Code Online (Sandbox Code Playgroud)

驱动程序上会生成一个错误文件,其开头如下所示:

#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 268435456 bytes for committing reserved memory.
# Possible reasons:
#   The system is out of physical RAM or swap space
#   In 32 bit mode, the process size limit was hit …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark apache-spark-ml

5
推荐指数
1
解决办法
345
查看次数

在Spark SQL中“展平” DataFrame时出现Spark AnalysisException

我正在使用此处给出的方法在Spark SQL中展平一个DataFrame。这是我的代码:

package com.acme.etl.xml

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.{Column, SparkSession}

object RuntimeError {   def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("FlattenSchema").getOrCreate()
    val rowTag = "idocData"
    val dataFrameReader =
        spark.read
          .option("rowTag", rowTag)
    val xmlUri = "bad_011_1.xml"
    val df =
        dataFrameReader
          .format("xml")
          .load(xmlUri)
    val schema: StructType = df.schema
    val columns: Array[Column] = flattenSchema(schema)
    val df2 = df.select(columns: _*)

  }

  def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = {
    schema.fields.flatMap(f => {
      val colName: String = if (prefix …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql

5
推荐指数
1
解决办法
351
查看次数