Thursday, 30 April 2026

Self-Healing Pipeline Architecture

In the last blog dicussed a Autonomus SDLC system , In this we make it autonomous in terms of error detection, classification, recovery, and KB-backed continuous improvement — zero human intervention from failure to resolution.

Solution for Self-Healing — 7 new files + V18 migration build on top of every existing component: PipelineLog checkpoints, WorkflowFailedEvent, KnowledgeBaseService RAG, KnowledgeFeedbackService S3 writes, BedrockClient fallback, KbMaintenanceAgent schedule pattern. No new infrastructure required.
L1 DETECT L2 CKPT L3 RECOVER L4 KB L5 LEARN LAYER 1 — Detection & Watchdog Pipeline Execution ProposalService · CodeGenerationService RepoAnalysisService · AgentLoopService Exception stepFailed() PipelineLog persisted (FAILED) SSE broadcast to all subscribers WorkflowFailedEvent requirementId · failedStep errorMessage · Spring ApplicationEvent PipelineHealthMonitor @Scheduled every 60s — scans FAILED reqs @Scheduled every 5min — scans HUNG reqs LAYER 2 — Checkpoint Ledger (V18 Migration) PipelineLog (Enhanced) step: String · stepOrder: int · status: enum + retry_count: int (V18 new) + recovery_session_id: varchar (V18 new) getLastCheckpoint(reqId) Finds last COMPLETED step order in PipelineLog e.g. story 3 done at order 113 → resume from order 114 (story 4) alreadyCompleted(reqId, stepKey) Checks PipelineLogRepository for COMPLETED status on step name Used in CodeGenerationService + ProposalService loops Prevents duplicate work: stories 1–3 skipped on resume, story 4 retried LAYER 3 — Recovery Engine: PipelineRecoveryService PipelineRecoveryService @EventListener(WorkflowFailedEvent) · @Async("jarvisTaskExecutor") retry guard: Set<String> recentlyAttempted + TTL (30 min) ErrorClassifier → RecoveryStrategy TRANSIENT_BEDROCK ThrottlingException · timeout · 503 → Exponential backoff (2s→4s→8s) + retry same step MALFORMED_AI_RESPONSE → Retry Bedrock with suffix "Respond ONLY with valid JSON" GIT_CONFLICT 422 · ref exists → Rename branch: {name}-retry-{N}, re-attempt push STALE_CLONE → Delete local dir + re-clone from remote JIRA_UNAVAILABLE → Skip JIRA calls, continue pipeline (non-critical) CONFIG_MISSING → Escalate to Teams · cannot auto-fix creds UNKNOWN → KB lookup first → if hit: apply past fix · else: retry once → escalate Max Retry Guard TRANSIENT: max 3 · MALFORMED: max 2 GIT: max 3 · UNKNOWN: max 1 After max: → PipelineRecoveryExhaustedEvent → Teams alert Checkpoint Resume 1. getLastCheckpoint(reqId) → stepOrder N 2. startDevelopment(reqId, resumeFromOrder=N) 3. CodeGenerationService story loop: if alreadyCompleted(reqId, "CODE_GEN_STORY_"+N) continue ← skip, no duplicate commit else → generate + commit (first time) 4. Pipeline continues naturally to PR creation New Event Records (v1.3) PipelineRecoveryRequestedEvent reqId · failedStep · errorMsg · attemptNumber PipelineRecoveredEvent reqId · recoveredStep · strategy · attemptsNeeded · durationMs PipelineRecoveryExhaustedEvent reqId · step · allStrategiesFailed → escalate All via Spring ApplicationEventPublisher — no new infra needed LAYER 4 — Knowledge Base Lookup (existing KnowledgeBaseService) resolveFromKB("pipeline failure step:"+step+" error:"+errorClass, reqId) Queries Bedrock KB · Retrieve API (k=20) · Metadata filter: type=pipeline-incident Score ≥ 0.80 → past fix found → apply documented strategy immediately (skip trial-and-error) KbIncidentMatch (value object) strategy: RecoveryStrategy · confidence: float backoffMs: int · maxRetries: int · notes: String (from incident.md frontmatter) KB lookup before retry LAYER 5 — Continuous Improvement: KbHealingFeedbackService Incident Document (S3 Upload) Path: s3://.../learnings/{reqId}/incidents/{ts}-{step}.md YAML frontmatter: type: pipeline-incident step · error-class · fix-strategy · attempts · duration Body: Error · Root Cause · Fix Applied · Prevention → KB sync → vector indexed → available for future RAG @EventListener Coverage WorkflowFailedEvent → write incident stub immediately PipelineRecoveredEvent → complete incident doc (fix worked) PipelineRecoveryExhaustedEvent → escalation doc (fix failed) All @Async("jarvisTaskExecutor") — non-blocking Guarded by isEnabled() — silent skip if KB/S3 not configured Continuous Improvement Loop 1st occurrence: default strategy (trial-and-error) 2nd occurrence: KB hit → apply pre-validated fix instantly Each cycle: incident doc → KnowledgeBaseService.startSync() KB continuously enriched with real production failure data → Recovery time decreases with every resolved incident ① Fail (0s) ② Detect (≤60s) ③ KB Lookup (1–2s) ④ Classify + Backoff (2–8s) ⑤ Resume from ckpt (0s overhead) ⑥ Recovery complete + KB written ⑦ Improved

Component Deep Dive

🔍 Layer 1 — PipelineHealthMonitor

Type: @Component with two @Scheduled jobs
Job 1 — Active Failures (every 60s): Queries all RequirementStatus.FAILED requirements updated in the last 2 hours that haven't been attempted in the last 30 minutes. Publishes PipelineRecoveryRequestedEvent.
Job 2 — Silent Hangs (every 5 min): Finds requirements in ANALYZING or IN_DEVELOPMENT with no PipelineLog update for more than 15 minutes. Synthesizes a WorkflowFailedEvent("TIMEOUT: no pipeline progress for 15m").
Guard: ConcurrentHashMap<String, Instant> recentlyAttempted — TTL 30 min prevents retry storms.

📋 Layer 2 — Checkpoint Ledger

V18 Migration adds: retry_count INT DEFAULT 0 and recovery_session_id VARCHAR(36) to pipeline_logs. Also adds recovery_attempt_count INT and last_recovery_at TIMESTAMP to requirements.
New repo method: findTopByRequirementIdAndStatusOrderByStepOrderDesc(reqId, COMPLETED) — returns last completed step.
Skip logic in loops: alreadyCompleted(reqId, "CODE_GEN_STORY_4") checks for a COMPLETED PipelineLog with that step name. On resume: stories 1–3 are skipped in microseconds, story 4 is retried from scratch.

⚙️ Layer 3 — PipelineRecoveryService

Entry point: @EventListener on WorkflowFailedEvent, @Async("jarvisTaskExecutor")
Flow: (1) Check retry guard → (2) Query KB for past fix → (3) ErrorClassifier.classify(step, errorMsg)RecoveryStrategy enum → (4) Apply strategy with backoff → (5) Validate success → (6) Publish PipelineRecoveredEvent or PipelineRecoveryExhaustedEvent
Max retries per strategy: TRANSIENT=3, MALFORMED=2, GIT_CONFLICT=3, STALE_CLONE=2, UNKNOWN=1
Reuses: BedrockClient.invokeWithFallback(), GitHubClient.createBranch(), existing clone/delete logic

🧠 Layer 4 — KB Lookup

Called before every recovery attempt: knowledgeBaseService.resolveFromKB("pipeline failure step:CODE_GEN_STORY error:TRANSIENT_BEDROCK", reqId)
Metadata filter: source-uri startsWith s3://.../learnings/.../incidents/
Threshold: confidence ≥ 0.80 → use documented strategy; < 0.80 → use default ErrorClassifier strategy
KbIncidentMatch parses frontmatter: fix-strategy, backoff-ms, max-retries → PipelineRecoveryService applies them directly
Zero new AWS resources — reuses existing KB ID, AOSS index, embeddings model

📚 Layer 5 — KbHealingFeedbackService

New class extending the existing S3 upload pattern from KnowledgeFeedbackService.
Listens to 3 events: WorkflowFailedEvent (stub), PipelineRecoveredEvent (complete), PipelineRecoveryExhaustedEvent (escalation).
Upload path: s3://.../learnings/{reqId}/incidents/{yyyyMMdd-HHmmss}-{step}.md
YAML frontmatter enables metadata filtering in KB: type: pipeline-incident, step, error-class, fix-strategy, resolved: true/false
After upload → knowledgeBaseService.startSync() → new incident indexed within ~30s

🗂️ ErrorClassifier (standalone)

Pure utility class — no Spring dependencies, fully unit-testable.
classify(String step, String errorMessage) → RecoveryStrategy
Uses ordered regex/contains checks:
ThrottlingException|Rate exceeded|503 → TRANSIENT_BEDROCK
<|Unexpected character|parse error → MALFORMED_AI_RESPONSE
422|Reference already exists|already exists → GIT_CONFLICT
Remote mismatch|no commits|stale clone → STALE_CLONE
jira|401|403.*atlassian → JIRA_UNAVAILABLE
bucket|credentials|NoSuch.*Key → CONFIG_MISSING
fallthrough → UNKNOWN

Recovery Time Budget (TRANSIENT_BEDROCK Example)

0s   → CODE_GEN_STORY_4 throws ThrottlingException → stepFailed() persists PipelineLog
0s   → WorkflowFailedEvent published → KbHealingFeedbackService writes incident stub to S3
≤60s → PipelineHealthMonitor detects FAILED status → publishes PipelineRecoveryRequestedEvent
+1s  → PipelineRecoveryService.onRecovery() → KB lookup: "pipeline failure step:CODE_GEN_STORY error:TRANSIENT_BEDROCK"
+2s  → KB HIT (2nd+ occurrence): past incident found, confidence=0.87 → apply: 4s backoff, retry once
       OR KB MISS (1st occurrence): ErrorClassifier → TRANSIENT_BEDROCK → default 2s→4s→8s backoff
+4s  → getLastCheckpoint() → order 113 (story 3 done) → alreadyCompleted() checks stories 1–3 = skip
+5s  → Retry story 4 → Bedrock invoked → success
+8s  → Stories 5–8 continue normally → PR created → PipelineRecoveredEvent published
+9s  → KbHealingFeedbackService completes incident.md → knowledgeBaseService.startSync()
+40s → KB re-indexed → next ThrottlingException anywhere → instant KB-guided recovery

Design Principles

✓ Zero New Infrastructure

Reuses existing thread pool, KB, S3, SSE stream, PipelineLog. Flyway V18 is the only schema change. No Redis, no Kafka, no distributed lock manager needed.

✓ Idempotent Resume

alreadyCompleted() is the single guard. A story that succeeded before recovery will never be re-run, guaranteeing no duplicate commits or duplicate JIRA transitions.

✓ Self-Improving KB

Every incident enriches the KB. First failure is trial-and-error. Every subsequent identical failure is resolved instantly from the KB. Recovery time strictly decreases over time.

⚠️ Non-Blocking Side Effects

All KB writes, incident uploads, and sync triggers are @Async. A KB outage during recovery does NOT block the recovery itself — the pipeline resumes regardless.

⚠️ Guardrail: Max Retries

Strict per-strategy retry caps prevent runaway loops. After exhaustion: RequirementStatus.FAILED is set permanently, Teams alert sent, full incident logged. Human can restart manually.

📡 Full Observability

Every recovery attempt creates a PipelineLog entry visible in the SSE pipeline viewer with step name RECOVERY_ATTEMPT_N. Users see healing happen in real time in the UI.

No comments:

Post a Comment