我正在使用0.9.0.0版本的Kafka,我想在不使用管理脚本kafka-console-consumer.sh的情况下计算主题中的消息数.
我已经尝试了答案Java中的所有命令,如何在apache kafka中获取主题中的消息数量, 但没有人产生结果.有人可以帮我从这里出去吗?
*大家好,
我对大家有一个简单的问题。我有一个使用createStream方法从kafka流创建的RDD。现在我想在转换为数据帧之前将时间戳记作为此rdd的值添加。我尝试使用withColumn()向数据框添加值,但返回此错误*
val topicMaps = Map("topic" -> 1)
val now = java.util.Calendar.getInstance().getTime()
val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)
messages.foreachRDD(rdd =>
{
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val dataframe = sqlContext.read.json(rdd.map(_._2))
val d =dataframe.withColumn("timeStamp_column",dataframe.col("now"))
Run Code Online (Sandbox Code Playgroud)
val d = dataframe.withColumn(“ timeStamp_column”,dataframe.col(“ now”))org.apache.spark.sql.AnalysisException:无法解析(动作,device_os_ver,device_type,event_name,item_name,lat, ,lon,memberid,productUpccd,tenantid);在org.apache.spark.sql.DataFrame $$ anonfun $ resolve $ 1.apply(DataFrame.scala:15
据我所知,DataFrames不可更改,因为它们是不可变的,但RDD也是不可变的。那么什么是最好的方法。如何将值添加到RDD(将时间戳动态添加到RDD)。
我又来了,我尝试使用用 scala -2.10.5 编写的 Spark Streaming_1.6.1 类从 kafka_0.9.0.0 主题读取数据。这是一个简单的程序,我在 sbt_0.13.12 中构建了它。当我运行该程序时,我收到此异常
(run-main-0) org.apache.spark.SparkException: 由于阶段失败,作业中止:阶段 1.0 中的任务 0 失败 1 次,最近一次失败:阶段 1.0 中丢失任务 0.0 (TID 1,本地主机):java.lang. lang.ClassCastException:[B 无法在 org.kafka.receiver.AvroCons$$anonfun$1.apply(AvroConsumer.scala:54) [错误] 在 org.kafka.receiver.AvroCons 处转换为 java.lang.String [错误] $$anonfun$1.apply(AvroConsumer.scala:54) [错误] 位于 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) [错误]
位于 org.apache.spark.util.Utils$。 getIteratorSize(Utils.scala:1597) [错误] 在 org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157) [错误] 在 org.apache.spark.rdd.RDD$ $anonfun$count$1.apply(RDD.scala:1157) [错误] 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) [错误] 在 org.apache.spark。 SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) [错误] 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) [错误] 在 org.apache.spark.scheduler。 Task.run(Task.scala:89) [错误] 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) [错误] 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1145) [错误] 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [错误] 在 java.lang.Thread.run(Thread.java:745) [错误] [错误]驱动程序堆栈跟踪:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1.0 中的任务 …