Ingestion Module Playbook¶
Mission Control for Data Ingestion
This document is the single source of truth for the Labeeb Ingestion Module. It provides a deep dive into the architecture, API contracts, and operational procedures required to manage the flow of data into the platform. It is designed for SREs, on-call engineers, and developers who need to understand, operate, or extend the ingestion pipeline.
1. Mission & Guiding Principles¶
The Ingestion Module's mission is to provide a single, reliable, and extensible gateway for all external data entering the Labeeb platform.
Its architecture is founded on these core principles:
- Asynchronous & Resilient: All processing is handled via a robust, queue-based system (Laravel Horizon & Redis). This allows the API to acknowledge ingestion requests instantly while complex validation, normalization, and persistence tasks are performed reliably in the background. This design prevents data loss during downstream failures and ensures high throughput.
- Strategy Pattern for Extensibility: The module uses the Strategy design pattern to decouple the core ingestion logic from the specifics of any single data source. New data providers (e.g., for a new news API or a website's sitemap) can be added as self-contained "Adapter" classes with zero changes to the core orchestration logic.
- Strict, Versioned Contracts: All data enters through a single, versioned API endpoint (
/v1/ingest/articles). This strict contract ensures data integrity and allows the Scraper and other client services to evolve independently without breaking the core platform. - Database as the Source of Truth: The module is the sole writer to the core
articlesandsourcestables in the PostgreSQL database, ensuring data consistency and a single, authoritative system of record.
2. Architecture Deep Dive¶
The module is composed of several key components that work together to create a flexible and robust pipeline.
flowchart TD
%% Styles
classDef ext fill:#e0f2fe,stroke:#0ea5e9,stroke-width:1px;
classDef store fill:#f0fdf4,stroke:#22c55e,stroke-width:1px;
classDef process fill:#f8fafc,stroke:#64748b,stroke-width:1px;
classDef optional stroke-dasharray: 5 5;
%% External producers
subgraph Producers
SCR["Scraper Service
POST /ingest"]:::ext
BCK["Backfill/CLI"]:::ext
WH["Partner Webhook (optional)"]:::ext
end
%% Ingestion service (API)
subgraph Ingestion Service API
direction LR
ING_API["/ingest Controller
• JSON schema & per-item RFC7807
• size caps / auth"]:::process
ING_Q[(Redis Queue)]:::store
WKR["Persist Worker
• Normalize (clean, lang)
• Canonicalize URL
• content_hash"]:::process
IDED{"Idempotent?
(canonical_url + content_hash)"}:::process
OS[("OpenSearch `news_docs`")]:::store
OUT[(Outbox/Event
article.upserted / deleted)]:::store
METRICS[(Prometheus /metrics)]:::store
LOGS[(Structured Logs)]:::store
end
%% Vectorize sync (downstream)
subgraph Cloudflare Sync
direction LR
CHUNK["Chunker"]:::process
SYNC["Vectorize Syncer
• embed (Workers AI)
• upsert/delete (Vectorize)"]:::process
VEC[("Vectorize Index")]:::store
end
%% Flows
SCR --> ING_API
BCK --> ING_API
WH --> ING_API
ING_API --> ING_Q
ING_Q --> WKR --> IDED
IDED -- duplicate --> METRICS
IDED -- new/updated --> OS
IDED -- deleted --> OUT
OS --> OUT
OUT --> CHUNK --> SYNC --> VEC
%% Obs
ING_API -. "request metrics / logs" .-> METRICS
ING_API -. logs .-> LOGS
WKR -. job stats .-> METRICS
SYNC -. sync stats .-> METRICS
Data Flow Diagram (End-to-End)¶
The following DFD complements the architecture view by showing all operational entrypoints, including CLI commands, push/pull ingestion paths, queues, stores, and the analysis/indexing steps. It aligns with api/openapi.json and scraper/openapi.json.
flowchart TD
classDef ext fill:#e0f2fe,stroke:#0ea5e9,stroke-width:1px;
classDef process fill:#f8fafc,stroke:#64748b,stroke-width:1px;
classDef store fill:#f0fdf4,stroke:#22c55e,stroke-width:1px;
classDef queue fill:#fff7ed,stroke:#f97316,stroke-width:1px;
classDef optional stroke-dasharray: 5 5;
%% External systems
subgraph External
SCR[Scraper Service
Endpoints: /scrape, /profiles, /profiles/reload]:::ext
PARTNER[Partner Webhooks]:::ext
NEWSAPI[News Providers
e.g. NewsData, MediaStack, RSS]:::ext
end
%% CLI entrypoints
subgraph CLI Commands
CS["crawl:source
• SourceRegistrar (--create)
• CrawlOrchestrator (--sync)"]:::process
SYNC["scraper:sync-sources
• ScraperPushSyncService -> GET /profiles
• optional prune/dry-run"]:::process
NP["news:pull
• Provider latest/archive
• Normalize -> upsert"]:::process
OS_SETUP["opensearch:setup
• Templates + ISM policies"]:::process
end
%% Ingestion API (push)
subgraph Ingestion API
CTRL["IngestArticlesController
(POST /v1/ingest/articles)"]:::process
IDEMP[(Redis Idempotency Cache
Idempotency-Key)]:::store
VAL["Validate JSON -> DTO (IngestedItem)"]:::process
SR["SourceResolver"]:::process
APROC_SYNC["ArticleProcessor.normalizeAndDeduplicate
(sync)"]:::process
end
%% Pull orchestration
subgraph Pull Orchestration
ORCH["CrawlOrchestrator"]:::process
LOCK[(Per-source Lock)]:::store
ADP["SourceAdapter(s)
• RssAdapter
• NewsDataSourceAdapter
• ScraperPullAdapter"]:::process
end
%% Core infra
PG[(PostgreSQL
sources, articles, contents, entities, crawl_runs)]:::store
Q[(Redis Queue 'ingestion')]:::queue
%% Workers
subgraph Workers
NSJ["NormalizeAndStoreJob
(pull path)"]:::process
APROC_ASYNC["ArticleProcessor.normalizeAndDeduplicate
(async)"]:::process
AAIJ["AnalyzeAndIndexJob"]:::process
NER["NerService"]:::process
EMB["EmbeddingsService"]:::process
end
%% Search/Index
OS[(OpenSearch)]:::store
%% Observability
METRICS[(Prometheus /metrics)]:::store
LOGS[(Structured Logs)]:::store
%% Flows: CLI
CS --> ORCH
CS -- create --> PG
SYNC --> SCR:::ext
SYNC -- upsert/prune --> PG
NP -- fetch via provider --> NEWSAPI
NP -- normalize/upsert --> PG
OS_SETUP --> OS
%% Flows: Push ingestion
SCR -- push --> CTRL
PARTNER --> CTRL
CTRL -. idempotency .- IDEMP
CTRL --> VAL --> SR --> APROC_SYNC --> PG
APROC_SYNC -- accepted --> AAIJ
CTRL -. req metrics/logs .-> METRICS
CTRL -. logs .-> LOGS
%% Flows: Pull ingestion
CS --> ORCH
ORCH --> LOCK
ORCH --> ADP
ADP -- discover --> NEWSAPI
ADP -- yield DTOs --> ORCH --> Q
ORCH -. run metrics/logs .-> METRICS
%% Queue consumption
Q --> NSJ --> APROC_ASYNC --> PG
NSJ -- enqueue --> AAIJ
%% Analysis + Indexing
AAIJ --> NER --> PG
AAIJ --> EMB --> OS
AAIJ --> OS
NSJ -. job stats .-> METRICS
AAIJ -. job stats .-> METRICS
Architectural Zones¶
A zone-based view that groups components by purpose and boundary.
%% ----------------------------------------------------------------------------
%% Project Labeeb - Master Data Flow Diagram (DFD) - v3.1 (Corrected)
%% ----------------------------------------------------------------------------
%% Improved to the MAX by Gemini
%% ----------------------------------------------------------------------------
flowchart LR
%% ---- STYLING & VISUAL LANGUAGE ----
%% A rich visual language is defined for maximum clarity and professional aesthetic.
classDef platform stroke:#475569,stroke-width:2px,fill:#f8fafc,color:#334155;
classDef cloudPlatform stroke:#059669,stroke-width:2px,fill:#f0fdf4,color:#047857;
classDef live fill:#ecfdf5,stroke:#22c55e,stroke-width:1.5px,color:#15803d;
classDef mvp fill:#fefce8,stroke:#eab308,stroke-width:1.5px,color:#713f12;
classDef later fill:#fee2e2,stroke:#ef4444,stroke-width:1.5px,color:#991b1b;
classDef service fill:#eef2ff,stroke:#6366f1,stroke-width:1.5px;
classDef worker fill:#fffbeb,stroke:#f59e0b,stroke-width:1.5px;
classDef datastore fill:#f1f5f9,stroke:#64748b,stroke-width:1.5px;
classDef queue fill:#fff7ed,stroke:#f97316,stroke-width:1.5px;
classDef trigger fill:#f5f3ff,stroke:#8b5cf6,stroke-width:1.5px;
classDef external fill:#f8fafc,stroke:#9ca3af,stroke-width:1.5px,stroke-dasharray:3 3;
classDef optional stroke-dasharray: 3 3, fill:#f4f4f5;
%% ---- ARCHITECTURAL ZONES ----
%% The system is divided into clear, logical zones for easy understanding.
subgraph Inputs ["fa:fa-sign-in-alt Inputs & Triggers"]
direction TB
CLI["CLI Commands
(artisan)"]:::trigger
USER_QUERY["User Query
(via Public API)"]:::trigger
end
subgraph CorePlatform ["fa:fa-server Core Platform (DigitalOcean)"]
direction TB
subgraph ScraperService ["Scraper Service (Python/FastAPI)"]
direction LR
SCR_API["API Server
/profiles, /scrape"]:::service
SCR_Workers["Fetcher Workers
Parses articles into DTOs"]:::worker
end
subgraph ApiService ["API Service (PHP/Laravel)"]
direction LR
API_Orchestrator["Crawl Orchestrator
Manages pull-based ingestion"]:::service
API_Ingest["Ingest Controller
POST /v1/ingest/articles"]:::service
API_Processor["Article Processor
Normalizes, dedupes & persists"]:::service
end
subgraph CoreAI ["Core AI Logic & Orchestration (in API Service)"]
direction LR
RAG_Orchestrator["RAG Orchestrator
Hybrid Search & Reranking"]:::service
Analysis_Orchestrator["Analysis Orchestrator
Sends prompts to AI Gateway"]:::service
end
subgraph Async ["Async Processing"]
direction LR
RedisQueues[("fa:fa-layer-group Redis Queues
'ingestion', 'analysis'")]:::queue
AsyncWorkers["Job Workers
• NormalizeAndStoreJob
• AnalyzeAndIndexJob"]:::worker
end
end
subgraph Cloudflare ["fa:fa-cloudflare Edge AI Platform (Cloudflare)"]
direction TB
CF_Gateway["AI Gateway
Universal LLM Endpoint"]:::service
CF_RAG_Worker["RAG Query Worker
Embeds query, searches Vectorize"]:::worker
CF_Vectorize["fa:fa-database Cloudflare Vectorize
Vector Store"]:::datastore
end
subgraph Persistence ["fa:fa-database Data & Persistence"]
direction TB
PG["PostgreSQL
Authoritative Datastore"]:::datastore
OS["OpenSearch
Keyword Search & Analytics"]:::datastore
Observability["Observability
Prometheus & Logs"]:::datastore
end
%% ---- DATA FLOW PATHWAYS ----
%% We now trace the primary "stories" of how data moves through the system.
%% Path 1: The Ingestion Pipeline (Getting data IN)
subgraph IngestionPipeline [" "]
style IngestionPipeline fill:none,stroke:none
CLI -- "crawl:source" --> API_Orchestrator;
API_Orchestrator -- "Pulls from" --> EXT_Providers["fa:fa-newspaper News Providers
(RSS, APIs)"]:::external;
API_Orchestrator -- "Or triggers" --> SCR_API;
SCR_API -- "Dispatches jobs to" --> SCR_Workers;
SCR_Workers -- "Fetches from" --> EXT_Providers;
SCR_Workers -- "Pushes DTOs to" --> API_Ingest;
EXT_Providers -- " " --> SCR_Workers
API_Orchestrator -- "Enqueues job" --> RedisQueues;
RedisQueues -- "Processed by" --> AsyncWorkers;
AsyncWorkers -- "Uses" --> API_Processor;
API_Ingest -- "Uses" --> API_Processor;
API_Processor -- "1. Persists to" --> PG;
API_Processor -- "2. Enqueues job" --> RedisQueues;
AsyncWorkers -- "3. Indexes in" --> OS;
AsyncWorkers -- "4. (Optional) Upserts to" --> CF_Vectorize;
end
%% Path 2: The RAG & Analysis Pipeline (Getting answers OUT)
subgraph AnalysisPipeline [" "]
style AnalysisPipeline fill:none,stroke:none
USER_QUERY -- "Initiates request" --> RAG_Orchestrator;
RAG_Orchestrator -- "a. Keyword search" --> OS;
RAG_Orchestrator -- "b. Vector search" --> CF_RAG_Worker;
CF_RAG_Worker -- "Queries" --> CF_Vectorize;
OS -- "BM25 Results" --> RAG_Orchestrator;
CF_RAG_Worker -- "Vector Results" --> RAG_Orchestrator;
RAG_Orchestrator -- "Builds" --> EvidencePack["fa:fa-file-alt Evidence Pack
Ranked, relevant context"];
EvidencePack --> Analysis_Orchestrator;
Analysis_Orchestrator -- "Prompts for Stance,
Conflict, Explanation" --> CF_Gateway;
CF_Gateway -- "Routes to" --> EXT_LLMs["fa:fa-brain LLMs
(Llama, Claude, etc.)"]:::external;
EXT_LLMs -- "Returns analysis" --> Analysis_Orchestrator;
Analysis_Orchestrator -- "Generates" --> Verdict["fa:fa-gavel Final Verdict
ClaimReview JSON-LD"];
Verdict -- "Persists to" --> OS;
Verdict -- "Serves via" --> FinalAPI["fa:fa-broadcast-tower Public API & UI"];
end
%% Path 3: Observability
ApiService & ScraperService & AsyncWorkers -- "Emit Metrics & Logs" --> Observability
%% ---- APPLY STATUS & COMPONENT STYLES ----
classDef platform stroke:#475569,stroke-width:2px,fill:#f8fafc,color:#334155;
classDef cloudPlatform stroke:#059669,stroke-width:2px,fill:#f0fdf4,color:#047857;
classDef live fill:#ecfdf5,stroke:#22c55e,stroke-width:1.5px,color:#15803d;
classDef mvp fill:#fefce8,stroke:#eab308,stroke-width:1.5px,color:#713f12;
classDef later fill:#fee2e2,stroke:#ef4444,stroke-width:1.5px,color:#991b1b;
classDef service fill:#eef2ff,stroke:#6366f1,stroke-width:1.5px;
classDef worker fill:#fffbeb,stroke:#f59e0b,stroke-width:1.5px;
classDef datastore fill:#f1f5f9,stroke:#64748b,stroke-width:1.5px;
classDef queue fill:#fff7ed,stroke:#f97316,stroke-width:1.5px;
classDef trigger fill:#f5f3ff,stroke:#8b5cf6,stroke-width:1.5px;
classDef external fill:#f8fafc,stroke:#9ca3af,stroke-width:1.5px,stroke-dasharray:3 3;
classDef optional stroke-dasharray: 3 3, fill:#f4f4f5;
%% Statuses
class SCR_API,SCR_Workers,API_Orchestrator,API_Ingest,API_Processor,RedisQueues,AsyncWorkers,PG,OS live;
class RAG_Orchestrator,CF_RAG_Worker,CF_Vectorize,EvidencePack mvp;
class Analysis_Orchestrator,CF_Gateway,EXT_LLMs,Verdict,FinalAPI later;
Key Components¶
CrawlOrchestrator(Service): The main entry point. It acquires a lock on aSourceto prevent concurrent runs, uses theProviderFactoryto get the correct adapter, and dispatches jobs to the queue for each item discovered by the adapter.ProviderFactory(Service): A factory that dynamically instantiates the correctSourceAdapterbased on theproviderkey in aSourcemodel (e.g.,newsdata,rss).SourceAdapter(Contract): A PHP interface that defines the contract for all data providers. Each provider must implement adiscover()method thatyields a stream ofIngestedItemDTOs.IngestedItem(DTO): A simple, immutable Data Transfer Object that represents a single scraped article, ensuring structured data is passed between components.ArticleRepository(Repository): A class that encapsulates all the complex database logic for creating, updating, and deduplicating articles, their content, and their relationships.AnalyzeAndIndexJob(Job): The primary background job. It takes anIngestedItem, normalizes its content, uses theArticleRepositoryto persist it, and then triggers indexing in OpenSearch.
3. Ingestion API Contract (v1)¶
This contract defines the rules for all data submitted to the platform.
- Endpoint:
POST /v1/ingest/articles - Authentication:
Authorization: Bearer <INGEST_TOKEN> - Headers:
Idempotency-Key: <source_id>:<external_id>(Recommended for single-article posts)X-Contract-Version: 1(Optional)
Provider category mapping¶
When a source only supplies a provider-specific label (e.g., "Politics"), send it as category_label along with the provider.
The API consults config/category_map.php to translate the label into our canonical category_id, sets category_source to
provider, clamps any category_confidence into the 0–100 range, and fills taxonomy_version automatically.
Request Payloads¶
The endpoint accepts a single article object or a batch of articles.
{
"articles": [
{
"source_id": "verify-sy",
"external_id": "abc-123",
"url": "https://example.com/post/1",
"canonical_url": null,
"title": "Example title",
"content": "Plain text body only (no HTML).",
"lang": "ar",
"published_at": "2025-08-14T09:10:00Z",
"authors": ["Author Name"],
"images": ["https://example.com/cover.jpg"],
"raw": { "provider": "rss", "feed": "..." }
}
]
}
Field Constraints¶
title: 1–512 characterscontent: 200–200,000 characters (longer content will be truncated)lang:ar,en, ornullpublished_at: ISO 8601 format ornullauthors,images: Max 25 items per array
Response Body¶
The API responds with 202 Accepted when the request is well-formed. If any item conflicts with existing content, the endpoint returns 409 and an application/problem+json body summarizing the batch. Each item’s status is detailed in the results array.
{
"schema": "ingest.v1",
"summary": {"request_id": "<uuid>", "received": 3, "accepted": 2, "failed": 1, "conflicts": 0},
"results": [
{"external_id":"abc-123","status":"accepted","article_id":12345},
{"external_id":"abc-124","status":"duplicate","article_id":10221},
{"external_id":"abc-125","status":"failed","error":{"type":"/errors/invalid-url","title":"Invalid Payload"}}
]
}
| Item Status | Description |
|---|---|
accepted |
The article was new and has been successfully persisted and queued for analysis. |
duplicate |
An article with the same content_hash or unique identifiers already exists. No action was taken. |
conflict |
The (source_id, external_id) pair already exists but points to different content. The batch is rejected with 409. |
failed |
The article failed validation (e.g., missing a required field, content too short). See the error object for details. |
Quick cURL Examples¶
curl -X POST http://localhost:8000/v1/ingest/articles \
-H "Authorization: Bearer $INGEST_TOKEN" \
-H "Content-Type: application/json" \
-H "Idempotency-Key: verify-sy:abc-123" \
-d '{
"source_id":"verify-sy",
"external_id":"abc-123",
"url":"https://example.com/post/1",
"title":"Example title",
"content":"Plain text...",
"lang":"ar",
"published_at":"2025-08-14T09:10:00Z",
"authors":["Name"],
"images":[],
"raw":{"provider":"rss"}
}'
curl -X POST http://localhost:8000/v1/ingest/articles \
-H "Authorization: Bearer $INGEST_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"articles":[{ "source_id":"verify-sy","external_id":"abc-123","url":"https://example.com/post/1","title":"t1","content":"..."},
{ "source_id":"verify-sy","external_id":"abc-124","url":"https://example.com/post/2","title":"t2","content":"..."}]
}'
4. How to Add a New Data Provider¶
The ingestion module is designed for easy extensibility. To add support for a new data source (e.g., a new news API or a custom RSS feed), follow these steps:
Step 1: Create the Provider Class¶
First, create a new PHP class in the /api/app/Ingestion/Providers directory. This class must implement the SourceAdapter contract.
Example: SimpleRssProvider.php
<?php
// in /api/app/Ingestion/Providers/SimpleRssProvider.php
namespace App\Ingestion\Providers;
use App\Ingestion\Contracts\SourceAdapter;
use App\Ingestion\DTO\IngestedItem;
use App\Models\Source;
use Generator;
use Illuminate\Support\Facades\Http;
use Illuminate\Support\Facades\Log;
use SimpleXMLElement;
class SimpleRssProvider implements SourceAdapter
{
public function discover(Source $source): Generator
{
Log::info("SimpleRssProvider: Discovering articles for source '{$source->key}'.");
try {
$response = Http::timeout(30)->get($source->feed_url);
$xml = new SimpleXMLElement($response->body());
foreach ($xml->channel->item as $item) {
yield new IngestedItem(
externalId: (string) $item->guid,
url: (string) $item->link,
title: (string) $item->title,
summary: (string) $item->description,
content: (string) ($item->children('content', true)->encoded ?? ''),
publishedAt: (string) $item->pubDate,
sourceMeta: [
'provider' => 'simple-rss',
'name' => (string) $xml->channel->title,
],
raw: $item->asXML()
);
}
} catch (\Throwable $e) {
Log::error("SimpleRssProvider: Failed to fetch or parse feed for source '{$source->key}'.", [
'error' => $e->getMessage(),
]);
return;
}
}
}
Step 2: Register the Provider in the Factory¶
Next, open the ProviderFactory and add your new provider to the match statement.
// in /api/app/Ingestion/Services/ProviderFactory.php
// ...
public function make(Source $source): SourceAdapter
{
return match ($source->provider) {
'newsdata' => new NewsDataProvider($this->config->get('services.newsdata')),
'rss' => new RssAdapter(),
// Add your new provider here
'simple-rss' => new SimpleRssProvider(),
default => throw new \InvalidArgumentException(
"Unsupported provider type: [{$source->provider}] for source [{$source->key}]"
),
};
}
// ...
Step 3: Create a Source in the Database¶
Finally, add a new record to your sources table. The provider column must match the key you registered in the factory.
Example SQL:
INSERT INTO sources (
"key", "provider", "name", "feed_url", "language", "active", "created_at", "updated_at"
) VALUES (
'example_rss_feed', 'simple-rss', 'Example Blog RSS', 'https://example.com/rss.xml', 'en', true, NOW(), NOW()
);
Step 4: Run the Ingestion¶
That's it! You can now run the ingestion for your new source.
5. Scraper Integration (Push/Pull)¶
The Labeeb Scraper service plays a crucial role in the ingestion pipeline by fetching and normalizing articles from external sources. It integrates with the API's ingestion module through two primary mechanisms:
- Pull Mode (
ScraperPullAdapter): In this mode, the API'sCrawlOrchestratorinitiates a scrape job on the Scraper service. The Scraper then fetches data and returns it to the API, which processes it through the ingestion pipeline. This is handled by theApp\Ingestion\Adapters\ScraperPullAdapter. - Push Mode (
ScraperPushAdapter): In this mode, the Scraper service autonomously fetches data and pushes it directly to the API's ingestion endpoint (/v1/ingest/articles). This is handled by theApp\Ingestion\Adapters\ScraperPushAdapter, which acts as a no-op adapter within the API's discovery process, as the data is already being pushed.
All incoming articles, whether from pull or push mechanisms, are received by the App\Http\Controllers\IngestArticlesController at the /v1/ingest/articles endpoint (defined in api/routes/api.php). This controller is responsible for validating the incoming data and dispatching it for further processing.
For detailed information on the Scraper service's architecture, endpoints, and operational procedures, please refer to its dedicated documentation:
- Scraper Service Documentation
- Scraper Architecture
- Scraper Endpoints
- Scraper OpenAPI Specification
- API Ingestion Endpoint OpenAPI Specification (for the receiving end)
6. Relations Enrichment Callback¶
Endpoint Overview¶
- Endpoint:
POST /ingest/relations - Authentication:
Authorization: Bearer <INGEST_TOKEN>(same credential as/v1/ingest/articles). - Producer: AI-Box relations worker posting the
RelationsPackagepayload. - Purpose: Persist typed event–argument relations extracted by NLP Lab, while maintaining request-tracing metadata.
Request Contract (summary)¶
{
"doc_id": "123",
"lang": "ar",
"task_version": "re-hadath-v1:model-x",
"nlp_lab_version": "2.3.4",
"enrichment_status": "ok | cached | timeout | error | skipped",
"relations": [
{
"type": "hasAgent",
"args": {
"agent": "وزارة الصحة",
"event": "أعلنت",
"location": "غزة"
},
"confidence": 0.86,
"offsets": {"start": 120, "end": 168}
}
],
"enrich_ms": 143
}
Persistence Rules¶
- The controller validates the payload, propagates
X-Request-Id, and stores the full package underarticle.raw.ai.relationsalong withraw.ai.re_rid. - When
enrichment_status === "ok": - Existing
article_relationsrows are replaced with the new set, preserving sidecar-provided{start,end}offsets. - Entities referenced in relation arguments are upserted via
EntityRepository, and their IDs are tracked in each row'smeta.entitiesmap. article.raw.ai.ie.relations_summaryis updated with type counts for fast reads.- For non-
okstatuses, the payload is captured inraw.ai.relationsbut previously stored relations are left untouched; the API responds with 202 Accepted so upstream workers can retry later.
Observability¶
X-Request-Idis echoed for tracing across AI-Box, the worker, and the API.- Relation inserts participate in the existing Prometheus instrumentation via the ingestion service's database metrics.