spark-streaming和连接池实现

bot*_*kop 8 apache-spark spark-streaming

https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams上的spark-streaming网站提到了以下代码:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}
Run Code Online (Sandbox Code Playgroud)

我试图使用org.apache.commons.pool2来实现它,但运行应用程序失败,出现了预期的java.io.NotSerializableException:

15/05/26 08:06:21 ERROR OneForOneStrategy: org.apache.commons.pool2.impl.GenericObjectPool
java.io.NotSerializableException: org.apache.commons.pool2.impl.GenericObjectPool
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
 ...
Run Code Online (Sandbox Code Playgroud)

我想知道实现可序列化的连接池是多么现实.有人有成功吗?

谢谢.

maa*_*asg 13

为了解决这个"本地资源"问题,需要的是一个单例对象 - 即一个在JVM中只需要实例化一次且仅一次实例化的对象.幸运的是,Scala object提供了开箱即用的功能.

要考虑的第二件事是,这个单例将为在托管它的同一个JVM上运行的所有任务提供服务,因此,它必须处理并发和资源管理.

让我们尝试草拟(*)这样的服务:

class ManagedSocket(private val pool: ObjectPool, val socket:Socket) {
   def release() = pool.returnObject(socket)
}

// singleton object 
object SocketPool {
    var hostPortPool:Map[(String, Int),ObjectPool] = Map()
    sys.addShutdownHook{
        hostPortPool.values.foreach{ // terminate each pool } 
    }

    // factory method
    def apply(host:String, port:String): ManagedSocket = {
        val pool = hostPortPool.getOrElse{(host,port), {
            val p = ??? // create new pool for (host, port)
            hostPortPool += (host,port) -> p
            p
        }
        new ManagedSocket(pool, pool.borrowObject)
    }
}
Run Code Online (Sandbox Code Playgroud)

然后用法变为:

val host = ???
val port = ???
stream.foreachRDD { rdd =>
    rdd.foreachPartition { partition => 
        val mSocket = SocketPool(host, port)
        partition.foreach{elem => 
            val os = mSocket.socket.getOutputStream()
            // do stuff with os + elem
        }
        mSocket.release()
    }
}
Run Code Online (Sandbox Code Playgroud)

我假设GenericObjectPool问题中使用的是并发处理.否则,pool需要通过某种形式的同步来保护对每个实例的访问.

(*)提供的代码用于说明如何设计此类对象的想法 - 需要额外的努力才能转换为工作版本.

  • 这种方法似乎有效.谢谢一堆.我将其标记为首选答案.有关此答案的具体实现,请参阅https://gist.github.com/koen-dejonghe/39c10357607c698c0b04 (3认同)