'MongoDB change stream returns empty fullDocument on insert

Mongo 4.4 and respective Golang driver are used. Database’s replica set is being run locally at localhost:27017, localhost:27020. I’ve also tried using Atlas’s sandbox cluster which gave me the same results.

According to Mongo's documentation when handling insertion of a new document fullDocument field of event data is supposed to contain newly inserted document which for some reason is not the case for me. ns field where database and collection name are supposed to be and documentKey where affected document _id is stored are empty as well. operationType field contains correct operation type. In another test it appeared that update operations do not appear in a change stream at all.

It used to work as it should but now it doesn't. Why does it happen and what am I doing wrong?

Code

// ds is the connection to discord, required for doing stuff inside handlers
func iterateChangeStream(stream *mongo.ChangeStream, ds *discordgo.Session, ctx context.Context, cancel context.CancelFunc) {
    defer stream.Close(ctx)
    defer cancel() // for graceful crashing

    for stream.Next(ctx) {
        var event bson.M
        err := stream.Decode(&event)
        if err != nil {
            log.Print(errors.Errorf("Failed to decode event: %w\n", err))
            return
        }

        rv := reflect.ValueOf(event["operationType"]) // getting operation type
        opType, ok := rv.Interface().(string)
        if !ok {
            log.Print("String expected in operationType\n")
            return
        }
        
        // event["fullDocument"] will be empty even when handling insertion
        // models.Player is a struct representing a document of the collection
        // I'm watching over
        doc, ok := event["fullDocument"].(models.Player)
        if !ok {
            log.Print("Failed to convert document into Player type")
            return
        }
        handlerCtx := context.WithValue(ctx, "doc", doc)
        // handlerToEvent maps operationType to respective handler
        go handlerToEvent[opType](ds, handlerCtx, cancel)
    }
}

func WatchEvents(ds *discordgo.Session, ctx context.Context, cancel context.CancelFunc) {

    pipeline := mongo.Pipeline{
        bson.D{{
            "$match",
            bson.D{{
                "$or", bson.A{
                    bson.D{{"operationType", "insert"}}, // !!!
                    bson.D{{"operationType", "delete"}},
                    bson.D{{"operationType", "invalidate"}},
                },
            }},
        }},
    }
    // mongo instance is initialized on program startup and stored in a global variable
    opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
    stream, err := db.Instance.Collection.Watch(ctx, pipeline, opts)
    if err != nil {
        log.Panic(err)
    }
    defer stream.Close(ctx)

    iterateChangeStream(stream, ds, ctx, cancel)
}

My issue might be related to this, except that it consistently occurs on insertion instead ocuring sometimes on updates. If you know how to enable change stream optimization feature flag mentioned inside link above, let me know.

Feel free to ask for more clarifications.



Solution 1:[1]

The question was answered here.

TLDR

You need to create the following structure to unmarshal event into:

type CSEvent struct {
    OperationType string        `bson:"operationType"`
    FullDocument  models.Player `bson:"fullDocument"`
}
var event CSEvent
err := stream.Decode(&event)

event will contain a copy of the inserted document.

Solution 2:[2]

From sample events that I see from this link we can see that fullDocument exists only on operationType: 'insert'.

 { 
     _id: { _data: '825DE67A42000000072B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7680004' },
    operationType: 'insert',
    clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 7, high_: 1575385666 },
    fullDocument: { 
        _id: 5de67a42113ea7de6472e768,
        name: 'Sydney Harbour Home',
        bedrooms: 4,
        bathrooms: 2.5,
        address: { market: 'Sydney', country: 'Australia' } },
        ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' },
        documentKey: { _id: 5de67a42113ea7de6472e768 } 
 }
 { 
    _id: { _data: '825DE67A42000000082B022C0100296E5A10046BBC1C6A9CBB4B6E9CA9447925E693EF46645F696400645DE67A42113EA7DE6472E7680004' },
    operationType: 'delete',
    clusterTime: Timestamp { _bsontype: 'Timestamp', low_: 8, high_: 1575385666 },
    ns: { db: 'sample_airbnb', coll: 'listingsAndReviews' },
    documentKey: { _id: 5de67a42113ea7de6472e768 } 
 }

So I recommend You

  1. to limit Your $match to insert
  2. or add if statement to operationType.
      if opType == "insert" {
        doc, ok := event["fullDocument"].(models.Player)
        if !ok {
            log.Print("Failed to convert document into Player type")
            return
        }
        handlerCtx := context.WithValue(ctx, "doc", doc)
        // handlerToEvent maps operationType to respective handler
        go handlerToEvent[opType](ds, handlerCtx, cancel)
        return
      }
  1. or make sure You're getting document using id of document from event["documentKey"]["_id"] and call playersCollection.findOne({_id: event["documentKey"]["_id"]})

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 mcv_dev
Solution 2