https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams上的spark-streaming网站提到了以下代码:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
Run Code Online (Sandbox Code Playgroud)
我试图使用org.apache.commons.pool2来实现它,但运行应用程序失败,出现了预期的java.io.NotSerializableException:
15/05/26 08:06:21 ERROR OneForOneStrategy: org.apache.commons.pool2.impl.GenericObjectPool
java.io.NotSerializableException: org.apache.commons.pool2.impl.GenericObjectPool
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
...
Run Code Online (Sandbox Code Playgroud)
我想知道实现可序列化的连接池是多么现实.有人有成功吗?
谢谢.
我有火花流设置,以便它从套接字读取,在将数据发布到兔子队列之前对数据进行一些丰富.在通过在设置流上下文之前读取常规文本文件(Source.fromFile ...)来实例化的Map中,富集查找信息.
我有一种感觉,这不是应该做的事情.另一方面,当使用StreamingContext时,我只能读取流,而不能读取静态文件,因为我可以使用SparkContext.
我可以尝试允许多个上下文,但我不确定这是否也是正确的方法.
任何建议将不胜感激.