我是新来的火花,我想使用group-by&reduce从CSV中找到以下内容(使用一行):
Department, Designation, costToCompany, State
Sales, Trainee, 12000, UP
Sales, Lead, 32000, AP
Sales, Lead, 32000, LA
Sales, Lead, 32000, TN
Sales, Lead, 32000, AP
Sales, Lead, 32000, TN
Sales, Lead, 32000, LA
Sales, Lead, 32000, LA
Marketing, Associate, 18000, TN
Marketing, Associate, 18000, TN
HR, Manager, 58000, TN
Run Code Online (Sandbox Code Playgroud)
我想通过Department,Designation,State简化包含sum(costToCompany)和TotalEmployeeCount的附加列的CSV
应得到如下结果:
Dept, Desg, state, empCount, totalCost
Sales,Lead,AP,2,64000
Sales,Lead,LA,3,96000
Sales,Lead,TN,2,64000
Run Code Online (Sandbox Code Playgroud)
有没有办法使用转换和动作来实现这一点.或者我们应该进行RDD操作?
我正在使用spark 1.3.1 prebuild version spark-1.3.1-bin-hadoop2.6.tgz
线程"main"中的异常java.lang.NoSuchMethodError:scala.Predef $.$ conforms()Lscala/Predef $$ less $ colon $ less; org.apache.spark.util.Utils $ .getSystemProperties(Utils.scala:1418)org.apache.spark.SparkConf.(SparkConf.scala:58)org.apache.spark.SparkConf.(SparkConf.scala: 52)在com.zoho.zbi.Testing.test(Testing.java:43)com.zoho.zbi.Testing.main(Testing.java:39)使用Spark的默认log4j配置文件:org/apache/spark/log4j- defaults.properties
我正在尝试一个简单的演示应用程序来保存到cassandra
SparkConf batchConf= new SparkConf()
.setSparkHome(sparkHome)
.setJars(jars)
.setAppName(ZohoBIConstants.getAppName("cassandra"))//NO I18N
.setMaster(master).set("spark.cassandra.connection.host", "localhost");
JavaSparkContext sc = new JavaSparkContext(batchConf);
// here we are going to save some data to Cassandra...
List<Person> people = Arrays.asList(
Person.newInstance(1, "John", new Date()),
Person.newInstance(2, "Anna", new Date()),
Person.newInstance(3, "Andrew", new Date())
);
// Person test = Person.newInstance(1, "vini", new Date())''
System.out.println("Inside Java API Demo : "+people);
JavaRDD<Person> rdd = …Run Code Online (Sandbox Code Playgroud) 我正在运行24X7火花流,并使用updateStateByKey函数来保存计算的历史数据,就像在NetworkWordCount示例中一样.
我尝试使用3lac记录流式传输文件,每1500条记录休眠1秒.我正在使用3名工人
错误执行程序:任务ID 1635中的异常java.lang.ArrayIndexOutOfBoundsException:3
14/10/23 21:20:43 ERROR TaskSetManager: Task 29170.0:2 failed 1 times; aborting job
14/10/23 21:20:43 ERROR DiskBlockManager: Exception while deleting local spark dir: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232
java.io.IOException: Failed to delete: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232/24
14/10/23 21:20:43 ERROR Executor: Exception in task ID 8037
java.io.FileNotFoundException: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232/22/shuffle_81_0_1 (No such file or directory)
at java.io.FileOutputStream.open(Native Method)
Run Code Online (Sandbox Code Playgroud)
怎么办呢?我想updateStateByKey应该随着它的快速增长而定期重置,请分享关于何时以及如何重置updateStateByKey的一些例子..还是我有任何其他问题?一些启发.
任何帮助深表感谢.谢谢你的时间
我正在使用maven
我添加了以下依赖项
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.1.0</version>
</dependency> <dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.1.0</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
我还在代码中添加了jar
SparkConf sparkConf = new SparkConf().setAppName("KafkaSparkTest");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
sc.addJar("/home/test/.m2/repository/org/apache/spark/spark-streaming-kafka_2.10/1.0.2/spark-streaming-kafka_2.10-1.0.2.jar");
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000));
Run Code Online (Sandbox Code Playgroud)
它可以很好地解决任何错误,当我通过spark-submit运行时,我收到以下错误,非常感谢任何帮助.谢谢你的时间.
bin/spark-submit --class "KafkaSparkStreaming" --master local[4] try/simple-project/target/simple-project-1.0.jar
Run Code Online (Sandbox Code Playgroud)
线程"main"中的异常java.lang.NoClassDefFoundError:org/apache/spark/streaming/kafka/KafkaUtils位于KafkaSparkStreamingTest(KafkaSparkStreaming.java:40),位于sun.reflect的KafkaSparkStreaming.main(KafkaSparkStreaming.java:23). sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)中的NativeMethodAccessorImpl.invoke0(Native Method)位于sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method. java:606)org.apache.spark.deploy.SparkSubmit $ .launch(SparkSubmit.scala:303)atg.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:55)at org.apache.spark .deploy.SparkSubmit.main(SparkSubmit.scala)引起:java.lang.ClassNotFoundException:java.net.URLClassLoader $ 1.run(URLClassLoader.java:366)中的org.apache.spark.streaming.kafka.KafkaUtils
我希望只使用kafka读取火花流中的最新消息,但它也会获取过去的数据
如何在KafkaUtil中为spark设置auto.offset.reset
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
Run Code Online (Sandbox Code Playgroud)
如何将conf设置为仅获取当前消息.请举个例子.
在此先感谢,还有另一个线程
但还不够,请帮助我.提前致谢.
我正在运行一个24/7的火花流和使用updateStateByKey是可以24/7运行火花流?如果是,则updateStateByKey不会变大,如何处理它?我们是否必须在24/7运行时定期重置/删除updateStateByKey,如果没有重置方式和时间?或Spark以分布式方式处理?如何动态增加内存/存储空间.
当updateStateByKey增长时,我得到以下错误
Array out of bound exception
Exception while deleting local spark dir: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141026101251-cfb4
java.io.IOException: Failed to delete: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141026101251-cfb4
Run Code Online (Sandbox Code Playgroud)
如何处理这个..请指出我是否有任何文件?我完全陷入困境,非常感谢任何帮助..感谢您的时间
我有一个50MB的csv数据,有没有可能我可以压缩数据加载d3.js/dc.js图表,现在页面太慢我想优化它..任何帮助非常感谢
提前致谢