标签: apache-spark

如何停止在火花控制台上显示INFO消息?

我想停止火花壳上的各种消息.

我试图编辑该log4j.properties文件以阻止这些消息.

这是内容 log4j.properties

# Define the root logger with appender file
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
Run Code Online (Sandbox Code Playgroud)

但是消息仍在控制台上显示.

以下是一些示例消息

15/01/05 15:11:45 INFO SparkEnv: Registering BlockManagerMaster
15/01/05 15:11:45 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150105151145-b1ba
15/01/05 15:11:45 INFO MemoryStore: MemoryStore started with capacity 0.0 B.
15/01/05 15:11:45 INFO ConnectionManager: Bound socket to port 44728 with id = ConnectionManagerId(192.168.100.85,44728)
15/01/05 …
Run Code Online (Sandbox Code Playgroud)

log4j apache-spark spark-submit

168
推荐指数
12
解决办法
16万
查看次数

Scala与Python的Spark性能

我更喜欢Python而不是Scala.但是,由于Spark本身是用Scala编写的,因此我希望我的代码在Scala中的运行速度比Python版本快,原因很明显.

有了这个假设,我想学习和编写一些非常常见的预处理代码的Scala版本,用于1 GB的数据.数据来自Kaggle的SpringLeaf比赛.只是为了概述数据(它包含1936个维度和145232行).数据由各种类型组成,例如int,float,string,boolean.我使用8个核心中的6个用于Spark处理; 这就是我使用的原因minPartitions=6,每个核心都有一些东西需要处理.

Scala代码

val input = sc.textFile("train.csv", minPartitions=6)

val input2 = input.mapPartitionsWithIndex { (idx, iter) => 
  if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"

def separateCols(line: String): Array[String] = {
  val line2 = line.replaceAll("true", "1")
  val line3 = line2.replaceAll("false", "0")
  val vals: Array[String] = line3.split(",")

  for((x,i) <- vals.view.zipWithIndex) {
    vals(i) = "VAR_%04d".format(i) + delim1 + x
  }
  vals
}

val input3 = input2.flatMap(separateCols)

def toKeyVal(line: String): (String, String) = { …
Run Code Online (Sandbox Code Playgroud)

performance scala apache-spark rdd pyspark

166
推荐指数
1
解决办法
4万
查看次数

(为什么)我们需要在RDD上调用缓存或持久化

当从文本文件或集合(或从另一个RDD)创建弹性分布式数据集(RDD)时,我们是否需要显式调用"cache"或"persist"来将RDD数据存储到内存中?或者默认情况下RDD数据是以分布式方式存储在内存中的吗?

val textFile = sc.textFile("/user/emp.txt")
Run Code Online (Sandbox Code Playgroud)

根据我的理解,在上面的步骤之后,textFile是一个RDD,并且可以在节点的所有/部分内存中使用.

如果是这样,为什么我们需要在textFile RDD上调用"cache"或"persist"呢?

scala apache-spark rdd

161
推荐指数
5
解决办法
7万
查看次数

如何在pyspark中更改数据框列名?

我来自pandas背景,习惯于将CSV文件中的数据读入数据帧,然后使用简单命令将列名更改为有用的东西:

df.columns = new_column_name_list
Run Code Online (Sandbox Code Playgroud)

但是,在使用sqlContext创建的pyspark数据帧中,这同样不起作用.我可以轻松解决的唯一解决方案如下:

df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', inferschema='true', delimiter='\t').load("data.txt")
oldSchema = df.schema
for i,k in enumerate(oldSchema.fields):
  k.name = new_column_name_list[i]
df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', delimiter='\t').load("data.txt", schema=oldSchema)
Run Code Online (Sandbox Code Playgroud)

这基本上是定义变量两次并首先推断模式然后重命名列名,然后再次使用更新的模式加载数据帧.

像熊猫一样,有更好更有效的方法吗?

我的火花版是1.5.0

python apache-spark pyspark pyspark-sql

159
推荐指数
12
解决办法
22万
查看次数

如何在Spark SQL的DataFrame中更改列类型?

假设我做的事情如下:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment                
1997 Ford  E350  Go get one now th...  
Run Code Online (Sandbox Code Playgroud)

但我真的想要yearas Int(并且可能会转换其他一些列).

我能想到的最好的是

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

143
推荐指数
12
解决办法
32万
查看次数

将jar添加到Spark作业 - spark-submit

真的...已经讨论了很多.

然而,有很多歧义和一些答案提供...包括在jar/executor /驱动程序配置或选项中复制jar引用.

模糊和/或省略的细节

对于每个选项,应澄清含糊不清,不清楚和/或省略的细节:

  • ClassPath如何受到影响
    • 司机
    • 执行程序(用于运行的任务)
    • 一点也不
  • 分隔字符:逗号,冒号,分号
  • 如果提供的文件是自动分发的
    • 任务(每个执行者)
    • 用于远程驱动程序(如果以群集模式运行)
  • 接受的URI类型:本地文件,hdfs,http等
  • 如果复制公共位置,该位置是(hdfs,local?)

它影响的选项:

  1. --jars
  2. SparkContext.addJar(...) 方法
  3. SparkContext.addFile(...) 方法
  4. --conf spark.driver.extraClassPath=... 要么 --driver-class-path ...
  5. --conf spark.driver.extraLibraryPath=..., 要么 --driver-library-path ...
  6. --conf spark.executor.extraClassPath=...
  7. --conf spark.executor.extraLibraryPath=...
  8. 不要忘记,spark-submit的最后一个参数也是一个.jar文件.

我知道在哪里可以找到主要的spark文档,特别是关于如何提交,可用的选项以及JavaDoc.然而,这对我来说仍然有一些漏洞,尽管它也有部分回答.

我希望它不是那么复杂,有人可以给我一个清晰简洁的答案.

如果我从文档中猜测,似乎--jarsSparkContext addJar,addFile方法是自动分发文件的方法,而其他选项只是修改ClassPath.

假设为简单起见,我可以安全地使用3个主要选项同时添加其他应用程序jar文件:

spark-submit --jar additional1.jar,additional2.jar \
  --driver-library-path additional1.jar:additional2.jar \
  --conf spark.executor.extraLibraryPath=additional1.jar:additional2.jar \
  --class MyClass main-application.jar
Run Code Online (Sandbox Code Playgroud)

找到一篇关于一篇文章答案的好文章.然而没有什么新学到的 海报确实很好地评论了本地驱动程序(纱线客户端)和远程驱动程序(纱线群集)之间的区别.记住这一点非常重要.

java scala jar apache-spark spark-submit

139
推荐指数
2
解决办法
9万
查看次数

如何在Dataset中存储自定义对象?

根据Spark数据集介绍:

正如我们期待着星火2.0,我们计划将数据集的一些令人兴奋的改进,具体包括:...自定义编码器 - 而我们对各种各样的类型目前自动生成编码器,我们想开了一个API,用于自定义对象.

并尝试将自定义类型存储为Dataset导致以下错误:

无法找到存储在数据集中的类型的编码器.导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类).将来版本中将添加对序列化其他类型的支持

要么:

Java.lang.UnsupportedOperationException:找不到针对....的编码器

有没有现成的解决方法?


请注意,此问题仅作为社区Wiki答案的入口点存在.随意更新/改进问题和答案.

scala apache-spark apache-spark-dataset apache-spark-encoders

133
推荐指数
4
解决办法
6万
查看次数

如何在Spark中关闭INFO日志记录?

我使用AWS EC2指南安装了Spark,我可以使用bin/pyspark脚本启动程序以获得spark提示,也可以成功执行Quick Start quide.

但是,我不能为我的生活弄清楚如何INFO在每个命令后停止所有详细的日志记录.

我已经在我的log4j.properties文件中的几乎所有可能的场景中尝试了我的conf文件,在我从中启动应用程序的文件夹以及每个节点上,没有做任何事情.INFO执行每个语句后,我仍然会打印日志语句.

我对这应该如何工作非常困惑.

#Set everything to be logged to the console log4j.rootCategory=INFO, console                                                                        
log4j.appender.console=org.apache.log4j.ConsoleAppender 
log4j.appender.console.target=System.err     
log4j.appender.console.layout=org.apache.log4j.PatternLayout 
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
Run Code Online (Sandbox Code Playgroud)

这是我使用时的完整类路径SPARK_PRINT_LAUNCH_COMMAND:

Spark命令:/Library/Java/JavaVirtualMachines/jdk1.8.0_05.jdk/Contents/Home/bin/java -cp:/root/spark-1.0.1-bin-hadoop2/conf:/root/spark-1.0.1 -bin-hadoop2/CONF:/root/spark-1.0.1-bin-hadoop2/lib/spark-assembly-1.0.1-hadoop2.2.0.jar:/root/spark-1.0.1-bin-hadoop2/lib /datanucleus-api-jdo-3.2.1.jar:/root/spark-1.0.1-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/root/spark-1.0.1-bin-hadoop2 /lib/datanucleus-rdbms-3.2.1.jar -XX:MaxPermSize = 128m -Djava.library.path = -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark. repl.Main

内容spark-env.sh:

#!/usr/bin/env bash

# This file is sourced when …
Run Code Online (Sandbox Code Playgroud)

python hadoop scala apache-spark pyspark

128
推荐指数
12
解决办法
9万
查看次数

如何将rdd对象转换为spark中的dataframe

如何将RDD(org.apache.spark.rdd.RDD[org.apache.spark.sql.Row])转换为Dataframe org.apache.spark.sql.DataFrame.我使用了将数据帧转换为rdd .rdd.处理完之后我想把它放回到数据帧中.我怎样才能做到这一点 ?

scala apache-spark rdd apache-spark-sql

128
推荐指数
7
解决办法
24万
查看次数

如何将阶段拆分为Spark中的任务?

让我们假设以下每个时间点只运行一个Spark作业.

到目前为止我得到了什么

以下是我了解Spark中发生的事情:

  1. SparkContext被创建,每个工作节点开始执行人.执行程序是单独的进程(JVM),它连接回驱动程序.每个执行程序都有驱动程序的jar.退出驱动程序,关闭执行程序.每个执行程序都可以保存一些分区.
  2. 执行作业时,将根据沿袭图创建执行计划.
  3. 执行作业分为几个阶段,其中阶段包含尽可能多的邻近(在谱系图中)转换和动作,但没有随机播放.因此阶段通过混洗分开.

图片1

我明白那个

  • 任务是通过序列化Function对象从驱动程序发送到执行程序的命令.
  • 执行程序反序列化(使用驱动程序jar)命令(任务)并在分区上执行它.

问题(S)

如何将舞台分成这些任务?

特别:

  1. 是由变换和操作决定的任务还是可以在任务中进行多个转换/操作?
  2. 任务是由分区确定的(例如,每个分区每个阶段一个任务).
  3. 任务是由节点确定的(例如,每个节点每个阶段一个任务)?

我的想法(只有部分答案,即使是正确的)

https://0x0fff.com/spark-architecture-shuffle中,随着图像解释了随机播放

在此输入图像描述

我得到了规则的印象

每个阶段被分成#count-of-partitions任务,不考虑节点数量

对于我的第一张图片,我会说我有3个地图任务和3个减少任务.

对于来自0x0fff的图像,我会说有8个地图任务和3个减少任务(假设只有三个橙色和三个深绿色文件).

无论如何都要打开问题

那是对的吗?但即使这是正确的,我上面的问题也没有全部回答,因为它仍然是开放的,无论多个操作(例如多个地图)是在一个任务内还是每个操作分成一个任务.

别人怎么说

Spark的任务是什么?Spark工作人员如何执行jar文件?以及Apache Spark调度程序如何将文件拆分为任务?是相似的,但我觉得我的问题在那里得不到清楚.

apache-spark

125
推荐指数
3
解决办法
4万
查看次数