'JDBC Kafka Connect using multiple tables with resulting nested JSON and arrays

Kafka Connect and JDBC Source Connector. I am trying to get a nested JSON with arrays from the tables:

/* Create tables, in this case DB2 */
CREATE TABLE contacts(
    contact_id INT NOT NULL GENERATED ALWAYS AS IDENTITY,
    first_name VARCHAR(100) NOT NULL,
    last_name VARCHAR(100) NOT NULL,
    modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY(contact_id)
);

CREATE TABLE phones(
    phone_id INT NOT NULL GENERATED ALWAYS AS IDENTITY,
    phone_no VARCHAR(20) NOT NULL,
    phone_type VARCHAR(10) NOT NULL,
    contact_id INT NOT NULL,
    modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY(phone_id),
    FOREIGN KEY (contact_id)
        REFERENCES contacts(contact_id)
        ON UPDATE NO ACTION
        ON DELETE CASCADE
);

/* Insert some data */
INSERT INTO contacts(first_name, last_name) 
    VALUES
        ('John','Doe');

INSERT INTO phones(phone_no, phone_type, contact_id)
    VALUES
        ('Johns phone #1','HOME',1),
        ('Johns phone #2','MOBILE',1),
        ('Johns phone #3','WORK',1);

The JSON I would like get out on Kafka topic via Kafka Connect is something like this (minor adjustments are possible):

{
    "contact_id": 1,
    "first_name": "John",
    "last_name": "Doe",
    "modified_at": "2022-03-16T13:33:04.276",
    "phones":
    [
        {
            "phone_id": 1,
            "phone_no": "Johns phone #1",
            "phone_type": "HOME",
            "contact_id": 1,
            "modified_at": "2022-03-16T13:33:05.101"
        },
        {
            "phone_id": 2,
            "phone_no": "Johns phone #2",
            "phone_type": "MOBILE",
            "contact_id": 1,
            "modified_at": "2022-03-16T13:33:05.210"
        },
        {
            "phone_id": 3,
            "phone_no": "Johns phone #3",
            "phone_type": "WORK",
            "contact_id": 1,
            "modified_at": "2022-03-16T13:33:05.673"
        }
    ]
}

How can I do that with Kafka Connect (i.e. Kafka Connect config)?



Solution 1:[1]

The JDBC connector does not support arrays types.

You could write a VIEW that defines a JOIN between your tables, then query that in the connector, and this would create unique events, e.g.

Record 1

{
    "contact_id": 1,
    "first_name": "John",
    "last_name": "Doe",
    "modified_at": "2022-03-16T13:33:04.276",
    "phone_id": 1,
    "phone_no": "Johns phone #1",
    "phone_type": "HOME",
    "contact_id": 1,
    "modified_at": "2022-03-16T13:33:05.101"
}

Record 2

{
    "contact_id": 1,
    "first_name": "John",
    "last_name": "Doe",
    "modified_at": "2022-03-16T13:33:04.276",
    "phone_id": 2,
    "phone_no": "Johns phone #2",
    "phone_type": "MOBILE",
    "contact_id": 1,
    "modified_at": "2022-03-16T13:33:05.210"
}

You then would need to use a Stream Processing library to group-by contact_id, for example. Or you can use Debezium / JDBC Source on the individual tables, and do the JOIN in Kafka Streams / KSQL and create the arrays in there.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 OneCricketeer