小编Ram*_*ram的帖子

java.lang.NoClassDefFoundError:在scala代码中通过spark-submit启动spark作业时无法初始化类

我有一个代码,如下所示

 object ErrorTest {
case class APIResults(status:String, col_1:Long, col_2:Double, ...)

def funcA(rows:ArrayBuffer[Row])(implicit defaultFormats:DefaultFormats):ArrayBuffer[APIResults] = {
  //call some API ang get results and return APIResults
  ...
}

// MARK: load properties
val props = loadProperties()
private def loadProperties(): Properties =  {
  val configFile = new File("config.properties")
  val reader = new FileReader(configFile)
  val props = new Properties()
  props.load(reader)
  props
}

def main(args: Array[String]): Unit = {
  val prop_a = props.getProperty("prop_a")

  val session = Context.initialSparkSession();
  import session.implicits._

  val initialSet = ArrayBuffer.empty[Row]
  val addToSet …

java scala apache-spark apache-spark-sql spark-dataframe

8
推荐指数
1
解决办法
5201
查看次数

flatten和flatMap(身份)之间有什么区别吗?

scala> List(List(1), List(2), List(3), List(4))
res18: List[List[Int]] = List(List(1), List(2), List(3), List(4))

scala> res18.flatten
res19: List[Int] = List(1, 2, 3, 4)

scala> res18.flatMap(identity)
res20: List[Int] = List(1, 2, 3, 4)
Run Code Online (Sandbox Code Playgroud)

这两个功能有什么区别吗?何时使用一个而不是另一个?有任何权衡吗?

functional-programming scala flatmap

7
推荐指数
2
解决办法
5968
查看次数

无法找到Kafka Producer - org.apache.kafka.common.serialization.StringSerializer

我创建了一个简单的Kafka Producer&Consumer.我使用的是kafka_2.11-0.9.0.0.这是我的制作人代码,

public class KafkaProducerTest {
public static String topicName = "test-topic-2";
public static void main(String[] args) {
    // TODO Auto-generated method stub
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer",
            StringSerializer.class.getName());
    props.put("value.serializer",
            StringSerializer.class.getName());

    Producer<String, String> producer = new KafkaProducer(props);
    for (int i = 0; i < 100; i++) {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
                topicName, Integer.toString(i), Integer.toString(i));
        System.out.println(producerRecord);
        producer.send(producerRecord);
    }

    producer.close();
}

}
Run Code Online (Sandbox Code Playgroud)

在启动捆绑ia时遇到以下错误,

2016-05-20 09:44:57,792 | …
Run Code Online (Sandbox Code Playgroud)

apache-karaf apache-kafka apache-spark spark-streaming-kafka

7
推荐指数
3
解决办法
1万
查看次数

组合器和分区器之间的区别

我是MapReduce的新手,我无法弄清楚分区器和组合器的区别.我知道两者都在map和reduce任务之间的中间步骤中运行,并且都减少了reduce任务要处理的数据量.请用一个例子来解释差异.

hadoop mapreduce partitioner

7
推荐指数
2
解决办法
6441
查看次数

Spark 2.0中的全阶段代码生成

我听说过Whole-Stage Code Generationsql来优化查询.通过p539-neumann.pdfsparksql-sql-codegen-is-not-giving-any-improvemnt

但不幸的是,没有人回答上述问题.

很想知道使用Spark 2.0这个功能的场景是什么.但谷歌搜索后没有得到正确的用例.

每当我们使用sql时,我们可以使用此功能吗?如果是这样,任何正确的用例看到这个工作?

apache-spark apache-spark-sql

7
推荐指数
1
解决办法
3309
查看次数

Spark:当加入2个大型DF时,大小超过Integer.MAX_VALUE

伙计们,

我正在尝试在每行一个单个密钥标识符的spark中加入2个大型数据帧(每个100GB +)时遇到此问题.

我在EMR上使用Spark 1.6,这就是我正在做的事情:

val df1 = sqlContext.read.json("hdfs:///df1/")
val df2 = sqlContext.read.json("hdfs:///df2/")

// clean up and filter steps later 

df1.registerTempTable("df1")
df2.registerTempTable("df2")

val df3 = sql("select df1.*, df2.col1 from df1 left join df2 on df1.col3 = df2.col4")

df3.write.json("hdfs:///df3/")
Run Code Online (Sandbox Code Playgroud)

这基本上是我正在做的事情的要点,以及最终加入df1和df2之间的其他清理和过滤步骤.

我看到的错误是:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
    at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:860)
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:127)
    at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:115)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1250)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:129)
    at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:136)
    at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:503)
    at org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:420)
    at org.apache.spark.storage.BlockManager.get(BlockManager.scala:625)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

7
推荐指数
1
解决办法
2516
查看次数

在java中计算S3对象(文件夹)大小

我将所有类型的文件存储在亚马逊 S3 上。在亚马逊S3存储桶中,所有文件都存储在不同的文件夹中,我知道亚马逊S3中没有文件夹的概念。对象只能通过它们的键来识别。如果我存储任何带有“mydocs/personal/profile-pic.jpg”之类的密钥的文件,这意味着将在那里创建两个父文件夹(mydocs 文件夹内的个人文件夹)。

我想计算java中像“mydocs”这样的任何文件夹的大小。我使用下面给出的代码计算了存储桶总大小:

public long calculateBucketSize(String bucketName) {
long totalSize = 0;
    int totalItems = 0;
    ObjectListing objects = listObjects(bucketName);
    do {
        for (S3ObjectSummary objectSummary : objects.getObjectSummaries()) {
            totalSize += objectSummary.getSize();
            totalItems++;
        }
        objects = listNextBatchOfObjects(objects);
    } while (objects.isTruncated());
    System.out.println("Amazon S3 bucket: " + bucketName + " containing "
            + totalItems + " objects with a total size of " + totalSize
            + " bytes.");

    return totalSize;
}
Run Code Online (Sandbox Code Playgroud)

此方法将返回存储桶的总大小。我想计算任何单个文件夹的大小。任何帮助将不胜感激。

java amazon-s3 amazon-web-services aws-sdk

6
推荐指数
1
解决办法
8323
查看次数

java.lang.StackOverflowError使用Kryo序列化彼此引用的对象时

我有一个类似图形的对象,我从服务器发送到包含nodes它的客户端adjacencyLists.

我有类似的东西:

Clearing c1 = new Clearing(1, 134, 151);
Clearing c6 = new Clearing(6, 250, 88);

c1.adjacentByPath.add(new Path(1, c6));
c6.adjacentByPath.add(new Path(1, c1));
Run Code Online (Sandbox Code Playgroud)

每次我发送包含这些清除的对象时,我收到以下错误:

Exception in thread "Server" java.lang.StackOverflowError
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:448)
at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
    ......
Run Code Online (Sandbox Code Playgroud)

在Kryonet有解决方法吗?谢谢

java kryo kryonet apache-spark

6
推荐指数
1
解决办法
1179
查看次数

Spark列出所有缓存的RDD名称

我是Apache Spark的新手,我创建了几个RDD和DataFrames,缓存它们,现在我想通过使用下面的命令来解决它们中的一些问题

rddName.unpersist()
Run Code Online (Sandbox Code Playgroud)

但我不记得他们的名字.我使用sc.getPersistentRDDs但输出不包括名称.我还使用浏览器查看缓存的rdds,但同样没有名称信息.我错过了什么吗?

java scala dataframe apache-spark rdd

6
推荐指数
2
解决办法
4061
查看次数

如何拆分逗号分隔的字符串并在Spark Scala数据帧中获取n个值?

如何从arraytypeSpark Scala中的列中仅获取2个数据?我得到的数据就像val df = spark.sqlContext.sql("select col1, col2 from test_tbl").

我有以下数据:

col1  | col2                              
---   | ---
a     | [test1,test2,test3,test4,.....]   
b     | [a1,a2,a3,a4,a5,.....]       
Run Code Online (Sandbox Code Playgroud)

我想获得如下数据:

col1| col2
----|----
a   | test1,test2
b   | a1,a2
Run Code Online (Sandbox Code Playgroud)

当我这样做时df.withColumn("test", col("col2").take(5))它不起作用.它给出了这个错误:

value take不是org.apache.spark.sql.ColumnName的成员

如何按上述顺序获取数据?

scala dataframe apache-spark apache-spark-sql spark-dataframe

6
推荐指数
1
解决办法
2844
查看次数