我正在使用sbt 1.2.8和sbt-assembly 0.14.9。我正在尝试为使用Spark + Akka + gRPC的项目构建胖JAR。一开始我有很多重复数据删除错误;我设法解决了除1之外的所有问题,而且我几个小时都找不到解决该问题的方法。
这是我收到的错误消息sbt assembly:
[error] (assembly) deduplicate: different file contents found in the following:
[error] /Users/samedduzcay/.ivy2/cache/org.apache.arrow/arrow-vector/jars/arrow-vector-0.10.0.jar:git.properties
[error] /Users/samedduzcay/.ivy2/cache/org.apache.arrow/arrow-format/jars/arrow-format-0.10.0.jar:git.properties
[error] /Users/samedduzcay/.ivy2/cache/org.apache.arrow/arrow-memory/jars/arrow-memory-0.10.0.jar:git.properties
Run Code Online (Sandbox Code Playgroud)
这是我的build.sbt:
import sbt.Keys._
import sbtassembly.AssemblyPlugin.autoImport.PathList
name := "xxx"
version := "1.0"
lazy val sv = "2.11.12"
scalaVersion := sv
lazy val akkaVersion = "2.5.19"
lazy val sparkVersion = "2.4.0"
enablePlugins(AkkaGrpcPlugin)
enablePlugins(JavaAgent)
javaAgents += "org.mortbay.jetty.alpn" % "jetty-alpn-agent" % "2.0.9" % "runtime;test"
test in assembly := {}
logLevel in assembly := Level.Debug
lazy …Run Code Online (Sandbox Code Playgroud) 我有一个PySpark数据框,其中包含一ArrayType(StringType())列。该列包含数组中需要删除的重复字符串。例如,一行条目可能看起来像[milk, bread, milk, toast]。假设我的数据框已命名df,我的列已命名arraycol。我需要类似的东西:
df = df.withColumn("arraycol_without_dupes", F.remove_dupes_from_array("arraycol"))
Run Code Online (Sandbox Code Playgroud)
我的直觉是对此有一个简单的解决方案,但是在浏览stackoverflow 15分钟后,我发现没有比分解该列,删除整个数据帧上的重复项然后再进行分组更好的了。目前已经得到了成为一个更简单的方法,我只是没想到吧?
我正在使用Spark版本'2.3.1'。
我在删除元组数据集中的重复行时遇到麻烦 Dataset[(LeftDs, RightDs)]
试图像这样连接两个数据集:
val comparableDs = leftDs.joinWith(
rightDs,
fuzzyMatch(leftDs.col("name"), rightDs.col("officialName"))
)
Run Code Online (Sandbox Code Playgroud)
我想删除两个字段的重复项:
val resultDs = comparableDs.dropDuplicates("_1.name", "_2.officialName")
Run Code Online (Sandbox Code Playgroud)
但是得到这个错误:
Cannot resolve column name "_1.name" among (_1, _2);
这是以下内容的架构comparableDs:
root
|-- _1: struct (nullable = false)
| |-- id: string (nullable = true)
| |-- name: string (nullable = true)
|-- _2: struct (nullable = false)
| |-- id: string (nullable = true)
| |-- category: string (nullable = true)
| |-- officialName: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
如何dropDuplicates …
在下面的代码中,我希望编译器识别output在if部分或else部分中定义的get.
val df1 = spark.createDataFrame(Seq(
(1, 10),
(2, 20)
)).toDF("A", "B")
val df2 = spark.emptyDataFrame
if(df2.isEmpty){
val output = df1
}
else{
val output = df2
}
println(output.show)
Run Code Online (Sandbox Code Playgroud)
但是,它给我一个错误说 error: not found: value output.如果我在python中执行相同的实现,它工作正常,我得到预期的输出.为了使用scala使这个工作在spark中我已经定义output为一个可变变量并在其中更新它if-else.
var output = spark.emptyDataFrame
if(df2.isEmpty){
output = df1
}
else{
output = df2
}
println(output.show)
Run Code Online (Sandbox Code Playgroud)
为什么第一个实现不起作用,是否有办法在不使用可变变量的情况下获得预期结果?
到目前为止,我只在YARN作为资源管理器的Hadoop集群上使用了Spark。在这种类型的集群中,我确切地知道要运行多少个执行程序以及资源管理的工作方式。但是,知道我正在尝试使用独立Spark集群,我有些困惑。纠正我在哪里我错了。
在本文中,默认情况下,辅助节点使用该节点的所有内存减去1 GB。但我知道通过使用SPARK_WORKER_MEMORY,我们可以使用更少的内存。例如,如果节点的总内存为32 GB,但我指定了16 GB,那么Spark worker是否在该节点上使用的内存不会超过16 GB?
但是执行者呢?让我们说如果我要在每个节点上运行2个执行程序,是否可以通过将期间的执行程序内存指定spark-submit为的一半来执行此操作SPARK_WORKER_MEMORY,是否要在每个节点上运行4个执行程序,通过将执行程序内存指定为四分之一的值来执行此操作SPARK_WORKER_MEMORY?
如果是这样的话,我认为,除了执行程序内存外,我还必须正确指定执行程序核心。例如,如果我要在一个工人上运行4个执行程序,则必须将执行程序核心指定为SPARK_WORKER_CORES?的四分之一。如果我指定一个更大的数字会怎样?我的意思是,如果我将执行程序的内存指定为内存的四分之一SPARK_WORKER_MEMORY,但是执行程序的核心仅是内存的一半SPARK_WORKER_CORES?在这种情况下,我将让2或4个执行程序在该节点上运行吗?
hadoop scala cluster-computing apache-spark apache-spark-standalone
我正在尝试将一些熊猫代码转换为Spark以进行缩放。myfunc是复杂API的包装,该API接受一个字符串并返回一个新字符串(这意味着我不能使用向量化函数)。
def myfunc(ds):
for attribute, value in ds.items():
value = api_function(attribute, value)
ds[attribute] = value
return ds
df = df.apply(myfunc, axis='columns')
Run Code Online (Sandbox Code Playgroud)
myfunc接收一个DataSeries,将其分解为单个单元格,为每个单元格调用API,并使用相同的列名构建一个新的DataSeries。这有效地修改了DataFrame中的所有单元格。
我是Spark的新手,我想使用来翻译此逻辑pyspark。我已经将熊猫DataFrame转换为Spark:
spark = SparkSession.builder.appName('My app').getOrCreate()
spark_schema = StructType([StructField(c, StringType(), True) for c in df.columns])
spark_df = spark.createDataFrame(df, schema=spark_schema)
Run Code Online (Sandbox Code Playgroud)
这是我迷路的地方。我需要UDF一个pandas_udf吗?如何遍历所有单元格,并为每个单元格返回一个新字符串myfunc?spark_df.foreach()不返回任何东西,也没有map()函数。
我可以修改myfunc从DataSeries- > DataSeries到string> - string如果需要的话。
详细说明:
通常,在编写spark作业时,需要为不同的spark配置指定特定值,以便以最佳方式使用群集资源.我们可以在初始化SparkSession时以编程方式执行此操作:
SparkSession.builder .appName(SPARK_APP_NAME).config("spark.executor.memory","1G")
我想知道的是:使用Cloud Dataproc时我们还需要这样做吗?实际上,在创建Dataproc集群时,会初始化一个名为的属性文件cluster.properies并包含类似的值spark\:spark.executor.memory=2688m.所以,我想知道Dataproc是否会根据群集资源自动填充这些值,在这种情况下,我们不必手动/编程调整这些火花配置?
我有一个DataFrame,其中两列为"key":id1和id2:
val df1 = Seq(
(1, 11, "n1", "d1"),
(1, 22, "n2", "d2"),
(2, 11, "n3", "d3"),
(2, 11, "n4", "d4")
).toDF("id1", "id2", "number", "data")
scala> df1.show
+---+---+------+----+
|id1|id2|number|data|
+---+---+------+----+
| 1| 11| n1| d1|
| 1| 22| n2| d2|
| 2| 11| n3| d3|
| 2| 11| n4| d4|
+---+---+------+----+
Run Code Online (Sandbox Code Playgroud)
我想得到Json,按数据框的键分组,如下所示:
+---+---+-------+----------------------------------------------------------+
|id1|id2| json |
+---+---+-------+----------------------------------------------------------+
| 1| 11|[{"number" : "n1", "data": "d1"}] |
| 1| 22|[{"number" : "n2", "data": "d2"}] |
| 2| 11|[{"number" …Run Code Online (Sandbox Code Playgroud) 下面的代码,因此有关性能的问题 - 当然是大规模的想象:
import org.apache.spark.sql.types.StructType
val df = sc.parallelize(Seq(
("r1", 1, 1),
("r2", 6, 4),
("r3", 4, 1),
("r4", 1, 2)
)).toDF("ID", "a", "b")
val ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)
// or
def ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)
df.withColumn("ones", ones).explain
Run Code Online (Sandbox Code Playgroud)
这里有两个使用def和val的物理计划 - 它们是相同的:
== Physical Plan == **def**
*(1) Project [_1#760 AS ID#764, _2#761 AS a#765, _3#762 AS b#766, (CASE WHEN (_2#761 = 1) THEN 1 …Run Code Online (Sandbox Code Playgroud) 假设我有一个列列表,例如:
col_list = ['col1','col2']
df = spark.read.json(path_to_file)
print(df.columns)
# ['col1','col2','col3']
Run Code Online (Sandbox Code Playgroud)
我需要通过串联col1和来创建一个新列col2。我不想在连接时对列名进行硬编码,但需要从列表中选择。
我怎样才能做到这一点?