我正在使用 Kafka 的 Structured Streaming 源(集成指南),如前所述,它没有提交任何偏移量。
我的目标之一是监控它(检查它是否落后等)。即使它没有提交偏移量,它也会通过不时查询 kafka 并检查下一个要处理的偏移量来处理它们。根据文档,偏移量被写入 HDFS,因此在发生故障时可以恢复,但问题是:
它们存放在哪里?如果不提交偏移量,是否有任何方法可以监视火花流(结构化)的 kafka 消耗(从程序外部;所以 kafka cli 或类似的,每个记录附带的偏移量不适合用例) ?
干杯
offset apache-kafka apache-spark spark-streaming spark-structured-streaming
我想看看我的 DataFrame 会出现什么..
这是火花代码
from pyspark.sql import SparkSession
import pyspark.sql.functions as psf
import logging
import time
spark = SparkSession \
.builder \
.appName("Console Example") \
.getOrCreate()
logging.info("started to listen to the host..")
lines = spark \
.readStream \
.format("socket") \
.option("host", "127.0.0.1") \
.option("port", 9999) \
.load()
data = lines.selectExpr("CAST(value AS STRING)")
query1 = data.writeStream.format("console").start()
time.sleep(10)
query1.awaitTermination()
Run Code Online (Sandbox Code Playgroud)
我正在获取进度报告,但显然每个触发器的输入行都是 0。
2019-08-19 23:45:45 INFO MicroBatchExecution:54 - Streaming query made progress: {
"id" : "a4b26eaf-1032-4083-9e42-a9f2f0426eb7",
"runId" : "35c2b82a-191d-4998-9c98-17b24f5e3e9d",
"name" : null,
"timestamp" : …Run Code Online (Sandbox Code Playgroud) 在 ForeachBatch 函数结构化 Straming 中,我想创建微批次中接收的数据帧的临时视图
func(tabdf, epoch_id):
tabaDf.createOrReplaceView("taba")
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误:
org.apache.spark.sql.streaming.StreamingQueryException: Table or view not found: taba
Caused by: org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'taba' not found
Run Code Online (Sandbox Code Playgroud)
请任何人帮助我解决这个问题。
spark-streaming apache-spark-sql pyspark spark-structured-streaming
我指的是https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java并尝试构建Spark wordcount示例,但有些代码未在Eclipse中编译并显示以下错误.
抛出错误的代码是:
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(SPACE.split(x)).iterator();
}
});
Run Code Online (Sandbox Code Playgroud)
编译错误:
返回类型与FlatMapFunction.call(String)不兼容
这里有详细信息:Spark 1.6.1,Java 1.7_67,Eclipse Kepler,CDH5.7我尝试过更改JDK版本并将所有Hadoop Jars添加为外部Jar,Maven依赖项,但此错误仍然存在.
我正在尝试复制正在传输其位置坐标的设备,然后处理数据并将其保存到文本文件中。我正在使用 Kafka 和 Spark 流(在 pyspark 上),这是我的架构:
1-Kafka生产者以以下字符串格式将数据发送到名为test的主题:
"LG float LT float" example : LG 8100.25191107 LT 8406.43141483
Run Code Online (Sandbox Code Playgroud)
生产者代码:
from kafka import KafkaProducer
import random
producer = KafkaProducer(bootstrap_servers='localhost:9092')
for i in range(0,10000):
lg_value = str(random.uniform(5000, 10000))
lt_value = str(random.uniform(5000, 10000))
producer.send('test', 'LG '+lg_value+' LT '+lt_value)
producer.flush()
Run Code Online (Sandbox Code Playgroud)
生产者工作正常,我在消费者中获取流数据(甚至在 Spark 中)
2- Spark Streaming正在接收这个流,我可以甚至pprint()它
Spark流处理代码
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(sc, 1)
kvs = KafkaUtils.createDirectStream(ssc, ["test"], {"bootstrap.servers": "localhost:9092"})
lines = kvs.map(lambda …Run Code Online (Sandbox Code Playgroud) python-2.7 apache-spark spark-streaming pyspark kafka-python
apache-spark ×4
pyspark ×3
apache-kafka ×1
hadoop ×1
java ×1
kafka-python ×1
offset ×1
pyspark-sql ×1
python-2.7 ×1
word-count ×1