我有一个 kafka 流,正在加载到 Spark。来自 Kafka 主题的消息具有以下属性:bl_iban、blacklisted、timestamp。因此,有 IBANS、关于该 IBAN 是否被列入黑名单 (Y/N) 的标志,并且还有该记录的时间戳。问题是一个 IBAN 可以有多个记录,因为超时的 IBAN 可能会被列入黑名单或“删除”。我想要实现的目标是了解每个 IBANS 的当前状态。然而,我从更简单的目标开始,那就是列出每个最新的 IBAN timestamp(之后我也想添加blacklisted状态),所以我生成了以下代码(其中黑名单代表我从 Kafka 加载的数据集):
blackList = blackList.groupBy("bl_iban")
.agg(col("bl_iban"), max("timestamp"));
Run Code Online (Sandbox Code Playgroud)
之后我尝试使用以下代码将其打印到控制台:
StreamingQuery query = blackList.writeStream()
.format("console")
.outputMode(OutputMode.Append())
.start();
Run Code Online (Sandbox Code Playgroud)
我已经运行我的代码并收到以下错误:
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark
所以我将水印添加到我的数据集中,如下所示:
blackList = blackList.withWatermark("timestamp", "2 seconds")
.groupBy("bl_iban")
.agg(col("bl_iban"), max("timestamp"));
Run Code Online (Sandbox Code Playgroud)
之后又出现同样的错误。我有什么想法可以解决这个问题吗?
更新:在迈克的帮助下,我成功地摆脱了这个错误。但问题是我仍然无法让我的黑名单发挥作用。我可以看到数据是如何从 Kafka 加载的,但之后从我的组操作中我得到了两个空批次,仅此而已。从Kafka打印的数据:
+-----------------------+-----------+-----------------------+
|bl_iban |blacklisted|timestamp |
+-----------------------+-----------+-----------------------+
|SK047047595122709025789|N …Run Code Online (Sandbox Code Playgroud) java apache-spark spark-streaming spark-structured-streaming
我想调试我的笔记本,因此我需要在笔记本控制台模式下打印流数据。我有两个问题: 1-是否可以这样做:
df.writeStream.format("console").start().awaitTermination()
Run Code Online (Sandbox Code Playgroud)
2-如果是,我在哪里可以看到输出?
谢谢!
我正在尝试将流数据帧与配置单元表连接起来,并将生成的数据帧插入到另一个 Kafka 主题中。下面是我实现的代码,它按照要求工作。
def write_stream_batches(kafka_df: DataFrame,table_config):
table_config = state_config
kafka_df.writeStream \
.format('kafka') \
.foreachBatch(join_kafka_streams_denorm) \
.option('checkpointLocation', table_config['checkpoint_location']) \
.start() \
.awaitTermination()
def join_kafka_streams_denorm(kafka_df, batch_id):
try:
table_config = state_config
kafka_config = kafkaconfig
filters = ata_filter(kafka_df=kafka_df)
main_df = spark.sql(f'select * from db.table where {filters}')
joined_df = join_remove_duplicate_col(kafka_df=kafka_df, denorm=main_df, table_config=table_config)
push_to_kafka(joined_df, kafka_config, table_config, 'state')
except Exception as error:
print(f'Join failed with the exception: {error}')
traceback.print_exc()
print('Stopping the application')
sys.exit(1)
Run Code Online (Sandbox Code Playgroud)
该方法write_stream_batches正在从 kafka 接收流数据帧。我正在将此主题数据合并到配置单元表中,并且我的表配置在从 config.py 文件导入的字典中,下面是该行。
table_config = state_config
Run Code Online (Sandbox Code Playgroud)
这里的问题是给出检查点配置,我在 write_stream_batches 中导入 state_config …
我是一个引发流媒体的初学者.因此对检查点有一个基本的疑问.我的用例是按天计算唯一用户的数量.我正在使用按键和窗口缩小.我的窗口持续时间为24小时,滑动持续时间为5分钟.我正在将处理过的记录更新为mongodb.目前我每次都会更换现有记录.但我看到记忆力随着时间的推移逐渐增加,并在1小时半后杀死这个过程(在小实例中).重新启动后DB写入清除所有旧数据.所以我理解检查点就是解决方案.但我怀疑的是
我有一个Spark (version 1.3.1)申请.其中,我试图将一个转换Java bean RDD JavaRDD<Message>为Dataframe,它有许多具有不同数据类型的字段(整数,字符串,列表,映射,双精度).
但是,当我执行我的代码时.
messages.foreachRDD(new Function2<JavaRDD<Message>,Time,Void>(){
@Override
public Void call(JavaRDD<Message> arg0, Time arg1) throws Exception {
SQLContext sqlContext = SparkConnection.getSqlContext();
DataFrame df = sqlContext.createDataFrame(arg0, Message.class);
df.registerTempTable("messages");
Run Code Online (Sandbox Code Playgroud)
我收到了这个错误
/06/12 17:27:40 INFO JobScheduler: Starting job streaming job 1434110260000 ms.0 from job set of time 1434110260000 ms
15/06/12 17:27:40 ERROR JobScheduler: Error running job streaming job 1434110260000 ms.1
scala.MatchError: interface java.util.List (of class java.lang.Class)
at org.apache.spark.sql.SQLContext$$anonfun$getSchema$1.apply(SQLContext.scala:1193)
at org.apache.spark.sql.SQLContext$$anonfun$getSchema$1.apply(SQLContext.scala:1192)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) …Run Code Online (Sandbox Code Playgroud) 下面的问题是类似的:使用Twitter的Spark Streaming - 没有注册输出流,所以没有什么可以执行,但我认为在线51使用wordCounts.print()我实际上输出了一些结果.
基本代码:
ssc.start()
ssc.awaitTermination()
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
Run Code Online (Sandbox Code Playgroud)
或者我在这里误解了什么?要跟进:https: //github.com/dataplayground/playground/blob/master/app/actors/DirectStreamingActor.scala
我是spark和scala的新手,刚开始学习......我在CDH 5.1.3上使用spark 1.0.0
我得到了一个名为dbTableKeyValueMap的广播rdd:RDD [(String,String)],我想使用dbTableKeyValueMap来处理我的文件RDD(每行有300多列).这是代码:
val get = fileRDD.map({x =>
val tmp = dbTableKeyValueMap.lookup(x)
tmp
})
Run Code Online (Sandbox Code Playgroud)
在本地运行此挂起和/或在一段时间后出错:
scala.MatchError: null
at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571)
Run Code Online (Sandbox Code Playgroud)
我可以理解访问一个RDD里面其他会有问题,如果集合的地点和大小进入图片..对于我采取笛卡尔产品不是选项,因为文件RDD中的记录是巨大的(每行有300+列)...就像我使用分布式缓存在setup方法中加载这个dbTableKeyValueMap并在hadoop java mapreduce代码的MAP中使用,我想在spark map中使用类似的方式...我找不到简单的例子来引用类似的用例...一个我想迭代文件RDD行并在"每一列"上进行一些转换,祝福,查找等以进行进一步处理......或者还有其他任何方式我可以使用dbTableKeyValueMap作为scala集合而不是spark RDD
请帮忙
我使用Spark 1.6.0和Cloudera 5.8.3.
我有一个DStream对象,并在其上定义了大量的转换,
val stream = KafkaUtils.createDirectStream[...](...)
val mappedStream = stream.transform { ... }.map { ... }
mappedStream.foreachRDD { ... }
mappedStream.foreachRDD { ... }
mappedStream.map { ... }.foreachRDD { ... }
Run Code Online (Sandbox Code Playgroud)
有没有办法注册foreachRDD保证最后执行的最后一个并且只有在上面的foreachRDDs完成执行时?
换句话说,当Spark UI显示作业已完成时 - 就在我想要执行轻量级函数时.
API中是否有允许我实现的内容?
谢谢
我开始使用Spark流媒体.我想从Kafka获取一个流,其中包含我在Spark文档中找到的示例代码:https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html
这是我的代码:
object SparkStreaming {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Test_kafka_spark").setMaster("local[*]") // local parallelism 1
val ssc = new StreamingContext(conf, Seconds(1))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9093",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("spark")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
}
}
Run Code Online (Sandbox Code Playgroud)
所有人似乎都开始很好,但工作立即停止,记录如下:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties …Run Code Online (Sandbox Code Playgroud) 我有一个简单的Spark程序,它读取JSON文件并发出CSV文件.在JSON数据中,值包含前导和尾随空格,当我发出CSV时,前导和尾随空格都消失了.有没有办法可以保留空间.我尝试了很多选项,如ignoreTrailingWhiteSpace,ignoreLeadingWhiteSpace,但没有运气
input.json
{"key" : "k1", "value1": "Good String", "value2": "Good String"}
{"key" : "k1", "value1": "With Spaces ", "value2": "With Spaces "}
{"key" : "k1", "value1": "with tab\t", "value2": "with tab\t"}
Run Code Online (Sandbox Code Playgroud)
output.csv
_corrupt_record,key,value1,value2
,k1,Good String,Good String
,k1,With Spaces,With Spaces
,k1,with tab,with tab
Run Code Online (Sandbox Code Playgroud)
expected.csv
_corrupt_record,key,value1,value2
,k1,Good String,Good String
,k1,With Spaces ,With Spaces
,k1,with tab\t,with tab\t
Run Code Online (Sandbox Code Playgroud)
我的代码:
public static void main(String[] args) {
SparkSession sparkSession = SparkSession
.builder()
.appName(TestSpark.class.getName())
.master("local[1]").getOrCreate();
SparkContext context = sparkSession.sparkContext();
context.setLogLevel("ERROR");
SQLContext sqlCtx = sparkSession.sqlContext();
System.out.println("Spark context established"); …Run Code Online (Sandbox Code Playgroud) apache-spark spark-streaming apache-spark-sql spark-dataframe apache-spark-mllib
spark-streaming ×10
apache-spark ×9
scala ×5
apache-kafka ×2
java ×2
amazon-emr ×1
broadcasting ×1
cloudera ×1
rdd ×1