小编Rus*_*huk的帖子

如何使用IntelliJ IDEA执行mvn命令?

我正在尝试在我的本地Maven仓库中添加Oracle JDBC驱动程序.我找到了这样做的链接.

我想在Inside IntelliJ IDEA中做同样的事情.有没有办法从IntelliJ IDEA执行mvn命令?

java maven-2 intellij-idea maven-3 maven

34
推荐指数
4
解决办法
5万
查看次数

Spark流媒体+ Kafka vs Just Kafka

为什么以及何时会选择使用Kafka的Spark流媒体?

假设我有一个系统通过Kafka每秒获得数千条消息.我需要对这些消息应用一些实时分析,并将结果存储在数据库中.

我有两个选择:

  1. 创建我自己的工作人员,从Kafka读取消息,运行分析算法并将结果存储在DB中.在Docker时代,只需使用scale命令就可以轻松地在整个集群中扩展此工作程序.我只需要确保我的分区数量等于或大于我的工作者,并且一切都很好并且我有一个真正的并发性.

  2. 使用Kafka流输入创建Spark群集.让Spark集群进行分析计算,然后存储结果.

有没有第二种选择是更好的选择?听起来像是一个额外的开销.

apache-kafka apache-spark spark-streaming spark-streaming-kafka

13
推荐指数
1
解决办法
1862
查看次数

Spark Streaming Kafka流

我试图用kafka读取火花流时遇到一些问题.

我的代码是:

val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaIngestor")
val ssc = new StreamingContext(sparkConf, Seconds(2))

val kafkaParams = Map[String, String](
  "zookeeper.connect" -> "localhost:2181",
  "group.id" -> "consumergroup",
  "metadata.broker.list" -> "localhost:9092",
  "zookeeper.connection.timeout.ms" -> "10000"
  //"kafka.auto.offset.reset" -> "smallest"
)

val topics = Set("test")
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
Run Code Online (Sandbox Code Playgroud)

我之前在端口2181启动了zookeeper,在端口9092启动了Kafka服务器0.9.0.0.但是我在Spark驱动程序中遇到以下错误:

Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:90)
at scala.Option.map(Option.scala:145)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:90)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:87)
Run Code Online (Sandbox Code Playgroud)

Zookeeper日志:

[2015-12-08 00:32:08,226] INFO Got user-level KeeperException when processing sessionid:0x1517ec89dfd0000 type:create cxid:0x34 zxid:0x1d3 …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark spark-streaming spark-streaming-kafka

6
推荐指数
1
解决办法
7181
查看次数

Spark Streaming Kafka背压

我们有一个Spark Streaming应用程序,它从接收器中的Kafka队列读取数据并进行一些转换并输出到HDFS.批量间隔为1分钟,我们已经调整了背压和spark.streaming.receiver.maxRate参数,因此大部分时间都可以正常工作.

但我们仍有一个问题.当HDFS完全关闭时,批处理作业将挂起很长时间(让我们说HDFS工作4小时不起作业,并且作业将挂起4小时),但是接收器不知道作业没有完成,所以它仍在接收未来4小时的数据.这导致OOM异常,并且整个应用程序都关闭,我们丢失了大量数据.

所以,我的问题是:是否有可能让接收者知道作业没有完成,因此它将收到更少(甚至没有)数据,并且当作业完成时,它将开始接收更多数据以赶上.在上述情况下,当HDFS关闭时,接收器将从Kafka读取较少的数据,并且在接下来的4小时内生成的块非常小,接收器和整个应用程序没有关闭,在HDFS正常后,接收器将读取更多数据并开始追赶.

streaming apache-kafka backpressure apache-spark spark-streaming-kafka

5
推荐指数
1
解决办法
3758
查看次数

Spark Streaming与Kafka 2.0.0依赖项

我正在尝试使用spark-streaming2.0.0来使用kafka 0.8主题,我正在尝试识别我在build.sbt文件中尝试使用这些依赖项所需的依赖项

libraryDependencies += "org.apache.spark" %% "spark-streaming_2.11" % "2.0.0"
Run Code Online (Sandbox Code Playgroud)

当我运行sbt包时,我得到所有这三个罐子的未解决的依赖关系,

但这些罐子确实存在

https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.0.0

请帮忙调试这个问题,我是Scala的新手,所以如果我没有做对,请告诉我

sbt apache-spark spark-streaming spark-streaming-kafka

5
推荐指数
1
解决办法
3150
查看次数

从spark流中的kafka消息中提取时间戳?

尝试从卡夫卡源读取。我想从收到的消息中提取时间戳以进行结构化火花流处理。kafka(版本 0.10.0.0) Spark Streaming(版本 2.0.1)

apache-kafka spark-streaming spark-streaming-kafka

5
推荐指数
1
解决办法
5010
查看次数

Kafka Spark流媒体:无法阅读消息

我正在整合Kafka和Spark,使用spark-streaming.我创建了一个作为kafka制作人的主题:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 
Run Code Online (Sandbox Code Playgroud)

我正在kafka发布消息并尝试使用spark-streaming java代码读取它们并在屏幕上显示它们.
守护进程全都出现了:Spark-master,worker; 动物园管理员; 卡夫卡.
我正在编写一个java代码,使用KafkaUtils.createStream
代码如下:

public class SparkStream {
    public static void main(String args[])
    {
        if(args.length != 3)
        {
            System.out.println("SparkStream <zookeeper_ip> <group_nm> <topic1,topic2,...>");
            System.exit(1);
        }


        Map<String,Integer> topicMap = new HashMap<String,Integer>();
        String[] topic = args[2].split(",");
        for(String t: topic)
        {
            topicMap.put(t, new Integer(1));
        }

        JavaStreamingContext jssc = new JavaStreamingContext("spark://192.168.88.130:7077", "SparkStream", new Duration(3000));
        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );

        System.out.println("Connection done++++++++++++++");
        JavaDStream<String> data = messages.map(new Function<Tuple2<String, String>, …
Run Code Online (Sandbox Code Playgroud)

hadoop apache-kafka spark-streaming spark-streaming-kafka

4
推荐指数
1
解决办法
5213
查看次数

Spark Streaming Kafka Consumer

我正在尝试设置一个Spark Streaming简单应用程序,它将从Kafka主题中读取消息.

经过大量工作后,我处于这个阶段,但得到下面显示的例外情况.

码:

public static void main(String[] args) throws Exception {

    String brokers = "my.kafka.broker" + ":" + "6667";
    String topics = "MyKafkaTopic";

    // Create context with a 2 seconds batch interval
    SparkConf sparkConf = new SparkConf().setAppName("StreamingE")
            .setMaster("local[1]")
            ;
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

    Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
    Map<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("metadata.broker.list", brokers);
    System.out.println("Brokers: " + brokers);

    // Create direct kafka stream with brokers and topics
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
            jssc,
            String.class,
            String.class, …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka spark-streaming spark-streaming-kafka

4
推荐指数
1
解决办法
4155
查看次数

Apache Kafka和Spark流

我正在阅读这篇博客文章:

http://blog.jaceklaskowski.pl/2015/07/20/real-time-data-processing-using-apache-kafka-and-spark-streaming.html

它讨论了有关使用Spark Streaming和Apache Kafka进行一些近实时处理的问题。我完全理解这篇文章。它确实显示了如何使用Spark Streaming从主题读取消息。我想知道是否有一个Spark Streaming API,可用于将消息写入Kakfa主题?

我的用例非常简单。我有一组数据,可以以固定的时间间隔(例如每秒)从给定的源读取数据。我使用反应式流进行此操作。我想使用Spark对这些数据进行一些分析。我想要容错,所以卡夫卡开始发挥作用。因此,我基本上要做的是以下操作(如果我输入错了,请纠正我):

  1. 使用反应流以固定间隔从外部源获取数据
  2. 将结果传递给Kafka主题
  3. 使用Spark Streaming为消费者创建流上下文
  4. 对消耗的数据执行分析

但是,另一个问题是,Spark中的Streaming API是否是反应式流规范的实现?是否具有反压处理功能(Spark Streaming v1.5)?

reactive-programming apache-kafka apache-spark spark-streaming spark-streaming-kafka

3
推荐指数
1
解决办法
1875
查看次数

spark streaming + kafka - spark session API

感谢您使用spark 2.0.2运行火花流程序的帮助.

运行错误"java.lang.ClassNotFoundException: Failed to find data source: kafka".修改后的POM文件如下.

正在创建Spark,但是在调用来自kafka的负载时出现错误.

创建火花会话:

 val spark = SparkSession
            .builder()
            .master(master)
            .appName("Apache Log Analyzer Streaming from Kafka")
            .config("hive.metastore.warehouse.dir", hiveWarehouse)
            .config("fs.defaultFS", hdfs_FS)
            .enableHiveSupport()
            .getOrCreate()
Run Code Online (Sandbox Code Playgroud)

创建kafka流媒体:

    val logLinesDStream = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:2181")
      .option("subscribe", topics)
      .load()
Run Code Online (Sandbox Code Playgroud)

错误信息:

Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark-packages.org
Run Code Online (Sandbox Code Playgroud)

pom.xml中:

    <scala.version>2.10.4</scala.version>
        <scala.compat.version>2.10</scala.compat.version>
        <spark.version>2.0.2</spark.version>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency>
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>${spark.version}</version>
        </dependency> …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka apache-spark spark-streaming-kafka

3
推荐指数
1
解决办法
2472
查看次数

永远不会使用赋值 - Java

在我的代码中,我有以下几行:

 BinaryTreeNode temp = new BinaryTreeNode();
 temp = left;
 while(temp != null)
     temp = temp.left;

 temp.left = newNode; 
Run Code Online (Sandbox Code Playgroud)

我的IDE现在说:"第一行上没有使用指定的值".我可能会说错了.

java

0
推荐指数
1
解决办法
1756
查看次数

带Kafka连接器的Spark流式传输停止

我开始使用Spark流媒体.我想从Kafka获取一个流,其中包含我在Spark文档中找到的示例代码:https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html

这是我的代码:

object SparkStreaming {

  def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("Test_kafka_spark").setMaster("local[*]") // local parallelism 1
    val ssc = new StreamingContext(conf, Seconds(1))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9093",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "test",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("spark")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    stream.map(record => (record.key, record.value))

  }
}
Run Code Online (Sandbox Code Playgroud)

所有人似乎都开始很好,但工作立即停止,记录如下:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka spark-streaming spark-streaming-kafka

0
推荐指数
1
解决办法
479
查看次数