'How to parse MongoDB aggregation step containing functions?

Looking at com.mongodb.reactivestreams.client.MongoCollection interface we see that aggregation can be invoked using list of Bson elements.

public interface MongoCollection<TDocument> {
  ...
  AggregatePublisher<TDocument> aggregate(List<? extends Bson> list);
}

It is clear how to use it when aggregation steps are JSONs (see Example with simple JSONs)

Unfortunately, when any aggregation step contains a function (which is allowed by native MongoDB query), for instance, $accumulator the same approach cannot be applied due to it causes violation of Bson format (org.bson.json.JsonParseException) (see Example with functions)


What is the best way to convert a native MongoDB aggregation query into a result in Java?

(suppose that queries are complex and it is not expedient to rewrite them with Mongo aggregation builders in Java)

Example with simple JSONs:

ReactiveMongoOperations mongo = /* ... */;

var match = BasicDBObject.parse("{ $match: {name: \"Jack\"} }");
var project = BasicDBObject.parse("{ $project: {_id: 0, age: 1, name: 1} }";

var queryParts = List.of(match, project);

Flux<PersonInfo> infoFlux = mongo
                      .getCollection("person")
                      .flatMapMany(person -> person.aggregate(queryParts).toFlux())
                      .map(it -> objectMapper.readValue(it.toJson(), PersonInfo.class))
                      .collectList()

Example with functions:
// here for conciseness it is just a counting accumulator; generally functions are more complex

var match = BasicDBObject.parse("""
          {
            $group: {
              _id: "$token",
              count: {
                $accumulator: {
                  init: function() {
                    return {owned: 0, total: 0}
                  },
                  accumulate: function(state, owner) {
                    return {
                      total: state.total + 1
                    }
                  },
                  accumulateArgs: ["$owner"],
                  merge: function(a, b) {
                    return {
                      total: a.total + b.total
                    }
                  },
                  lang: "js"
                }
              },
              minPriceEth: {$min: "$priceEth"}
            }
          }
 """);


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source