使用存储过程的Azure documentdb批量插入

var*_*hak 7 bulkinsert azure azure-cosmosdb

嗨我正在使用16个集合来插入大约3-4百万个json对象,范围从每个对象5-10k.我使用存储过程来插入这些文件.我有22个容量单位.

function bulkImport(docs) {
    var collection = getContext().getCollection();
    var collectionLink = collection.getSelfLink();

    // The count of imported docs, also used as current doc index.
    var count = 0;

    // Validate input.
    if (!docs) throw new Error("The array is undefined or null.");

    var docsLength = docs.length;
    if (docsLength == 0) {
        getContext().getResponse().setBody(0);
    }

    // Call the CRUD API to create a document.
    tryCreateOrUpdate(docs[count], callback);

    // Note that there are 2 exit conditions:
    // 1) The createDocument request was not accepted. 
    //    In this case the callback will not be called, we just call setBody and we are done.
    // 2) The callback was called docs.length times.
    //    In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done.
    function tryCreateOrUpdate(doc, callback) {
        var isAccepted = true;
        var isFound = collection.queryDocuments(collectionLink, 'SELECT * FROM root r WHERE r.id = "' + doc.id + '"', function (err, feed, options) {
            if (err) throw err;
            if (!feed || !feed.length) {
                isAccepted = collection.createDocument(collectionLink, doc, callback);
            }
            else {
                // The metadata document.
                var existingDoc = feed[0];
                isAccepted = collection.replaceDocument(existingDoc._self, doc, callback);
            }
        });

        // If the request was accepted, callback will be called.
        // Otherwise report current count back to the client, 
        // which will call the script again with remaining set of docs.
        // This condition will happen when this stored procedure has been running too long
        // and is about to get cancelled by the server. This will allow the calling client
        // to resume this batch from the point we got to before isAccepted was set to false
        if (!isFound && !isAccepted) getContext().getResponse().setBody(count);
    }

    // This is called when collection.createDocument is done and the document has been persisted.
    function callback(err, doc, options) {
        if (err) throw err;

        // One more document has been inserted, increment the count.
        count++;

        if (count >= docsLength) {
            // If we have created all documents, we are done. Just set the response.
            getContext().getResponse().setBody(count);
        } else {
            // Create next document.
            tryCreateOrUpdate(docs[count], callback);
        }
    }
Run Code Online (Sandbox Code Playgroud)

我的C#代码看起来像这样

    public async Task<int> Add(List<JobDTO> entities)
            {

                    int currentCount = 0;
                    int documentCount = entities.Count;

                    while(currentCount < documentCount)
                    {
                        string argsJson = JsonConvert.SerializeObject(entities.Skip(currentCount).ToArray());
                        var args = new dynamic[] { JsonConvert.DeserializeObject<dynamic[]>(argsJson) };

                        // 6. execute the batch.
                        StoredProcedureResponse<int> scriptResult = await DocumentDBRepository.Client.ExecuteStoredProcedureAsync<int>(sproc.SelfLink, args);

                        // 7. Prepare for next batch.
                        int currentlyInserted = scriptResult.Response;

                        currentCount += currentlyInserted;

                    }

                    return currentCount;
            }
Run Code Online (Sandbox Code Playgroud)

我面临的问题是我尝试插入的400k文档,有时文档会被错过而没有给出任何错误.

该应用程序是部署在云上的辅助角色.如果我增加插入documentDB的线程数或实例数,则错过的文档数量要高得多.

如何弄清楚是什么问题.谢谢你.

bra*_*321 9

我发现在尝试这段代码时我会在docs.length上得到一个错误,该错误表明长度未定义.

function bulkImport(docs) {
    var collection = getContext().getCollection();
    var collectionLink = collection.getSelfLink();

    // The count of imported docs, also used as current doc index.
    var count = 0;

    // Validate input.
    if (!docs) throw new Error("The array is undefined or null.");

    var docsLength = docs.length; // length is undefined
}
Run Code Online (Sandbox Code Playgroud)

经过多次测试(在Azure文档中找不到任何内容)后,我意识到我无法按照建议传递数组.参数必须是一个对象.我必须像这样修改批处理代码才能运行它.

我还发现我不能简单地尝试在DocumentDB脚本资源管理器(输入框)中传递一系列文档.即使占位符帮助文本说你可以.

这段代码对我有用:

// psuedo object for reference only
docObject = {
  "items": [{doc}, {doc}, {doc}]
}

function bulkImport(docObject) {
    var context = getContext();
    var collection = context.getCollection();
    var collectionLink = collection.getSelfLink();
    var count = 0;

    // Check input
    if (!docObject.items || !docObject.items.length) throw new Error("invalid document input parameter or undefined.");
    var docs = docObject.items;
    var docsLength = docs.length;
    if (docsLength == 0) {
        context.getResponse().setBody(0);
    }

    // Call the funct to create a document.
    tryCreateOrUpdate(docs[count], callback);

    // Obviously I have truncated this function. The above code should help you understand what has to change.
}
Run Code Online (Sandbox Code Playgroud)

希望如果我错过了Azure文档将会赶上或变得更容易找到.

我还将为Script Explorer添加一个错误报告,希望Azurites能够更新.


And*_*Liu 4

需要注意的是,存储过程的执行是有限的,其中所有操作都必须在服务器指定的请求超时时间内完成。如果操作未在该时间限制内完成,事务将自动回滚。为了简化开发以处理时间限制,所有 CRUD(创建、读取、更新和删除)操作都会返回一个布尔值,表示该操作是否将完成。该布尔值可用作信号来结束执行并用于实现基于连续的模型以恢复执行(这在下面的代码示例中进行了说明)。

\n\n

上面提供的批量插入存储过程通过返回成功创建的文档数来实现延续模型。这在存储过程的注释中指出:

\n\n
    // If the request was accepted, callback will be called.\n    // Otherwise report current count back to the client, \n    // which will call the script again with remaining set of docs.\n    // This condition will happen when this stored procedure has been running too long\n    // and is about to get cancelled by the server. This will allow the calling client\n    // to resume this batch from the point we got to before isAccepted was set to false\n    if (!isFound && !isAccepted) getContext().getResponse().setBody(count);\n
Run Code Online (Sandbox Code Playgroud)\n\n

如果输出文档计数小于输入文档计数,您将需要使用剩余的文档集重新运行存储过程。

\n