Data Lineage
Tracking the provenance of AI-enriched data — the DataLineage table, what enrichment events are recorded, how to query the lineage for a specific record, and why lineage is required for GDPR accountability.
What Is Data Lineage?
Data lineage is the documented trail of where data came from and what transformations it has undergone. In the context of AI enrichment, lineage answers the question: How did this classification label get here? — capturing the source record, the AI model that produced the enrichment, the inputs used, and the timestamp.
Lineage is required for GDPR accountability (Article 5 principle of accuracy and accountability), enterprise audit standards, and model governance — so you can identify all records enriched by a specific model version and re-enrich them if that model is updated or found to be biased.
DataLineage Table Schema
CREATE TABLE DataLineage (
-- Primary key
LineageId uniqueidentifier NOT NULL DEFAULT NEWSEQUENTIALID(),
-- Record reference
TenantId int NOT NULL,
SourceTable nvarchar(200) NOT NULL, -- e.g., 'Lead', 'Customer'
SourceRecordId nvarchar(500) NOT NULL, -- GUID or int of the enriched record
DatasourceId nvarchar(200) NOT NULL, -- e.g., 'sales-data-db'
-- Enrichment event details
EnrichmentType nvarchar(100) NOT NULL, -- 'Classification', 'Summarization', 'Embedding', 'Sentiment'
WorkflowId nvarchar(200) NOT NULL, -- Flow Studio workflow that performed enrichment
WorkflowExecutionId nvarchar(500) NOT NULL, -- Specific execution ID for traceability
-- AI model details
ModelId nvarchar(200) NOT NULL, -- e.g., 'gpt-4o-2024-08-06'
ModelProvider nvarchar(100) NOT NULL, -- e.g., 'Azure OpenAI', 'OpenAI'
-- Content fingerprinting
InputHash nvarchar(64) NULL, -- SHA-256 of the text that was processed
OutputHash nvarchar(64) NULL, -- SHA-256 of the AI output
-- Result summary (no PII — just metadata)
ResultSummary nvarchar(max) NULL, -- JSON: { "label": "...", "confidence": 0.9 }
TokensUsed int NULL,
-- Timestamps
ProcessedAt datetime2 NOT NULL DEFAULT GETUTCDATE(),
IsSuccess bit NOT NULL DEFAULT 1,
ErrorMessage nvarchar(max) NULL,
CONSTRAINT PK_DataLineage PRIMARY KEY CLUSTERED (LineageId)
);
CREATE INDEX IX_DataLineage_TenantId_SourceRecord
ON DataLineage (TenantId, SourceTable, SourceRecordId, ProcessedAt DESC);
CREATE INDEX IX_DataLineage_TenantId_ModelId
ON DataLineage (TenantId, ModelId, ProcessedAt DESC);
Recording a Lineage Event in Workflow
// SqlUpdateNode — insert lineage record at end of every enrichment workflow
{
"nodeType": "SqlUpdateNode",
"nodeId": "record-lineage",
"datasourceId": "sales-data-db",
"command": "INSERT INTO DataLineage (LineageId, TenantId, SourceTable, SourceRecordId, DatasourceId, EnrichmentType, WorkflowId, WorkflowExecutionId, ModelId, ModelProvider, InputHash, ResultSummary, TokensUsed, ProcessedAt, IsSuccess) VALUES (NEWID(), @tenantId, @sourceTable, @sourceRecordId, @datasourceId, @enrichmentType, @workflowId, @executionId, @modelId, @modelProvider, @inputHash, @resultSummary, @tokensUsed, GETUTCDATE(), @isSuccess)",
"parameters": {
"tenantId": "{{workflow.tenantId}}",
"sourceTable": "Lead",
"sourceRecordId": "{{input.leadId}}",
"datasourceId": "sales-data-db",
"enrichmentType": "Classification",
"workflowId": "lead-enrich",
"executionId": "{{workflow.executionId}}",
"modelId": "{{variables.modelVersion}}",
"modelProvider": "Azure OpenAI",
"inputHash": "{{variables.inputHash}}",
"resultSummary": "{{variables.lineageSummaryJson}}",
"tokensUsed": "{{variables.tokensUsed}}",
"isSuccess": 1
}
}
Querying Data Lineage
Full Lineage History for a Record
SELECT
EnrichmentType,
ModelId,
ModelProvider,
ResultSummary,
TokensUsed,
ProcessedAt,
IsSuccess
FROM DataLineage
WHERE TenantId = @tenantId
AND SourceTable = 'Lead'
AND SourceRecordId = @leadId
ORDER BY ProcessedAt DESC;
All Records Enriched by a Specific Model Version
-- Use case: a model version is deprecated or found inaccurate
-- Find all records enriched by it for re-processing
SELECT DISTINCT SourceTable, SourceRecordId
FROM DataLineage
WHERE TenantId = @tenantId
AND ModelId = 'gpt-4o-2024-05-13' -- Deprecated model
AND EnrichmentType = 'Classification'
AND IsSuccess = 1;
Enrichment Volume and Cost Tracking
SELECT
ModelId,
EnrichmentType,
COUNT(*) AS EnrichmentCount,
SUM(TokensUsed) AS TotalTokens,
AVG(TokensUsed) AS AvgTokens,
MIN(ProcessedAt) AS FirstRun,
MAX(ProcessedAt) AS LastRun
FROM DataLineage
WHERE TenantId = @tenantId
AND ProcessedAt >= DATEADD(DAY, -30, GETUTCDATE())
AND IsSuccess = 1
GROUP BY ModelId, EnrichmentType
ORDER BY TotalTokens DESC;
With multiple enrichment types per record across large datasets, the DataLineage table can grow quickly. Implement an archival strategy: move lineage rows older than 1 year to DataLineage_Archive. Recent lineage (last 12 months) stays in the active table. Historical lineage remains queryable in the archive table.