标签: spark-streaming

结构化流式 Kafka 源偏移存储

我正在使用 Kafka 的 Structured Streaming 源(集成指南),如前所述,它没有提交任何偏移量。

我的目标之一是监控它(检查它是否落后等)。即使它没有提交偏移量,它也会通过不时查询 kafka 并检查下一个要处理的偏移量来处理它们。根据文档,偏移量被写入 HDFS,因此在发生故障时可以恢复,但问题是:

它们存放在哪里?如果不提交偏移量,是否有任何方法可以监视火花流(结构化)的 kafka 消耗(从程序外部;所以 kafka cli 或类似的,每个记录附带的偏移量不适合用例) ?

干杯

offset apache-kafka apache-spark spark-streaming spark-structured-streaming

0
推荐指数
1
解决办法
2527
查看次数

如何在控制台中查看数据帧(相当于结构化流媒体的 .show())?

我想看看我的 DataFrame 会出现什么..

这是火花代码

from pyspark.sql import SparkSession
import pyspark.sql.functions as psf
import logging
import time

spark = SparkSession \
    .builder \
    .appName("Console Example") \
    .getOrCreate()

logging.info("started to listen to the host..")

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "127.0.0.1") \
    .option("port", 9999) \
    .load()

data = lines.selectExpr("CAST(value AS STRING)")
query1 = data.writeStream.format("console").start()
time.sleep(10)
query1.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

我正在获取进度报告,但显然每个触发器的输入行都是 0。

2019-08-19 23:45:45 INFO  MicroBatchExecution:54 - Streaming query made progress: {
  "id" : "a4b26eaf-1032-4083-9e42-a9f2f0426eb7",
  "runId" : "35c2b82a-191d-4998-9c98-17b24f5e3e9d",
  "name" : null,
  "timestamp" : …
Run Code Online (Sandbox Code Playgroud)

apache-spark spark-streaming pyspark pyspark-sql

0
推荐指数
1
解决办法
1468
查看次数

Spark Structure Streaming 中的临时视图

在 ForeachBatch 函数结构化 Straming 中,我想创建微批次中接收的数据帧的临时视图

func(tabdf, epoch_id):
    tabaDf.createOrReplaceView("taba")
Run Code Online (Sandbox Code Playgroud)

但我收到以下错误:

org.apache.spark.sql.streaming.StreamingQueryException: Table or view not found: taba
Caused by: org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'taba' not found
Run Code Online (Sandbox Code Playgroud)

请任何人帮助我解决这个问题。

spark-streaming apache-spark-sql pyspark spark-structured-streaming

0
推荐指数
1
解决办法
3355
查看次数

SparkStreaming WordCount错误/语法

我指的是https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java并尝试构建Spark wordcount示例,但有些代码未在Eclipse中编译并显示以下错误.

抛出错误的代码是:


JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterator<String> call(String x) {
        return Arrays.asList(SPACE.split(x)).iterator();
      }
    });
Run Code Online (Sandbox Code Playgroud)

编译错误:

返回类型与FlatMapFunction.call(String)不兼容

这里有详细信息:Spark 1.6.1,Java 1.7_67,Eclipse Kepler,CDH5.7我尝试过更改JDK版本并将所有Hadoop Jars添加为外部Jar,Maven依赖项,但此错误仍然存​​在.

java hadoop word-count apache-spark spark-streaming

-1
推荐指数
1
解决办法
1481
查看次数

PySpark 处理流数据并将处理后的数据保存到文件

我正在尝试复制正在传输其位置坐标的设备,然后处理数据并将其保存到文本文件中。我正在使用 Kafka 和 Spark 流(在 pyspark 上),这是我的架构:

1-Kafka生产者以以下字符串格式将数据发送到名为test的主题:

"LG float LT float" example : LG 8100.25191107 LT 8406.43141483
Run Code Online (Sandbox Code Playgroud)

生产者代码:

from kafka import KafkaProducer
import random

producer = KafkaProducer(bootstrap_servers='localhost:9092')

for i in range(0,10000):
    lg_value = str(random.uniform(5000, 10000))
    lt_value = str(random.uniform(5000, 10000))
producer.send('test', 'LG '+lg_value+' LT '+lt_value)

producer.flush()
Run Code Online (Sandbox Code Playgroud)

生产者工作正常,我在消费者中获取流数据(甚至在 Spark 中)

2- Spark Streaming正在接收这个流,我可以甚至pprint()

Spark流处理代码

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

ssc = StreamingContext(sc, 1)
kvs = KafkaUtils.createDirectStream(ssc, ["test"], {"bootstrap.servers": "localhost:9092"})

lines = kvs.map(lambda …
Run Code Online (Sandbox Code Playgroud)

python-2.7 apache-spark spark-streaming pyspark kafka-python

-1
推荐指数
1
解决办法
3129
查看次数