'JDBC Kafka Connect using multiple tables with resulting nested JSON and arrays
Kafka Connect and JDBC Source Connector. I am trying to get a nested JSON with arrays from the tables:
/* Create tables, in this case DB2 */
CREATE TABLE contacts(
contact_id INT NOT NULL GENERATED ALWAYS AS IDENTITY,
first_name VARCHAR(100) NOT NULL,
last_name VARCHAR(100) NOT NULL,
modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(contact_id)
);
CREATE TABLE phones(
phone_id INT NOT NULL GENERATED ALWAYS AS IDENTITY,
phone_no VARCHAR(20) NOT NULL,
phone_type VARCHAR(10) NOT NULL,
contact_id INT NOT NULL,
modified_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY(phone_id),
FOREIGN KEY (contact_id)
REFERENCES contacts(contact_id)
ON UPDATE NO ACTION
ON DELETE CASCADE
);
/* Insert some data */
INSERT INTO contacts(first_name, last_name)
VALUES
('John','Doe');
INSERT INTO phones(phone_no, phone_type, contact_id)
VALUES
('Johns phone #1','HOME',1),
('Johns phone #2','MOBILE',1),
('Johns phone #3','WORK',1);
The JSON I would like get out on Kafka topic via Kafka Connect is something like this (minor adjustments are possible):
{
"contact_id": 1,
"first_name": "John",
"last_name": "Doe",
"modified_at": "2022-03-16T13:33:04.276",
"phones":
[
{
"phone_id": 1,
"phone_no": "Johns phone #1",
"phone_type": "HOME",
"contact_id": 1,
"modified_at": "2022-03-16T13:33:05.101"
},
{
"phone_id": 2,
"phone_no": "Johns phone #2",
"phone_type": "MOBILE",
"contact_id": 1,
"modified_at": "2022-03-16T13:33:05.210"
},
{
"phone_id": 3,
"phone_no": "Johns phone #3",
"phone_type": "WORK",
"contact_id": 1,
"modified_at": "2022-03-16T13:33:05.673"
}
]
}
How can I do that with Kafka Connect (i.e. Kafka Connect config)?
Solution 1:[1]
The JDBC connector does not support arrays types.
You could write a VIEW that defines a JOIN between your tables, then query that in the connector, and this would create unique events, e.g.
Record 1
{
"contact_id": 1,
"first_name": "John",
"last_name": "Doe",
"modified_at": "2022-03-16T13:33:04.276",
"phone_id": 1,
"phone_no": "Johns phone #1",
"phone_type": "HOME",
"contact_id": 1,
"modified_at": "2022-03-16T13:33:05.101"
}
Record 2
{
"contact_id": 1,
"first_name": "John",
"last_name": "Doe",
"modified_at": "2022-03-16T13:33:04.276",
"phone_id": 2,
"phone_no": "Johns phone #2",
"phone_type": "MOBILE",
"contact_id": 1,
"modified_at": "2022-03-16T13:33:05.210"
}
You then would need to use a Stream Processing library to group-by contact_id, for example. Or you can use Debezium / JDBC Source on the individual tables, and do the JOIN in Kafka Streams / KSQL and create the arrays in there.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | OneCricketeer |
