小编Vij*_*uri的帖子

如何覆盖spark中的输出目录

我有一个火花流应用程序,它可以生成每分钟的数据集.我需要保存/覆盖已处理数据的结果.

当我试图覆盖数据集org.apache.hadoop.mapred.FileAlreadyExistsException时停止执行.

我设置了Spark属性set("spark.files.overwrite","true"),但没有运气.

如何覆盖或预先删除spark中的文件?

apache-spark

91
推荐指数
5
解决办法
14万
查看次数

如何在Apache Spark应用程序中优化shuffle溢出

我正在运行带有2名工作人员的Spark流应用程序.应用程序具有联接和联合操作.

所有批次都已成功完成,但注意到随机溢出指标与输入数据大小或输出数据大小不一致(溢出内存超过20次).

请在下图中找到火花阶段的详细信息: 在此输入图像描述

经过对此的研究,发现了

当没有足够的内存用于随机数据时,会发生随机溢出.

Shuffle spill (memory) - 溢出时内存中数据的反序列化形式的大小

shuffle spill (disk) - 溢出后磁盘上数据序列化形式的大小

由于反序列化数据比序列化数据占用更多空间.所以,Shuffle溢出(内存)更多.

注意到这个溢出内存大小非常大,输入数据很大.

我的疑问是:

这种溢出是否会对性能产生很大影响?

如何优化内存和磁盘的溢出?

是否有可以减少/控制这种巨大溢出的Spark Properties?

apache-spark spark-streaming apache-spark-1.4

39
推荐指数
2
解决办法
2万
查看次数

按火花对RDD中的值排序

我有一个火花对RDD(键,计数)如下

Array[(String, Int)] = Array((a,1), (b,2), (c,1), (d,3))
Run Code Online (Sandbox Code Playgroud)

使用spark scala API如何获取按值排序的新对RDD?

要求的结果: Array((d,3), (b,2), (a,1), (c,1))

scala apache-spark

18
推荐指数
2
解决办法
3万
查看次数

如何在RDD对中找到最大值?

我有一个火花对RDD(键,计数)如下

Array[(String, Int)] = Array((a,1), (b,2), (c,1), (d,3))
Run Code Online (Sandbox Code Playgroud)

如何使用spark scala API查找具有最高计数的密钥?

编辑:对RDD的数据类型是org.apache.spark.rdd.RDD [(String,Int)]

scala apache-spark pyspark

15
推荐指数
4
解决办法
4万
查看次数

hadoop writables Apache Spark API的NotSerializableException

Spark Java应用程序在hadoop可写入时抛出NotSerializableException.

public final class myAPP {
  public static void main(String[] args) throws Exception {    
    if (args.length < 1) {
      System.err.println("Usage: myAPP <file>");
      System.exit(1);
    }
    SparkConf sparkConf = new SparkConf().setAppName("myAPP").setMaster("local");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);
    Configuration conf = new Configuration();
    JavaPairRDD<LongWritable,Text> lines = ctx.newAPIHadoopFile(args[0], TextInputFormat.class, LongWritable.class, Text.class, conf);
    System.out.println(    lines.collect().toString());
    ctx.stop();
  }
Run Code Online (Sandbox Code Playgroud)

.

java.io.NotSerializableException: org.apache.hadoop.io.LongWritable
Serialization stack:
    - object not serializable (class: org.apache.hadoop.io.LongWritable, value: 15227295)
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
    - object (class scala.Tuple2, (15227295,))
    - …
Run Code Online (Sandbox Code Playgroud)

java apache-spark

11
推荐指数
2
解决办法
6863
查看次数

通过opencv检测停车场

如果对象是单行(较小的图像),则该程序识别对象.

from __future__ import division 
from collections import defaultdict
from collections import OrderedDict
from cv2 import line
import cv2
from matplotlib import pyplot as plt
from networkx.algorithms import swap
from numpy import mat
from skimage.exposure import exposure
import numpy as np
from org import imutils
from numpy.core.defchararray import rindex
import sys

def line(p1, p2):
    A = (p1[1] - p2[1])
    B = (p2[0] - p1[0])
    C = (p1[0]*p2[1] - p2[0]*p1[1])
    return A, B, -C

def intersection(L1, L2):
    D  = L1[0] * …
Run Code Online (Sandbox Code Playgroud)

opencv python-2.7 opencv3.0

11
推荐指数
1
解决办法
5895
查看次数

如何在Spark中处理多行输入记录

我将每个记录分布在输入文件中的多行(非常大的文件).

例如:

Id:   2
ASIN: 0738700123
  title: Test tile for this product
  group: Book
  salesrank: 168501
  similar: 5  0738700811  1567184912  1567182813  0738700514  0738700915
  categories: 2
   |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Wicca[12484]
   |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Witchcraft[12486]
  reviews: total: 12  downloaded: 12  avg rating: 4.5
    2001-12-16  cutomer: A11NCO6YTE4BTJ  rating: 5  votes:   5  helpful:   4
    2002-1-7  cutomer:  A9CQ3PLRNIR83  rating: 4  votes:   5  helpful:   5
Run Code Online (Sandbox Code Playgroud)

如何识别和处理spark中的每个多行记录?

scala apache-spark

10
推荐指数
2
解决办法
6263
查看次数

Spark流式传输DStream RDD以获取文件名

Spark流式传输textFileStream,fileStream可以监控目录并处理Dstream RDD中的新文件.

如何在特定时间间隔内获取DStream RDD正在处理的文件名?

scala apache-spark

9
推荐指数
1
解决办法
2358
查看次数

如何在一个字符串中读取整个文件

我想在pyspark.lf中读取json或xml文件,我的文件被分成多行

rdd= sc.textFIle(json or xml) 
Run Code Online (Sandbox Code Playgroud)

输入

{
" employees":
[
 {
 "firstName":"John",
 "lastName":"Doe" 
},
 { 
"firstName":"Anna"
  ]
}
Run Code Online (Sandbox Code Playgroud)

输入分布在多条线上.

预期产出 {"employees:[{"firstName:"John",......]}

如何使用pyspark在一行中获取完整的文件?

请帮助我,我是新来的火花.

apache-spark apache-spark-sql

8
推荐指数
4
解决办法
1万
查看次数

批处理之间的Spark流数据共享

Spark流以微批处理数据.

使用RDD并行处理每个间隔数据,而不在每个间隔之间共享任何数据.

但我的用例需要在间隔之间共享数据.

考虑网络WordCount示例,该示例生成在该间隔中接收的所有单词的计数.

我如何生成以下字数?

  • 具有先前间隔计数的单词"hadoop"和"spark"的相对计数

  • 所有其他单词的正常字数.

注意:UpdateStateByKey执行有状态处理,但这会在每条记录上应用函数而不是特定记录.

因此,UpdateStateByKey不适合此要求.

更新:

考虑以下示例

间隔1

输入:

Sample Input with Hadoop and Spark on Hadoop
Run Code Online (Sandbox Code Playgroud)

输出:

hadoop  2
sample  1
input   1
with    1
and 1
spark   1
on  1
Run Code Online (Sandbox Code Playgroud)

间隔2

输入:

Another Sample Input with Hadoop and Spark on Hadoop and another hadoop another spark spark
Run Code Online (Sandbox Code Playgroud)

输出:

another 3
hadoop  1
spark   2
and 2
sample  1
input   1
with    1
on  1
Run Code Online (Sandbox Code Playgroud)

说明:

第一个间隔给出所有单词的正常单词计数.

在第二个间隔中,hadoop发生了3次,但输出应为1(3-2)

火花发生3次,但输出应为2(3-1)

对于所有其他单词,它应该给出正常的单词计数.

因此,在处理第二个Interval数据时,它应该具有hadoop和 …

apache-spark spark-streaming

7
推荐指数
1
解决办法
6328
查看次数