我创建了一个Spark Dataset[Long]:
scala> val ds = spark.range(100000000)
ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]
Run Code Online (Sandbox Code Playgroud)
当我运行ds.count它给我结果0.2s(在4 Core 8GB机器上).此外,它创建的DAG如下:
但是,当我跑的ds.rdd.count时候给了我结果4s(同一台机器).但它创建的DAG如下:
所以,我的怀疑是:
ds.rdd.count只创造一个阶段而ds.count创造两个阶段?ds.rdd.count只有一个阶段时,为什么它比ds.count两个阶段慢?performance scala apache-spark apache-spark-sql apache-spark-dataset
我有一个从csv读取的数据帧.
CSV:
name,age,pets
Alice,23,dog
Bob,30,dog
Charlie,35,
Reading this into a DataFrame called myData:
+-------+---+----+
| name|age|pets|
+-------+---+----+
| Alice| 23| dog|
| Bob| 30| dog|
|Charlie| 35|null|
+-------+---+----+
Run Code Online (Sandbox Code Playgroud)
现在,我想将此数据帧的每一行转换为json myData.toJSON.我得到的是以下jsons.
{"name":"Alice","age":"23","pets":"dog"}
{"name":"Bob","age":"30","pets":"dog"}
{"name":"Charlie","age":"35"}
Run Code Online (Sandbox Code Playgroud)
我希望第三行的json包含null值.防爆.
{"name":"Charlie","age":"35", "pets":null}
Run Code Online (Sandbox Code Playgroud)
但是,这似乎不可能.我通过代码调试并看到Spark的org.apache.spark.sql.catalyst.json.JacksonGenerator类具有以下实现
private def writeFields(
row: InternalRow, schema: StructType, fieldWriters:
Seq[ValueWriter]): Unit = {
var i = 0
while (i < row.numFields) {
val field = schema(i)
if (!row.isNullAt(i)) {
gen.writeFieldName(field.name)
fieldWriters(i).apply(row, i)
}
i += 1
}
}
Run Code Online (Sandbox Code Playgroud)
如果列为null,这似乎正在跳过列.我不太清楚为什么这是默认行为,但有没有办法在使用Spark的json中打印空值toJSON? …
我的 Spark 应用程序中有一个方法可以从 MySQL 数据库加载数据。该方法看起来像这样。
trait DataManager {
val session: SparkSession
def loadFromDatabase(input: Input): DataFrame = {
session.read.jdbc(input.jdbcUrl, s"(${input.selectQuery}) T0",
input.columnName, 0L, input.maxId, input.parallelism, input.connectionProperties)
}
}
Run Code Online (Sandbox Code Playgroud)
该方法除了执行jdbc方法并从数据库加载数据之外不执行任何其他操作。我该如何测试这个方法?标准方法是创建对象的模拟,session该对象是 的实例SparkSession。但由于SparkSession有一个私有构造函数,我无法使用 ScalaMock 来模拟它。
这里的主要问题是我的函数是一个纯粹的副作用函数(副作用是从关系数据库中提取数据),并且鉴于我在模拟时遇到问题,我如何对该函数进行单元测试SparkSession。
那么有什么方法可以模拟SparkSession或者比模拟更好的方法来测试这个方法呢?
我想使用sbt部署并提交一个spark程序,但是它会抛出错误。
码:
package in.goai.spark
import org.apache.spark.{SparkContext, SparkConf}
object SparkMeApp {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("First Spark")
val sc = new SparkContext(conf)
val fileName = args(0)
val lines = sc.textFile(fileName).cache
val c = lines.count
println(s"There are $c lines in $fileName")
}
}
Run Code Online (Sandbox Code Playgroud)
build.sbt
name := "First Spark"
version := "1.0"
organization := "in.goai"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
resolvers += Resolver.mavenLocal
Run Code Online (Sandbox Code Playgroud)
在第一/项目目录下
build.properties
bt.version=0.13.9
Run Code Online (Sandbox Code Playgroud)
当我尝试运行sbt package下面给出的抛出错误时。
[root@hadoop first]# sbt package …Run Code Online (Sandbox Code Playgroud) 使用 Scala 2.13.x,scala.MatchError: null当我对未使用的变量使用占位符时,我得到了:
scala> object Test {
| val _: Any = null
| }
object Test
scala> Test
scala.MatchError: null
... 41 elided
Run Code Online (Sandbox Code Playgroud)
但是使用 Scala 2.12.x,我没有得到scala.MatchError: null:
scala> object Test {
| val _: Any = null
| }
defined object Test
scala> Test
res1: Test.type = Test$@784c5ef5
Run Code Online (Sandbox Code Playgroud)
任何原因?
我有一个Controller类,它控制发送给Akka actor的请求,该请求被注入控制器.
控制器代码:
class Controller(actor: ActorRef) {
def control(msg: String): Future[String] = {
actor.ask(msg)(Timeout(2 seconds)).mapTo[String]
}
}
Run Code Online (Sandbox Code Playgroud)
我的演员代码是:
class ActorA extends Actor {
override def receive: Receive = {
case msg: String => sender ! msg
case msg: Int => sender ! msg.toString
case _ => "Invalid command!"
}
Run Code Online (Sandbox Code Playgroud)
现在我需要将ActorA的行为模拟为单元测试Controller.有没有办法通过Akka TestKit这样做?
这似乎是一个简单的任务,但我不知道如何在 Spark(而不是 PySpark)中使用 Scala 来完成它。我有一个df包含不同列的数据框。其中一列的类型String应更改为Long。我该怎么做?
如果我执行此代码,我会收到编译错误Cannot resolve symbol col:
df.withColumn("timestamp", col("timestamp").cast(LongType))
Run Code Online (Sandbox Code Playgroud) 我正在使用spark 2.11版本,我在我的应用程序中只做了3个基本操作:
但是对于这3个操作,它需要将近20分钟.如果我在SQL中执行相同的操作,则需要不到1分钟.
我已经开始使用spark因为它会产生非常快的结果,但是花费了太多时间.如何提高性能?
第1步:从数据库中获取记录.
Properties connectionProperties = new Properties();
connectionProperties.put("user", "test");
connectionProperties.put("password", "test##");
String query="(SELECT * from items)
dataFileContent= spark.read().jdbc("jdbc:oracle:thin:@//172.20.0.11/devad", query,connectionProperties);
Run Code Online (Sandbox Code Playgroud)
步骤2:使用contains检查文件B(2M)中存在的文件A(5k)的记录
Dataset<Row> NewSet=source.join(target,target.col("ItemIDTarget").contains(source.col("ItemIDSource")),"inner");
Run Code Online (Sandbox Code Playgroud)
步骤3:将匹配的记录写入CSV格式的文件
NewSet.repartition(1).select("*")
.write().format("com.databricks.spark.csv")
.option("delimiter", ",")
.option("header", "true")
.option("treatEmptyValuesAsNulls", "true")
.option("nullValue", "")
.save(fileAbsolutePath);
Run Code Online (Sandbox Code Playgroud)
为了提高性能,我尝试了一些设置Cache,数据序列化等功能
set("spark.serializer","org.apache.spark.serializer.KryoSerializer")),
Run Code Online (Sandbox Code Playgroud)
随机播放时间
sqlContext.setConf("spark.sql.shuffle.partitions", "10"),
Run Code Online (Sandbox Code Playgroud)
数据结构调整
-XX:+UseCompressedOops ,
Run Code Online (Sandbox Code Playgroud)
没有一种方法不会产生更好的性能.
我搜索了很多,但找不到任何可以适应我的情况的东西。我有一个像这样的数据框:
+-----------------+---------------+
| keys| values|
+-----------------+---------------+
|[one, two, three]|[101, 202, 303]|
+-----------------+---------------+
Run Code Online (Sandbox Code Playgroud)
键有一个字符串数组,值有一个整数数组。
我想创建一个包含键到值的映射的新列,如下所示:
+-----------------+---------------+---------------------------+
| keys| values| map|
+-----------------+---------------+---------------------------+
|[one, two, three]|[101, 202, 303]|Map(one->101, two->202, etc|
+-----------------+---------------+---------------------------+
Run Code Online (Sandbox Code Playgroud)
我一直在看这个问题,但不确定它可以用作我的情况的起点:Spark DataFrame columns transform to Map type and List of Map Type
我需要在 Scala 中使用这个。
谢谢!
我有一个数据框,我想从中提取最大值、最小值并计算其中的记录数。
数据框是:
scala> val df = spark.range(10000)
df: org.apache.spark.sql.Dataset[Long] = [id: bigint]
Run Code Online (Sandbox Code Playgroud)
为了获取我正在使用的所需值df.select(),如下所示:
scala> df.select(min("id"), max("id"), count("id")).show
+-------+-------+---------+
|min(id)|max(id)|count(id)|
+-------+-------+---------+
| 0| 9999| 10000|
+-------+-------+---------+
Run Code Online (Sandbox Code Playgroud)
这给了我正确的结果,但是当我尝试时df.agg()它也给了我相同的答案。
scala> df.agg(min("id"), max("id"), count("id")).show
+-------+-------+---------+
|min(id)|max(id)|count(id)|
+-------+-------+---------+
| 0| 9999| 10000|
+-------+-------+---------+
Run Code Online (Sandbox Code Playgroud)
所以,我的问题是它们之间有什么区别df.select(),df.agg()如果它们提供相同的结果,我应该使用哪一个以获得更好的性能?
scala ×9
apache-spark ×8
performance ×2
unit-testing ×2
akka ×1
akka-testkit ×1
java ×1
json ×1
mocking ×1
null ×1
sbt ×1
scala-2.13 ×1
scalamock ×1
sql ×1