RRM*_*RRM 10 java apache-kafka apache-spark spark-streaming
我想使用Spark(1.6.2)Streaming 从Kafka(broker v 0.10.2.1)中的主题接收消息.
我正在使用这种Receiver方法.代码如下代码:
public static void main(String[] args) throws Exception
{
SparkConf sparkConf = new SparkConf().setAppName("SimpleStreamingApp");
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(5000));
//
Map<String, Integer> topicMap = new HashMap<>();
topicMap.put("myTopic", 1);
//
String zkQuorum = "host1:port1,host2:port2,host3:port3";
//
Map<String, String> kafkaParamsMap = new HashMap<>();
kafkaParamsMap.put("bootstraps.server", zkQuorum);
kafkaParamsMap.put("metadata.broker.list", zkQuorum);
kafkaParamsMap.put("zookeeper.connect", zkQuorum);
kafkaParamsMap.put("group.id", "group_name");
kafkaParamsMap.put("security.protocol", "SASL_PLAINTEXT");
kafkaParamsMap.put("security.mechanism", "GSSAPI");
kafkaParamsMap.put("ssl.kerberos.service.name", "kafka");
kafkaParamsMap.put("key.deserializer", "kafka.serializer.StringDecoder");
kafkaParamsMap.put("value.deserializer", "kafka.serializer.DefaultDecoder");
//
JavaPairReceiverInputDStream<byte[], byte[]> stream = KafkaUtils.createStream(javaStreamingContext,
byte[].class, byte[].class,
DefaultDecoder.class, DefaultDecoder.class,
kafkaParamsMap,
topicMap,
StorageLevel.MEMORY_ONLY());
VoidFunction<JavaPairRDD<byte[], byte[]>> voidFunc = new VoidFunction<JavaPairRDD<byte[], byte[]>> ()
{
public void call(JavaPairRDD<byte[], byte[]> rdd) throws Exception
{
List<Tuple2<byte[], byte[]>> all = rdd.collect();
System.out.println("size of red: " + all.size());
}
}
stream.forEach(voidFunc);
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}
Run Code Online (Sandbox Code Playgroud)
访问Kafka是kerberized.当我发射
spark-submit --verbose --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf"
--files jaas.conf,privKey.der
--principal <accountName>
--keytab <path to keytab file>
--master yarn
--jars <comma separated path to all jars>
--class <fully qualified java main class>
<path to jar file containing main class>
VerifiableProperties来自Kafka的类记录了hashmap中包含的属性的警告消息kafkaParams:Run Code Online (Sandbox Code Playgroud)INFO KafkaReceiver: connecting to zookeeper: <the correct zookeeper quorum provided in kafkaParams map> VerifiableProperties: Property auto.offset.reset is overridden to largest VerifiableProperties: Property enable.auto.commit is not valid. VerifiableProperties: Property sasl.kerberos.service.name is not valid VerifiableProperties: Property key.deserializer is not valid ... VerifiableProperties: Property zookeeper.connect is overridden to ....
我认为因为这些属性不被接受,所以它可能会影响流处理.
**当我在群集模式下启动时--master yarn,这些警告消息不会出现**
之后,我看到配置后每5秒重复一次以下日志:
INFO BlockRDD: Removing RDD 4 from persistence list
INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[4] at createStream at ...
INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
INFO ... INFO BlockManager: Removing RDD 4
但是,我没有看到在控制台上打印任何实际消息.
问题:为什么我的代码不打印任何实际消息?
我的gradle依赖是:
compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '1.6.2'
compile group: 'org.apache.spark', name: 'spark-streaming_2.10', version: '1.6.2'
compile group: 'org.apache.spark', name: 'spark-streaming-kafka_2.10', version: '1.6.2'
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
584 次 |
| 最近记录: |