使用数据帧在 pyspark 中实现 Louvain

Hel*_*ili 6 pyspark

我正在尝试使用数据帧在 pyspark 中实现Louvain 算法。问题是我的实现真的很慢。我就是这样做的:

  1. 我将所有顶点和communityIds收集到简单的python列表中
  2. 对于每个顶点-communityId 对,我使用数据帧计算模块化增益(只是一个涉及边权重和/差的奇特公式)
  3. 重复直到没有变化

我究竟做错了什么?

我想如果我能以某种方式并行化每个循环,性能将会提高,但我该怎么做呢?

稍后编辑: 我可以使用vertices.foreach(changeCommunityId)for every 循环来代替,但随后我必须在没有数据帧的情况下计算模块化增益(那个奇特的公式)。

请参阅下面的代码示例:

def louvain(self):

        oldModularity = 0 # since intially each node represents a community

        graph = self.graph

        # retrieve graph vertices and edges dataframes
        vertices = verticesDf = self.graph.vertices
        aij = edgesDf = self.graph.edges

        canOptimize = True

        allCommunityIds = [row['communityId'] for row in verticesDf.select('communityId').distinct().collect()]
        verticesIdsCommunityIds = [(row['id'], row['communityId']) for row in verticesDf.select('id', 'communityId').collect()]

        allEdgesSum = self.graph.edges.groupBy().sum('weight').collect()
        m = allEdgesSum[0]['sum(weight)']/2

        def computeModularityGain(vertexId, newCommunityId):

            # the sum of all weights of the edges within C
            sourceNodesNewCommunity = vertices.join(aij, vertices.id == aij.src) \
                                .select('weight', 'src', 'communityId') \
                                .where(vertices.communityId == newCommunityId);
            destinationNodesNewCommunity = vertices.join(aij, vertices.id == aij.dst) \
                                .select('weight', 'dst', 'communityId') \
                                .where(vertices.communityId == newCommunityId);

            k_in = sourceNodesNewCommunity.join(destinationNodesNewCommunity, sourceNodesNewCommunity.communityId == destinationNodesNewCommunity.communityId) \
                        .count()
            # the rest of the formula computation goes here, I just wanted to show you an example
            # just return some value for the modularity
            return 0.9  

        def changeCommunityId(vertexId, currentCommunityId):

            maxModularityGain = 0
            maxModularityGainCommunityId = None
            for newCommunityId in allCommunityIds:
                if (newCommunityId != currentCommunityId):
                    modularityGain = computeModularityGain(vertexId, newCommunityId)
                    if (modularityGain > maxModularityGain):
                        maxModularityGain = modularityGain
                        maxModularityGainCommunityId = newCommunityId

            if (maxModularityGain > 0):
                return maxModularityGainCommunityId
            return currentCommunityId

        while canOptimize:

            while self.changeInModularity:

                self.changeInModularity = False

                for vertexCommunityIdPair in verticesIdsCommunityIds:
                    vertexId = vertexCommunityIdPair[0]
                    currentCommunityId = vertexCommunityIdPair[1]
                    newCommunityId = changeCommunityId(vertexId, currentCommunityId)

                self.changeInModularity = False

            canOptimize = False
Run Code Online (Sandbox Code Playgroud)

Art*_*Sbr 1

正如该算法的Spark 实现中所指出的,技巧是同时(而不是迭代地)重新分配所有节点。

在分布式版本中,所有顶点同时而不是按串行顺序做出此选择,并在每次更改后更新图状态。由于选择是并行做出的,某些选择可能是不正确的,并且不会最大化模块化值,但是经过重复迭代后,社区选择变得更加稳定,我们得到的结果与串行算法密切相关。

因此changeCommunityID()应该同时映射到所有顶点,或者更好的是,您可以编写一个函数来计算由于同时将所有节点切换到所有相邻社区而引起的模块化变化(代数有点难;只需记住双重求和即可本质上只是一个 group by + sum)。