Microsoft Fabric: Incremental Data Load Using Watermark Column

SAS
0
Microsoft Fabric: Incremental Data Load Using Watermark Column

Introduction

Incremental loading is one of the most important design pattern when you’re building data integration workflows. Instead of reloading everything every time, incremental loads help you fetch only what changed, thus saving time, cost, and compute resources.

In this guide, we’ll walk through a full incremental load pattern using Microsoft Fabric Data Factory Pipelines and Fabric SQL Database.

We’ll build the tables, set up watermark logic, configure Script and Copy activities, and validate the behavior by inserting and updating data.

Everything is demonstrated using a simple Employee dataset so you can easily replicate and extend the pattern.

What We Will Build

We’ll create a pipeline that:
Reads the largest LastModifiedDateTime from the Staging table
Fetches the last watermark stored in a log table
Copies all records between the two timestamps into a Target table
Updates the watermark so the next run only picks up new changes

Creating the Required Tables

Before delving deep into pipeline creation part, first lets prepare the required tables which we will be using this pipelien development.
Employee_Staging: Contains new or modified rows

DROP TABLE IF EXISTS Employee_Staging
GO

CREATE TABLE Employee_Staging (
    EmployeeID INT PRIMARY KEY,
    FirstName VARCHAR(50),
    LastName VARCHAR(50),
    Department VARCHAR(50),
    Position VARCHAR(50),
    HireDate DATE,
    Salary DECIMAL(10, 2),
    LastModifiedDateTime DATETIME 
);
Go
INSERT INTO Employee_Staging (EmployeeID, FirstName, LastName, Department, Position, HireDate, Salary,LastModifiedDateTime)
VALUES
(101, 'Amit', 'Sharma', 'Finance', 'Analyst', '2020-03-15', 65000.00,GETDATE()),
(102, 'Priya', 'Verma', 'HR', 'Manager', '2018-07-01', 85000.00,GETDATE()),
(103, 'Rahul', 'Mehta', 'IT', 'Developer', '2021-01-20', 72000.00,GETDATE()),
(104, 'Sneha', 'Reddy', 'Marketing', 'Executive', '2019-11-10', 60000.00,GETDATE()),
(105, 'Vikram', 'Singh', 'IT', 'Lead Developer', '2017-05-25', 95000.00,GETDATE());

Employee_Target: Final table where we maintain the latest data

DROP TABLE IF EXISTS Employee_Target;
GO
CREATE TABLE Employee_Target (
    EmployeeID INT PRIMARY KEY,
    FirstName VARCHAR(50),
    LastName VARCHAR(50),
    Department VARCHAR(50),
    Position VARCHAR(50),
    HireDate DATE,
    Salary DECIMAL(10, 2),
    LastModifiedDateTime DATETIME 
);

WaterMarkLog: Stores the last successful load timestamp (our “watermark”)

DROP TABLE IF EXISTS WaterMarkLog
GO
CREATE TABLE WaterMarkLog
(
ID INT IDENTITY(1,1),
TableName NVARCHAR(150),
LastWaterMarkDate DATETIME,
IsActive BIT
)
GO
INSERT INTO WaterMarkLog
(TableName,LastWaterMarkDate,IsActive)
VALUES ('Employee_Target',NULL,1)

Pipeline Development

The pipeline consists of four activities and will show each activity along with set up in detail in the following steps.

Step 1: Fetch Max LastModifiedDate from Staging

  • Add a Script activity and rename it, in this we are naming it as StagingLMD.
  • Select the Fabric SQL connection, if you need more information on how to set up the Fabric SQL Connection look at my blog.
  • Choose the appropriate database.
  • Switch to Query mode and use the below SQL Code. This query retrieves the latest timestamp present in the staging table. This value becomes the upper bound for the incremental window
  • SELECT MAX(LastModifiedDateTime) AS StagingLMDT FROM Employee_Staging
    

Step 2: Retrieve Last Processed Watermark

  • Add a second Script activity and name it, in this we are using WaterMarkLMD.
  • Select the Fabric SQL connection, if you need more information on how to set up the Fabric SQL Connection look at my blog.
  • Choose the appropriate database.
  • Switch to Query mode and use the below SQL Code. This query retrieves the latest WaterMart present in the table. This value becomes the lower bound for the incremental window
  • SELECT ISNULL(LastWaterMarkDate,'1900-01-01') AS WaterMarkLMDT FROM WaterMarkLog
    

Why ISNULL 🤔? During the initial execution, the watermark table will not contain any date. The fallback value ensures the first run loads all available records to Target table.

Step 3: Copy Incremental Records

  • Add a Copy Data activity and name it **CopyIncrementalData
  • Source Configuration
    Select the Fabric SQL connection
    Choose your database
    Set mode to Query
    Use the following SQL:
  • SELECT * FROM Employee_Staging
    WHERE LastModifiedDateTime BETWEEN
    CAST('@{activity('WaterMarkLMD').output.resultSets[0].rows[0].WaterMarkLMDT}' AS DATETIME)
    AND
    CAST('@{activity('StagingLMDT').output.resultSets[0].rows[0].StagingLMDT }' AS DATETIME)
    
  • Destination Configuration
    Reuse the same Fabric SQL connection
    Choose the target database
    Select Use Existing Table
    Pick teh target table, in here it is Employee_Target
    Under Advanced, choose Upsert as the write behavior (this ensures New rows are inserted and Modified rows are updated based on key columns)
    Select the key column, in here it is `EmployeeID`
    Enable Bulk Insert Table Lock, if you have more data to flow to the Target Table

Step 4: Update Watermark After Successful Load

  • Add a third Script activity and name it, in here we used UpdateWaterMark
  • Use the following SQL to update the water mark value to the log table
  • UPDATE WaterMarkLog
    
    SET LastWaterMarkDate = CAST('@{activity('StagingLMDT').output.resultSets[0].rows[0].StagingLMDT }' AS DATETIME)
    
    WHERE TableName = 'Employee_Target'
    
    and IsActive = 1
    

Updating the watermark marks the completion of the current load cycle. The next run will use this updated timestamp as its starting point.

Executing and Validating the Pipeline

Once the pipeline development is complete:
Save the pipeline
Trigger a manual run
Confirm that all activities complete successfully

You can now query the target table Employee_Target to validate that the initial load has populated all records.

Testing Incremental Behaviour

To validate whether the pipeline is functioning incrementally, perform the following actions on the staging table Employee_Staging and re trigger the pipeline and then verify the target table.

INSERT INTO Employee_Staging (EmployeeID, FirstName, LastName, Department, Position, HireDate, Salary,LastModifiedDateTime)
VALUES
(106, 'Amitdgd', 'Sharmaji', 'Finance', 'Analyst', '2020-03-15', 65000.00,GETDATE())

UPDATE Employee_Staging SET Department = 'HR',LastModifiedDateTime = GETDATE()
WHERE EmployeeID = 101

Conclusion

The incremental load pattern implemented here is simple, predictable, and production-friendly. By maintaining a clean watermark mechanism and relying on upsert operations, this approach ensures that target datasets remain synchronized with upstream changes without unnecessary full reloads.




Tags:

Post a Comment

0Comments

Post a Comment (0)