Spring Data MongoDB Lookup with Pipeline Aggregation

Alw*_*ing 16 spring mongodb spring-data aggregation-framework spring-data-mongodb

如何将以下 MongoDB 查询转换为 Java Spring 应用程序要使用的查询?我找不到使用pipeline提供的查找方法的方法。

这是我试图转换的查询。我还想指出,我没有使用,$unwind因为我希望将deliveryZipCodeTimings保留为返回对象中的分组集合。

db.getCollection('fulfillmentChannel').aggregate([
    {
        $match: {
            "dayOfWeek": "SOME_VARIABLE_STRING_1"
        }
    },
    {
        $lookup: {
            from: "deliveryZipCodeTiming",
            let: { location_id: "$fulfillmentLocationId" },
            pipeline: [{
                $match: {
                    $expr: {
                        $and: [
                            {$eq: ["$fulfillmentLocationId", "$$location_id"]},
                            {$eq: ["$zipCode", "SOME_VARIABLE_STRING_2"]}
                        ]
                    }
                }
            },
            { 
                $project: { _id: 0, zipCode: 1, cutoffTime: 1 } 
            }],
            as: "deliveryZipCodeTimings"
        }
    },
    {
        $match: {
            "deliveryZipCodeTimings": {$ne: []}
        }
    }
])
Run Code Online (Sandbox Code Playgroud)

Alw*_*ing 13

基于@dnickless 提供的信息,我能够解决这个问题。我将发布完整的解决方案,希望它在未来对其他人有所帮助。

我正在使用mongodb-driver:3.6.4

首先,我必须创建一个自定义聚合操作类,以便我可以传入一个自定义 JSON mongodb 查询以用于聚合操作。这将允许我pipeline在我使用$lookup的驱动程序版本不支持的范围内使用。

public class CustomProjectAggregationOperation implements AggregationOperation {
    private String jsonOperation;

    public CustomProjectAggregationOperation(String jsonOperation) {
        this.jsonOperation = jsonOperation;
    }

    @Override
    public Document toDocument(AggregationOperationContext aggregationOperationContext) {
        return aggregationOperationContext.getMappedObject(Document.parse(jsonOperation));
    }
}
Run Code Online (Sandbox Code Playgroud)

现在我们能够将自定义 JSON 查询传递到我们的 mongodb spring 实现中,剩下的就是将这些值插入到TypedAggregation查询中。

public List<FulfillmentChannel> getFulfillmentChannels(
    String SOME_VARIABLE_STRING_1, 
    String SOME_VARIABLE_STRING_2) {

    AggregationOperation match = Aggregation.match(
            Criteria.where("dayOfWeek").is(SOME_VARIABLE_STRING_1));
    AggregationOperation match2 = Aggregation.match(
            Criteria.where("deliveryZipCodeTimings").ne(Collections.EMPTY_LIST));
    String query =
            "{ $lookup: { " +
                    "from: 'deliveryZipCodeTiming'," +
                    "let: { location_id: '$fulfillmentLocationId' }," +
                    "pipeline: [{" +
                    "$match: {$expr: {$and: [" +
                    "{ $eq: ['$fulfillmentLocationId', '$$location_id']}," +
                    "{ $eq: ['$zipCode', '" + SOME_VARIABLE_STRING_2 + "']}]}}}," +
                    "{ $project: { _id: 0, zipCode: 1, cutoffTime: 1 } }]," +
                    "as: 'deliveryZipCodeTimings'}}";

    TypedAggregation<FulfillmentChannel> aggregation = Aggregation.newAggregation(
            FulfillmentChannel.class,
            match,
            new CustomProjectAggregationOperation(query),
            match2
    );

    AggregationResults<FulfillmentChannel> results = 
        mongoTemplate.aggregate(aggregation, FulfillmentChannel.class);
    return results.getMappedResults();
}
Run Code Online (Sandbox Code Playgroud)


dma*_*a_k 5

我想添加这个我的解决方案,它在某些方面重复之前发布的解决方案。

蒙戈驱动程序 v3.x

对于 Mongo 驱动程序 v3.x 我得出以下解决方案:

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.util.JSON;

import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;

public class JsonOperation implements AggregationOperation {

    private List<Document> documents;

    public JsonOperation(String json) {
        Object root = JSON.parse(json);

        documents = root instanceof BasicDBObject
                    ? Collections.singletonList(new Document(((BasicDBObject) root).toMap()))
                    : ((BasicDBList) root).stream().map(item -> new Document((Map<String, Object>) ((BasicDBObject) item).toMap())).collect(Collectors.toList());
    }

    @Override
    public Document toDocument(AggregationOperationContext context) {
        // Not necessary to return anything as we override toPipelineStages():
        return null;
    }

    @Override
    public List<Document> toPipelineStages(AggregationOperationContext context) {
        return documents;
    }
}
Run Code Online (Sandbox Code Playgroud)

然后假设在某些资源中给出了聚合步骤aggregations.json

[
  {
    $match: {
      "userId": "..."
    }
  },
  {
    $lookup: {
      let: {
        ...
      },
      from: "another_collection",
      pipeline: [
        ...
      ],
      as: "things"
    }
  },
  {
    $sort: {
      "date": 1
    }
  }
]
Run Code Online (Sandbox Code Playgroud)

可以按如下方式使用上面的类:

import static org.springframework.data.mongodb.core.aggregation.Aggregation.newAggregation;

Collection<ResultDao> results = mongoTemplate.aggregate(newAggregation(new JsonOperation(resourceToString("aggregations.json", StandardCharsets.UTF_8))), "some_collection", ResultDao.class).getMappedResults();
Run Code Online (Sandbox Code Playgroud)

蒙戈驱动程序 v4.x

由于JSON类已从 Mongo v4 中删除,我将类重写如下:

import java.util.Collections;
import java.util.List;

import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;

public class JsonOperation implements AggregationOperation {

    private List<Document> documents;

    private static final String DUMMY_KEY = "dummy";

    public JsonOperation(String json) {
        documents = parseJson(json);
    }

    static final List<Document> parseJson(String json) {
        return (json.startsWith("["))
                    ? Document.parse("{\"" + DUMMY_KEY + "\": " + json + "}").getList(DUMMY_KEY, Document.class)
                    : Collections.singletonList(Document.parse(json));
    }

    @Override
    public Document toDocument(AggregationOperationContext context) {
        // Not necessary to return anything as we override toPipelineStages():
        return null;
    }

    @Override
    public List<Document> toPipelineStages(AggregationOperationContext context) {
        return documents;
    }

    @Override
    public String getOperator() {
        return documents.iterator().next().keySet().iterator().next();
    }
}
Run Code Online (Sandbox Code Playgroud)

但由于字符串操作,现在的实现有点难看。如果有人对如何以更优雅的方式解析对象数组有更好的想法,请编辑这篇文章或发表评论。理想情况下,Mongo 核心中应该有一些方法允许解析 JSON 对象或列表(返回BasicDBObject/BasicDBListDocument/ List<Document>)。

Document另请注意,我跳过了在方法中转换实例的步骤toPipelineStages(),因为在我的情况下没有必要:

@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
    return documents.stream().map(document -> context.getMappedObject(document)).collect(Collectors.toList());
}

Run Code Online (Sandbox Code Playgroud)