我很确定bootstrap.servers是正确的.Maven中有什么冲突或Kafka有什么问题吗?
在此之前它成功地运作了.我添加了一些Maven或Spark然后出了点问题..
谁能知道如何解决它?
这是java中的kafka代码
Properties props = new Properties();
props.put("bootstrap.servers", "x.xx.xxx.xxx:9092");
props.put("metadata.broker.list", "x.xx.xxx.xxx:9091, x.xx.xxx.xxx:9092, x.xx.xxx.xxx:9093");
props.put("producer.type", "async");
props.put("batch.size", "500");
props.put("compression.codec", "1");
props.put("compression.topic", topicName);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<String, String>(
props);
Run Code Online (Sandbox Code Playgroud)
获取错误在引导服务器中没有给出可解析的引导程序URL,
[err] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335)
[err] at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:188)
[err] at com.wra.controller.ParserController.GetResumeUpload(ParserController.java:98)
[err] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[err] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
[err] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
[err] at java.lang.reflect.Method.invoke(Method.java:508)
[err] at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
[err] at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
[err] at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:114)
[err] at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
[err] at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
[err] at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
[err] at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:963)
[err] …
Run Code Online (Sandbox Code Playgroud) 只有很少的序列化器可用,如,
org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.StringSerializer
Run Code Online (Sandbox Code Playgroud)
我们如何创建自己的自定义序列化程序?
我很确定我只推送数据字符串并反序列化为String.记录我推它也显示错误.
但为什么突然出现这种类型的错误,我有什么遗漏的吗?
这是下面的代码,
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import scala.Tuple2;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.HasOffsetRanges;
import org.apache.spark.streaming.kafka10.*;
import org.apache.spark.streaming.kafka.OffsetRange;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
public final class KafkaConsumerDirectStream {
public static void main(String[] args) throws Exception {
try {
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount11").setMaster("local[*]");
sparkConf.set("spark.streaming.concurrentJobs", "3");
// Create the context with 2 seconds batch size
JavaStreamingContext jssc = new …
Run Code Online (Sandbox Code Playgroud) 我工作一个应用程序 Spark-scala 并使用 sbt 构建项目,我的树状结构是:
projectFilms/src/main/scala/AppFilms
我在 HDFS 中有 3 个文件,这些目录是:hdfs/tmp/projetFilms/<my_3_Files>
,当我通过此命令行“sbt run”运行我的代码时,它生成一个错误:
java.lang.IllegalArgumentException: java.net.UnknownHostException: tmp
Run Code Online (Sandbox Code Playgroud)
和这个:
[trace] Stack trace suppressed: run last compile:run for the full output.
ERROR Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.InterruptedException
Run Code Online (Sandbox Code Playgroud)
这是我的代码:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd._
object appFilms {
val conf = new SparkConf().setAppName("system of recommandation").setMaster("local[*]")
val sc = new SparkContext(conf)
def main(args: Array[String]) {
val files = sc.wholeTextFiles("hdfs://tmp/ProjetFilm/*.dat")
//val nbfiles = files.count
println("Hello my application!")
sc.stop()
} …
Run Code Online (Sandbox Code Playgroud) Spring Integration 支持多渠道。
这些渠道有什么区别?
如果用真实世界的例子回答,将不胜感激。