java.lang.ClassCastException: [解析 json[String,String] 时无法将 B 转换为 java.lang.String

jac*_*hik 1 scala sbt spark-streaming kafka-consumer-api

我又来了,我尝试使用用 scala -2.10.5 编写的 Spark Streaming_1.6.1 类从 kafka_0.9.0.0 主题读取数据。这是一个简单的程序,我在 sbt_0.13.12 中构建了它。当我运行该程序时,我收到此异常

(run-main-0) org.apache.spark.SparkException: 由于阶段失败,作业中止:阶段 1.0 中的任务 0 失败 1 次,最近一次失败:阶段 1.0 中丢失任务 0.0 (TID 1,本地主机):java.lang. lang.ClassCastException:[B 无法在 org.kafka.receiver.AvroCons$$anonfun$1.apply(AvroConsumer.scala:54) [错误] 在 org.kafka.receiver.AvroCons 处转换为 java.lang.String [错误] $$anonfun$1.apply(AvroConsumer.scala:54) [错误] 位于 scala.collection.Iterator$$anon$11.next(Iterator.scala:328) [错误]
位于 org.apache.spark.util.Utils$。 getIteratorSize(Utils.scala:1597) [错误] 在 org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1157) [错误] 在 org.apache.spark.rdd.RDD$ $anonfun$count$1.apply(RDD.scala:1157) [错误] 在 org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) [错误] 在 org.apache.spark。 SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) [错误] 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) [错误] 在 org.apache.spark.scheduler。 Task.run(Task.scala:89) [错误] 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) [错误] 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1145) [错误] 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [错误] 在 java.lang.Thread.run(Thread.java:745) [错误] [错误]驱动程序堆栈跟踪:org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1.0 中的任务 0 失败 1 次,最近一次失败:阶段 1.0 中丢失任务 0.0(TID 1,本地主机):java.lang.ClassCastException: [B 无法转换为 java.lang.String

这是scala程序,

1 package org.kafka.receiver
      2 case class mobileData(action: String, tenantid: Int, lat: Float, lon: Float, memberid: Int, event_name: String, productUpccd: Int, device_type: String, device_os_ver: Float, item_na        me: String)
      3 import java.util.HashMap
      4 import org.apache.avro.SchemaBuilder
      5 import org.apache.spark.SparkConf
      6 import org.apache.spark.SparkContext
      7 import org.apache.spark.serializer.KryoSerializer
      8 import org.apache.spark.storage.StorageLevel
      9 import org.apache.spark.streaming.Seconds
     10 import org.apache.spark.streaming.StreamingContext
     11 import org.apache.spark.streaming.StreamingContext._
     12 import org.apache.spark.SparkContext._
     13 import org.apache.spark.streaming.dstream.ReceiverInputDStream
     14 import org.apache.spark.streaming.kafka.KafkaUtils
     15 import kafka.serializer.DefaultDecoder
     16 import org.apache.spark.sql.SQLContext
     17 import com.sun.istack.internal.logging.Logger
     18 object AvroCons {
     19   val eventSchema = SchemaBuilder.record("eventRecord").fields
     20     .name("action").`type`().stringType().noDefault()
     21     .name("tenantid").`type`().intType().noDefault()
     22     .name("lat").`type`().doubleType().noDefault()
     23     .name("lon").`type`().doubleType().noDefault()
     24     .name("memberid").`type`().intType().noDefault()
     25     .name("event_name").`type`().stringType().noDefault()
     26     .name("productUpccd").`type`().intType().noDefault()
     27     .name("device_type").`type`().stringType().noDefault()
     28     .name("device_os_ver").`type`().stringType().noDefault()
     29     .name("item_name").`type`().stringType().noDefault().endRecord
     30     def main(args: Array[String]): Unit = {
     31
     32     val sparkConf = new SparkConf().setAppName("Avro Consumer").
     33       set("spark.driver.allowMultipleContexts", "true").setMaster("local[2]")
     34     sparkConf.set("spark.cores.max", "2")
     35     sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
     36     sparkConf.set("spark.sql.tungsten.enabled", "true")
     37     sparkConf.set("spark.eventLog.enabled", "true")
     38     sparkConf.set("spark.app.id", "KafkaConsumer")
     39     sparkConf.set("spark.io.compression.codec", "snappy")
     40     sparkConf.set("spark.rdd.compress", "true")
     41     sparkConf.set("spark.streaming.backpressure.enabled", "true")
     42     sparkConf.set("spark.sql.avro.compression.codec", "snappy")
     43     sparkConf.set("spark.sql.avro.mergeSchema", "true")
     44     sparkConf.set("spark.sql.avro.binaryAsString", "true")
     45       val sc = new SparkContext(sparkConf)
     46     sc.hadoopConfiguration.set("avro.enable.summary-metadata", "false")
     47     val ssc = new StreamingContext(sc, Seconds(2))
     48     val kafkaConf = Map[String, String]("metadata.broker.list" -> "############:9092",
     49         "zookeeper.connect" -> "#############",
     50         "group.id" -> "KafkaConsumer",
     51         "zookeeper.connection.timeout.ms" -> "1000000")
     52       val topicMaps = Map("fishbowl" -> 1)
     53       val messages  = KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder](ssc, kafkaConf, topicMaps, StorageLevel.MEMORY_ONLY_SER)

     54   messages.print()    
     55   val lines = messages.map(x=>x._2);  lines.foreachRDD((rdd,time)=>{
     56 val count = rdd.count()
     57 if(count>0)
     58 rdd.foreach(record=>{println(record)})})
     59
     60   ssc.start()
     61     ssc.awaitTermination()
     62     }
     63
     64 }
Run Code Online (Sandbox Code Playgroud)

这是我的 build.sbt

name := "AvroConsumer" 
version := "1.0" 
scalaVersion := "2.10.6"
jarName in assembly := "AvroConsumer.jar" 

libraryDependencies  += "org.apache.spark" % "spark-core_2.10" % "1.6.1" % "provided"    

libraryDependencies  += "org.apache.spark" % "spark-sql_2.10" % "1.6.1" % "provided" 

libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.1"

libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-assembly_2.10" % "1.6.1"

libraryDependencies  += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.1" 

libraryDependencies  += "org.codehaus.jackson" % "jackson-mapper-asl" % "1.9.13"

libraryDependencies += "org.openrdf.sesame" % "sesame-rio-api" % "2.7.2" 

libraryDependencies += "com.databricks" % "spark-csv_2.10" %  "0.1"

libraryDependencies += "org.apache.avro" % "avro" % "1.8.1"

libraryDependencies += "log4j" % "log4j" % "1.2.17"

libraryDependencies += "org.apache.avro" % "avro-tools" % "1.7.4"

assemblyMergeStrategy in assembly := {  case PathList("META-INF", xs @
_*) => MergeStrategy.discard  case x => MergeStrategy.first }   
Run Code Online (Sandbox Code Playgroud)

我正在准备此代码以从 kafka 主题创建 DF,因此我必须在 SparkConf() 中设置所有这些属性。这是我传入数据的架构,

{
  "action": "AppEvent",
  "tenantid": 299,
  "lat": 0.0,
  "lon": 0.0,
  "memberid": 16445,
  "event_name": "CATEGORY_CLICK",
  "productUpccd": 0,
  "device_type": "iPhone",
  "device_os_ver": "10.1",
  "item_name": "CHICKEN"
}
Run Code Online (Sandbox Code Playgroud)

这是我的卡夫卡生产者课程。

public class KafkaAvroProducer {

    /* case class
     TopicData("action":"AppEvent","tenantid":1173,"lat":0.0,"lon":0.0,"memberid":55,
     "event_name":"CATEGORY_CLICK",
     "productUpccd":0,"device_type":"iPhone","device_os_ver":"10.1","item_name":"CHICKEN",*/

    public static final String EVENT_SCHEMA = "{" + "\"type\":\"record\","
            + "\"name\":\"eventrecord\"," + "\"fields\":["
            + "  { \"name\":\"action\", \"type\":\"string\" },"
            + "  { \"name\":\"tenantid\", \"type\":\"int\" },"
            + "  { \"name\":\"lat\", \"type\":\"double\" },"
            + "  { \"name\":\"lon\", \"type\":\"double\" },"
            + "  { \"name\":\"memberid\", \"type\":\"int\" },"
            + "  { \"name\":\"event_name\", \"type\":\"string\" },"
            + "  { \"name\":\"productUpccd\", \"type\":\"int\" },"
            + "  { \"name\":\"device_type\", \"type\":\"string\" },"
            + "  { \"name\":\"device_os_ver\", \"type\":\"string\" },"
            + "{ \"name\":\"item_name\", \"type\":\"string\" }" + "]}";

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "##########:9092");
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("producer.type", "async");
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(EVENT_SCHEMA);
        Injection<GenericRecord, String> avroRecords = GenericAvroCodecs.toJson(schema);
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i<300;i++){
            GenericData.Record avroRecord = new GenericData.Record(schema);
            setEventValues(i, avroRecord);
            String messages = avroRecords.apply(avroRecord);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("fishbowl",String.valueOf(i),messages);
            System.out.println(producerRecord);
            producer.send(producerRecord);

        }
        producer.close();
    }

    private static void setEventValues(int i, Record avroRecord) {

        avroRecord.put("action", "AppEvent");
        avroRecord.put("tenantid", i);
        avroRecord.put("lat", i*0.0);
        avroRecord.put("lon", 0.0);
        avroRecord.put("memberid", i*55);
        avroRecord.put("event_name", "CATEGORY_CLICK");
        avroRecord.put("productUpccd", 0);
        avroRecord.put("device_type", "iPhone");
        avroRecord.put("device_os_ver", "10.1");
        avroRecord.put("item_name", "CHICKEN");
    }

}
Run Code Online (Sandbox Code Playgroud)

maa*_*asg 5

Kafka 消费者应该配置为使用正确的解码器:

代替:

KafkaUtils.createStream[String, String,DefaultDecoder, DefaultDecoder]
Run Code Online (Sandbox Code Playgroud)

因为String它应该是:

KafkaUtils.createStream[String, String,StringDecoder, StringDecoder]
Run Code Online (Sandbox Code Playgroud)