Portal Community

ISemanticMemoryStore Write API

public interface ISemanticMemoryStore
{
    // Write a single memory record
    Task UpsertAsync(string collection, MemoryRecord record, CancellationToken ct);

    // Write a batch of memory records (more efficient)
    Task UpsertBatchAsync(string collection, IReadOnlyList<MemoryRecord> records, CancellationToken ct);

    // Delete all records for a specific source document
    Task DeleteBySourceAsync(string collection, string source, CancellationToken ct);

    // Delete a single record by ID
    Task DeleteAsync(string collection, string id, CancellationToken ct);

    // Search (retrieval — see Retrieval page)
    Task<IReadOnlyList<MemoryRecord>> SearchAsync(
        string collection, float[] queryEmbedding, int topK, float minScore, CancellationToken ct);
}

public class MemoryRecord
{
    public string         Id        { get; init; } = Guid.NewGuid().ToString();
    public string         Content   { get; init; } = string.Empty;
    public float[]        Embedding { get; init; } = Array.Empty<float>();
    public MemoryMetadata Metadata  { get; init; } = new();
    public float          Score     { get; set; }   // Populated by SearchAsync
}

public class MemoryMetadata
{
    public string Source     { get; set; } = string.Empty;
    public string Category   { get; set; } = string.Empty;
    public string AgentId    { get; set; } = string.Empty;
    public string TenantId   { get; set; } = string.Empty;
    public int    ChunkIndex { get; set; }
    public string IndexedAt  { get; set; } = string.Empty;
}

Document Indexing Service

public class DocumentIndexingService
{
    public async Task<IndexingResult> IndexAsync(
        Stream fileStream,
        DocumentMetadata metadata,
        Guid agentId,
        CancellationToken ct)
    {
        // 1. Extract text
        var ingester = _ingesters.First(i => i.CanHandle(metadata.Extension, metadata.MimeType));
        var doc = await ingester.IngestAsync(fileStream, metadata, ct);

        // 2. Chunk
        var chunks = _chunker.Chunk(doc.RawText, _chunkingConfig);

        // 3. Embed (batched)
        var embeddings = await _embedder.EmbedBatchAsync(
            chunks.Select(c => c.Content).ToList(), ct);

        // 4. Delete old chunks for this source (for re-indexing updates)
        string collection = $"agent_{agentId:N}";
        await _store.DeleteBySourceAsync(collection, metadata.Source, ct);

        // 5. Write new chunks
        var records = chunks.Select((chunk, i) => new MemoryRecord
        {
            Id        = Guid.NewGuid().ToString(),
            Content   = chunk.Content,
            Embedding = embeddings[i],
            Metadata  = new MemoryMetadata
            {
                Source     = metadata.Source,
                Category   = metadata.Category,
                AgentId    = agentId.ToString(),
                TenantId   = metadata.TenantId.ToString(),
                ChunkIndex = chunk.Index,
                IndexedAt  = DateTime.UtcNow.ToString("O")
            }
        }).ToList();

        await _store.UpsertBatchAsync(collection, records, ct);

        return new IndexingResult
        {
            DocumentId = metadata.DocumentId,
            ChunkCount = records.Count,
            TokenCount = chunks.Sum(c => c.TokenCount),
            IndexedAt  = DateTime.UtcNow
        };
    }
}

Re-indexing a Document

When a document is updated, re-indexing atomically replaces all its chunks:

// PATCH /api/octopus/knowledge/{agentId}/documents/{documentId}/reindex
// The service:
// 1. Deletes all existing chunks matching metadata.Source == documentFileName
// 2. Re-ingests, re-chunks, re-embeds, and writes the new chunks
// Result: no downtime — old chunks serve queries until new ones are written

Bulk Re-indexing Job

// POST /api/octopus/knowledge/{agentId}/reindex-all
// Triggers a background job that re-indexes all documents:
// Used when changing chunking config or embedding model

public class BulkReindexJob : IHostedService
{
    public async Task StartAsync(CancellationToken ct)
    {
        var documents = await _docStore.GetAllAsync(agentId, ct);
        var semaphore = new SemaphoreSlim(5);  // Max 5 concurrent indexing operations

        var tasks = documents.Select(async doc =>
        {
            await semaphore.WaitAsync(ct);
            try   { await _indexer.IndexAsync(doc.Stream, doc.Metadata, agentId, ct); }
            finally { semaphore.Release(); }
        });

        await Task.WhenAll(tasks);
    }
}

Indexing Status Tracking

StatusMeaning
PendingDocument uploaded; indexing queued but not started
IndexingActive — chunking, embedding, or writing in progress
IndexedComplete — chunks available for retrieval immediately
FailedError during indexing — check error details and retry
DeletedDocument and all its chunks have been removed