我想停止火花壳上的各种消息.
我试图编辑该log4j.properties文件以阻止这些消息.
这是内容 log4j.properties
# Define the root logger with appender file
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
Run Code Online (Sandbox Code Playgroud)
但是消息仍在控制台上显示.
以下是一些示例消息
15/01/05 15:11:45 INFO SparkEnv: Registering BlockManagerMaster
15/01/05 15:11:45 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150105151145-b1ba
15/01/05 15:11:45 INFO MemoryStore: MemoryStore started with capacity 0.0 B.
15/01/05 15:11:45 INFO ConnectionManager: Bound socket to port 44728 with id = ConnectionManagerId(192.168.100.85,44728)
15/01/05 …Run Code Online (Sandbox Code Playgroud) 我更喜欢Python而不是Scala.但是,由于Spark本身是用Scala编写的,因此我希望我的代码在Scala中的运行速度比Python版本快,原因很明显.
有了这个假设,我想学习和编写一些非常常见的预处理代码的Scala版本,用于1 GB的数据.数据来自Kaggle的SpringLeaf比赛.只是为了概述数据(它包含1936个维度和145232行).数据由各种类型组成,例如int,float,string,boolean.我使用8个核心中的6个用于Spark处理; 这就是我使用的原因minPartitions=6,每个核心都有一些东西需要处理.
Scala代码
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = { …Run Code Online (Sandbox Code Playgroud) 当从文本文件或集合(或从另一个RDD)创建弹性分布式数据集(RDD)时,我们是否需要显式调用"cache"或"persist"来将RDD数据存储到内存中?或者默认情况下RDD数据是以分布式方式存储在内存中的吗?
val textFile = sc.textFile("/user/emp.txt")
Run Code Online (Sandbox Code Playgroud)
根据我的理解,在上面的步骤之后,textFile是一个RDD,并且可以在节点的所有/部分内存中使用.
如果是这样,为什么我们需要在textFile RDD上调用"cache"或"persist"呢?
我来自pandas背景,习惯于将CSV文件中的数据读入数据帧,然后使用简单命令将列名更改为有用的东西:
df.columns = new_column_name_list
Run Code Online (Sandbox Code Playgroud)
但是,在使用sqlContext创建的pyspark数据帧中,这同样不起作用.我可以轻松解决的唯一解决方案如下:
df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', inferschema='true', delimiter='\t').load("data.txt")
oldSchema = df.schema
for i,k in enumerate(oldSchema.fields):
k.name = new_column_name_list[i]
df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', delimiter='\t').load("data.txt", schema=oldSchema)
Run Code Online (Sandbox Code Playgroud)
这基本上是定义变量两次并首先推断模式然后重命名列名,然后再次使用更新的模式加载数据帧.
像熊猫一样,有更好更有效的方法吗?
我的火花版是1.5.0
假设我做的事情如下:
val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()
root
|-- year: string (nullable = true)
|-- make: string (nullable = true)
|-- model: string (nullable = true)
|-- comment: string (nullable = true)
|-- blank: string (nullable = true)
df.show()
year make model comment blank
2012 Tesla S No comment
1997 Ford E350 Go get one now th...
Run Code Online (Sandbox Code Playgroud)
但我真的想要yearas Int(并且可能会转换其他一些列).
我能想到的最好的是
df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, …Run Code Online (Sandbox Code Playgroud) 真的...已经讨论了很多.
然而,有很多歧义和一些答案提供...包括在jar/executor /驱动程序配置或选项中复制jar引用.
对于每个选项,应澄清含糊不清,不清楚和/或省略的细节:
--jarsSparkContext.addJar(...) 方法SparkContext.addFile(...) 方法 --conf spark.driver.extraClassPath=... 要么 --driver-class-path ...--conf spark.driver.extraLibraryPath=..., 要么 --driver-library-path ...--conf spark.executor.extraClassPath=...--conf spark.executor.extraLibraryPath=...我知道在哪里可以找到主要的spark文档,特别是关于如何提交,可用的选项以及JavaDoc.然而,这对我来说仍然有一些漏洞,尽管它也有部分回答.
我希望它不是那么复杂,有人可以给我一个清晰简洁的答案.
如果我从文档中猜测,似乎--jars和SparkContext addJar,addFile方法是自动分发文件的方法,而其他选项只是修改ClassPath.
假设为简单起见,我可以安全地使用3个主要选项同时添加其他应用程序jar文件:
spark-submit --jar additional1.jar,additional2.jar \
--driver-library-path additional1.jar:additional2.jar \
--conf spark.executor.extraLibraryPath=additional1.jar:additional2.jar \
--class MyClass main-application.jar
Run Code Online (Sandbox Code Playgroud)
找到一篇关于另一篇文章答案的好文章.然而没有什么新学到的 海报确实很好地评论了本地驱动程序(纱线客户端)和远程驱动程序(纱线群集)之间的区别.记住这一点非常重要.
根据Spark数据集介绍:
正如我们期待着星火2.0,我们计划将数据集的一些令人兴奋的改进,具体包括:...自定义编码器 - 而我们对各种各样的类型目前自动生成编码器,我们想开了一个API,用于自定义对象.
并尝试将自定义类型存储为Dataset导致以下错误:
无法找到存储在数据集中的类型的编码器.导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类).将来版本中将添加对序列化其他类型的支持
要么:
Java.lang.UnsupportedOperationException:找不到针对....的编码器
有没有现成的解决方法?
请注意,此问题仅作为社区Wiki答案的入口点存在.随意更新/改进问题和答案.
scala apache-spark apache-spark-dataset apache-spark-encoders
我使用AWS EC2指南安装了Spark,我可以使用bin/pyspark脚本启动程序以获得spark提示,也可以成功执行Quick Start quide.
但是,我不能为我的生活弄清楚如何INFO在每个命令后停止所有详细的日志记录.
我已经在我的log4j.properties文件中的几乎所有可能的场景中尝试了我的conf文件,在我从中启动应用程序的文件夹以及每个节点上,没有做任何事情.INFO执行每个语句后,我仍然会打印日志语句.
我对这应该如何工作非常困惑.
#Set everything to be logged to the console log4j.rootCategory=INFO, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
Run Code Online (Sandbox Code Playgroud)
这是我使用时的完整类路径SPARK_PRINT_LAUNCH_COMMAND:
Spark命令:/Library/Java/JavaVirtualMachines/jdk1.8.0_05.jdk/Contents/Home/bin/java -cp:/root/spark-1.0.1-bin-hadoop2/conf:/root/spark-1.0.1 -bin-hadoop2/CONF:/root/spark-1.0.1-bin-hadoop2/lib/spark-assembly-1.0.1-hadoop2.2.0.jar:/root/spark-1.0.1-bin-hadoop2/lib /datanucleus-api-jdo-3.2.1.jar:/root/spark-1.0.1-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/root/spark-1.0.1-bin-hadoop2 /lib/datanucleus-rdbms-3.2.1.jar -XX:MaxPermSize = 128m -Djava.library.path = -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit spark-shell --class org.apache.spark. repl.Main
内容spark-env.sh:
#!/usr/bin/env bash
# This file is sourced when …Run Code Online (Sandbox Code Playgroud) 如何将RDD(org.apache.spark.rdd.RDD[org.apache.spark.sql.Row])转换为Dataframe org.apache.spark.sql.DataFrame.我使用了将数据帧转换为rdd .rdd.处理完之后我想把它放回到数据帧中.我怎样才能做到这一点 ?
让我们假设以下每个时间点只运行一个Spark作业.
以下是我了解Spark中发生的事情:
SparkContext被创建,每个工作节点开始执行人.执行程序是单独的进程(JVM),它连接回驱动程序.每个执行程序都有驱动程序的jar.退出驱动程序,关闭执行程序.每个执行程序都可以保存一些分区.我明白那个
- 任务是通过序列化Function对象从驱动程序发送到执行程序的命令.
- 执行程序反序列化(使用驱动程序jar)命令(任务)并在分区上执行它.
但
如何将舞台分成这些任务?
特别:
在https://0x0fff.com/spark-architecture-shuffle中,随着图像解释了随机播放
我得到了规则的印象
每个阶段被分成#count-of-partitions任务,不考虑节点数量
对于我的第一张图片,我会说我有3个地图任务和3个减少任务.
对于来自0x0fff的图像,我会说有8个地图任务和3个减少任务(假设只有三个橙色和三个深绿色文件).
那是对的吗?但即使这是正确的,我上面的问题也没有全部回答,因为它仍然是开放的,无论多个操作(例如多个地图)是在一个任务内还是每个操作分成一个任务.
Spark的任务是什么?Spark工作人员如何执行jar文件?以及Apache Spark调度程序如何将文件拆分为任务?是相似的,但我觉得我的问题在那里得不到清楚.
apache-spark ×10
scala ×7
pyspark ×3
rdd ×3
python ×2
spark-submit ×2
hadoop ×1
jar ×1
java ×1
log4j ×1
performance ×1
pyspark-sql ×1