如何批量上传数据到appengine数据存储区?较旧的方法不起作用

Cyg*_*ger 15 python google-app-engine google-cloud-storage google-cloud-datastore

这应该是一个相当普遍的要求,也是一个简单的过程:将数据批量上传到appengine数据存储区.

但是,stackoverflow(下面的链接*)中提到的旧解决方案似乎都不再有效.bulkloader方法是使用DB API上传到数据存储区时最合理的解决方案,不适用于NDB API

现在,批量加载器方法似乎已被弃用,并且文档中仍然存在的旧链接会导致页面错误.这是一个例子

https://developers.google.com/appengine/docs/python/tools/uploadingdata

以上链接仍显示在此页面上:https://developers.google.com/appengine/docs/python/tools/uploadinganapp

现在批量加载数据的推荐方法是什么?

两个可行的替代方案似乎是1)使用remote_api或2)将CSV文件写入GCS存储桶并从中读取.有没有人有成功使用这两种方法的经验?

任何指针将不胜感激.谢谢!

[*以下链接提供的解决方案已不再有效]

[1] 如何将数据批量上传到Google appengine数据存储区?

[2] 如何在Google App Engine数据存储中插入批量数据?

Sri*_*ram 9

方法1:使用remote_api

如何:编写一个bulkloader.yaml文件并使用终端中的" appcfg.py upload_data "命令直接运行它我不推荐这种方法有以下几个原因:1.巨大的延迟2.不支持NDB

方法2:GCS并使用mapreduce

将数据文件上传到GCS:

使用" storage-file-transfer-json-python "github项目(chunked_transfer.py)将文件从本地系统上传到gcs.确保从应用引擎管理控制台生成正确的"client-secrets.json"文件.

MapReduce的:

使用" appengine-mapreduce "github项目.将"mapreduce"文件夹复制到项目顶级文件夹.

将以下行添加到app.yaml文件中:

includes:
  - mapreduce/include.yaml
Run Code Online (Sandbox Code Playgroud)

下面是你的main.py文件

import cgi
import webapp2
import logging
import os, csv
from models import DataStoreModel
import StringIO
from google.appengine.api import app_identity
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from mapreduce.input_readers import InputReader

def testmapperFunc(newRequest):
    f = StringIO.StringIO(newRequest)
    reader = csv.reader(f, delimiter=',')
    for row in reader:
        newEntry = DataStoreModel(attr1=row[0], link=row[1])
        yield op.db.Put(newEntry)

class TestGCSReaderPipeline(base_handler.PipelineBase):
    def run(self, filename):
        yield mapreduce_pipeline.MapreducePipeline(
                "test_gcs",
                "testgcs.testmapperFunc",
                "mapreduce.input_readers.FileInputReader",
                mapper_params={
                    "files": [filename],
                    "format": 'lines'
                },
                shards=1)

class tempTestRequestGCSUpload(webapp2.RequestHandler):
    def get(self):
        bucket_name = os.environ.get('BUCKET_NAME',
                                     app_identity.get_default_gcs_bucket_name())

        bucket = '/gs/' + bucket_name
        filename = bucket + '/' + 'tempfile.csv'

        pipeline = TestGCSReaderPipeline(filename)
        pipeline.with_params(target="mapreducetestmodtest")
        pipeline.start()
        self.response.out.write('done')

application = webapp2.WSGIApplication([
    ('/gcsupload', tempTestRequestGCSUpload),
], debug=True)
Run Code Online (Sandbox Code Playgroud)

要记住:

  1. Mapreduce项目使用现已弃用的"Google Cloud Storage Files API".所以未来的支持不能保证.
  2. Map reduce为数据存储读取和写入增加了一点开销.

方法3:GCS和GCS客户端库

  1. 使用上述文件传输方法将csv/text文件上载到gcs.
  2. 使用gcs客户端库(将"cloudstorage"文件夹复制到应用程序顶级文件夹).

将以下代码添加到应用程序main.py文件中.

import cgi
import webapp2
import logging
import jinja2
import os, csv
import cloudstorage as gcs
from google.appengine.ext import ndb
from google.appengine.api import app_identity
from models import DataStoreModel

class UploadGCSData(webapp2.RequestHandler):
    def get(self):
        bucket_name = os.environ.get('BUCKET_NAME',
                                     app_identity.get_default_gcs_bucket_name())
        bucket = '/' + bucket_name
        filename = bucket + '/tempfile.csv'
        self.upload_file(filename)

    def upload_file(self, filename):
        gcs_file = gcs.open(filename)
        datareader = csv.reader(gcs_file)
        count = 0
        entities = []
        for row in datareader:
            count += 1
                newProd = DataStoreModel(attr1=row[0], link=row[1])
                entities.append(newProd)

            if count%50==0 and entities:
                ndb.put_multi(entities)
                entities=[]

        if entities:
            ndb.put_multi(entities)

application = webapp2.WSGIApplication([
    ('/gcsupload', UploadGCSData),
], debug=True)
Run Code Online (Sandbox Code Playgroud)


gre*_*ess 1

你们中的一些人可能会遇到我的情况:我无法使用数据存储区的导入/导出实用程序,因为我的数据在进入数据存储区之前需要进行转换。

我最终使用了apache-beam谷歌云数据流

只需要写几行“beam”代码即可

  • 读取您的数据(例如,托管在云存储上) - 您会得到一个PCollection字符串,
  • 做任何你想要的转换(这样你就得到了一个PCollection数据存储实体),
  • 将它们转储到数据存储接收器

请参阅如何使用多个工作人员加速批量导入到谷歌云数据存储?对于具体的用例。

我能够以每秒 800 个实体的速度向我的数据存储区写入 5 个工作人员。这使我能够在大约 5 小时内完成导入任务(1600 万行)。如果你想让它更快,请使用更多的工人:D