标签: apache-spark

使用org.apache.arrow进行sbt-assembly重复数据删除错误

我正在使用sbt 1.2.8和sbt-assembly 0.14.9。我正在尝试为使用Spark + Akka + gRPC的项目构建胖JAR。一开始我有很多重复数据删除错误;我设法解决了除1之外的所有问题,而且我几个小时都找不到解决该问题的方法。

这是我收到的错误消息sbt assembly

[error] (assembly) deduplicate: different file contents found in the following:
[error] /Users/samedduzcay/.ivy2/cache/org.apache.arrow/arrow-vector/jars/arrow-vector-0.10.0.jar:git.properties
[error] /Users/samedduzcay/.ivy2/cache/org.apache.arrow/arrow-format/jars/arrow-format-0.10.0.jar:git.properties
[error] /Users/samedduzcay/.ivy2/cache/org.apache.arrow/arrow-memory/jars/arrow-memory-0.10.0.jar:git.properties
Run Code Online (Sandbox Code Playgroud)

这是我的build.sbt

import sbt.Keys._
import sbtassembly.AssemblyPlugin.autoImport.PathList

name := "xxx"

version := "1.0"

lazy val sv = "2.11.12"
scalaVersion := sv

lazy val akkaVersion = "2.5.19"
lazy val sparkVersion = "2.4.0"

enablePlugins(AkkaGrpcPlugin)
enablePlugins(JavaAgent)
javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.9" % "runtime;test"

test in assembly := {}
logLevel in assembly := Level.Debug

lazy …
Run Code Online (Sandbox Code Playgroud)

scala sbt sbt-assembly apache-spark

1
推荐指数
1
解决办法
501
查看次数

从PySpark阵列列中删除重复项

我有一个PySpark数据框,其中包含一ArrayType(StringType())列。该列包含数组中需要删除的重复字符串。例如,一行条目可能看起来像[milk, bread, milk, toast]。假设我的数据框已命名df,我的列已命名arraycol。我需要类似的东西:

df = df.withColumn("arraycol_without_dupes", F.remove_dupes_from_array("arraycol"))
Run Code Online (Sandbox Code Playgroud)

我的直觉是对此有一个简单的解决方案,但是在浏览stackoverflow 15分钟后,我发现没有比分解该列,删除整个数据帧上的重复项然后再进行分组更好的了。目前已经得到了成为一个更简单的方法,我只是没想到吧?

我正在使用Spark版本'2.3.1'。

python apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
1370
查看次数

从Spark中的元组Dateset删除重复项

我在删除元组数据集中的重复行时遇到麻烦 Dataset[(LeftDs, RightDs)]

试图像这样连接两个数据集:

val comparableDs = leftDs.joinWith(
   rightDs,
   fuzzyMatch(leftDs.col("name"), rightDs.col("officialName"))
)
Run Code Online (Sandbox Code Playgroud)

我想删除两个字段的重复项:

val resultDs = comparableDs.dropDuplicates("_1.name", "_2.officialName")
Run Code Online (Sandbox Code Playgroud)

但是得到这个错误: Cannot resolve column name "_1.name" among (_1, _2);

这是以下内容的架构comparableDs

root
 |-- _1: struct (nullable = false)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |-- _2: struct (nullable = false)
 |    |-- id: string (nullable = true)
 |    |-- category: string (nullable = true)
 |    |-- officialName: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

如何dropDuplicates …

scala tuples dataset apache-spark

1
推荐指数
1
解决办法
121
查看次数

在"if-else结构"之外找不到"if-else结构"中定义的值

在下面的代码中,我希望编译器识别outputif部分或else部分中定义的get.

val df1 = spark.createDataFrame(Seq(
  (1, 10),
  (2, 20)
)).toDF("A", "B")

val df2 = spark.emptyDataFrame

if(df2.isEmpty){
    val output = df1
}
else{
    val output = df2
}

println(output.show)
Run Code Online (Sandbox Code Playgroud)

但是,它给我一个错误说 error: not found: value output.如果我在python中执行相同的实现,它工作正常,我得到预期的输出.为了使用scala使这个工作在spark中我已经定义output为一个可变变量并在其中更新它if-else.

var output = spark.emptyDataFrame

if(df2.isEmpty){
    output = df1
}
else{
    output = df2
}

println(output.show)
Run Code Online (Sandbox Code Playgroud)

为什么第一个实现不起作用,是否有办法在不使用可变变量的情况下获得预期结果?

scala apache-spark

1
推荐指数
1
解决办法
43
查看次数

使用Spark独立集群如何在工作节点上管理多个执行者?

到目前为止,我只在YARN作为资源管理器的Hadoop集群上使用了Spark。在这种类型的集群中,我确切地知道要运行多少个执行程序以及资源管理的工作方式。但是,知道我正在尝试使用独立Spark集群,我有些困惑。纠正我在哪里我错了。

本文中,默认情况下,辅助节点使用该节点的所有内存减去1 GB。但我知道通过使用SPARK_WORKER_MEMORY,我们可以使用更少的内存。例如,如果节点的总内存为32 GB,但我指定了16 GB,那么Spark worker是否在该节点上使用的内存不会超过16 GB?

但是执行者呢?让我们说如果我要在每个节点上运行2个执行程序,是否可以通过将期间的执行程序内存指定spark-submit为的一半来执行此操作SPARK_WORKER_MEMORY,是否要在每个节点上运行4个执行程序,通过将执行程序内存指定为四分之一的值来执行此操作SPARK_WORKER_MEMORY

如果是这样的话,我认为,除了执行程序内存外,我还必须正确指定执行程序核心。例如,如果我要在一个工人上运行4个执行程序,则必须将执行程序核心指定为SPARK_WORKER_CORES?的四分之一。如果我指定一个更大的数字会怎样?我的意思是,如果我将执行程序的内存指定为内存的四分之一SPARK_WORKER_MEMORY,但是执行程序的核心仅是内存的一半SPARK_WORKER_CORES?在这种情况下,我将让2或4个执行程序在该节点上运行吗?

hadoop scala cluster-computing apache-spark apache-spark-standalone

1
推荐指数
1
解决办法
547
查看次数

将功能应用于Spark DataFrame中的所有单元格

我正在尝试将一些熊猫代码转换为Spark以进行缩放。myfunc是复杂API的包装,该API接受一个字符串并返回一个新字符串(这意味着我不能使用向量化函数)。

def myfunc(ds):
    for attribute, value in ds.items():
        value = api_function(attribute, value)
        ds[attribute] = value
    return ds

df = df.apply(myfunc, axis='columns')
Run Code Online (Sandbox Code Playgroud)

myfunc接收一个DataSeries,将其分解为单个单元格,为每个单元格调用API,并使用相同的列名构建一个新的DataSeries。这有效地修改了DataFrame中的所有单元格。

我是Spark的新手,我想使用来翻译此逻辑pyspark。我已经将熊猫DataFrame转换为Spark:

spark = SparkSession.builder.appName('My app').getOrCreate()
spark_schema = StructType([StructField(c, StringType(), True) for c in df.columns])
spark_df = spark.createDataFrame(df, schema=spark_schema)
Run Code Online (Sandbox Code Playgroud)

这是我迷路的地方。我需要UDF一个pandas_udf吗?如何遍历所有单元格,并为每个单元格返回一个新字符串myfuncspark_df.foreach()不返回任何东西,也没有map()函数。

我可以修改myfuncDataSeries- > DataSeriesstring> - string如果需要的话。

python pandas apache-spark apache-spark-sql pyspark

1
推荐指数
2
解决办法
3251
查看次数

使用Google CloudDataproc时是否仍需要微调spark配置参数?

详细说明:

  • 通常,在编写spark作业时,需要为不同的spark配置指定特定值,以便以最佳方式使用群集资源.我们可以在初始化SparkSession时以编程方式执行此操作:

    SparkSession.builder .appName(SPARK_APP_NAME).config("spark.executor.memory","1G")

  • 我想知道的是:使用Cloud Dataproc时我们还需要这样做吗?实际上,在创建Dataproc集群时,会初始化一个名为的属性文件cluster.properies并包含类似的值spark\:spark.executor.memory=2688m.所以,我想知道Dataproc是否会根据群集资源自动填充这些值,在这种情况下,我们不必手动/编程调整这些火花配置?

apache-spark google-cloud-dataproc

1
推荐指数
1
解决办法
342
查看次数

在Json中收集DataFrame列的数据

我有一个DataFrame,其中两列为"key":id1id2:

val df1 = Seq(
  (1, 11, "n1", "d1"),
  (1, 22, "n2", "d2"),
  (2, 11, "n3", "d3"),
  (2, 11, "n4", "d4")
).toDF("id1", "id2", "number", "data")

scala> df1.show
+---+---+------+----+
|id1|id2|number|data|
+---+---+------+----+
|  1| 11|    n1|  d1|
|  1| 22|    n2|  d2|
|  2| 11|    n3|  d3|
|  2| 11|    n4|  d4|
+---+---+------+----+
Run Code Online (Sandbox Code Playgroud)

我想得到Json,按数据框的键分组,如下所示:

+---+---+-------+----------------------------------------------------------+
|id1|id2| json                                                             |
+---+---+-------+----------------------------------------------------------+
|  1| 11|[{"number" : "n1", "data": "d1"}]                                 |
|  1| 22|[{"number" : "n2", "data": "d2"}]                                 |
|  2| 11|[{"number" …
Run Code Online (Sandbox Code Playgroud)

json scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
50
查看次数

对Spark Dataframe的val vs def性能

下面的代码,因此有关性能的问题 - 当然是大规模的想象:

import org.apache.spark.sql.types.StructType

val df = sc.parallelize(Seq(
   ("r1", 1, 1),
   ("r2", 6, 4),
   ("r3", 4, 1),
   ("r4", 1, 2)
   )).toDF("ID", "a", "b")

val ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)

// or

def ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)

df.withColumn("ones", ones).explain
Run Code Online (Sandbox Code Playgroud)

这里有两个使用def和val的物理计划 - 它们是相同的:

 == Physical Plan == **def**
 *(1) Project [_1#760 AS ID#764, _2#761 AS a#765, _3#762 AS b#766, (CASE WHEN (_2#761 = 1) THEN 1 …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

1
推荐指数
1
解决办法
260
查看次数

使用pyspark连接数据框的多列

假设我有一个列列表,例如:

col_list = ['col1','col2']
df = spark.read.json(path_to_file)
print(df.columns)
# ['col1','col2','col3']
Run Code Online (Sandbox Code Playgroud)

我需要通过串联col1和来创建一个新列col2。我不想在连接时对列名进行硬编码,但需要从列表中选择。

我怎样才能做到这一点?

apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
1606
查看次数