Enrichment Pipeline Pattern
The standard enrichment workflow: a trigger fires when a new record arrives, downstream AI nodes classify, summarize, and embed the content, and the results are written back to the SQL record — all asynchronously without blocking the creating user.
The Core Pattern
All enrichment workflows follow the same four-stage pattern, regardless of which table they enrich or which AI operations they perform:
Trigger
An event fires that a new or updated record needs enrichment. This can be an async sub-workflow call from a CRUD workflow, a scheduled scan for un-enriched rows, or an event message from EdgeStream.
Read Raw Data
A SqlQueryNode reads the current state of the record. This retrieves all text fields that will be processed: notes, descriptions, names, free-form content.
AI Enhancement (parallel)
One or more AI nodes process the raw content in parallel: classification node, summarization node, embedding generation node, sentiment node. Each runs concurrently to minimize total enrichment latency.
Write Enriched Fields
A SqlUpdateNode writes all AI-generated fields back to the record: ClassificationLabel, SummaryText, SentimentScore, EmbeddingRef, AiProcessedAt, AiModelVersion.
Record Lineage
A SqlUpdateNode inserts a row into the DataLineage table recording what was enriched, by which model, at what time, with what input/output hash — for audit and GDPR compliance.
Trigger Patterns
Three patterns for triggering enrichment workflows:
Pattern A: Async Sub-Workflow (Recommended)
The CRUD create workflow triggers the enrichment workflow asynchronously as its final step. The create workflow returns success to the user immediately — enrichment runs in parallel without delaying the response:
// In lead-create workflow — final step after INSERT:
{
"nodeType": "WorkflowTriggerNode",
"nodeId": "trigger-enrichment",
"workflowId": "lead-enrich",
"triggerMode": "async", // Fire and forget — do not wait for completion
"inputs": {
"leadId": "{{variables.insertResult.generatedId}}",
"tenantId": "{{workflow.tenantId}}"
}
}
Pattern B: Scheduled Batch Scan
A scheduled workflow runs on a cron schedule and finds all records with AiProcessedAt IS NULL, then enriches them in batches:
// Scheduled enrichment batch workflow
// Step 1: Find un-enriched records
SqlQueryNode: SELECT TOP 100 LeadId FROM Lead
WHERE TenantId = @tenantId AND AiProcessedAt IS NULL AND IsDeleted = 0
ORDER BY CreatedAt ASC
// Step 2: Loop through results
// For each leadId → trigger lead-enrich async
// Configured schedule: "0 */6 * * *" (every 6 hours)
Pattern C: EdgeStream Event
The enrichment workflow subscribes to an EdgeStream data channel. When a new record event is published to the channel, the workflow fires immediately:
// Workflow trigger: EdgeStream subscription
{
"triggerType": "EdgeStream",
"channel": "data-ocean-events",
"eventFilter": {
"eventType": "lead.created lead.updated",
"condition": "event.data.hasNotes == true"
}
}
Idempotency
Enrichment workflows must be idempotent — running the same enrichment twice on the same record must be safe. Achieve this by checking AiProcessedAt at the start of the workflow:
// Enrichment workflow — idempotency check
SqlQueryNode: SELECT AiProcessedAt FROM Lead
WHERE TenantId = @tenantId AND LeadId = @leadId
// Condition Node:
// If AiProcessedAt IS NOT NULL AND DATEDIFF(HOUR, AiProcessedAt, GETUTCDATE()) < 24
// → Exit workflow (already enriched recently)
// Else
// → Proceed with enrichment
Error Handling
Enrichment failures must not affect the business data. Enrich workflows handle errors by:
- Catching AI node failures (model API errors, rate limits, timeouts) with a catch/finally pattern
- Writing partial enrichment results even if some nodes fail — a classification without an embedding is better than no enrichment at all
- Logging enrichment failures to an
EnrichmentErrortable for the scheduled batch scan to retry - Never leaving a record in a partial state visible to users — all AI column updates happen in a single SqlUpdateNode
A record is fully usable — queryable, displayed in forms, passed to other workflows — before enrichment completes. The AI columns are all nullable. Application code must handle null AI columns gracefully (e.g., display "Pending classification" instead of a null ClassificationLabel).