小编Vim*_*duk的帖子

Kafka获取错误在引导服务器中给出了无可解析的引导程序URL

我很确定bootstrap.servers是正确的.Maven中有什么冲突或Kafka有什么问题吗?

在此之前它成功地运作了.我添加了一些Maven或Spark然后出了点问题..

谁能知道如何解决它?

这是java中的kafka代码

Properties props = new Properties();
        props.put("bootstrap.servers", "x.xx.xxx.xxx:9092");
        props.put("metadata.broker.list", "x.xx.xxx.xxx:9091, x.xx.xxx.xxx:9092, x.xx.xxx.xxx:9093");

        props.put("producer.type", "async");
        props.put("batch.size", "500");
        props.put("compression.codec", "1");
        props.put("compression.topic", topicName);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<String, String>(
                    props);
Run Code Online (Sandbox Code Playgroud)

获取错误在引导服务器中没有给出可解析的引导程序URL,

[err]   at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335)
    [err]   at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:188)
    [err]   at com.wra.controller.ParserController.GetResumeUpload(ParserController.java:98)
    [err]   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    [err]   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
    [err]   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
    [err]   at java.lang.reflect.Method.invoke(Method.java:508)
    [err]   at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
    [err]   at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
    [err]   at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:114)
    [err]   at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
    [err]   at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
    [err]   at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
    [err]   at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:963)
    [err] …
Run Code Online (Sandbox Code Playgroud)

java maven apache-kafka kafka-producer-api

10
推荐指数
3
解决办法
2万
查看次数

如何在kafka中创建自定义序列化程序?

只有很少的序列化器可用,如,

org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.StringSerializer 
Run Code Online (Sandbox Code Playgroud)

我们如何创建自己的自定义序列化程序?

java distributed-computing apache-kafka kafka-consumer-api

7
推荐指数
2
解决办法
9301
查看次数

Java中的对象不可序列化(org.apache.kafka.clients.consumer.ConsumerRecord)spark kafka流

我很确定我只推送数据字符串并反序列化为String.记录我推它也显示错误.

但为什么突然出现这种类型的错误,我有什么遗漏的吗?

这是下面的代码,

    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.Arrays;
    import java.util.Collection;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Set;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.regex.Pattern;

    import scala.Tuple2;

    import kafka.serializer.StringDecoder;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.streaming.api.java.*;
    import org.apache.spark.streaming.kafka.HasOffsetRanges;
    import org.apache.spark.streaming.kafka10.*;
    import org.apache.spark.streaming.kafka.OffsetRange;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.Durations;

public final class KafkaConsumerDirectStream {
    public static void main(String[] args) throws Exception { 
       try {
                    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount11").setMaster("local[*]");
                    sparkConf.set("spark.streaming.concurrentJobs", "3");

                    // Create the context with 2 seconds batch size
                    JavaStreamingContext jssc = new …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-spark kafka-consumer-api

6
推荐指数
1
解决办法
3318
查看次数

java.lang.IllegalArgumentException: java.net.UnknownHostException: tmp

我工作一个应用程序 Spark-scala 并使用 sbt 构建项目,我的树状结构是: projectFilms/src/main/scala/AppFilms 我在 HDFS 中有 3 个文件,这些目录是:hdfs/tmp/projetFilms/<my_3_Files>,当我通过此命令行“sbt run”运行我的代码时,它生成一个错误:

java.lang.IllegalArgumentException: java.net.UnknownHostException: tmp
Run Code Online (Sandbox Code Playgroud)

和这个:

  [trace] Stack trace suppressed: run last compile:run for the full output.
 ERROR Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.InterruptedException
Run Code Online (Sandbox Code Playgroud)

这是我的代码:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd._   

object appFilms {

val conf = new SparkConf().setAppName("system of recommandation").setMaster("local[*]")
val sc = new SparkContext(conf)
def main(args: Array[String]) {

val files = sc.wholeTextFiles("hdfs://tmp/ProjetFilm/*.dat")
//val nbfiles = files.count
println("Hello my application!")
sc.stop()
} …
Run Code Online (Sandbox Code Playgroud)

scala sbt apache-spark

4
推荐指数
1
解决办法
1万
查看次数

PollableChannel 和 DirectChannel 有什么区别?

Spring Integration 支持多渠道。

这些渠道有什么区别?

如果用真实世界的例子回答,将不胜感激。

java spring spring-integration

2
推荐指数
1
解决办法
545
查看次数