从 JPEG 创建 Spark 对象并在非翻译函数上使用 spark_apply()

Mat*_*ach 15 binary jpeg r sparklyr

通常,当人们想要sparklyr在自定义函数( ** 非翻译函数)上使用时,他们会将它们放在spark_apply(). 然而,我只遇到例子,其中一个单一的本地数据帧或者是copy_to()spark_read_csv()到远程数据源,然后使用spark_apply()它。一个示例,仅用于说明目的:

library(sparklyr)
sc <- spark_connect(master = "local")

n_sim = 100
iris_samps <- iris %>% dplyr::filter(Species == "virginica") %>%
  sapply(rep.int, times=n_sim) %>% cbind(replicate = rep(1:n_sim, each = 50)) %>% 
  data.frame() %>%
  dplyr::group_by(replicate) %>%
  dplyr::sample_n(50, replace = TRUE)

iris_samps_tbl <- copy_to(sc, iris_samps)

iris_samps_tbl %>% 
  spark_apply(function(x) {mean(x$Petal_Length)}, 
    group_by = "replicate") %>%
  ggplot(aes(x = result)) + geom_histogram(bins = 20) + ggtitle("Histogram of 100 Bootstrapped Means using sparklyr")
Run Code Online (Sandbox Code Playgroud)

因此,只要数据驻留在 Spark 对象中,似乎就可以在来自CRANBioconductor包的任何范围的非翻译函数上使用它。

我想出了一个特定问题.jpeg的图像,因为我读的是SparkR可以加载压缩的图像(.jpeg.png通过)为原始图像表示ImageIO在Java库-它似乎有可能是sparklyr可以做到这一点。

RsimMosaic::composeMosaicFromImageRandom(inputImage, outputImage, pathToTilesLibrary)函数采用输入图像和用于创建照片马赛克的图块路径并输出图像(示例此处)。

如果这个函数只拍摄一张图像并且我知道如何将它变成火花对象,我可能会想象命令看起来像:composeMosaicFromImageRandom(inputImage, outputImage, spark_obj). 但是,此功能正在获取 30,000 个图像的路径。

如何从这些图块 ( .jpegs)的路径创建 30,000 个 Spark 对象,然后使用此功能?

如果底层代码实际上需要修改,我曾经jimhester/lookup提供源代码:

function (originalImageFileName, outputImageFileName, imagesToUseInMosaic, 
        useGradients = FALSE, removeTiles = TRUE, fracLibSizeThreshold = 0.7, 
        repFracSize = 0.25, verbose = TRUE) 
{
        if (verbose) {
                cat(paste("\n ------------------------------------------------ \n"))
                cat(paste("    R Simple Mosaic composer - random version   \n"))
                cat(paste(" ------------------------------------------------ \n\n"))
        }
        if (verbose) {
                cat(paste("    Creating the library... \n"))
        }
        libForMosaicFull <- createLibraryIndexDataFrame(imagesToUseInMosaic, 
                saveLibraryIndex = F, useGradients = useGradients)
        libForMosaic <- libForMosaicFull
        filenameArray <- list.files(imagesToUseInMosaic, full.names = TRUE)
        originalImage <- jpeg::readJPEG(filenameArray[1])
        xTileSize <- dim(originalImage[, , 1])[1]
        yTileSize <- dim(originalImage[, , 1])[2]
        if (verbose) {
                cat(paste("    -- Tiles in the Library : ", length(libForMosaic[, 
                        1]), "\n"))
                cat(paste("    -- Tile dimensions : ", xTileSize, " x ", 
                        yTileSize, "\n"))
        }
        if (verbose) {
                cat(paste("\n"))
                cat(paste("    Reading the original image... \n"))
        }
        originalImage <- jpeg::readJPEG(originalImageFileName)
        xOrigImgSize <- dim(originalImage[, , 1])[1]
        yOrigImgSize <- dim(originalImage[, , 1])[2]
        if (verbose) {
                cat(paste("    -- Original image dimensions : ", xOrigImgSize, 
                        " x ", yOrigImgSize, "\n"))
                cat(paste("    -- Output image dimensions : ", ((xOrigImgSize - 
                        2) * xTileSize), " x ", ((yOrigImgSize - 2) * yTileSize), 
                        "\n"))
        }
        if (verbose) {
                cat(paste("\n"))
                cat(paste("    Computing the mosaic... \n"))
        }
        outputImage <- array(dim = c(((xOrigImgSize - 2) * xTileSize), 
                ((yOrigImgSize - 2) * yTileSize), 3))
        removedList <- c()
        l <- 1
        pCoord <- matrix(nrow = ((xOrigImgSize - 2) * (yOrigImgSize - 
                2)), ncol = 2)
        for (i in 2:(xOrigImgSize - 1)) {
                for (j in 2:(yOrigImgSize - 1)) {
                        pCoord[l, 1] <- i
                        pCoord[l, 2] <- j
                        l <- l + 1
                }
        }
        npixels <- length(pCoord[, 1])
        for (i in 1:npixels) {
                idx <- round(runif(1, 1, length(pCoord[, 1])))
                pixelRGBandNeigArray <- computeStatisticalQuantitiesPixel(pCoord[idx, 
                        1], pCoord[idx, 2], originalImage, useGradients)
                tileFilename <- getCloseMatch(pixelRGBandNeigArray, 
                        libForMosaic)
                startI <- (pCoord[idx, 1] - 2) * xTileSize + 1
                startJ <- (pCoord[idx, 2] - 2) * yTileSize + 1
                outputImage[startI:(startI + xTileSize - 1), startJ:(startJ + 
                        yTileSize - 1), ] <- jpeg::readJPEG(tileFilename)
                if (removeTiles) {
                        libForMosaic <- removeTile(tileFilename, libForMosaic)
                        removedList <- c(removedList, tileFilename)
                        if (length(libForMosaic[, 1]) < (fracLibSizeThreshold * 
                                length(libForMosaicFull[, 1]))) {
                                idxs <- runif(round(0.25 * length(libForMosaicFull[, 
                                        1])), 1, length(removedList))
                                for (ii in 1:length(idxs)) {
                                        libForMosaic <- addBackTile(removedList[idxs[ii]], 
                                                libForMosaic, libForMosaicFull)
                                }
                                removedList <- removedList[-idxs]
                        }
                }
                if (length(pCoord[, 1]) > 2) {
                        pCoord <- pCoord[-idx, ]
                }
        }
        if (verbose) {
                cat(paste("\n"))
                cat(paste("    Done!\n\n"))
        }
        jpeg::writeJPEG(outputImage, outputImageFileName)
}
Run Code Online (Sandbox Code Playgroud)

请注意:我第一次尝试加速此代码是 1) 使用profvis查找瓶颈(for 循环) 2)foreach在 for 循环上使用包。这导致代码变慢,这表明我在太低的水平上进行了并行化。据我所知sparklyr,更多的是关于分布式计算而不是并行化,所以这可能会奏效。