'Apache Pulsar: Read / Consume messages from an integer specified message id to an end message Id?

With Kafka, I can specify an integer message id to begin consuming from and an end message to stop at, e.g as follows:

 kafkacat -b kafka:9092 -t messages -o 11000 -c 11333

However, it appears the same functionality to specify integer start and stop messages is not available in Apache Pulsar!

To be fair, it's possible to specify a start message id and end message id, if these have been tracked and saved in a byte format, using a very convoluted process which is bound to affect performance and code complexity.

As in this example:

client, err := NewClient(pulsar.ClientOptions{
    URL: lookupURL,
})

if err != nil {
    log.Fatal(err)
}
defer client.Close()

topic := "topic-1"
ctx := context.Background()

// create producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic:           topic,
    DisableBatching: true,
})
if err != nil {
    log.Fatal(err)
}
defer producer.Close()

// send 10 messages
msgIDs := [10]MessageID{}
for i := 0; i < 10; i++ {
    msgID, err := producer.Send(ctx, &pulsar.ProducerMessage{
        Payload: []byte(fmt.Sprintf("hello-%d", i)),
    })
    assert.NoError(t, err)
    assert.NotNil(t, msgID)
    msgIDs[i] = msgID
}

// create reader on 5th message (not included)
reader, err := client.CreateReader(pulsar.ReaderOptions{
    Topic:          topic,
    StartMessageID: msgIDs[4],
})

if err != nil {
    log.Fatal(err)
}
defer reader.Close()

// receive the remaining 5 messages
for i := 5; i < 10; i++ {
    msg, err := reader.Next(context.Background())
    if err != nil {
    log.Fatal(err)
}

// create reader on 5th message (included)
readerInclusive, err := client.CreateReader(pulsar.ReaderOptions{
    Topic:                   topic,
    StartMessageID:          msgIDs[4],
    StartMessageIDInclusive: true,
})

if err != nil {
    log.Fatal(err)
}
defer readerInclusive.Close()

However, this is complicated and unreliable (or complex) for multiple concurrent readers and requires the use of an external construct to track the processed messages before it can be retrieved using the start/end semantics.

Is there any way to achieve this (preferably via golang)



Solution 1:[1]

I've found that the following simple approach is sufficient (proof of concept script):

package main

import (
    "context"
    "fmt"
    "io/ioutil"
    "log"
    "os"
    "strconv"
    "strings"
    "time"

    "github.com/apache/pulsar-client-go/pulsar"
)

func writeBytesToFile(f string, byteSlice []byte) int {
    // Open a new file for writing only

    f = "./data/" + f

    file, err := os.OpenFile(
        f,
        os.O_WRONLY|os.O_TRUNC|os.O_CREATE,
        0666,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    // Write bytes to file
    bytesWritten, err := file.Write(byteSlice)

    if err != nil {
        log.Fatal(err)
    }

    log.Printf("Wrote %d bytes.\n", bytesWritten)

    return bytesWritten
}

func readBackByEntryId(msgDir string, msgIndex string) (yourBytes []byte) {

    //We know the file name by convention
    fname := msgDir + "/" + msgIndex + ".dat"

    yourBytes, err := ioutil.ReadFile(fname)

    if err != nil {
        log.Printf("error reading %s", fname)
        return nil
    }

    return yourBytes
}

func getFiles(aDir string) []string {

    var theFiles []string

    files, err := ioutil.ReadDir("./data/")

    if err != nil {
        log.Fatal(err)
    }

    for _, f := range files {

        theFiles = append(theFiles, f.Name())

    }

    return theFiles
}

func streamAll(reader pulsar.Reader, startMsgIndex int64, stopMsgIndex int64) {

    read := false

    for reader.HasNext() {

        msg, err := reader.Next(context.Background())

        if err != nil {
            log.Fatal(err)
        }

        //can I access the details of the message ? yes
        fmt.Printf("%v -> %#v\n", msg.ID().EntryID(), msg.ID())

        //Can i serialize into bytes? Yes
        myBytes := msg.ID().Serialize()

        //Can I store it somewhere? Perhaps a map ? or even on disk in a file ?
        //In other words: Can I write a byte[] slice to a file? Yes!
        msgIndex := msg.ID().EntryID()

        if msgIndex == startMsgIndex {
            fmt.Println("start read: ", msgIndex)
            read = true
        }

        if msgIndex > stopMsgIndex {
            fmt.Println("stop reading: ", msgIndex)
            read = false
        }

        if read == false {

            fmt.Println("skipping ", msgIndex)

        } else {

            fname := strconv.FormatInt(msgIndex, 10) + ".dat"

            fmt.Println("written bytes: ", writeBytesToFile(fname, myBytes))

            fmt.Printf("Received message msgId: %#v -- content: '%s' published at %v\n",
                msg.ID(), string(msg.Payload()), msg.PublishTime())

        }

        /*
            //FYI - to save and reread a msgId from store: https://githubmemory.com/@storm-5
            msgId := msg.ID()
            msgIdBytes := msgId.Serialize()
            idNew, _ := pulsar.DeserializeMessageID(msgIdBytes)

            readerInclusive, err := client.CreateReader(pulsar.ReaderOptions{
                Topic:                   "ragnarok/transactions/requests",
                StartMessageID:          idNew,
                StartMessageIDInclusive: true,
            })
        */
    }

}

func retrieveRange(client pulsar.Client) {

    someFiles := getFiles("./data/")

    for _, f := range someFiles {

        fIndex := strings.Split(f, ".")[0]

        fmt.Println("re-reading message index -> ", fIndex)

        msgIdBytes := readBackByEntryId("./data", fIndex)

        fmt.Printf("boom -> %#v\n", msgIdBytes)

        idNew, err := pulsar.DeserializeMessageID(msgIdBytes)

        if err != nil {
            log.Fatal(err)
        }

        fmt.Println("Got message entry id => ", idNew.EntryID())

        readerInclusive, err := client.CreateReader(pulsar.ReaderOptions{
            Topic:                   "ragnarok/transactions/requests",
            StartMessageID:          idNew,
            StartMessageIDInclusive: true,
        })

        if err != nil {
            log.Fatal(err)
        }

        defer readerInclusive.Close()

        //defer readerInclusive.Close()
        fmt.Println("bleep!")

        msg, err := readerInclusive.Next(context.Background())

        if err != nil {
            log.Fatal(err)
        }

        //fmt.Println("retrieved message -> ", string(msg.Payload()))
        fmt.Printf("Retrieved message ID message msgId: %#v -- content: '%s' published at %v\n",
            msg.ID(), string(msg.Payload()), msg.PublishTime())

    }
}

func main() {

    client, err := pulsar.NewClient(
        pulsar.ClientOptions{
            URL:               "pulsar://localhost:6650",
            OperationTimeout:  30 * time.Second,
            ConnectionTimeout: 30 * time.Second,
        })

    if err != nil {
        log.Fatalf("Could not instantiate Pulsar client: %v", err)
    }

    defer client.Close()

    reader, err := client.CreateReader(pulsar.ReaderOptions{
        Topic:          "ragnarok/transactions/requests",
        StartMessageID: pulsar.EarliestMessageID(),
    })

    if err != nil {
        log.Fatal(err)
    }

    defer reader.Close()

    if err != nil {
        log.Fatal(err)
    }

    var startMsgId int64 = 55
    var stopMsgId int64 = 66

    //stream all the messages from the earliest to latest
    //pick a subset between a start and stop id
    streamAll(reader, startMsgId, stopMsgId)

    //retrieve the picked range
    retrieveRange(client)

}



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Traiano Welcome