October 23, 2024
Chicago 12, Melborne City, USA
SQL

SQL batch procedure for grouped data with SCD2 (DBT)


For context, while the logic is sound when building the table using all data, I am struggling to figure out how to set up the incremental conditions for a DBT model.

In this scenario we have log records for a machine and wether it is turned on or off (note the records are necessarily when there is a change). The task is to created a consolidated table that tracks the on/off flag as and SCD2. This requires grouping and and start and end timestamps for past/active rows.

This is the logic that I have now, but as I have thought about it more it will not work on incremental runs since the first CTE grouping will not have sufficient knowledge of the existing table (i.e. won’t match up on the unique key since the start date of the active row and the new data will be different)

Here is the current query:

{{ config(
    tags=['hourly'],
    materialized='incremental',
    unique_key=['orb_id', 'start_ts'],
) }}


with change_detection as (
    select
        machine_id,
        on_flag,
        snowflake_loaded_at,
        lag(on_flag)
            over (partition by machine_id order by snowflake_loaded_at) as previous_on_flag,
        lead(snowflake_loaded_at)
            over (partition by machine_id order by snowflake_loaded_at) as next_start_ts,
        row_number() over (partition by machine_id order by snowflake_loaded_at desc) as rn,
        case
            when previous_on_flag != on_flag or previous_on_flag is null then 1
            else 0
        end as change_flag
    from
        {{ ref('machine_on_flag_logs') }}
    where
        is_self_serve is not null

        {% if is_incremental() %}
            and snowflake_loaded_at > (select max(start_ts) from {{ this }})
        {% endif %}
),

grouped_data as (
    select
        machine_id,
        on_flag,
        snowflake_loaded_at,
        next_start_ts,
        rn,
        sum(change_flag) over (partition by machine_id order by snowflake_loaded_at) as change_group
    from
        change_detection
),

final_data as (
    select
        machine_id,
        on_flag,
        min(snowflake_loaded_at) as start_ts,
        case
            -- Set end_ts to NULL for the most recent period (active status)
            when min(rn) = 1 then null
            -- Otherwise, the end_ts is the next period's start_ts
            else max(next_start_ts)
        end as end_ts
    from
        grouped_data
    group by
        machine_id, on_flag, change_group
)

select * from final_data

{% if is_incremental() %}
    union all

    -- Pull active rows that may need to be updated (those with NULL end_ts)
    select
        machine_id,
        on_flag,
        start_ts,
        case
            when end_ts is null and exists (
                select 1 from final_data as fd
                where
                    fd.machine_id = existing.machine_id
                    and fd.on_flag != existing.on_flag
            ) then (select min(fd.start_ts) from final_data as fd where fd.machine_id = existing.machine_id)
            else end_ts
        end as end_ts
    from {{ this }} as existing
    where end_ts is null
{% endif %}

Here is the scenario that I am imagining an incremental run not behaving correctly:
Existing Data:

machine_id;              on_flag;                   start_ts;                end_ts;
1,                       true,                      2024-10-20               null
1,                       false,                     2024-10-19               2024-10-20
2,                       false,                     2024-10-19               null
3,                       true,                      2024-10-19               null
3,                       false,                     2024-10-17               2024-10-19 

Incremental data after change_detection:

machine_id;              on_flag;                   start_ts;                end_ts;
1,                       false,                     2024-10-22               null
1,                       true,                      2024-10-21               2024-10-22
2,                       false,                     2024-10-20               null
3,                       false,                     2024-10-20               null

as you can see I am having a hard time coming up with incremental logic to then make necessary changes to the active rows.

(note start_ts and end_ts are timestamps in the real data and not dates)



You need to sign in to view this answers

Leave feedback about this

  • Quality
  • Price
  • Service

PROS

+
Add Field

CONS

+
Add Field
Choose Image
Choose Video