Lab: QUALIFY for deduplication and temporal pricing

Hands-on lab with synthetic data to reproduce duplicate source exports and date-valid supplier price selection with QUALIFY.

Prerequisites

Goal

Reproduce two production-shaped problems with realistic synthetic data:

The examples are intentionally small enough to inspect row by row, but structured like real warehouse incidents.

Download the notebook

Import this notebook into Jupyter, VS Code, or Databricks:

Download qualify-dedup-and-temporal-pricing.ipynb

The notebook generates fake but plausible datasets for:

It also includes optional DuckDB cells so readers can execute the QUALIFY queries directly if duckdb is installed in the notebook kernel.

Step 1: Inspect duplicate export groups

The lottery scenario simulates a sales platform that produced two export files for the same business orders after a release changed an internal timestamp.

Start by finding suspicious groups:

SELECT
  order_id,
  COUNT(*) AS row_count,
  MIN(source_exported_at) AS first_export_ts,
  MAX(source_exported_at) AS last_export_ts
FROM bronze_lottery_sales
GROUP BY order_id
HAVING COUNT(*) > 1
ORDER BY row_count DESC, last_export_ts DESC;

Expected outcome on the sample data:

Step 2: Compare DISTINCT vs QUALIFY

This looks tempting but does not solve the problem:

SELECT DISTINCT *
FROM bronze_lottery_sales;

It fails because source_exported_at and ingestion_ts differ.

Use the ranking rule explicitly:

SELECT
  order_id,
  customer_id,
  game_code,
  gross_amount,
  source_created_at,
  source_exported_at,
  ingestion_ts
FROM bronze_lottery_sales
QUALIFY ROW_NUMBER() OVER (
  PARTITION BY order_id
  ORDER BY source_exported_at DESC, ingestion_ts DESC
) = 1;

This keeps the latest technical export per business order.

Step 3: Review rows that would be deleted

Before a delete, verify the losing rows:

SELECT
  order_id,
  source_exported_at,
  ingestion_ts
FROM bronze_lottery_sales
QUALIFY ROW_NUMBER() OVER (
  PARTITION BY order_id
  ORDER BY source_exported_at DESC, ingestion_ts DESC
) > 1
ORDER BY order_id, source_exported_at;

Only after that review should you convert the selection into a DELETE, MERGE, or quarantine step.

Step 4: Recompute margin with date-valid pricing

The B2B supplier example uses restaurant orders and a product cost history. The common mistake is to join every order to the most recent price in the dimension.

Wrong idea:

SELECT
  o.order_id,
  o.product_id,
  o.sale_price,
  p.unit_cost,
  o.sale_price - p.unit_cost AS gross_margin
FROM b2b_orders o
JOIN latest_product_cost p
  ON o.product_id = p.product_id;

Correct rule:

SELECT
  o.order_id,
  o.order_date,
  o.product_id,
  o.sale_price,
  p.unit_cost,
  o.sale_price - p.unit_cost AS gross_margin
FROM b2b_orders o
JOIN supplier_price_history p
  ON o.product_id = p.product_id
 AND p.effective_date <= o.order_date
QUALIFY ROW_NUMBER() OVER (
  PARTITION BY o.order_id
  ORDER BY p.effective_date DESC, p.updated_at DESC
) = 1;

This returns the cost version that was actually valid when the order was booked.

Step 5: Validate the difference

Once both queries are available, compare:

In the provided synthetic dataset, a few orders show a material overstatement when the wrong price version is used. That is the exact kind of quiet analytics defect this pattern is meant to prevent.

Step 6: Keep preferred stores with DENSE_RANK()

For customer segmentation, it is common to define a preferred store based on the highest visit volume.

When ties are meaningful, keep all top stores:

WITH customer_store_visits AS (
  SELECT
    customer_id,
    store_id,
    COUNT(*) AS visit_count,
    MAX(visit_date) AS last_visit_date
  FROM customer_shop_visits
  GROUP BY customer_id, store_id
)
SELECT
  customer_id,
  store_id,
  visit_count,
  last_visit_date
FROM customer_store_visits
QUALIFY DENSE_RANK() OVER (
  PARTITION BY customer_id
  ORDER BY visit_count DESC
) = 1;

If the business insists on one forced preferred store, switch to ROW_NUMBER() and add a tie-breaker like last_visit_date DESC.

Step 7: Keep the final pipeline status and detect recoveries

To get the latest status per pipeline run:

SELECT
  run_id,
  pipeline_name,
  status,
  event_ts,
  attempt_no
FROM pipeline_run_log
QUALIFY ROW_NUMBER() OVER (
  PARTITION BY run_id
  ORDER BY event_ts DESC, attempt_no DESC
) = 1;

To find runs that recovered right after a failure:

SELECT
  run_id,
  pipeline_name,
  status,
  event_ts,
  LAG(status) OVER (
    PARTITION BY run_id
    ORDER BY event_ts, attempt_no
  ) AS previous_status
FROM pipeline_run_log
QUALIFY previous_status = 'failed'
    AND status IN ('retried', 'success');

This is a practical pattern for operational dashboards, retry-rate KPIs, and escalation rules.

What to keep from this lab