Portal Community

What Is Data Lineage?

Data lineage is the documented trail of where data came from and what transformations it has undergone. In the context of AI enrichment, lineage answers the question: How did this classification label get here? — capturing the source record, the AI model that produced the enrichment, the inputs used, and the timestamp.

Lineage is required for GDPR accountability (Article 5 principle of accuracy and accountability), enterprise audit standards, and model governance — so you can identify all records enriched by a specific model version and re-enrich them if that model is updated or found to be biased.

DataLineage Table Schema

CREATE TABLE DataLineage (
    -- Primary key
    LineageId           uniqueidentifier NOT NULL DEFAULT NEWSEQUENTIALID(),

    -- Record reference
    TenantId            int              NOT NULL,
    SourceTable         nvarchar(200)    NOT NULL,   -- e.g., 'Lead', 'Customer'
    SourceRecordId      nvarchar(500)    NOT NULL,   -- GUID or int of the enriched record
    DatasourceId        nvarchar(200)    NOT NULL,   -- e.g., 'sales-data-db'

    -- Enrichment event details
    EnrichmentType      nvarchar(100)    NOT NULL,   -- 'Classification', 'Summarization', 'Embedding', 'Sentiment'
    WorkflowId          nvarchar(200)    NOT NULL,   -- Flow Studio workflow that performed enrichment
    WorkflowExecutionId nvarchar(500)    NOT NULL,   -- Specific execution ID for traceability

    -- AI model details
    ModelId             nvarchar(200)    NOT NULL,   -- e.g., 'gpt-4o-2024-08-06'
    ModelProvider       nvarchar(100)    NOT NULL,   -- e.g., 'Azure OpenAI', 'OpenAI'

    -- Content fingerprinting
    InputHash           nvarchar(64)     NULL,       -- SHA-256 of the text that was processed
    OutputHash          nvarchar(64)     NULL,       -- SHA-256 of the AI output

    -- Result summary (no PII — just metadata)
    ResultSummary       nvarchar(max)    NULL,       -- JSON: { "label": "...", "confidence": 0.9 }
    TokensUsed          int              NULL,

    -- Timestamps
    ProcessedAt         datetime2        NOT NULL DEFAULT GETUTCDATE(),
    IsSuccess           bit              NOT NULL DEFAULT 1,
    ErrorMessage        nvarchar(max)    NULL,

    CONSTRAINT PK_DataLineage PRIMARY KEY CLUSTERED (LineageId)
);

CREATE INDEX IX_DataLineage_TenantId_SourceRecord
    ON DataLineage (TenantId, SourceTable, SourceRecordId, ProcessedAt DESC);

CREATE INDEX IX_DataLineage_TenantId_ModelId
    ON DataLineage (TenantId, ModelId, ProcessedAt DESC);

Recording a Lineage Event in Workflow

// SqlUpdateNode — insert lineage record at end of every enrichment workflow
{
  "nodeType": "SqlUpdateNode",
  "nodeId": "record-lineage",
  "datasourceId": "sales-data-db",
  "command": "INSERT INTO DataLineage (LineageId, TenantId, SourceTable, SourceRecordId, DatasourceId, EnrichmentType, WorkflowId, WorkflowExecutionId, ModelId, ModelProvider, InputHash, ResultSummary, TokensUsed, ProcessedAt, IsSuccess) VALUES (NEWID(), @tenantId, @sourceTable, @sourceRecordId, @datasourceId, @enrichmentType, @workflowId, @executionId, @modelId, @modelProvider, @inputHash, @resultSummary, @tokensUsed, GETUTCDATE(), @isSuccess)",
  "parameters": {
    "tenantId": "{{workflow.tenantId}}",
    "sourceTable": "Lead",
    "sourceRecordId": "{{input.leadId}}",
    "datasourceId": "sales-data-db",
    "enrichmentType": "Classification",
    "workflowId": "lead-enrich",
    "executionId": "{{workflow.executionId}}",
    "modelId": "{{variables.modelVersion}}",
    "modelProvider": "Azure OpenAI",
    "inputHash": "{{variables.inputHash}}",
    "resultSummary": "{{variables.lineageSummaryJson}}",
    "tokensUsed": "{{variables.tokensUsed}}",
    "isSuccess": 1
  }
}

Querying Data Lineage

Full Lineage History for a Record

SELECT
    EnrichmentType,
    ModelId,
    ModelProvider,
    ResultSummary,
    TokensUsed,
    ProcessedAt,
    IsSuccess
FROM DataLineage
WHERE TenantId = @tenantId
  AND SourceTable = 'Lead'
  AND SourceRecordId = @leadId
ORDER BY ProcessedAt DESC;

All Records Enriched by a Specific Model Version

-- Use case: a model version is deprecated or found inaccurate
-- Find all records enriched by it for re-processing
SELECT DISTINCT SourceTable, SourceRecordId
FROM DataLineage
WHERE TenantId = @tenantId
  AND ModelId = 'gpt-4o-2024-05-13'  -- Deprecated model
  AND EnrichmentType = 'Classification'
  AND IsSuccess = 1;

Enrichment Volume and Cost Tracking

SELECT
    ModelId,
    EnrichmentType,
    COUNT(*) AS EnrichmentCount,
    SUM(TokensUsed) AS TotalTokens,
    AVG(TokensUsed) AS AvgTokens,
    MIN(ProcessedAt) AS FirstRun,
    MAX(ProcessedAt) AS LastRun
FROM DataLineage
WHERE TenantId = @tenantId
  AND ProcessedAt >= DATEADD(DAY, -30, GETUTCDATE())
  AND IsSuccess = 1
GROUP BY ModelId, EnrichmentType
ORDER BY TotalTokens DESC;
Lineage Table Grows Fast

With multiple enrichment types per record across large datasets, the DataLineage table can grow quickly. Implement an archival strategy: move lineage rows older than 1 year to DataLineage_Archive. Recent lineage (last 12 months) stays in the active table. Historical lineage remains queryable in the archive table.