小编Tom*_*Tom的帖子

如何使用spark将avro写入多个输出目录

嗨,有一个主题是使用MultipleTextOutputFormat在一个spark作业中将文本数据写入多个输出目录

通过键Spark写入多个输出 - 一个Spark作业

我会问是否有类似的方法将avro数据写入多个目录

我想要的是将avro文件中的数据写入不同的目录(基于时间戳字段,时间戳中的同一天转到同一目录)

avro apache-spark spark-avro

5
推荐指数
1
解决办法
509
查看次数

HttpUrlConnection.setChunkedStreamingMode的效果

我对此没有很好的了解HttpUrlConnection.setChunkedStreamingMode,此模式的作用是什么?

我有以下示例代码:

HttpURLConnection conn = getHttpURLConnection(_url);
conn.setChunkedStreamingMode(4096); //4k
conn.setConnectTimeout(3000);
conn.setDoInput(true);
conn.setDoOutput(true);
conn.setRequestMethod("POST");
OutputStream out = conn.getOutputStream();
byte[] buffer = new byte[1024 * 10];//10k
FileInputStream in= new FileInputStream(file); //Write the content of the file to the server
int len;
while ((len = in.read(buffer)) != -1) {
    out.write(buffer, 0, len);
}

out.flush();
in.close();
Run Code Online (Sandbox Code Playgroud)

说,文件大小为101k,我将块大小设置为4096。

  1. 每次写入时,HttpUrlConnection会向服务器发送4096字节吗?最后一次是1k?

  2. 请注意,我已经使用了10k缓冲区写入输出流,块大小和缓冲区大小不同是否重要?

  3. 如果我在代码中禁用ChunkedStreamMode,与设置4096的代码相比有什么效果?

java apache-httpclient-4.x

5
推荐指数
1
解决办法
2379
查看次数

IntFunction <String>和Function <Integer,String>

我有两个简单的代码:

IntFunction<String> f1 = Integer::toString;
Function<Integer, String> f2 = Integer::toString;
Run Code Online (Sandbox Code Playgroud)

我认为这两个定义都是正确的,并且等价相同,但第二个定义了编译错误,抱怨说 Required Function<Integer, String>,but Method Reference is found.

java java-8 method-reference

5
推荐指数
1
解决办法
494
查看次数

窗口函数的默认窗口框架是什么

运行以下代码:

val sales = Seq(
  (0, 0, 0, 5),
  (1, 0, 1, 3),
  (2, 0, 2, 1),
  (3, 1, 0, 2),
  (4, 2, 0, 8),
  (5, 2, 2, 8))
  .toDF("id", "orderID", "prodID", "orderQty")

val orderedByID = Window.orderBy('id)

val totalQty = sum('orderQty).over(orderedByID).as('running_total)
val salesTotalQty = sales.select('*, totalQty).orderBy('id)
salesTotalQty.show
Run Code Online (Sandbox Code Playgroud)

结果是:

+---+-------+------+--------+-------------+
| id|orderID|prodID|orderQty|running_total|
+---+-------+------+--------+-------------+
|  0|      0|     0|       5|            5|
|  1|      0|     1|       3|            8|
|  2|      0|     2|       1|            9|
|  3|      1|     0|       2|           11|
|  4|      2|     0| …
Run Code Online (Sandbox Code Playgroud)

sql window-functions apache-spark apache-spark-sql

5
推荐指数
1
解决办法
1885
查看次数

SparkLauncher和Java -jar fat-jar相比有什么好处?

我知道SparkLauncher用于以编程方式而不是使用spark-submit脚本来启动Spark应用程序,但是何时使用SparkLauncher或有什么好处,我感到有些困惑。

以下代码使用SparkLauncher启动主类为的Spark应用程序"org.apache.spark.launcher.WordCountApp

代码是:

object WordCountSparkLauncher {
  def main(args: Array[String]) {
    val proc = new SparkLauncher()
      .setAppName("WordCountSparkLauncherApp")
      .setMaster("local")
      .setSparkHome("D:/spark-2.2.0-bin-hadoop2.7")
      .setAppResource("file:///d:/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar")
      .setVerbose(true)
      .setMainClass("org.apache.spark.launcher.WordCountApp")
      .launch()

    new Thread(new IORunnable(proc.getInputStream, "proc-input-stream")).start()

    new Thread(new IORunnable(proc.getErrorStream, "proc-error-input-stream")).start()

    proc.waitFor()

  }
}
Run Code Online (Sandbox Code Playgroud)

它工作正常,但还有另一种选择:

使用maven shade插件创建一个可运行的胖罐,将所有与spark相关的依赖项打包到一个jar中,这样,我仍然可以使用来运行spark应用程序java -jar thefatjar

SparkLaunchervs胖子罐子有什么好处?

apache-spark

5
推荐指数
1
解决办法
1178
查看次数

scala 映射的花括号和括号

 Array(7,8,9) map (x:Int=>x+1) //1).error, identifier expected but integer literal found.
 Array(7,8,9) map {x:Int=>x+1} //2) correct  
 Array(7,8,9) map ((x:Int)=>x+1) //3) correct
 Array(7,8,9) map (x=>x+1) //4 correct
 Array(7,8,9) map {x=>x+1} //5 correct
 Array(7,8,9) map x=>x+1   //6 error
Run Code Online (Sandbox Code Playgroud)

我会问上述情况?为什么有些工作而其他人则不像评论所示

scala

5
推荐指数
1
解决办法
353
查看次数

是否可以在同一提取器中定义unapply和unapplySeq

我有以下旨在在同一提取器中定义unapply和unapplySeq的代码

  test("pattern matching define unapply and unapplySeq") {
    object A {
      def unapply(arg: String): Option[(String, String)] = Some((arg, arg))

      def unapplySeq(arg: String): Option[Seq[String]] = Some(arg.split(" "))
    }

    def x = "hello world"

    x match {
      case A(a, b) => println("unapply matched", a, b)
      case A(a, b, _*) => println("unapplySeq matched", a, b)
    }


  }
Run Code Online (Sandbox Code Playgroud)

但是似乎不起作用,当我运行此测试用例时,它会导致编译错误并抱怨

Error:(292, 12) Star pattern must correspond with varargs or unapplySeq
      case A(a, b, _*) => println("unapplySeq matched", a, b)
Run Code Online (Sandbox Code Playgroud)

我想知道是否可以在同一提取器中同时定义unapply和unapplySeq?

scala

5
推荐指数
1
解决办法
168
查看次数

如何在IntellijIdea Scala插件中禁用方法返回类型提示

Intellij Idea scala插件会自动以灰色显示该方法的返回类型,这在大多数情况下很烦人。

如何取消此功能

scala intellij-idea

5
推荐指数
1
解决办法
1438
查看次数

为什么Functor是一个更高级的类型

我有以下仿函数定义

trait Functor[F[_]] { 
    def map[A, B](fa: F[A])(f: A => B): F[B]
}
object ListFunctor extends Functor[List] { //
    def map[A, B](f: A => B)(data: List[A]): List[B] = data map f
}
Run Code Online (Sandbox Code Playgroud)

在scala中,很常见的F是集合类型,例如List,Seq,Option,我会问为什么Functor必须是更高级的类型,类型参数F究竟是什么意思?

scala category-theory

4
推荐指数
1
解决办法
649
查看次数

KeyedStream 中的 max 和 maxBy 有什么区别

  1. KeyedStream#max(字符串字段)

应用聚合,该聚合根据给定键给出给定字段表达式处的数据流的当前最大值。每个键都保留一个独立的聚合。字段表达式可以是公共字段的名称,也可以是带有 {@link DataStream} 基础类型括号的 getter 方法。点可用于深入查看对象,如 {@code "field1.fieldxy" } 中所示。

  1. KeyedStream#maxBy(字符串字段)

应用聚合,通过给定键为当前元素提供给定位置处的最大值。每个键都保留一个独立的聚合。如果有多个元素在给定位置具有最大值,则该运算符默认返回第一个。

这两个API的javadoc看起来非常相似,我想问一下它们之间有什么区别,以及何时选择这个或那个

apache-flink

4
推荐指数
1
解决办法
2247
查看次数