我正在尝试在我的本地Maven仓库中添加Oracle JDBC驱动程序.我找到了这样做的链接.
我想在Inside IntelliJ IDEA中做同样的事情.有没有办法从IntelliJ IDEA执行mvn命令?
为什么以及何时会选择使用Kafka的Spark流媒体?
假设我有一个系统通过Kafka每秒获得数千条消息.我需要对这些消息应用一些实时分析,并将结果存储在数据库中.
我有两个选择:
创建我自己的工作人员,从Kafka读取消息,运行分析算法并将结果存储在DB中.在Docker时代,只需使用scale命令就可以轻松地在整个集群中扩展此工作程序.我只需要确保我的分区数量等于或大于我的工作者,并且一切都很好并且我有一个真正的并发性.
使用Kafka流输入创建Spark群集.让Spark集群进行分析计算,然后存储结果.
有没有第二种选择是更好的选择?听起来像是一个额外的开销.
apache-kafka apache-spark spark-streaming spark-streaming-kafka
我试图用kafka读取火花流时遇到一些问题.
我的代码是:
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaIngestor")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val kafkaParams = Map[String, String](
"zookeeper.connect" -> "localhost:2181",
"group.id" -> "consumergroup",
"metadata.broker.list" -> "localhost:9092",
"zookeeper.connection.timeout.ms" -> "10000"
//"kafka.auto.offset.reset" -> "smallest"
)
val topics = Set("test")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
Run Code Online (Sandbox Code Playgroud)
我之前在端口2181启动了zookeeper,在端口9092启动了Kafka服务器0.9.0.0.但是我在Spark驱动程序中遇到以下错误:
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
Run Code Online (Sandbox Code Playgroud)
Zookeeper日志:
[2015-12-08 00:32:08,226] INFO Got user-level KeeperException when processing sessionid:0x1517ec89dfd0000 type:create cxid:0x34 zxid:0x1d3 …Run Code Online (Sandbox Code Playgroud) apache-kafka apache-spark spark-streaming spark-streaming-kafka
我们有一个Spark Streaming应用程序,它从接收器中的Kafka队列读取数据并进行一些转换并输出到HDFS.批量间隔为1分钟,我们已经调整了背压和spark.streaming.receiver.maxRate参数,因此大部分时间都可以正常工作.
但我们仍有一个问题.当HDFS完全关闭时,批处理作业将挂起很长时间(让我们说HDFS工作4小时不起作业,并且作业将挂起4小时),但是接收器不知道作业没有完成,所以它仍在接收未来4小时的数据.这导致OOM异常,并且整个应用程序都关闭,我们丢失了大量数据.
所以,我的问题是:是否有可能让接收者知道作业没有完成,因此它将收到更少(甚至没有)数据,并且当作业完成时,它将开始接收更多数据以赶上.在上述情况下,当HDFS关闭时,接收器将从Kafka读取较少的数据,并且在接下来的4小时内生成的块非常小,接收器和整个应用程序没有关闭,在HDFS正常后,接收器将读取更多数据并开始追赶.
streaming apache-kafka backpressure apache-spark spark-streaming-kafka
我正在尝试使用spark-streaming2.0.0来使用kafka 0.8主题,我正在尝试识别我在build.sbt文件中尝试使用这些依赖项所需的依赖项
libraryDependencies += "org.apache.spark" %% "spark-streaming_2.11" % "2.0.0"
Run Code Online (Sandbox Code Playgroud)
当我运行sbt包时,我得到所有这三个罐子的未解决的依赖关系,
但这些罐子确实存在
https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.0.0
请帮忙调试这个问题,我是Scala的新手,所以如果我没有做对,请告诉我
尝试从卡夫卡源读取。我想从收到的消息中提取时间戳以进行结构化火花流处理。kafka(版本 0.10.0.0) Spark Streaming(版本 2.0.1)
我正在整合Kafka和Spark,使用spark-streaming.我创建了一个作为kafka制作人的主题:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Run Code Online (Sandbox Code Playgroud)
我正在kafka发布消息并尝试使用spark-streaming java代码读取它们并在屏幕上显示它们.
守护进程全都出现了:Spark-master,worker; 动物园管理员; 卡夫卡.
我正在编写一个java代码,使用KafkaUtils.createStream
代码如下:
public class SparkStream {
public static void main(String args[])
{
if(args.length != 3)
{
System.out.println("SparkStream <zookeeper_ip> <group_nm> <topic1,topic2,...>");
System.exit(1);
}
Map<String,Integer> topicMap = new HashMap<String,Integer>();
String[] topic = args[2].split(",");
for(String t: topic)
{
topicMap.put(t, new Integer(1));
}
JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077", "SparkStream", new Duration(3000));
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
System.out.println("Connection done++++++++++++++");
JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, …Run Code Online (Sandbox Code Playgroud) 我正在尝试设置一个Spark Streaming简单应用程序,它将从Kafka主题中读取消息.
经过大量工作后,我处于这个阶段,但得到下面显示的例外情况.
码:
public static void main(String[] args) throws Exception {
String brokers = "my.kafka.broker" + ":" + "6667";
String topics = "MyKafkaTopic";
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("StreamingE")
.setMaster("local[1]")
;
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
System.out.println("Brokers: " + brokers);
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class, …Run Code Online (Sandbox Code Playgroud) 我正在阅读这篇博客文章:
它讨论了有关使用Spark Streaming和Apache Kafka进行一些近实时处理的问题。我完全理解这篇文章。它确实显示了如何使用Spark Streaming从主题读取消息。我想知道是否有一个Spark Streaming API,可用于将消息写入Kakfa主题?
我的用例非常简单。我有一组数据,可以以固定的时间间隔(例如每秒)从给定的源读取数据。我使用反应式流进行此操作。我想使用Spark对这些数据进行一些分析。我想要容错,所以卡夫卡开始发挥作用。因此,我基本上要做的是以下操作(如果我输入错了,请纠正我):
但是,另一个问题是,Spark中的Streaming API是否是反应式流规范的实现?是否具有反压处理功能(Spark Streaming v1.5)?
reactive-programming apache-kafka apache-spark spark-streaming spark-streaming-kafka
感谢您使用spark 2.0.2运行火花流程序的帮助.
运行错误"java.lang.ClassNotFoundException: Failed to find data source: kafka".修改后的POM文件如下.
正在创建Spark,但是在调用来自kafka的负载时出现错误.
创建火花会话:
val spark = SparkSession
.builder()
.master(master)
.appName("Apache Log Analyzer Streaming from Kafka")
.config("hive.metastore.warehouse.dir", hiveWarehouse)
.config("fs.defaultFS", hdfs_FS)
.enableHiveSupport()
.getOrCreate()
Run Code Online (Sandbox Code Playgroud)
创建kafka流媒体:
val logLinesDStream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:2181")
.option("subscribe", topics)
.load()
Run Code Online (Sandbox Code Playgroud)
错误信息:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark-packages.org
Run Code Online (Sandbox Code Playgroud)
pom.xml中:
<scala.version>2.10.4</scala.version>
<scala.compat.version>2.10</scala.compat.version>
<spark.version>2.0.2</spark.version>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>${spark.version}</version>
</dependency> …Run Code Online (Sandbox Code Playgroud) 在我的代码中,我有以下几行:
BinaryTreeNode temp = new BinaryTreeNode();
temp = left;
while(temp != null)
temp = temp.left;
temp.left = newNode;
Run Code Online (Sandbox Code Playgroud)
我的IDE现在说:"第一行上没有使用指定的值".我可能会说错了.
我开始使用Spark流媒体.我想从Kafka获取一个流,其中包含我在Spark文档中找到的示例代码:https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html
这是我的代码:
object SparkStreaming {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Test_kafka_spark").setMaster("local[*]") // local parallelism 1
val ssc = new StreamingContext(conf, Seconds(1))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9093",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("spark")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
}
}
Run Code Online (Sandbox Code Playgroud)
所有人似乎都开始很好,但工作立即停止,记录如下:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties …Run Code Online (Sandbox Code Playgroud) apache-kafka ×9
apache-spark ×6
java ×3
scala ×2
backpressure ×1
hadoop ×1
maven ×1
maven-2 ×1
maven-3 ×1
sbt ×1
streaming ×1