'How can I save the offset of ScyllaDB CDC after consumption?

I have read the doc of Scylla CDC and the Java Demo. After unremitting efforts in local code debugging but did not find how to save the CDC consumption offset. When my service is restarted, I will consume from the starting point, which makes me very confused.

This is my local Java Code

public static void main(String[] args) {
        RawChangeConsumerProvider changeConsumerProvider = threadId -> {
            RawChangeConsumer changeConsumer = change -> {
                if (change.getOperationType().equals(RawChange.OperationType.ROW_INSERT)) {
                    ChangeId changeId = change.getId();
                    StreamId streamId = changeId.getStreamId();
                    ChangeTime changeTime = changeId.getChangeTime();
                    System.out.println("-------- change ------------ ttl is " + change.getTTL() + ", changeId is " + changeId + ", streamId  is " + streamId + ",changeTime is " + changeTime);
                }

                return CompletableFuture.completedFuture(null);
            };
            return changeConsumer;
        };

        try (CDCConsumer consumer = CDCConsumer.builder()
                .addContactPoint("127.0.0.1", 9042)
                .withCredentials("cassandra", "cassandra")
                .addTable(new TableName("cds", "outbox_event"))
                .withConsumerProvider(changeConsumerProvider)
                .withWorkersCount(1)
                .withSleepBeforeFirstGenerationMs(500)
                //.withQueryTimeWindowSizeMs(5000)
                .withConfidenceWindowSizeMs(500)
                .build()) {

            consumer.start();
            CountDownLatch terminationLatch = new CountDownLatch(1);
            Signal.handle(new Signal("INT"), signal -> terminationLatch.countDown());
            terminationLatch.await();
        } catch (InterruptedException ex) {
            System.err.println("Exception occurred while running the Printer: " + ex.getMessage());
        }

    }


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source