小编Shu*_*tal的帖子

如何在apache spark(scala)中迭代RDD

我使用以下命令用一堆包含2个字符串["filename","content"]的数组填充RDD.

现在,我希望迭代每个事件,以便对每个文件名和内容执行某些操作.

val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")
Run Code Online (Sandbox Code Playgroud)

我似乎无法找到有关如何执行此操作的任何文档.

所以我想要的是:

foreach occurrence-in-the-rdd{
   //do stuff with the array found on loccation n of the RDD
} 
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

21
推荐指数
2
解决办法
7万
查看次数

使用泛型类型和通用字段名称进行GSON反序列化

假设我们有这样的结构:

JSON:

{
  "body": {
    "cats": [
      {
        "cat": {
          "id": 1,
          "title": "cat1"
        }
      },
      {
        "cat": {
          "id": 2,
          "title": "cat2"
        }
      }
    ]
  }
}
Run Code Online (Sandbox Code Playgroud)

和相应的POJO:

Response.class

private final Body body;

Body.class

private final Collection<CatWrapper> cats

CatWrapper.class

private final Cat cat

Cat.class

private final int id;
private final String title;
Run Code Online (Sandbox Code Playgroud)

但是,现在我们说我们有相同的结构,但不是Cat我们收到的truck

{
  "body": {
    "trucks": [
      {
        "truck": {
          "id": 1,
          "engine": "big",
          "wheels": 12
        }
      },
      {
        "truck": {
          "id": 2,
          "engine": "super …
Run Code Online (Sandbox Code Playgroud)

java generics android json gson

8
推荐指数
1
解决办法
5756
查看次数

基于Bellman ford队列的方法来自Sedgewick和Wayne - Algorithms,第4版

我正在研究 这个链接http://algs4.cs.princeton.edu/44sp/BellmanFordSP.java上的来自书籍的算法源代码的单源最短路径的queue-based方法.Bellman-Ford algorithmRobert Sedgewick and Kevin Wayne - Algorithms, 4th edition

我有两点是一个疑问,另一个是代码改进建议.

  1. 在上述方法中,以下是用于放松到顶点的距离的松弛方法的代码.

    for (DirectedEdge e : G.adj(v)) {
        int w = e.to();
        if (distTo[w] > distTo[v] + e.weight()) {
            distTo[w] = distTo[v] + e.weight();
            edgeTo[w] = e;
            if (!onQueue[w]) {
                queue.enqueue(w);
                onQueue[w] = true;
            }
        }
        if (cost++ % G.V() == 0){
            findNegativeCycle();
        }
    }
    
    Run Code Online (Sandbox Code Playgroud)

在该方法中,在找到负循环之前使用以下条件.

if (cost++ % G.V() == 0){
     findNegativeCycle();
}
Run Code Online (Sandbox Code Playgroud)

以上他们将成本除以vertices图表中的数量来检查negative cycle.这可能是因为放松是在V-1时间上进行的,并且如果放松Vth …

algorithm shortest-path graph-algorithm bellman-ford

5
推荐指数
1
解决办法
1628
查看次数

无界表是火花结构流

我开始学习Spark,并且很难理解Spark中Structured Streaming背后的合理性.结构化流处理作为无界输入表到达的所有数据,其中数据流中的每个新项被视为表中的新行.我有以下代码来读入传入的文件csvFolder.

val spark = SparkSession.builder.appName("SimpleApp").getOrCreate()

val csvSchema = new StructType().add("street", "string").add("city", "string")
.add("zip", "string").add("state", "string").add("beds", "string")
.add("baths", "string").add("sq__ft", "string").add("type", "string")
.add("sale_date", "string").add("price", "string").add("latitude", "string")
.add("longitude", "string")

val streamingDF = spark.readStream.schema(csvSchema).csv("./csvFolder/")

val query = streamingDF.writeStream
  .format("console")
  .start()
Run Code Online (Sandbox Code Playgroud)

如果我将1GB文件转储到该文件夹​​会发生什么.根据规范,流媒体作业每隔几毫秒触发一次.如果Spark在下一个瞬间遇到如此庞大的文件,在尝试加载文件时不会耗尽内存.还是自动批量处理?如果是,该批处理参数是否可配置?

scala apache-spark spark-structured-streaming

5
推荐指数
1
解决办法
913
查看次数