使用pyspark流式传输到HBase

swi*_*ish 3 hadoop hbase spark-streaming pyspark

关于使用Scala进行Spark流式传输( 两个特别有用)以及Java的一些信息,有大量关于批量加载到HBase的信息,但似乎缺乏使用PySpark进行此操作的信息.所以我的问题是:

  • 如何使用PySpark将数据批量加载到HBase中?
  • 任何语言中的大多数示例仅显示每行插入一列.如何每行插入多个列?

我目前的代码如下:

if __name__ == "__main__":

    context = SparkContext(appName="PythonHBaseBulkLoader")
    streamingContext = StreamingContext(context, 5)

    stream = streamingContext.textFileStream("file:///test/input");

    stream.foreachRDD(bulk_load)

    streamingContext.start()
    streamingContext.awaitTermination()
Run Code Online (Sandbox Code Playgroud)

我需要帮助的是批量加载功能

def bulk_load(rdd):
    #???
Run Code Online (Sandbox Code Playgroud)

我以前取得了一些进展,有很多不同的错误(如此此处所述)

swi*_*ish 8

所以经过多次试验和错误,我在这里呈现了我想出的最好成绩.它运行良好,并成功批量加载数据(使用Puts或HFiles)我完全愿意相信它不是最好的方法,所以欢迎任何评论/其他答案.这假设您使用CSV作为数据.

使用Puts进行批量加载

到目前为止,这是最简单的批量加载方式,这只是Put为CSV中的每个单元创建一个请求,并将它们排队到HBase.

def bulk_load(rdd):
    #Your configuration will likely be different. Insert your own quorum and parent node and table name
    conf = {"hbase.zookeeper.qourum": "localhost:2181",\
            "zookeeper.znode.parent": "/hbase-unsecure",\
            "hbase.mapred.outputtable": "Test",\
            "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat",\
            "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\
            "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}

    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

    load_rdd = rdd.flatMap(lambda line: line.split("\n"))\#Split the input into individual lines
                  .flatMap(csv_to_key_value)#Convert the CSV line to key value pairs
    load_rdd.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)
Run Code Online (Sandbox Code Playgroud)

这个功能csv_to_key_value是魔术发生的地方:

def csv_to_key_value(row):
    cols = row.split(",")#Split on commas.
    #Each cell is a tuple of (key, [key, column-family, column-descriptor, value])
    #Works well for n>=1 columns
    result = ((cols[0], [cols[0], "f1", "c1", cols[1]]),
              (cols[0], [cols[0], "f2", "c2", cols[2]]),
              (cols[0], [cols[0], "f3", "c3", cols[3]]))
    return result
Run Code Online (Sandbox Code Playgroud)

我们前面定义的值转换器将这些元组转换为HBase的Put小号

用HFiles批量装载

使用HFile进行批量加载更有效:不是Put对每个单元的请求,而是直接写入HFile,并简单地告诉RegionServer指向新的HFile.这将使用Py4J,所以在Python代码之前我们必须编写一个小的Java程序:

import py4j.GatewayServer;
import org.apache.hadoop.hbase.*;

public class GatewayApplication {

    public static void main(String[] args)
    {
        GatewayApplication app = new GatewayApplication();
        GatewayServer server = new GatewayServer(app);
        server.start();
    }
}
Run Code Online (Sandbox Code Playgroud)

编译它,然后运行它.只要您的流式传输正在进行,就让它保持运行.现在更新bulk_load如下:

def bulk_load(rdd):
    #The output class changes, everything else stays
    conf = {"hbase.zookeeper.qourum": "localhost:2181",\
            "zookeeper.znode.parent": "/hbase-unsecure",\
            "hbase.mapred.outputtable": "Test",\
            "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",\
            "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable",\
            "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}#"org.apache.hadoop.hbase.client.Put"}

    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"

    load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
                  .flatMap(csv_to_key_value)\
                  .sortByKey(True)
    #Don't process empty RDDs
    if not load_rdd.isEmpty():
        #saveAsNewAPIHadoopDataset changes to saveAsNewAPIHadoopFile
        load_rdd.saveAsNewAPIHadoopFile("file:///tmp/hfiles" + startTime,
                                        "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2",
                                        conf=conf,
                                        keyConverter=keyConv,
                                        valueConverter=valueConv)
        #The file has now been written, but HBase doesn't know about it

        #Get a link to Py4J
        gateway = JavaGateway()
        #Convert conf to a fully fledged Configuration type
        config = dict_to_conf(conf)
        #Set up our HTable
        htable = gateway.jvm.org.apache.hadoop.hbase.client.HTable(config, "Test")
        #Set up our path
        path = gateway.jvm.org.apache.hadoop.fs.Path("/tmp/hfiles" + startTime)
        #Get a bulk loader
        loader = gateway.jvm.org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles(config)
        #Load the HFile
        loader.doBulkLoad(path, htable)
    else:
        print("Nothing to process")
Run Code Online (Sandbox Code Playgroud)

最后,相当简单dict_to_conf:

def dict_to_conf(conf):
    gateway = JavaGateway()
    config = gateway.jvm.org.apache.hadoop.conf.Configuration()
    keys = conf.keys()
    vals = conf.values()
    for i in range(len(keys)):
        config.set(keys[i], vals[i])
    return config
Run Code Online (Sandbox Code Playgroud)

正如您所看到的,使用HFile批量加载比使用Puts 更复杂,但是根据您的数据加载,它可能是值得的,因为一旦您使用它并不困难.

关于让我措手不及的事情的最后一点说明:HFiles希望他们收到的数据是按照词汇顺序写的.这并不总是保证是真的,特别是因为"10"<"9".如果您将密钥设计为唯一,那么可以轻松修复:

load_rdd = rdd.flatMap(lambda line: line.split("\n"))\
              .flatMap(csv_to_key_value)\
              .sortByKey(True)#Sort in ascending order
Run Code Online (Sandbox Code Playgroud)