'Make R2DBC use the same connection for all interactions with the database in a @Transactional method (Java, Sprint)
Motivation
I have a service which I want to make @Transactional
. My service is storing complex data in multiple tables, there is a referential integrity between the tables.
public class MyData { // simplified
MasterData masterData;
List<Detail> details;
public static class MasterData {
UUID id;
String field1;
String field2;
String field3;
}
public static class Detail {
UUID masterId;
String fieldA;
String fieldB;
}
}
- First I save the master data into one table using
R2dbcRepository<MasterData, UUID>
. TheINSERT
command is simple and I may use theR2dbcRepository
. - Then a list of details into another table using
DatabaseClient
. Each detail has a foreign key constraint to the master table. I want to use batchINSERT
and I complete the SQL using more complex approach inDatabaseClient
.
Problem
The problem is that I cannot save the detail data - I get the error
insert or update on table "detail" violates foreign key constraint
I suspect that the reason is that each SQL command is executed in a different connection so the master data are not yet visible when the details are stored.
Question
Is it really the root cause? How to make R2DBC always use the same connection across all the calls to the database inside one @Transactional
service call, even if it goes via various instances of R2dbcRepository
and DatabaseClient
?
If the solution is completely wrong, how to correctly implement @Transactional
in R2DBC?
I prefer calling all the INSERT
s into the detail
table in a batch.
Code
My (simplified) code looks like this:
@Service
public class MyService {
private final MasterRepository masterRepository;
private final DbConnector dbConnector;
@Transactional
public Mono<Void> saveMasterAndDetails(MyData data) {
return Mono.just(data)
.map(MyData::getMaster)
.flatMap(masterRepository::insertMasterData)
.thenReturn(data)
.map(MyData::getDetails)
.flatMap(dbConnector::insertDetails)
.then()
;
}
}
The code of MasterRepository
is something like
import org.springframework.data.r2dbc.repository.R2dbcRepository;
public interface MasterRepository extends R2dbcRepository<MasterData, UUID> {
@Query("""
INSERT INTO master(id, col_1, col_2, col_3)
VALUES (
:#{#masterData.id},
:#{#masterData.field1},
:#{#masterData.field2},
:#{#masterData.field3})
""")
Mono<Void> insertMasterData(MasterData masterData);
}
And the code of DbConnector
is more complex - but maybe overly complex? There is still missing direct support for batches and prepared statements in DatabaseClient
: spring-data-r2dbc-259, spring-framework-27229
import org.springframework.r2dbc.core.DatabaseClient;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Statement;
public class DbConnector {
private final DatabaseClient databaseClient;
public Mono<Integer> insertDetails(List<Detail> details) {
return Flux.usingWhen(
databaseClient.getConnectionFactory().create(),
connection -> {
final Statement statement = connection.createStatement("""
insert into detail (masterId, col_A, col_B)
values ($1, $2, $3)
""");
details.forEach(detail ->
statement
.bind("$1", detail.getMasterId())
.bind("$2", detail.getColA())
.bind("$3", detail.getColB())
.add()
);
return statement.execute();
},
Connection::close)
.flatMap(Result::getRowsUpdated)
.reduce(0, Integer::sum);
}
}
Solution 1:[1]
TL;DR:
I replaced
return Flux.usingWhen(
databaseClient.getConnectionFactory().create(),
connection -> {
statement = ... // prepare the statement
return statement.execute();
},
Connection::close
)
with
return databaseClient.inConnection(connection -> {
statement = ... // prepare the statement
return statement.execute();
);
Detailed answer
I have discovered a method which I was not aware of: DatabaseConnection.inConnection(). The DatabaseConnection
interface inherits it from ConnectionAccessor
:
Execute a callback Function within a Connection scope. The function is responsible for creating a Mono. The connection is released after the Mono terminates (or the subscription is cancelled). Connection resources must not be passed outside of the Function closure, otherwise resources may get defunct.
I changed my code using the DatabaseClient
and it seems that the SQL commands are executed in the same connection.
However, I would still like to understand it better. I am not sure, if I am just lucky now and if it can change with the next implementation. I still do not know how to have the full control over the connections and hence over the transactional code.
import org.springframework.r2dbc.core.DatabaseClient;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Statement;
public class DbConnector {
private final DatabaseClient databaseClient;
public Mono<Integer> insertDetails(List<Detail> details) {
return databaseClient.inConnection(connection -> {
final Statement statement = connection.createStatement("""
insert into detail (masterId, col_A, col_B)
values ($1, $2, $3)
""");
details.forEach(detail ->
statement
.bind("$1", detail.getMasterId())
.bind("$2", detail.getColA())
.bind("$3", detail.getColB())
.add()
);
return Flux.from(statement.execute())
.flatMap(Result::getRowsUpdated)
.reduce(0, Integer::sum)
;
});
}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 |