小编jac*_*hik的帖子

计数存储在kafka主题中的消息数

我正在使用0.9.0.0版本的Kafka,我想在不使用管理脚本kafka-console-consumer.sh的情况下计算主题中的消息数.

我已经尝试了答案Java中的所有命令,如何在apache kafka中获取主题中的消息数量, 但没有人产生结果.有人可以帮我从这里出去吗?

apache-kafka jms-topic kafka-consumer-api

23
推荐指数
3
解决办法
4万
查看次数

如何将时间戳记作为额外列添加到数据框

*大家好,

我对大家有一个简单的问题。我有一个使用createStream方法从kafka流创建的RDD。现在我想在转换为数据帧之前将时间戳记作为此rdd的值添加。我尝试使用withColumn()向数据框添加值,但返回此错误*

val topicMaps = Map("topic" -> 1)
    val now = java.util.Calendar.getInstance().getTime()

    val messages = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)

      messages.foreachRDD(rdd =>
          {

            val sqlContext = new org.apache.spark.sql.SQLContext(sc)
            import sqlContext.implicits._

            val dataframe = sqlContext.read.json(rdd.map(_._2))



        val d =dataframe.withColumn("timeStamp_column",dataframe.col("now"))
Run Code Online (Sandbox Code Playgroud)

val d = dataframe.withColumn(“ timeStamp_column”,dataframe.col(“ now”))org.apache.spark.sql.AnalysisException:无法解析(动作,device_os_ver,device_type,event_name,item_name,lat, ,lon,memberid,productUpccd,tenantid);在org.apache.spark.sql.DataFrame $$ anonfun $ resolve $ 1.apply(DataFrame.scala:15

据我所知,DataFrames不可更改,因为它们是不可变的,但RDD也是不可变的。那么什么是最好的方法。如何将值添加到RDD(将时间戳动态添加到RDD)。

immutability apache-spark rdd spark-dataframe

4
推荐指数
2
解决办法
9126
查看次数

java.lang.ClassCastException: [解析 json[String,String] 时无法将 B 转换为 java.lang.String

我又来了,我尝试使用用 scala -2.10.5 编写的 Spark Streaming_1.6.1 类从 kafka_0.9.0.0 主题读取数据。这是一个简单的程序,我在 sbt_0.13.12 中构建了它。当我运行该程序时,我收到此异常

(run-main-0) org.apache.spark.SparkException: 由于阶段失败,作业中止:阶段 1.0 中的任务 0 失败 1 次,最近一次失败:阶段 1.0 中丢失任务 0.0 (TID 1,本地主机):java.lang. lang.ClassCastException:[B 无法在 org.kafka.receiver.AvroCons$$anonfun$1.apply(AvroConsumer.scala:54) [错误] 在 org.kafka.receiver.AvroCons 处转换为 java.lang.String [错误] $$anonfun$1.apply(AvroConsumer.scala:54) [错误] 位于 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) [错误]
位于 org.apache.spark.util.Utils$。 getIteratorSize(Utils.scala:1597) [错误] 在 org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157) [错误] 在 org.apache.spark.rdd.RDD$ $anonfun$count$1.apply(RDD.scala:1157) [错误] 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) [错误] 在 org.apache.spark。 SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) [错误] 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) [错误] 在 org.apache.spark.scheduler。 Task.run(Task.scala:89) [错误] 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) [错误] 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1145) [错误] 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [错误] 在 java.lang.Thread.run(Thread.java:745) [错误] [错误]驱动程序堆栈跟踪:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1.0 中的任务 …

scala sbt spark-streaming kafka-consumer-api

1
推荐指数
1
解决办法
9408
查看次数