小编him*_*ian的帖子

无法在ScalaTest中导入Spark Implicits

我正在使用ScalaTest为Spark编写测试用例.

import org.apache.spark.sql.SparkSession
import org.scalatest.{BeforeAndAfterAll, FlatSpec}

class ClassNameSpec extends FlatSpec with BeforeAndAfterAll {
  var spark: SparkSession = _
  var className: ClassName = _

  override def beforeAll(): Unit = {
    spark = SparkSession.builder().master("local").appName("class-name-test").getOrCreate()
    className = new ClassName(spark)
  }

  it should "return data" in {
    import spark.implicits._
    val result = className.getData(input)

    assert(result.count() == 3)
  }

  override def afterAll(): Unit = {
    spark.stop()
  }
}
Run Code Online (Sandbox Code Playgroud)

当我尝试编译测试套件时,它会给我以下错误:

stable identifier required, but ClassNameSpec.this.spark.implicits found.
[error]     import spark.implicits._
[error]                  ^
[error] one error found
[error] (test:compileIncremental) …
Run Code Online (Sandbox Code Playgroud)

scala implicit scalatest apache-spark apache-spark-sql

13
推荐指数
1
解决办法
7832
查看次数

为什么我们需要在运行Spark SBT应用程序时添加"fork in run:= true"?

我已经构建了一个简单的Spark应用程序sbt.这是我的代码:

import org.apache.spark.sql.SparkSession

object HelloWorld {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local").appName("BigApple").getOrCreate()

    import spark.implicits._

    val ds = Seq(1, 2, 3).toDS()
    ds.map(_ + 1).foreach(x => println(x))
  }
}
Run Code Online (Sandbox Code Playgroud)

以下是我的 build.sbt

name := """sbt-sample-app"""

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" % "test"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.1.1"
Run Code Online (Sandbox Code Playgroud)

现在,当我尝试这样做时sbt run,它会给我以下错误:

$ sbt run
[info] Loading global plugins from /home/user/.sbt/0.13/plugins
[info] Loading project definition from /home/user/Projects/sample-app/project
[info] …
Run Code Online (Sandbox Code Playgroud)

scala sbt apache-spark

13
推荐指数
1
解决办法
4788
查看次数

附加模式下的水印聚合查询的空输出

我使用Spark 2.2.0-rc1.

我有一个卡夫卡topic这我查询运行水印聚集,有1 minute水印,发出来consoleappend输出模式.

import org.apache.spark.sql.types._
val schema = StructType(StructField("time", TimestampType) :: Nil)
val q = spark.
  readStream.
  format("kafka").
  option("kafka.bootstrap.servers", "localhost:9092").
  option("startingOffsets", "earliest").
  option("subscribe", "topic").
  load.
  select(from_json(col("value").cast("string"), schema).as("value"))
  select("value.*").
  withWatermark("time", "1 minute").
  groupBy("time").
  count.
  writeStream.
  outputMode("append").
  format("console").
  start
Run Code Online (Sandbox Code Playgroud)

我在Kafka推送以下数据topic:

{"time":"2017-06-07 10:01:00.000"}
{"time":"2017-06-07 10:02:00.000"}
{"time":"2017-06-07 10:03:00.000"}
{"time":"2017-06-07 10:04:00.000"}
{"time":"2017-06-07 10:05:00.000"}
Run Code Online (Sandbox Code Playgroud)

我得到以下输出:

scala> -------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 2 …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-structured-streaming

12
推荐指数
2
解决办法
1516
查看次数

来自单个主主题的多个流

如何从单个主题创建多个流?当我做这样的事情时:

KStreamBuilder builder = new KStreamBuilder();

builder.stream(Serdes.String(), Serdes.String(), "master")
            /* Filtering logic */
            .to(Serdes.String(), Serdes.String(), "output1");

builder.stream(Serdes.String(), Serdes.String(), "master")
            /* Filtering logic */
            .to(Serdes.String(), Serdes.String(), "output2");

KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic master has already been registered by another source.
    at org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:347)
    at org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:92)
Run Code Online (Sandbox Code Playgroud)

我是否需要为"master"中的每个流创建另一个KafkaStream实例?

java apache-kafka apache-kafka-streams

11
推荐指数
1
解决办法
3019
查看次数

为什么重写的变量在Scala中得到错误的值?

我在Scala中有一个A类,就像这个:

class A {
  val a = 3
  lazy val b = 2
  println("a = " + a)
  println("b = " + b)
}
Run Code Online (Sandbox Code Playgroud)

接下来,我将这个类扩展到另一个B类:

class B extends A {
  override val a = 4
  override lazy val b = 3
}
Run Code Online (Sandbox Code Playgroud)

现在,当我创建一个对象时class B,我得到以下输出:

a = 0  //the default value of int is zero `0` in Scala
b = 3
Run Code Online (Sandbox Code Playgroud)

而我期望输出为:

a = 3
b = 2
Run Code Online (Sandbox Code Playgroud)

我的问题是如何知道println()函数中class A定义的值class B,但仅限于b和不是 …

scala

10
推荐指数
1
解决办法
352
查看次数

通过Spark读取保存在文件夹中的所有Parquet文件

我有一个包含Parquet文件的文件夹.像这样的东西:

scala> val df = sc.parallelize(List(1,2,3,4)).toDF()
df: org.apache.spark.sql.DataFrame = [value: int]

scala> df.write.parquet("/tmp/test/df/1.parquet")

scala> val df = sc.parallelize(List(5,6,7,8)).toDF()
df: org.apache.spark.sql.DataFrame = [value: int]

scala> df.write.parquet("/tmp/test/df/2.parquet")
Run Code Online (Sandbox Code Playgroud)

当我去读取文件df夹中的所有镶木地板文件时保存数据帧后,它给了我错误.

scala> val read = spark.read.parquet("/tmp/test/df")
org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$8.apply(DataSource.scala:189)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$getOrInferFileFormatSchema(DataSource.scala:188)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:387)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:441)
  at org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:425)
  ... 48 elided
Run Code Online (Sandbox Code Playgroud)

我知道我可以通过提供完整路径来阅读Parquet文件,但如果有办法读取文件夹中的所有镶木地板文件会更好.

scala apache-spark apache-spark-sql

9
推荐指数
1
解决办法
1万
查看次数

使用Scala/Spark提取Teradata表后出现NullPointerException

我需要从Teradata(只读访问)中提取一个表到Scala(2.11)/ Spark(2.1.0).我正在构建一个可以成功加载的数据框

val df = spark.read.format("jdbc").options(options).load()
Run Code Online (Sandbox Code Playgroud)

但是df.show给了我一个NullPointerException:

java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)
Run Code Online (Sandbox Code Playgroud)

我做了一个df.printSchema,我发现这个NPE的原因是数据集包含列的null(nullable = false)(看起来Teradata给我错误的信息).实际上,df.show如果我删除有问题的列,我可以实现.

所以,我尝试指定一个新架构,所有列都设置为(nullable = true):

val new_schema = StructType(df.schema.map {
  case StructField(n,d,nu,m) => StructField(n,d,true,m)
})

val new_df = spark.read.format("jdbc").schema(new_schema).options(options).load()
Run Code Online (Sandbox Code Playgroud)

但后来我得到了:

org.apache.spark.sql.AnalysisException: JDBC does not allow user-specified schemas.;
Run Code Online (Sandbox Code Playgroud)

我还尝试从前一个创建一个新的Dataframe,指定所需的模式:

val new_df = df.sqlContext.createDataFrame(df.rdd, new_schema)
Run Code Online (Sandbox Code Playgroud)

但是在对数据帧采取行动时我仍然有一个NPE.

关于如何解决这个问题的任何想法?

scala teradata dataframe apache-spark apache-spark-sql

9
推荐指数
1
解决办法
999
查看次数

JMockit - 期望与MockUp <T>为什么一个工作而另一个不工作?

我正在尝试(仍然)学习JMockit的细节.这是JMockit奇怪的另一个例子,我只是没有得到.使用NonStrictExpectations运行测试工作正常.但是,使用MockUp运行却没有.我不知道为什么.有任何想法吗?我正在运行JMockit 1.5.

测试方法:

private List<Foo> getFooList(List<FooStatement> fooStatements){
    List<Foo> FooList = new ArrayList<Foo>();

    for(FooStatement at: fooStatements){
        List<Foo> aList = at.getFoos();
        FooList.addAll(aList);
    }

    return FooList;
}
Run Code Online (Sandbox Code Playgroud)

成功的期望测试

@Test
public void getFooListWithExpectationsTest(
        @Mocked final FooStatement mockFooStatement,
        @Mocked final Foo mockFoo
){

    List<FooStatement> fooStatementList = new ArrayList<>(Arrays.asList(
            mockFooStatement,
            mockFooStatement
    ));

    new NonStrictExpectations(){{
        mockFooStatement.getFoos();
        result = new ArrayList<Foo>(Arrays.asList(mockFoo));
    }};

    List<Foo> fooList = Deencapsulation.invoke(handler, "getFooList", fooStatementList);
    Assert.assertTrue(fooList.size() == 2);
}
Run Code Online (Sandbox Code Playgroud)

使用MockUp断言错误(0!= 2)

@Test
public void getFooListWithMockUpTest(
        @Mocked final FooStatement mockFooStatement,
        @Mocked final Foo mockFoo
){

    new …
Run Code Online (Sandbox Code Playgroud)

java unit-testing jmockit

7
推荐指数
1
解决办法
7311
查看次数

Apache Spark 读取 UTF-16 CSV 文件

我正在尝试读取以 UTF-16 编码的 CSV 文件。

val test = spark.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter",";")
.option("dateFormat", "yyyy-MM-dd HH:mm:ss.SSS")
.option("encoding", "UTF-16")
.option("charset", "ISO-8859-1")
.load("...")
Run Code Online (Sandbox Code Playgroud)

结果我得到了额外的行:

在此处输入图片说明

Spark 是否可能只能使用 UTF-8 编码?或者还有其他方法可以将 UTF-16 CSV 读入数据帧?

scala apache-spark apache-spark-sql spark-dataframe databricks

6
推荐指数
1
解决办法
4542
查看次数

如果我不断在其中添加列,我应该坚持Spark数据帧吗?

我在网上搜索的任何论坛都找不到关于以下主题的讨论.这可能是因为我是Spark和Scala的新手,我不是在问一个有效的问题.如果有任何现有的线程讨论相同或类似的主题,链接将非常有用.:)

我正在开发一个使用Spark和Scala的过程,并通过读取大量表来创建一个文件,并通过将逻辑应用于从表中获取的数据来获取大量字段.所以,我的代码结构是这样的:

val driver_sql = "SELECT ...";

var df_res = spark.sql(driver_sql)

var df_res = df_res.withColumn("Col1", <logic>)

var df_res = df_res.withColumn("Col2", <logic>)

var df_res = df_res.withColumn("Col3", <logic>)
.
.
.

var df_res = df_res.withColumn("Col20", <logic>)
Run Code Online (Sandbox Code Playgroud)

基本上,有一个驱动程序查询,它创建"驱动程序"数据帧.之后,基于驱动程序数据帧中的一个或多个键执行单独的逻辑(函数)以添加新的列/字段."逻辑"部分并不总是单行代码,有时,它是一个单独的函数,它运行另一个查询并在df_res上进行某种连接并添加一个新列.记录计数也发生了变化,因为在某些情况下我使用"内部"连接与其他表/数据帧.

所以,这是我的问题:

  • 我应该坚持df_res任何时间点吗?
  • df_res添加列后,我可以一次又一次地坚持下去吗?我的意思是,它增加了价值吗?
  • 如果df_res每次添加新列时我仍然存在(仅磁盘),是否更换了磁盘中的数据?或者它是否df_res在磁盘中创建了新的副本/版本?
  • 是否有更好的技术可以在这样的场景中持久保存/缓存数据(以避免在内存中做很多事情)?

scala persist dataframe apache-spark apache-spark-sql

6
推荐指数
1
解决办法
4490
查看次数