Automating Incremental Data Load from CSV to OneLake with Microsoft Fabric

SAS
0
Automating Incremental Data Load from CSV to OneLake with Microsoft Fabric

Introduction

Incremental data load is all about transferring only the data that’s changed since the last update, which helps save time and resources. In this case, we’re focusing on loading CSV files that customers place in a specific location into OneLake. With Microsoft Fabric, this process can be automated, ensuring only new or updated files are loaded—no need to reload everything. This approach cuts down on data processing time and keeps everything up-to-date without wasting resources. It’s a simple and effective way to maintain a smooth data pipeline for ongoing analysis.

Prerequisites

Before we dive into the process of incremental data loading into OneLake using Microsoft Fabric, there are a few things we need to set up:

  1. A Lake House Table to Track Loaded File Information:
    First, we need a table to log information about the files being loaded. This table will store details like the file name, last modified date, and whether the file has been processed. Here’s the SQL to create the FileLoadLog table
    DROP TABLE IF EXISTS FileLoadLog;
    CREATE TABLE FileLoadLog
    (
        ID int,
        FileName string,
        LastModifiedDate timestamp,
        IsProcessed int
    );
    
  2. Create a Folder in the Lakehouse for User CSV Files:
    You’ll also need to create a dedicated folder in your Lakehouse where users can regularly upload their CSV files. This will be the source folder for incremental loading, and it’s where the new or modified files will be stored.
  3. Final Lake House Table for Storing User Data:
    Finally, we’ll need a table in the Lakehouse to store the actual user data. This table will hold information like the date, order ID, product, quantity, and amount from each file. Below is the schema for this table, which we’ll create in the Lakehouse.
    schema = "Date STRING, OrderID INT, Product STRING, Quantity INT, Amount INT"
    
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS fsales
        ({schema})
        USING DELTA
        """)
    

Once the above Prerequisites are ready, lets start with step by step pipeline implementation.

Step 1: Get the Last Modified Date Time:

The first step in our incremental data load pipeline is to retrieve the Last Modified Date Time (LMDT) of the files that were previously processed. This helps us identify which files have been updated or added since the last load.

To achieve this, we’ll use a LookUp activity connected to the Lakehouse SQL endpoint. The LookUp activity will run a query to fetch the maximum LastModifiedDate from the FileLoadLog table. This is the SQL query we'll use:

IF EXISTS (SELECT 1 FROM [FabricOfData].[dbo].[fileloadlog])
BEGIN
    SELECT MAX(LastModifiedDate) AS LMDT FROM [FabricOfData].[dbo].[fileloadlog]
END
ELSE
BEGIN   
    SELECT CAST('1900-01-01' AS DATE) AS LMDT
END

You might be wondering why we’re using the date '1900-01-01' as a default value in case no records exist. The reason is simple: if we’re running the pipeline for the first time, the FileLoadLog table won’t have any data. So, to ensure the process doesn’t break, we use this default date to trigger the process as if it’s the first time files are being loaded.

This step is crucial to ensure that we only load new or updated files, avoiding unnecessary processing of files that have already been handled.

Step 2: Get All File Details with LMDT Greater Than Max LMDT:

After retrieving the Max Last Modified Date Time (LMDT) in the previous step, the next task is to identify the files that have been added or updated since the last time the pipeline ran. This is where the Get Metadata activity comes in.


To begin, we add the Get Metadata activity to the pipeline canvas. Configure it to connect to the Lakehouse and use the Files option to read the files stored within the Lakehouse folder and Child Items under Filed list to get the file details as output.


By default, the Get Metadata activity will read all files in the folder, but we want to filter it so that it only picks up files that have a Last Modified Date Time (LMDT) greater than the last processed file’s LMDT. To do this, you’ll add the following expression to the Start Time section of the Filter by Last Modified property in the Get Metadata activity:

@addSeconds(activity('GetMaxLMDT').output.firstRow.LMDT,1 )

Here’s how it works: the expression takes the output from the LookUp activity (which fetched the Max LMDT) and adds just one extra second. This small adjustment ensures that only the files modified after the last processed file are picked up by the Get Metadata activity. Without adding that extra second, the activity might end up reading the last processed file again, which we want to avoid.

Step 3: Loop Through Each Child Item:

Next, we need to loop through each file returned by the Get Metadata activity. To do this, add a For Each loop to your pipeline and configure it to process the files sequentially, meaning one file at a time.


In the For Each activity, use the following expression under Items to reference the list of child items returned by the Get Metadata activity:

@activity('GetLatestFiles').output.childItems

Step 4: Add If Condition to Check File Type

Within the For Each loop, add an If Condition activity to check whether the current child item is of the file type. If it is a file, the pipeline will proceed to the "True" path, where the file will be loaded into the Lakehouse table.


Use the following pipeline expression in the If Condition activity to check if the item is a file:


@equals(item().type,'File')

Step 5: Data Load to Lakehouse Table

  • Get Last Modified Date of the File
    Before loading the data into the final table, we use another Get Metadata activity. This activity connects to the file we’re about to load and retrieves its Last Modified Date. We then use this date to insert the file details into the FileLog table via a Fabric notebook.
  • Add Copy Activity to Load Data
    Next, add a Copy Activity to the pipeline. Set the source to connect to the Lakehouse, using the file retrieved in the For Each loop. This will load the file data into the final Lakehouse table.
  • Insert File Details into FileLog Table
    Finally, use a Fabric Notebook to insert the file details (such as filename and last modified date) into the FileLog table. Since the Lakehouse tables are Delta tables, inserting data into them requires using notebooks, as this is currently the only supported method.
    In the notebook, add FileName and LastModifiedDate as parameters by toggling the cell. Pass the values from the previous activities' outputs into these parameters. This ensures that the file information gets inserted correctly into the FileLog table.
  • Here is the Insert code used inside the Fabric Notebook.

    #Parameters 
    FileName = 'sales.csv'
    LMDFT  = '2025-10-14'
    #Data Insert to Log Table
    spark.sql("""
    
    INSERT INTO FileLoadLog
    SELECT
    (SELECT CASE WHEN (SELECT MAX(ID) FROM FileLoadLog) IS NULL THEN 1 ELSE
    (SELECT MAX(ID)+1 FROM FileLoadLog)END ) AS ID,
    '{0}' AS FileName,
    CAST('{1}'AS TIMESTAMP) AS LMDT,
    1 AS IsProcessed
    """.format(FileName,LMDFT))
    

Conclusion

This post showed how to set up an incremental data load pipeline in Microsoft Fabric. By using activities like Get Metadata and Copy Activity, along with a notebook for logging, we ensure only updated files are loaded. This makes the pipeline more efficient and keeps your data current with minimal effort.




Tags:

Post a Comment

0Comments

Post a Comment (0)