小编mit*_*hra的帖子

使用Apache Spark和Java将CSV解析为DataFrame/DataSet

我是新来的火花,我想使用group-by&reduce从CSV中找到以下内容(使用一行):

  Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN
Run Code Online (Sandbox Code Playgroud)

我想通过Department,Designation,State简化包含sum(costToCompany)TotalEmployeeCount的附加列的CSV

应得到如下结果:

  Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000
Run Code Online (Sandbox Code Playgroud)

有没有办法使用转换和动作来实现这一点.或者我们应该进行RDD操作?

java hadoop hdfs apache-spark apache-spark-sql

19
推荐指数
2
解决办法
6万
查看次数

Spark提交失败,包含java.lang.NoSuchMethodError:scala.Predef $.$ conforms()Lscala/Predef $$ less $ colon $ less;

我正在使用spark 1.3.1 prebuild version spark-1.3.1-bin-hadoop2.6.tgz

线程"main"中的异常java.lang.NoSuchMethodError:scala.Predef $.$ conforms()Lscala/Predef $$ less $ colon $ less; org.apache.spark.util.Utils $ .getSystemProperties(Utils.scala:1418)org.apache.spark.SparkConf.(SparkConf.scala:58)org.apache.spark.SparkConf.(SparkConf.scala: 52)在com.zoho.zbi.Testing.test(Testing.java:43)com.zoho.zbi.Testing.main(Testing.java:39)使用Spark的默认log4j配置文件:org/apache/spark/log4j- defaults.properties

我正在尝试一个简单的演示应用程序来保存到cassandra

SparkConf batchConf= new SparkConf()
            .setSparkHome(sparkHome)
            .setJars(jars)
            .setAppName(ZohoBIConstants.getAppName("cassandra"))//NO I18N
            .setMaster(master).set("spark.cassandra.connection.host", "localhost");

            JavaSparkContext sc = new JavaSparkContext(batchConf);
            // here we are going to save some data to Cassandra...
            List<Person> people = Arrays.asList(
                    Person.newInstance(1, "John", new Date()),
                    Person.newInstance(2, "Anna", new Date()),
                    Person.newInstance(3, "Andrew", new Date())
            );
//          Person test = Person.newInstance(1, "vini", new Date())''
            System.out.println("Inside Java API Demo : "+people);
            JavaRDD<Person> rdd = …
Run Code Online (Sandbox Code Playgroud)

java maven cassandra-2.0 apache-spark

11
推荐指数
1
解决办法
3万
查看次数

Spark Streaming UpdateStateByKey

我正在运行24X7火花流,并使用updateStateByKey函数来保存计算的历史数据,就像在NetworkWordCount示例中一样.

我尝试使用3lac记录流式传输文件,每1500条记录休眠1秒.我正在使用3名工人

  1. 在一段时间内updateStateByKey正在增长,然后程序抛出以下异常

错误执行程序:任务ID 1635中的异常java.lang.ArrayIndexOutOfBoundsException:3

14/10/23 21:20:43 ERROR TaskSetManager: Task 29170.0:2 failed 1 times; aborting job
14/10/23 21:20:43 ERROR DiskBlockManager: Exception while deleting local spark dir: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232
java.io.IOException: Failed to delete: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232/24

14/10/23 21:20:43 ERROR Executor: Exception in task ID 8037
java.io.FileNotFoundException: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232/22/shuffle_81_0_1 (No such file or directory)
    at java.io.FileOutputStream.open(Native Method)
Run Code Online (Sandbox Code Playgroud)

怎么办呢?我想updateStateByKey应该随着它的快速增长而定期重置,请分享关于何时以及如何重置updateStateByKey的一些例子..还是我有任何其他问题?一些启发.

任何帮助深表感谢.谢谢你的时间

spark-streaming

5
推荐指数
1
解决办法
777
查看次数

Spark Kafka Streaming Issue

我正在使用maven

我添加了以下依赖项

   <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.10</artifactId>
      <version>1.1.0</version>
    </dependency>   <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka_2.10</artifactId>
      <version>1.1.0</version>
    </dependency>
Run Code Online (Sandbox Code Playgroud)

我还在代码中添加了jar

SparkConf sparkConf = new SparkConf().setAppName("KafkaSparkTest");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
sc.addJar("/home/test/.m2/repository/org/apache/spark/spark-streaming-kafka_2.10/1.0.2/spark-streaming-kafka_2.10-1.0.2.jar");
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000)); 
Run Code Online (Sandbox Code Playgroud)

它可以很好地解决任何错误,当我通过spark-submit运行时,我收到以下错误,非常感谢任何帮助.谢谢你的时间.

bin/spark-submit --class "KafkaSparkStreaming" --master local[4] try/simple-project/target/simple-project-1.0.jar
Run Code Online (Sandbox Code Playgroud)

线程"main"中的异常java.lang.NoClassDefFoundError:org/apache/spark/streaming/kafka/KafkaUtils位于KafkaSparkStreamingTest(KafkaSparkStreaming.java:40),位于sun.reflect的KafkaSparkStreaming.main(KafkaSparkStreaming.java:23). sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)中的NativeMethodAccessorImpl.invoke0(Native Method)位于sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method. java:606)org.apache.spark.deploy.SparkSubmit $ .launch(SparkSubmit.scala:303)atg.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:55)at org.apache.spark .deploy.SparkSubmit.main(SparkSubmit.scala)引起:java.lang.ClassNotFoundException:java.net.URLClassLoader $ 1.run(URLClassLoader.java:366)中的org.apache.spark.streaming.kafka.KafkaUtils

java maven apache-kafka apache-spark

4
推荐指数
1
解决办法
6153
查看次数

阅读最新的spark kafka流媒体

我希望只使用kafka读取火花流中的最新消息,但它也会获取过去的数据

如何在KafkaUtil中为spark设置auto.offset.reset

JavaPairReceiverInputDStream<String, String> messages =
            KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
Run Code Online (Sandbox Code Playgroud)

如何将conf设置为仅获取当前消息.请举个例子.

在此先感谢,还有另一个线程

但还不够,请帮助我.提前致谢.

apache-kafka apache-spark spark-streaming

4
推荐指数
1
解决办法
1140
查看次数

带有updateStateByKey问题的Spark 24X7流式传输

我正在运行一个24/7的火花流和使用updateStateByKey是可以24/7运行火花流?如果是,则updateStateByKey不会变大,如何处理它?我们是否必须在24/7运行时定期重置/删除updateStateByKey,如果没有重置方式和时间?或Spark以分布式方式处理?如何动态增加内存/存储空间.

当updateStateByKey增长时,我得到以下错误

Array out of bound exception

Exception while deleting local spark dir: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141026101251-cfb4
java.io.IOException: Failed to delete: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141026101251-cfb4
Run Code Online (Sandbox Code Playgroud)

如何处理这个..请指出我是否有任何文件?我完全陷入困境,非常感谢任何帮助..感谢您的时间

spark-streaming

3
推荐指数
1
解决办法
2233
查看次数

在d3中加载压缩的csv数据以进行页面优化

我有一个50MB的csv数据,有没有可能我可以压缩数据加载d3.js/dc.js图表​​,现在页面太慢我想优化它..任何帮助非常感谢

提前致谢

csv d3.js dc.js

1
推荐指数
1
解决办法
1424
查看次数