小编him*_*ian的帖子

为什么dataset.count()比rdd.count()更快?

我创建了一个Spark Dataset[Long]:

scala> val ds = spark.range(100000000)
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]
Run Code Online (Sandbox Code Playgroud)

当我运行ds.count它给我结果0.2s(在4 Core 8GB机器上).此外,它创建的DAG如下:

在此输入图像描述

但是,当我跑的ds.rdd.count时候给了我结果4s(同一台机器).但它创建的DAG如下:

在此输入图像描述

所以,我的怀疑是:

  1. 为什么ds.rdd.count只创造一个阶段而ds.count创造两个阶段?
  2. 此外,当ds.rdd.count只有一个阶段时,为什么它比ds.count两个阶段慢?

performance scala apache-spark apache-spark-sql apache-spark-dataset

5
推荐指数
1
解决办法
4141
查看次数

在Spark中将数据帧转换为json时,如何打印空值

我有一个从csv读取的数据帧.

CSV:
name,age,pets
Alice,23,dog
Bob,30,dog
Charlie,35,

Reading this into a DataFrame called myData:
+-------+---+----+
|   name|age|pets|
+-------+---+----+
|  Alice| 23| dog|
|    Bob| 30| dog|
|Charlie| 35|null|
+-------+---+----+
Run Code Online (Sandbox Code Playgroud)

现在,我想将此数据帧的每一行转换为json myData.toJSON.我得到的是以下jsons.

{"name":"Alice","age":"23","pets":"dog"}
{"name":"Bob","age":"30","pets":"dog"}
{"name":"Charlie","age":"35"}
Run Code Online (Sandbox Code Playgroud)

我希望第三行的json包含null值.防爆.

{"name":"Charlie","age":"35", "pets":null}
Run Code Online (Sandbox Code Playgroud)

但是,这似乎不可能.我通过代码调试并看到Spark的org.apache.spark.sql.catalyst.json.JacksonGenerator类具有以下实现

  private def writeFields(
    row: InternalRow, schema: StructType, fieldWriters: 
    Seq[ValueWriter]): Unit = {
    var i = 0
    while (i < row.numFields) {
      val field = schema(i)
      if (!row.isNullAt(i)) {
        gen.writeFieldName(field.name)
        fieldWriters(i).apply(row, i)
      }
      i += 1
    }
  }
Run Code Online (Sandbox Code Playgroud)

如果列为null,这似乎正在跳过列.我不太清楚为什么这是默认行为,但有没有办法在使用Spark的json中打印空值toJSON? …

json scala apache-spark apache-spark-sql

5
推荐指数
1
解决办法
2770
查看次数

模拟 SparkSession 用于单元测试

我的 Spark 应用程序中有一个方法可以从 MySQL 数据库加载数据。该方法看起来像这样。

trait DataManager {

val session: SparkSession

def loadFromDatabase(input: Input): DataFrame = {
            session.read.jdbc(input.jdbcUrl, s"(${input.selectQuery}) T0",
              input.columnName, 0L, input.maxId, input.parallelism, input.connectionProperties)
    }
}
Run Code Online (Sandbox Code Playgroud)

该方法除了执行jdbc方法并从数据库加载数据之外不执行任何其他操作。我该如何测试这个方法?标准方法是创建对象的模拟,session该对象是 的实例SparkSession。但由于SparkSession有一个私有构造函数,我无法使用 ScalaMock 来模拟它。

这里的主要问题是我的函数是一个纯粹的副作用函数(副作用是从关系数据库中提取数据),并且鉴于我在模拟时遇到问题,我如何对该函数进行单元测试SparkSession

那么有什么方法可以模拟SparkSession或者比模拟更好的方法来测试这个方法呢?

unit-testing scala mocking apache-spark scalamock

5
推荐指数
1
解决办法
1万
查看次数

sbt-对象apache不是包org的成员

我想使用sbt部署并提交一个spark程序,但是它会抛出错误。

码:

package in.goai.spark

import org.apache.spark.{SparkContext, SparkConf}

object SparkMeApp {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("First Spark")
    val sc = new SparkContext(conf)
    val fileName = args(0)
    val lines = sc.textFile(fileName).cache
    val c = lines.count
    println(s"There are $c lines in $fileName")
  }
}
Run Code Online (Sandbox Code Playgroud)

build.sbt

    name := "First Spark"

    version := "1.0"

    organization := "in.goai"

    scalaVersion := "2.11.8"

    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"

    resolvers += Resolver.mavenLocal
Run Code Online (Sandbox Code Playgroud)

在第一/项目目录下

build.properties

bt.version=0.13.9
Run Code Online (Sandbox Code Playgroud)

当我尝试运行sbt package下面给出的抛出错误时。

[root@hadoop first]# sbt package …
Run Code Online (Sandbox Code Playgroud)

scala sbt apache-spark

5
推荐指数
1
解决办法
1563
查看次数

对未使用的变量使用占位符时出现 MatchError

使用 Scala 2.13.x,scala.MatchError: null当我对未使用的变量使用占位符时,我得到了:

scala> object Test {
     |   val _: Any = null
     | }
object Test

scala> Test
scala.MatchError: null
  ... 41 elided
Run Code Online (Sandbox Code Playgroud)

但是使用 Scala 2.12.x,我没有得到scala.MatchError: null

scala> object Test {
     |   val _: Any = null
     | }
defined object Test

scala> Test
res1: Test.type = Test$@784c5ef5
Run Code Online (Sandbox Code Playgroud)

任何原因?

null scala pattern-matching unused-variables scala-2.13

5
推荐指数
1
解决办法
209
查看次数

如何模拟一个Akka演员单元测试一个类?

我有一个Controller类,它控制发送给Akka actor的请求,该请求被注入控制器.

控制器代码:

class Controller(actor: ActorRef) {
  def control(msg: String): Future[String] = {
    actor.ask(msg)(Timeout(2 seconds)).mapTo[String]
  }
}
Run Code Online (Sandbox Code Playgroud)

我的演员代码是:

class ActorA extends Actor {
  override def receive: Receive = {
    case msg: String => sender ! msg
    case msg: Int => sender ! msg.toString
    case _ => "Invalid command!"
}
Run Code Online (Sandbox Code Playgroud)

现在我需要将ActorA的行为模拟为单元测试Controller.有没有办法通过Akka TestKit这样做?

unit-testing dependency-injection scala akka akka-testkit

4
推荐指数
1
解决办法
4253
查看次数

如何在 Spark Scala 中转换为 Long?

这似乎是一个简单的任务,但我不知道如何在 Spark(而不是 PySpark)中使用 Scala 来完成它。我有一个df包含不同列的数据框。其中一列的类型String应更改为Long。我该怎么做?

如果我执行此代码,我会收到编译错误Cannot resolve symbol col

df.withColumn("timestamp", col("timestamp").cast(LongType))
Run Code Online (Sandbox Code Playgroud)

sql scala apache-spark apache-spark-sql

4
推荐指数
1
解决办法
1万
查看次数

spark java中的性能问题

我正在使用spark 2.11版本,我在我的应用程序中只做了3个基本操作:

  1. 记录数据库:220万
  2. 使用contains检查数据库(220万)中存在的文件(5 000)中的记录
  3. 将匹配的记录写入CSV格式的文件

但是对于这3个操作,它需要将近20分钟.如果我在SQL中执行相同的操作,则需要不到1分钟.

我已经开始使用spark因为它会产生非常快的结果,但是花费了太多时间.如何提高性能?

第1步:从数据库中获取记录.

        Properties connectionProperties = new Properties();
        connectionProperties.put("user", "test");
        connectionProperties.put("password", "test##");
        String query="(SELECT * from items)
        dataFileContent= spark.read().jdbc("jdbc:oracle:thin:@//172.20.0.11/devad", query,connectionProperties);
Run Code Online (Sandbox Code Playgroud)

步骤2:使用contains检查文件B(2M)中存在的文件A(5k)的记录

Dataset<Row> NewSet=source.join(target,target.col("ItemIDTarget").contains(source.col("ItemIDSource")),"inner");
Run Code Online (Sandbox Code Playgroud)

步骤3:将匹配的记录写入CSV格式的文件

 NewSet.repartition(1).select("*")
        .write().format("com.databricks.spark.csv")
        .option("delimiter", ",")
        .option("header", "true")
        .option("treatEmptyValuesAsNulls", "true")  
        .option("nullValue", "")  
        .save(fileAbsolutePath);
Run Code Online (Sandbox Code Playgroud)

为了提高性能,我尝试了一些设置Cache,数据序列化等功能

set("spark.serializer","org.apache.spark.serializer.KryoSerializer")),
Run Code Online (Sandbox Code Playgroud)

随机播放时间

sqlContext.setConf("spark.sql.shuffle.partitions", "10"),
Run Code Online (Sandbox Code Playgroud)

数据结构调整

-XX:+UseCompressedOops ,
Run Code Online (Sandbox Code Playgroud)

没有一种方法不会产生更好的性能.

java performance apache-spark apache-spark-sql

3
推荐指数
1
解决办法
754
查看次数

从其他列在 Apache Spark 中创建映射列

我搜索了很多,但找不到任何可以适应我的情况的东西。我有一个像这样的数据框:

+-----------------+---------------+
|             keys|         values|
+-----------------+---------------+
|[one, two, three]|[101, 202, 303]|
+-----------------+---------------+
Run Code Online (Sandbox Code Playgroud)

键有一个字符串数组,值有一个整数数组。

我想创建一个包含键到值的映射的新列,如下所示:

+-----------------+---------------+---------------------------+
|             keys|         values|                        map|
+-----------------+---------------+---------------------------+
|[one, two, three]|[101, 202, 303]|Map(one->101, two->202, etc|
+-----------------+---------------+---------------------------+
Run Code Online (Sandbox Code Playgroud)

我一直在看这个问题,但不确定它可以用作我的情况的起点:Spark DataFrame columns transform to Map type and List of Map Type

我需要在 Scala 中使用这个。

谢谢!

scala apache-spark spark-dataframe

3
推荐指数
1
解决办法
6311
查看次数

df.select() 和 df.agg() 有什么区别?

我有一个数据框,我想从中提取最大值、最小值并计算其中的记录数。

数据框是:

scala> val df = spark.range(10000)
df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
Run Code Online (Sandbox Code Playgroud)

为了获取我正在使用的所需值df.select(),如下所示:

scala> df.select(min("id"), max("id"), count("id")).show
+-------+-------+---------+
|min(id)|max(id)|count(id)|
+-------+-------+---------+
|      0|   9999|    10000|
+-------+-------+---------+
Run Code Online (Sandbox Code Playgroud)

这给了我正确的结果,但是当我尝试时df.agg()它也给了我相同的答案。

scala> df.agg(min("id"), max("id"), count("id")).show
+-------+-------+---------+
|min(id)|max(id)|count(id)|
+-------+-------+---------+
|      0|   9999|    10000|
+-------+-------+---------+
Run Code Online (Sandbox Code Playgroud)

所以,我的问题是它们之间有什么区别df.select()df.agg()如果它们提供相同的结果,我应该使用哪一个以获得更好的性能?

scala aggregate-functions apache-spark apache-spark-sql

2
推荐指数
1
解决办法
3179
查看次数