小编uh_*_*boi的帖子

为什么我的火花工作有这么多的任务?默认情况下获取200个任务

我有一个spark作业,它接收来自hdfs的8条记录的文件,做一个简单的聚合并将其保存回hdfs.当我这样做时,我注意到有数百个任务.

我也不确定为什么有这么多工作?我觉得工作更像是一个动作发生的时候.我可以推测为什么 - 但我的理解是,在这段代码中它应该是一个工作,它应该分解为阶段,而不是多个工作.为什么不把它分解成各个阶段,为什么它会闯入工作岗位?

至于200多个任务,由于数据量和节点数量微乎其微,当只有一个聚合和几个过滤器时,每行数据有25个任务是没有意义的.为什么每个原子操作每个分区只有一个任务?

这是相关的scala代码 -

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object TestProj {object TestProj {
  def main(args: Array[String]) {

    /* set the application name in the SparkConf object */
    val appConf = new SparkConf().setAppName("Test Proj")

    /* env settings that I don't need to set in REPL*/
    val sc = new SparkContext(appConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val rdd1 = sc.textFile("hdfs://node002:8020/flat_files/miscellaneous/ex.txt")

     /*the below rdd will have schema defined in Record class*/
     val rddCase = …
Run Code Online (Sandbox Code Playgroud)

hadoop scala task apache-spark apache-spark-sql

20
推荐指数
1
解决办法
9877
查看次数

无法导入类,IntelliJ 显示 BOOT-INF 前缀,似乎是相关的

这是使用 Java 和 Maven - 我试图从一个项目中导入一些类,我可以在我的机器上构建到本地 mvn 存储库,或者我可以从公司的外部 mvn 存储库下载它已经是一个打包的 jar。在查看“外部库”并展开有问题的库时,我在左侧“项目”窗格中查看 IntelliJ 时确实注意到,有问题的 jar 下的所有类都有一个“BOOT-INF.classes”前缀。如果有帮助,它也是一个 springboot 项目,尽管我能够从外部存储库导入所有 springboot 类和所有其他类就好了。

(在“外部库”下的左窗格中的 IntelliJ 项目视图内部)

Maven: org.springframework.boot:spring-boot-starter-test:2.0.0.RELEASE

Maven: com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.8.1

---jackson-core-2.8.1.jar

------com.faster.xml.jackson.core

------com.faster.xml.jackson.core.async

........(列出了更多包)

Maven: com.mycompany.my.project:component-two-1.0.0-SNAPSHOT

Maven: com.mycompany.my.project:component-three-1.0.0-SNAPSHOT

---com.mycompany.my.project:component-1.0.0-20181201.jar

------ BOOT-INF.classes

------ BOOT-INF.classes .com.mycompany.project.my.package.one

---------MyClassOne

---------MyClassTwo

------ BOOT-INF.classes .com.mycompany.project.my.package.one

------ BOOT-INF.classes .com.mycompany.project.my.package.one.alpha

------ BOOT-INF.classes .com.mycompany.project.my.package.one.bravo

java intellij-idea package maven spring-boot

12
推荐指数
1
解决办法
5534
查看次数

优化火花低挂水果,特别是催化剂优化和火花配置

我正在使用Spark 2.1.1,我正在使用Scala API,尽管语言不太重要.我有兴趣以有效的方式优化火花查询/管道.我已经阅读了很多材料(包括伟大的"学习星火"书,我对Spark网站,Jacek Laskowski的博客以及其他人非常熟悉,而且我已经和Spark一起工作了将近两年.

但是,有太多的信息和概念需要注意,而且我没有做足够的优化来了解它们.不幸的是,一旦一切工作100%,可能只需要几天甚至几小时才能交付代码.我需要优先考虑我可以应用的修复程序.我之前已经优化了工作火花代码,但我正在寻找最好的整体策略以及尝试熟悉最好的低挂水果.总有一天,我会记住所有要调整的旋钮,但至少现在有十个非常好的旋钮.我目前认为重要的一些事情是(不是按顺序排列,但前4个恰好是我认为最重要的)...

  1. 开发 - 通过重新分区数据集或从一个分区的配置单元表中检索来减少随机(交换).
  2. 策略 - 查看Spark UI以查看哪个作业和阶段占用时间最长,并且仔细观察该作业和阶段.
  3. 开发 - 尽可能在连接之前过滤数据集,以避免创建高基数"多对多"连接,并避免在连接期间发送更多数据.
  4. 配置 - 正确执行器和内存
  5. 开发 - 尽可能远离笛卡尔积和theta-join.
  6. 开发 - 如果可能,在创建UDF之前使用spark库函数.
  7. 开发 - 如果表足够小,请尝试强制进行广播散列连接.
  8. 策略 - 除非有特定原因(这意味着我从不使用RDD API),否则切勿使用RDD API代替数据集/数据帧.
  9. 开发 - 构建数据集过滤器,以便下推谓词可以与它们一起使用(制作更多,更简单的过滤器而不是多条件过滤器).
  10. 策略与开发 - 始终保持Spark源代码打开,以便更容易找到类型声明和其他代码实现.
  11. 我想念的东西......

对我来说最有趣的增强功能是那些通过查看查询计划或DAG可视化而显而易见的增强功能.此外,使火花用户/开发人员走向"啊哈!"的老生常谈 您可能愿意分享.免责声明:以上十件事对我来说并不完全是"前十名",比如使用火花库函数代替UDF并不是非常重要(当然不是至少十大),但我想帮助给出一个好的例子.提示可能看起来像某人.

scala apache-spark apache-spark-sql spark-dataframe apache-spark-2.0

8
推荐指数
0
解决办法
235
查看次数

Spark将数据拉入RDD或数据框或数据集

当火花通过驱动程序提取数据时,我试图用简单的术语,然后当spark不需要通过驱动程序提取数据时.

我有3个问题 -

  1. 让我们有一个存储在HDFS中的20 TB平面文件文件,并从驱动程序中将其拉入数据框或RDD,使用相应库中的一个开箱即用功能(sc.textfile(path)sc.textfile(path).toDF等).如果驱动程序只运行32 GB内存,它会导致驱动程序有OOM吗?或者至少对司机吉姆进行掉期交易?或者spark和hadoop是否足够聪明,可以将数据从HDFS分发到一个Spark执行器,以便在不通过驱动程序的情况下生成数据帧/ RDD?
  2. 除了外部RDBMS之外,完全相同的问题是1?
  3. 除了特定节点文件系统(只是Unix文件系统,20 TB文件但不是HDFS)之外,与1完全相同的问题?

hadoop apache-spark apache-spark-sql spark-dataframe data-ingestion

7
推荐指数
1
解决办法
1340
查看次数

Spark从配置单元中选择还是从文件中选择更好?

我只是想知道人们对于从Hive读取还是从.csv文件,.txt文件,.ORC文件或.parquet文件读取有什么想法。假设基础Hive表是具有相同文件格式的外部表,您想从Hive表中读取还是从基础文件本身中读取,为什么?

麦克风

hive flat-file apache-spark parquet spark-dataframe

7
推荐指数
1
解决办法
2020
查看次数

Winutils Spark Windows安装env_variable

我正在尝试在Windows 10上安装Spark 1.6.1,到目前为止,我已经完成了以下工作...

  1. 下载了spark 1.6.1,解压到某个目录,然后设置SPARK_HOME
  2. 下载scala 2.11.8,解压到某个目录,然后设置SCALA_HOME
  3. 设置_JAVA_OPTION环境变量
  4. 只需下载zip目录,然后设置HADOOP_HOME env变量,即可从https://github.com/steveloughran/winutils.git下载winutils 。(不确定这是否不正确,由于权限被拒绝,我无法克隆目录)。

当我火花回家并运行bin \ spark-shell时,我得到了

'C:\Program' is not recognized as an internal or external command, operable program or batch file.
Run Code Online (Sandbox Code Playgroud)

我必须缺少一些东西,无论如何我都看不到如何从Windows环境运行bash脚本。但是希望我不需要仅仅为了使这个工作而理解。我一直在关注这个人的教程-https: //hernandezpaul.wordpress.com/2016/01/24/apache-spark-installation-on-windows-10/。任何帮助,将不胜感激。

windows git scala apache-spark apache-spark-standalone

2
推荐指数
1
解决办法
1万
查看次数

从多线程驱动程序启动Apache Spark SQL作业

我想使用Spark从约1500个远程Oracle表中提取数据,并且我想要一个多线程应用程序,该应用程序每个线程选择一个表,或者每个线程选择10个表,并启动一个Spark作业以从各自的表中读取数据。

从官方Spark网站https://spark.apache.org/docs/latest/job-scheduling.html来看,很明显它可以工作...

...运行Spark的集群管理器为跨应用程序调度提供了便利。其次,在每个Spark应用程序中,如果多个“作业”(Spark操作)是由不同的线程提交的,则它们可能同时运行。如果您的应用程序通过网络处理请求,则这很常见。Spark包含一个公平的调度程序,用于调度每个SparkContext中的资源。

但是,您可能已经在Spark中的此类SO 并发作业执行中注意到,该相似问题没有被接受的答案,而最受支持的答案始于

这实际上不是Spark的精神

  1. 每个人都知道这不是Spark的“精神”
  2. 谁在乎Spark的精神是什么?这实际上没有任何意义

有人以前有这样的东西可以工作吗?你有什么特别的事吗?在我浪费大量工作时间进行原型设计之前,只想提供一些建议。我真的很感谢任何帮助!

java multithreading scala apache-spark apache-spark-2.0

2
推荐指数
2
解决办法
2307
查看次数

Scala中的递归DataType

嗨,我想知道是否有人可以解释我在Spark代码库中找到的这个签名.它看起来像一个递归数据类型,它用于构建查询计划,所以它有意义.有没有人对此有详细的了解?

abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product 
Run Code Online (Sandbox Code Playgroud)

types scala higher-kinded-types f-bounded-polymorphism

2
推荐指数
1
解决办法
96
查看次数

toDF()不处理RDD

我有一个名为RowRDD的行RDD.我只是想转换成DataFrame.从我在互联网上看到的各个地方的例子,我看到我正在尝试RowRDD.toDF()我得到错误:

value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

scala row apache-spark rdd apache-spark-sql

1
推荐指数
1
解决办法
2510
查看次数

用于理解的Scala模式匹配

在 Scala 中,您是否可以有一个 for 理解,它遍历对象列表,然后根据元素的一个属性的类型创建一个值数组?所以假设我有一个元素列表,每个元素都有一个属性,并且属性可以是不同的类型......

for (element <- elementList) element.attribute match {
 case a: Type1 => "Type1"
 case a => "All Types"
}
Run Code Online (Sandbox Code Playgroud)

然后生成的 Array 将是一个具有类似值的数组

Array("Type1", "Type1", "All Types", "Type1", "All Types", "All Types", "All Types", "All Types") 
Run Code Online (Sandbox Code Playgroud)

scala pattern-matching for-comprehension

1
推荐指数
1
解决办法
4230
查看次数

Encoders.product中的TypeTag是什么?

我使用Spark 2.1.1.

我从以下开始:

import org.apache.spark.sql.types._
val mySchema = StructType(
  StructField("id", IntegerType, true),
  StructField("code", StringType, false),
  StructField("value", DecimalType, false))
val myDS = Seq((1,"000010", 1.0), (2, "000020", 2.0)).as[mySchema]
Run Code Online (Sandbox Code Playgroud)

在这里,我看到mySchema不是一个类型,看了之后Encoders.scala我可以看到我需要通过这里传递一个Product的子类型

def product[T <: Product : TypeTag]: Encoder[T] = ExpressionEncoder()
Run Code Online (Sandbox Code Playgroud)

因此,在看到冒号操作符只是来自什么是Scala上下文和视图边界的隐式参数的语法糖之后,我可以看到应该有一个隐含的TypeTag [T]可用,但我不明白TypeTag [T]是如何隐含的SQLImplicits.scala.

   /**
   * @since 1.6.1
   * @deprecated use [[newSequenceEncoder]]
   */
  def newProductSeqEncoder[A <: Product : TypeTag]: Encoder[Seq[A]] = ExpressionEncoder() 
Run Code Online (Sandbox Code Playgroud)

即使它被弃用了,当我看到它时

 /** @since 2.2.0 */
  implicit def newSequenceEncoder[T <: Seq[_] : …
Run Code Online (Sandbox Code Playgroud)

scala implicit apache-spark apache-spark-sql

1
推荐指数
1
解决办法
801
查看次数

Hadoop DNS解析

我正在尝试在VM上安装本地群集.当我打开Cloudera Manager时,有时主机运行状况检查显示为失败,错误消息基本上显示"DNS解析失败".有时HDFS服务显示为失败.我想在其他途径上有一些想法进行故障排除.我很确定使用hosts文件或DNS的某些东西很简单.我的操作系统是Ubuntu.

到目前为止,我已经编辑/ etc/hosts以获得运行ifconfig时获得的确切ipv4地址

10.2.0.15  michael-VirtualBox
Run Code Online (Sandbox Code Playgroud)

我删除了条目

127.0.0.1 localhost

我也重新启动了网络服务.我执行了命令

"python -c "import socket; print socket.getfqdn(); print socket.gethostbyname(socket.getfqdn())""

我回来了

127.0.0.1 localhost

dns hadoop hosts cloudera cloudera-cdh

0
推荐指数
1
解决办法
3172
查看次数

scala在lambda中有一个case语句

为什么你不能在lambda函数中有一个case语句?我的代码看起来像

def f(list:List[String]):List[Int] = list.map( _ match{ case _.length > 1 => _.length else 1})

input 
"mike" 
"tom"
"t"
" "

output
4  
3
1
1
Run Code Online (Sandbox Code Playgroud)

如你所见,我试图在lambda中做一个案例.我用语法尝试了很多方法.

collections scala list

0
推荐指数
1
解决办法
686
查看次数