嗨,有一个主题是使用MultipleTextOutputFormat在一个spark作业中将文本数据写入多个输出目录
我会问是否有类似的方法将avro数据写入多个目录
我想要的是将avro文件中的数据写入不同的目录(基于时间戳字段,时间戳中的同一天转到同一目录)
我对此没有很好的了解HttpUrlConnection.setChunkedStreamingMode,此模式的作用是什么?
我有以下示例代码:
HttpURLConnection conn = getHttpURLConnection(_url);
conn.setChunkedStreamingMode(4096); //4k
conn.setConnectTimeout(3000);
conn.setDoInput(true);
conn.setDoOutput(true);
conn.setRequestMethod("POST");
OutputStream out = conn.getOutputStream();
byte[] buffer = new byte[1024 * 10];//10k
FileInputStream in= new FileInputStream(file); //Write the content of the file to the server
int len;
while ((len = in.read(buffer)) != -1) {
out.write(buffer, 0, len);
}
out.flush();
in.close();
Run Code Online (Sandbox Code Playgroud)
说,文件大小为101k,我将块大小设置为4096。
每次写入时,HttpUrlConnection会向服务器发送4096字节吗?最后一次是1k?
请注意,我已经使用了10k缓冲区写入输出流,块大小和缓冲区大小不同是否重要?
如果我在代码中禁用ChunkedStreamMode,与设置4096的代码相比有什么效果?
我有两个简单的代码:
IntFunction<String> f1 = Integer::toString;
Function<Integer, String> f2 = Integer::toString;
Run Code Online (Sandbox Code Playgroud)
我认为这两个定义都是正确的,并且等价相同,但第二个定义了编译错误,抱怨说 Required Function<Integer, String>,but Method Reference is found.
运行以下代码:
val sales = Seq(
(0, 0, 0, 5),
(1, 0, 1, 3),
(2, 0, 2, 1),
(3, 1, 0, 2),
(4, 2, 0, 8),
(5, 2, 2, 8))
.toDF("id", "orderID", "prodID", "orderQty")
val orderedByID = Window.orderBy('id)
val totalQty = sum('orderQty).over(orderedByID).as('running_total)
val salesTotalQty = sales.select('*, totalQty).orderBy('id)
salesTotalQty.show
Run Code Online (Sandbox Code Playgroud)
结果是:
+---+-------+------+--------+-------------+
| id|orderID|prodID|orderQty|running_total|
+---+-------+------+--------+-------------+
| 0| 0| 0| 5| 5|
| 1| 0| 1| 3| 8|
| 2| 0| 2| 1| 9|
| 3| 1| 0| 2| 11|
| 4| 2| 0| …Run Code Online (Sandbox Code Playgroud) 我知道SparkLauncher用于以编程方式而不是使用spark-submit脚本来启动Spark应用程序,但是何时使用SparkLauncher或有什么好处,我感到有些困惑。
以下代码使用SparkLauncher启动主类为的Spark应用程序"org.apache.spark.launcher.WordCountApp:
代码是:
object WordCountSparkLauncher {
def main(args: Array[String]) {
val proc = new SparkLauncher()
.setAppName("WordCountSparkLauncherApp")
.setMaster("local")
.setSparkHome("D:/spark-2.2.0-bin-hadoop2.7")
.setAppResource("file:///d:/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar")
.setVerbose(true)
.setMainClass("org.apache.spark.launcher.WordCountApp")
.launch()
new Thread(new IORunnable(proc.getInputStream, "proc-input-stream")).start()
new Thread(new IORunnable(proc.getErrorStream, "proc-error-input-stream")).start()
proc.waitFor()
}
}
Run Code Online (Sandbox Code Playgroud)
它工作正常,但还有另一种选择:
使用maven shade插件创建一个可运行的胖罐,将所有与spark相关的依赖项打包到一个jar中,这样,我仍然可以使用来运行spark应用程序java -jar thefatjar。
SparkLaunchervs胖子罐子有什么好处?
Array(7,8,9) map (x:Int=>x+1) //1).error, identifier expected but integer literal found.
Array(7,8,9) map {x:Int=>x+1} //2) correct
Array(7,8,9) map ((x:Int)=>x+1) //3) correct
Array(7,8,9) map (x=>x+1) //4 correct
Array(7,8,9) map {x=>x+1} //5 correct
Array(7,8,9) map x=>x+1 //6 error
Run Code Online (Sandbox Code Playgroud)
我会问上述情况?为什么有些工作而其他人则不像评论所示
我有以下旨在在同一提取器中定义unapply和unapplySeq的代码
test("pattern matching define unapply and unapplySeq") {
object A {
def unapply(arg: String): Option[(String, String)] = Some((arg, arg))
def unapplySeq(arg: String): Option[Seq[String]] = Some(arg.split(" "))
}
def x = "hello world"
x match {
case A(a, b) => println("unapply matched", a, b)
case A(a, b, _*) => println("unapplySeq matched", a, b)
}
}
Run Code Online (Sandbox Code Playgroud)
但是似乎不起作用,当我运行此测试用例时,它会导致编译错误并抱怨
Error:(292, 12) Star pattern must correspond with varargs or unapplySeq
case A(a, b, _*) => println("unapplySeq matched", a, b)
Run Code Online (Sandbox Code Playgroud)
我想知道是否可以在同一提取器中同时定义unapply和unapplySeq?
Intellij Idea scala插件会自动以灰色显示该方法的返回类型,这在大多数情况下很烦人。
如何取消此功能
我有以下仿函数定义
trait Functor[F[_]] {
def map[A, B](fa: F[A])(f: A => B): F[B]
}
object ListFunctor extends Functor[List] { //
def map[A, B](f: A => B)(data: List[A]): List[B] = data map f
}
Run Code Online (Sandbox Code Playgroud)
在scala中,很常见的F是集合类型,例如List,Seq,Option,我会问为什么Functor必须是更高级的类型,类型参数F究竟是什么意思?
应用聚合,该聚合根据给定键给出给定字段表达式处的数据流的当前最大值。每个键都保留一个独立的聚合。字段表达式可以是公共字段的名称,也可以是带有 {@link DataStream} 基础类型括号的 getter 方法。点可用于深入查看对象,如 {@code "field1.fieldxy" } 中所示。
应用聚合,通过给定键为当前元素提供给定位置处的最大值。每个键都保留一个独立的聚合。如果有多个元素在给定位置具有最大值,则该运算符默认返回第一个。
这两个API的javadoc看起来非常相似,我想问一下它们之间有什么区别,以及何时选择这个或那个