'Incomplete SQL Server CDC Change Table Extraction Using Batches

Basic issue: I have a process to extract records from a CDC table which is 'missing' records.

I am pulling from a MS SQL 2019 (Data Center Ed) DB with CDC enabled on 67 tables. One table in particular houses 323 million rows, and is ~125 columns wide. During a nightly process, around 12 million of these rows are updated, therefore around 20 million rows are generated in the _CT table. During this nightly process, CDC capture is still running using default settings. It can 'get behind', but we check for this.

After the nightly process is complete, I have a Python 3.6 extractor which connects to the SQL server using ODBC. I have a loop which goes over each of the 67 source tables. Before the loop begins, I ensure that the CDC capture is 'caught up'.

For each table, the extractor begins the process by reading the last successfully loaded LSN from the target database, which is in Snowflake.

The Python script the table name, last loaded LSN, and table PKEY to the following query to get the current MAX_LSN for the table:

def get_incr_count(self, table_name, pk, last_loaded_lsn):
    try:
        cdc_table_name = self.get_cdc_table(table_name)
        max_lsn = self.get_max_lsn(table_name)

        incr_count_query = """with incr as
        (
            select
                row_number() over
                    (
                        partition by """ + pk + """
                        order by
                            __$start_lsn desc,
                            __$seqval desc
                    ) as __$rn,
                *
            from """ + cdc_table_name + """
            where
                __$operation <> 3 and
                __$start_lsn > """ + last_loaded_lsn + """ and
                __$start_lsn <= """ + max_lsn + """
        )
        select COUNT(1) as count from incr where __$rn = 1 ;
        """

        lsn_df = pd.read_sql_query(incr_count_query, self.cnxn)
        incr_count = lsn_df['count'][0]

        return incr_count

    except Exception as e:
        raise Exception('Could not get the count of the incremental load for ' + table_name + ': ' + str(e))

In the event that this query finds records to process, it then runs this function. The limitation of pulling 500,000 records at a time is a memory limitation on the virtual machine that runs this code. More than this amount maxes out the available memory.

def get_cdc_data(self, table_name, pk, last_loaded_lsn, offset_iterator=0, fetch_count=500000):

    try: 
        cdc_table_name = self.get_cdc_table(table_name)
        max_lsn = self.get_max_lsn(table_name)

        #Get the lasst LSN loaded from the ODS.LOG_CDC table for the current table
        last_lsn = last_loaded_lsn

        incremental_pull_query = """with incr as
        (
            select
                row_number() over
                    (
                        partition by """ + pk + """
                        order by
                            __$start_lsn desc,
                            __$seqval desc
                    ) as __$rn,
                *
            from """ + cdc_table_name + """
            where
                __$operation <> 3 and
                __$start_lsn > """ + last_lsn + """ and
                __$start_lsn <= """ + max_lsn + """
        )
        select CONVERT(VARCHAR(max), __$start_lsn, 1) as __$conv_lsn, * 
        from incr where __$rn = 1 
        order by __$conv_lsn
        offset """ + str(offset_iterator) + """ rows
        fetch first """ + str(fetch_count) + """ rows only;
        """
        # Load the incremental data into a dataframe, df, using the SQL Server connection and the incremental query
        full_df = pd.read_sql_query(incremental_pull_query, self.cnxn)

        # Trim all cdc columns except __$operation
        df = full_df.drop(['__$conv_lsn', '__$rn', '__$start_lsn', '__$end_lsn', '__$seqval', '__$update_mask', '__$command_id'], axis=1)

        return df


    except Exception as e:
        raise Exception('Could not get the incremental load dataframe for ' + table_name + ': ' + str(e))

The file is then moved into snowflake and merged into a table. If every import loop succeeds, we update the MAX LSN in the target db to set the next starting point. If any fail, we leave the max and re-try next pass. In the scenario below, there are no identified errors.

We are finding evidence that this second query is not pulling every valid record between the starting and MAX LSN as it loops through. There is no discernable pattern to which records are missed, other than if one LSN is missed, all changes within are missed.

I think it may have something to do with how we are ordering records: order by __$conv_lsn. This value is converted Binary to VARCHAR(MAX)...so I am wondering if trying to order on a more reliable key would be advisable. I cannot think of a way to audit this without adding additional work to this process, which is extremely time sensitive. This does make troubleshooting much more difficult.



Solution 1:[1]

I suspect that your problem is here.

row_number() over
(
    partition by """ + pk + """
    order by
        __$start_lsn desc,
        __$seqval desc
) as __$rn,
...
from incr where __$rn = 1

If a given transaction affected more than one row, they'll be enumerated 1-N. Even that is a little hand-wavy; I'm not sure what happens if a row is affected more than once in a transaction (I'd need to set up a test and... well... I'm lazy).


But all that said, this workflow feels weird to me. I've worked with CDC in the past and while admittedly I wasn't targeting snowflake, the extraction part should be similar and fairly straightforward.

  1. Get max LSN using sys.fn_cdc_get_max_lsn(); (i.e. no need to query the CDC data itself to obtain this value)
  2. Select from cdc.fn_cdc_get_all_changes_«capture_instance»() or cdc.fn_cdc_get_net_changes_«capture_ instance»() using the LSN endpoints (min from either the previous run for that table or from sys.fn_cdc_get_min_lsn(«capture_ instance») for a first run; max from above)
  3. Stream the results to wherever (i.e. you shouldn't need to hold a significant number of change records in memory at once).

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1