Data Flow¶
This page describes how messages flow through Lango, how the application bootstraps and wires its components, and how async processing buffers work.
Message Flow¶
The following diagram shows the end-to-end path of a user message from input to response.
sequenceDiagram
participant User
participant Channel as Channel / Gateway
participant App
participant ADK as ADK Agent
participant CTX as ContextAwareModel
participant Provider as AI Provider
participant Tools as Tool Execution
User->>Channel: Send message
Channel->>App: Forward to app.runAgent()
App->>ADK: agent.Run(sessionID, input)
ADK->>CTX: GenerateContent(LLMRequest)
par Parallel Context Retrieval
CTX->>CTX: Knowledge retrieval<br/>(8-layer ContextRetriever)
CTX->>CTX: RAG / Graph RAG search<br/>(vector + graph expansion)
CTX->>CTX: Memory retrieval<br/>(observations + reflections)
end
CTX->>CTX: Assemble augmented<br/>system prompt
CTX->>Provider: GenerateContent<br/>(augmented request)
Provider-->>CTX: LLM response (streaming)
alt Tool call requested
CTX-->>ADK: Response with tool calls
ADK->>Tools: Execute tool(s)
Tools-->>ADK: Tool results
ADK->>CTX: GenerateContent<br/>(with tool results)
CTX->>Provider: Forward to provider
Provider-->>CTX: Final response
end
CTX-->>ADK: Final response
ADK-->>App: Event stream
App-->>Channel: Response text
Channel-->>User: Display response
Note over Channel,App: After turn completes:
Channel->>App: OnTurnComplete callback
par Async Post-Processing
App->>App: MemoryBuffer.Trigger()
App->>App: AnalysisBuffer.Trigger()
App->>App: LibrarianBuffer.Trigger()
end Flow Description¶
-
Input. A user sends a message through any channel (Telegram, Discord, Slack, CLI, WebSocket, or HTTP). The channel adapter normalizes the message and forwards it to the application.
-
Session resolution. The application resolves a session key from the channel context and injects it into the Go context via
session.SessionKeyFromContext. -
Agent execution.
ADK Agent.Run()creates user content and passes it to the ADK runner, which callsGenerateContenton the model. -
Context assembly. The
ContextAwareModelAdapterinterceptsGenerateContent. It extracts the last user message and runs three retrieval pipelines in parallel usingerrgroup:- Knowledge retrieval queries the 8-layer
ContextRetrieverfor relevant knowledge entries, tool descriptions, runtime context, and skill patterns. - RAG / Graph RAG performs vector similarity search. If Graph RAG is enabled, it additionally expands results through the triple store to discover structurally connected context.
- Memory retrieval loads recent observations and reflections for the current session.
- Knowledge retrieval queries the 8-layer
-
Prompt augmentation. The retrieved context is appended to the base system prompt as structured sections (Knowledge Context, Semantic Context, Conversation Memory). The augmented prompt is set as the
SystemInstructionon the request. -
Provider call. The augmented request is forwarded to the configured AI provider (OpenAI, Gemini, or Claude) via the
Supervisor.ProviderProxy. -
Tool execution. If the provider's response contains tool calls, the ADK runner executes each tool and feeds results back for another LLM round. Tools may go through the approval provider first if approval policy is enabled.
-
Response delivery. The final text response streams back through the ADK runner to the channel, which formats and delivers it to the user.
-
Post-processing. After the turn completes, the Gateway fires
OnTurnCompletecallbacks that trigger async buffers for memory observation, conversation analysis, and proactive librarian processing.
Bootstrap and Wiring¶
The application startup follows a two-phase process: bootstrap (database and crypto) followed by component wiring.
graph TD
BOOT["bootstrap.Run()"] --> DB["Open SQLite Database"]
BOOT --> CRYPTO["Initialize Crypto Provider<br/>(passphrase prompt)"]
BOOT --> PROFILE["Load Config Profile"]
DB --> RESULT["bootstrap.Result<br/>(DBClient, Crypto, Config)"]
CRYPTO --> RESULT
PROFILE --> RESULT
RESULT --> APP["app.New(boot)"]
APP --> S1["1. initSupervisor()"]
APP --> S2["2. initSessionStore()"]
APP --> S3["3. initSecurity()"]
APP --> S4["4. buildTools()"]
APP --> S5["5. initGraphStore()"]
APP --> S6["6. initSkills()"]
APP --> S7["7. initKnowledge()"]
APP --> S8["8. initMemory()"]
APP --> S9["9. initEmbedding()"]
APP --> S10["10. wireGraphCallbacks()"]
APP --> S11["11. initConversationAnalysis()"]
APP --> S12["12. initLibrarian()"]
APP --> S13["13. initPayment() + initX402()"]
APP --> S14["14. initCron() / initBackground() / initWorkflow()"]
APP --> S15["15. initAuth()"]
APP --> S16["16. initGateway()"]
APP --> S17["17. Approval wrapping"]
APP --> S18["18. initAgent()"]
APP --> S19["19. initChannels()"]
APP --> S20["20. Wire turn callbacks"] Phase 1: Bootstrap (bootstrap.Run())¶
Before the application starts, the bootstrap phase handles three critical tasks:
- Database. Opens the SQLite database and runs Ent schema migrations. The raw
*sql.DBhandle is preserved for SQLite-vec (vector store). - Crypto provider. For the
localprovider, prompts for a passphrase and derives encryption keys. The initializedCryptoProvideris passed through to avoid re-prompting. - Config profile. Loads the active configuration profile from the encrypted config store.
The result is a bootstrap.Result struct containing DBClient, Crypto, RawDB, and Config that gets passed to app.New().
Phase 2: Component Wiring (app.New())¶
The app.New() function initializes components in dependency order. Each init* function follows the same pattern:
- Check the configuration flag (e.g.,
cfg.Knowledge.Enabled). - If disabled, log and return
nil. - If initialization fails, log a warning and return
nil(graceful degradation). - If successful, return a component struct.
This ordering ensures that dependencies are available when needed:
- Supervisor is created first because it holds provider credentials needed by tools and the agent.
- Session Store is created early because most subsystems need database access.
- Security (crypto, keys, secrets) is set up before tools that need encryption.
- Graph Store is created before Knowledge because the
GraphEnginevariant needs a graph store reference. - Knowledge is created before Embedding because the embed callback needs the knowledge store.
- Embedding is created after Knowledge and Memory so it can wire
SetEmbedCallbackon both stores. - Graph callbacks are wired after all stores exist, connecting the entity extractor pipeline.
- Approval wrapping happens near the end, wrapping all tools with the composite approval provider.
- Agent is created last with all tools and the fully-configured LLM (possibly wrapped with
ContextAwareModelAdapterandPIIRedactingModelAdapter).
Async Buffer Pattern¶
Lango uses async buffers to decouple expensive processing (embedding generation, graph indexing, memory observation) from the request/response cycle. All buffers follow the same lifecycle pattern:
graph LR
SAVE["Store.Save()"] -->|callback| ENQUEUE["Buffer.Enqueue()"]
ENQUEUE --> QUEUE["Buffered Channel<br/>(capacity 256)"]
QUEUE --> WORKER["Background Worker"]
WORKER -->|batch| PROCESS["Process Batch"]
START["Buffer.Start(wg)"] -.->|launches| WORKER
STOP["Buffer.Stop()"] -.->|signals| WORKER Lifecycle¶
| Method | Description |
|---|---|
Start(wg *sync.WaitGroup) | Launches the background goroutine and increments the WaitGroup for graceful shutdown tracking |
Enqueue(request) | Non-blocking send to the buffered channel. If the channel is full, the request is dropped and a drop counter is incremented |
Stop() | Closes the stop channel, signaling the worker to drain remaining items and exit |
Buffer Types¶
EmbeddingBuffer (internal/embedding/buffer.go)
- Collects
EmbedRequestitems (ID, collection, content, metadata). - Processes in batches of 32 with a 2-second flush timeout.
- Calls the embedding provider to generate vectors and stores them in SQLite-vec.
- Triggered via
SetEmbedCallbackon Knowledge Store and Memory Store.
GraphBuffer (internal/graph/buffer.go)
- Collects
GraphRequestitems (lists of triples). - Processes in batches of 64 with a 2-second flush timeout.
- Writes triples to the BoltDB triple store.
- Triggered via
SetGraphCallbackon Knowledge Store and Memory Store, plus the Entity Extractor and Learning GraphEngine.
MemoryBuffer (internal/memory/buffer.go)
- Triggered by
OnTurnCompletegateway callback. - Checks token thresholds (message tokens, observation tokens) to decide when to run.
- Calls
Observerto extract observations from recent messages. - Calls
Reflectorto synthesize reflections when observation count exceeds threshold. - Supports memory compaction via
SetCompactor()wired toEntStore.CompactMessages.
AnalysisBuffer (internal/learning/analysis_buffer.go)
- Triggered by
OnTurnCompletegateway callback. - Checks turn count and token thresholds.
- Runs
ConversationAnalyzerfor pattern extraction andSessionLearnerfor knowledge generation.
LibrarianProactiveBuffer (internal/librarian/buffer.go)
- Triggered by
OnTurnCompletegateway callback. - Runs
ObservationAnalyzerto identify knowledge gaps. - Runs
InquiryProcessorto generate and auto-resolve inquiries above a confidence threshold.
Callback Wiring¶
The callback pattern avoids import cycles between intelligence subsystems. Instead of direct function calls, stores expose setter methods for callbacks:
Knowledge Store ──SetEmbedCallback──> EmbeddingBuffer.Enqueue()
Knowledge Store ──SetGraphCallback──> GraphBuffer.Enqueue()
Memory Store ──SetEmbedCallback──> EmbeddingBuffer.Enqueue()
Memory Store ──SetGraphCallback──> GraphBuffer.Enqueue()
Memory Store ──SetGraphHooks────> GraphBuffer.Enqueue() (temporal/session triples)
Learning Engine ──SetGraphCallback──> GraphBuffer.Enqueue() (confidence triples)
This wiring happens in app.New() after all components are initialized. The stores have no compile-time dependency on the buffers -- they only know about the callback function signature.