'SQLAlchemy multi-table mapping insert not update on attribute change

I have an entity called a Report which points to a report stored in some repository, and those repository can have versioning, therefore the reports have an optional version.

I am now trying to track the reports in SQL via SQLalchemy. Because one report title can have several versions I wanted to map the entity over two tables, one for the report identification (title and other data I do not include here for simplicity) and one for report versions which reference a report (again plus other data not included here).

I have managed all the above in the following code. But now, I am blocked by the fact that setting the Report.version to a newer version causes an update of report_version and not the insertion of a new version, which means I'll only ever track one version.

from dataclasses import dataclass, field

from sqlalchemy import Column, ForeignKey, Integer, String, Table, \
    create_engine, join, select, text
from sqlalchemy.orm import Session, column_property, registry

mapper_registry = registry()

@dataclass
class Report:
    title: str
    version: str | None = field(default=None)  # versioning is optional

report = Table(
    "report",
    mapper_registry.metadata,
    Column("report_pk", Integer, primary_key=True),
    Column("title", String(35), nullable=False),
)

report_version = Table(
    "report_version",
    mapper_registry.metadata,
    Column("version_pk", Integer, primary_key=True),
    Column("report_fk", ForeignKey("report.report_pk"), nullable=False),
    Column("version_id", String(1024), nullable=True),
)

mapper_registry.map_imperatively(
    Report,
    report.join(report_version),
    properties={
        "id": column_property(report.c.report_pk, report_version.c.report_fk),
        "version": report_version.c.version_id,
    },
)

engine = create_engine("sqlite://", echo=True, future=True)

mapper_registry.metadata.create_all(engine)

session = Session(engine)

# setting attribute causes ORM to update `report_update`
r1 = Report(title="r1", version="a")
session.add(r1)
session.flush()  # flush r1a
r1.version = "b"
session.flush()  # flush r1b

# this is what I would like to achieve
r2 = Report(title="r2", version=".1")
session.add(r2)
session.flush()  # flush r2.1
session.execute(
    text(
        "INSERT INTO report_version (report_fk, version_id) VALUES (:report_id, '.2')"
    ),
    {"report_id": r2.id},
)
session.flush()  # flush r2.2

session.execute(text("SELECT * FROM report")).all()  # r1 and r2
session.execute(text("SELECT * FROM report_version")).all()  # b, .1 and .2

session.execute(select(Report)).scalars().all()  # r1b, r2.1 and r2.2

session.close()


Solution 1:[1]

In the end, I did not manage the behaviour I wanted over two different tables but went with the simpler SCD type 2, and keep adding rows as the report gets updated, which in the SQLAlchemy documentation is found at Versioning using Temporal Rows.

from dataclasses import dataclass, field
from datetime import datetime

from sqlalchemy import (Boolean, Column, DateTime, Integer, String, Table,
                        create_engine, event, select, text)
from sqlalchemy.orm import Session, attributes, make_transient, registry

mapper_registry = registry()

@dataclass
class Report:
    title: str
    version: str | None = field(default=None)  # versioning is optional

report = Table(
    "report",
    mapper_registry.metadata,
    Column("id", Integer, primary_key=True),
    Column("title", String(35), nullable=False),
    Column("version_id", String(1024), nullable=True),
    Column("created_at", DateTime, nullable=False, default=datetime.utcnow),
    Column("current_flag", Boolean, nullable=False, index=True, default=True),
)

mapper_registry.map_imperatively(
    Report, report, properties={"version": report.c.version_id}
)

# SCD II handling for Report
@event.listens_for(Session, "before_flush")
def before_flush(session, flush_context, instances):
    for instance in session.dirty:
        if any((
            not isinstance(instance, Report),
            not session.is_modified(instance),
            not attributes.instance_state(instance).has_identity,
        )):
            continue
        # unset current flag on previous instance
        session.query(Report).filter_by(id=instance.id).update(
            values={"current_flag": False}, synchronize_session=False
        )
        # make instance transient
        make_transient(instance)
        # remove id and created_at since new ones will be created on add
        instance.id = None
        instance.created_at = None
        # re-add to session with new id and version
        session.add(instance)

engine = create_engine("sqlite://", echo=True, future=True)

mapper_registry.metadata.create_all(engine)

session = Session(engine)

# setting attribute causes ORM to update `report_update`
r1 = Report(title="r1", version="a")
session.add(r1)
session.flush()  # flush r1a
r1.version = "b"
session.flush()  # flush r1b

session.execute(select(Report)).scalars().all()  # r1a, r1b
session.execute(select(Report).filter_by(current_flag=True)).scalars().all()  # r1b

session.execute(text("SELECT * FROM report")).all()

session.close()

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 ljmc