انتقل إلى المحتوى

Ingestion Module Playbook

Mission Control for Data Ingestion

This document is the single source of truth for the Labeeb Ingestion Module. It provides a deep dive into the architecture, API contracts, and operational procedures required to manage the flow of data into the platform. It is designed for SREs, on-call engineers, and developers who need to understand, operate, or extend the ingestion pipeline.


1. Mission & Guiding Principles

The Ingestion Module's mission is to provide a single, reliable, and extensible gateway for all external data entering the Labeeb platform.

Its architecture is founded on these core principles:

  • Asynchronous & Resilient: All processing is handled via a robust, queue-based system (Laravel Horizon & Redis). This allows the API to acknowledge ingestion requests instantly while complex validation, normalization, and persistence tasks are performed reliably in the background. This design prevents data loss during downstream failures and ensures high throughput.
  • Strategy Pattern for Extensibility: The module uses the Strategy design pattern to decouple the core ingestion logic from the specifics of any single data source. New data providers (e.g., for a new news API or a website's sitemap) can be added as self-contained "Adapter" classes with zero changes to the core orchestration logic.
  • Strict, Versioned Contracts: All data enters through a single, versioned API endpoint (/v1/ingest/articles). This strict contract ensures data integrity and allows the Scraper and other client services to evolve independently without breaking the core platform.
  • Database as the Source of Truth: The module is the sole writer to the core articles and sources tables in the PostgreSQL database, ensuring data consistency and a single, authoritative system of record.

2. Architecture Deep Dive

The module is composed of several key components that work together to create a flexible and robust pipeline.

flowchart TD
  %% Styles
  classDef ext fill:#e0f2fe,stroke:#0ea5e9,stroke-width:1px;
  classDef store fill:#f0fdf4,stroke:#22c55e,stroke-width:1px;
  classDef process fill:#f8fafc,stroke:#64748b,stroke-width:1px;
  classDef optional stroke-dasharray: 5 5;

  %% External producers
  subgraph Producers
    SCR["Scraper Service
POST /ingest"]:::ext BCK["Backfill/CLI"]:::ext WH["Partner Webhook (optional)"]:::ext end %% Ingestion service (API) subgraph Ingestion Service API direction LR ING_API["/ingest Controller
• JSON schema & per-item RFC7807
• size caps / auth"]:::process ING_Q[(Redis Queue)]:::store WKR["Persist Worker
• Normalize (clean, lang)
• Canonicalize URL
• content_hash"]:::process IDED{"Idempotent?
(canonical_url + content_hash)"}:::process OS[("OpenSearch `news_docs`")]:::store OUT[(Outbox/Event
article.upserted / deleted)]:::store METRICS[(Prometheus /metrics)]:::store LOGS[(Structured Logs)]:::store end %% Vectorize sync (downstream) subgraph Cloudflare Sync direction LR CHUNK["Chunker"]:::process SYNC["Vectorize Syncer
• embed (Workers AI)
• upsert/delete (Vectorize)"]:::process VEC[("Vectorize Index")]:::store end %% Flows SCR --> ING_API BCK --> ING_API WH --> ING_API ING_API --> ING_Q ING_Q --> WKR --> IDED IDED -- duplicate --> METRICS IDED -- new/updated --> OS IDED -- deleted --> OUT OS --> OUT OUT --> CHUNK --> SYNC --> VEC %% Obs ING_API -. "request metrics / logs" .-> METRICS ING_API -. logs .-> LOGS WKR -. job stats .-> METRICS SYNC -. sync stats .-> METRICS

Data Flow Diagram (End-to-End)

The following DFD complements the architecture view by showing all operational entrypoints, including CLI commands, push/pull ingestion paths, queues, stores, and the analysis/indexing steps. It aligns with api/openapi.json and scraper/openapi.json.

flowchart TD
  classDef ext fill:#e0f2fe,stroke:#0ea5e9,stroke-width:1px;
  classDef process fill:#f8fafc,stroke:#64748b,stroke-width:1px;
  classDef store fill:#f0fdf4,stroke:#22c55e,stroke-width:1px;
  classDef queue fill:#fff7ed,stroke:#f97316,stroke-width:1px;
  classDef optional stroke-dasharray: 5 5;

  %% External systems
  subgraph External
    SCR[Scraper Service
Endpoints: /scrape, /profiles, /profiles/reload]:::ext PARTNER[Partner Webhooks]:::ext NEWSAPI[News Providers
e.g. NewsData, MediaStack, RSS]:::ext end %% CLI entrypoints subgraph CLI Commands CS["crawl:source
• SourceRegistrar (--create)
• CrawlOrchestrator (--sync)"]:::process SYNC["scraper:sync-sources
• ScraperPushSyncService -> GET /profiles
• optional prune/dry-run"]:::process NP["news:pull
• Provider latest/archive
• Normalize -> upsert"]:::process OS_SETUP["opensearch:setup
• Templates + ISM policies"]:::process end %% Ingestion API (push) subgraph Ingestion API CTRL["IngestArticlesController
(POST /v1/ingest/articles)"]:::process IDEMP[(Redis Idempotency Cache
Idempotency-Key)]:::store VAL["Validate JSON -> DTO (IngestedItem)"]:::process SR["SourceResolver"]:::process APROC_SYNC["ArticleProcessor.normalizeAndDeduplicate
(sync)"]:::process end %% Pull orchestration subgraph Pull Orchestration ORCH["CrawlOrchestrator"]:::process LOCK[(Per-source Lock)]:::store ADP["SourceAdapter(s)
• RssAdapter
• NewsDataSourceAdapter
• ScraperPullAdapter"]:::process end %% Core infra PG[(PostgreSQL
sources, articles, contents, entities, crawl_runs)]:::store Q[(Redis Queue 'ingestion')]:::queue %% Workers subgraph Workers NSJ["NormalizeAndStoreJob
(pull path)"]:::process APROC_ASYNC["ArticleProcessor.normalizeAndDeduplicate
(async)"]:::process AAIJ["AnalyzeAndIndexJob"]:::process NER["NerService"]:::process EMB["EmbeddingsService"]:::process end %% Search/Index OS[(OpenSearch)]:::store %% Observability METRICS[(Prometheus /metrics)]:::store LOGS[(Structured Logs)]:::store %% Flows: CLI CS --> ORCH CS -- create --> PG SYNC --> SCR:::ext SYNC -- upsert/prune --> PG NP -- fetch via provider --> NEWSAPI NP -- normalize/upsert --> PG OS_SETUP --> OS %% Flows: Push ingestion SCR -- push --> CTRL PARTNER --> CTRL CTRL -. idempotency .- IDEMP CTRL --> VAL --> SR --> APROC_SYNC --> PG APROC_SYNC -- accepted --> AAIJ CTRL -. req metrics/logs .-> METRICS CTRL -. logs .-> LOGS %% Flows: Pull ingestion CS --> ORCH ORCH --> LOCK ORCH --> ADP ADP -- discover --> NEWSAPI ADP -- yield DTOs --> ORCH --> Q ORCH -. run metrics/logs .-> METRICS %% Queue consumption Q --> NSJ --> APROC_ASYNC --> PG NSJ -- enqueue --> AAIJ %% Analysis + Indexing AAIJ --> NER --> PG AAIJ --> EMB --> OS AAIJ --> OS NSJ -. job stats .-> METRICS AAIJ -. job stats .-> METRICS

Architectural Zones

A zone-based view that groups components by purpose and boundary.

%% ----------------------------------------------------------------------------
%% Project Labeeb - Master Data Flow Diagram (DFD) - v3.1 (Corrected)
%% ----------------------------------------------------------------------------
%% Improved to the MAX by Gemini
%% ----------------------------------------------------------------------------

flowchart LR
%% ---- STYLING & VISUAL LANGUAGE ----
%% A rich visual language is defined for maximum clarity and professional aesthetic.
    classDef platform stroke:#475569,stroke-width:2px,fill:#f8fafc,color:#334155;
    classDef cloudPlatform stroke:#059669,stroke-width:2px,fill:#f0fdf4,color:#047857;
    classDef live fill:#ecfdf5,stroke:#22c55e,stroke-width:1.5px,color:#15803d;
    classDef mvp fill:#fefce8,stroke:#eab308,stroke-width:1.5px,color:#713f12;
    classDef later fill:#fee2e2,stroke:#ef4444,stroke-width:1.5px,color:#991b1b;
    classDef service fill:#eef2ff,stroke:#6366f1,stroke-width:1.5px;
    classDef worker fill:#fffbeb,stroke:#f59e0b,stroke-width:1.5px;
    classDef datastore fill:#f1f5f9,stroke:#64748b,stroke-width:1.5px;
    classDef queue fill:#fff7ed,stroke:#f97316,stroke-width:1.5px;
    classDef trigger fill:#f5f3ff,stroke:#8b5cf6,stroke-width:1.5px;
    classDef external fill:#f8fafc,stroke:#9ca3af,stroke-width:1.5px,stroke-dasharray:3 3;
    classDef optional stroke-dasharray: 3 3, fill:#f4f4f5;

%% ---- ARCHITECTURAL ZONES ----
%% The system is divided into clear, logical zones for easy understanding.

    subgraph Inputs ["fa:fa-sign-in-alt Inputs & Triggers"]
        direction TB
        CLI["CLI Commands
(artisan)"]:::trigger USER_QUERY["User Query
(via Public API)"]:::trigger end subgraph CorePlatform ["fa:fa-server Core Platform (DigitalOcean)"] direction TB subgraph ScraperService ["Scraper Service (Python/FastAPI)"] direction LR SCR_API["API Server
/profiles, /scrape"]:::service SCR_Workers["Fetcher Workers
Parses articles into DTOs"]:::worker end subgraph ApiService ["API Service (PHP/Laravel)"] direction LR API_Orchestrator["Crawl Orchestrator
Manages pull-based ingestion"]:::service API_Ingest["Ingest Controller
POST /v1/ingest/articles"]:::service API_Processor["Article Processor
Normalizes, dedupes & persists"]:::service end subgraph CoreAI ["Core AI Logic & Orchestration (in API Service)"] direction LR RAG_Orchestrator["RAG Orchestrator
Hybrid Search & Reranking"]:::service Analysis_Orchestrator["Analysis Orchestrator
Sends prompts to AI Gateway"]:::service end subgraph Async ["Async Processing"] direction LR RedisQueues[("fa:fa-layer-group Redis Queues
'ingestion', 'analysis'")]:::queue AsyncWorkers["Job Workers
• NormalizeAndStoreJob
• AnalyzeAndIndexJob"]:::worker end end subgraph Cloudflare ["fa:fa-cloudflare Edge AI Platform (Cloudflare)"] direction TB CF_Gateway["AI Gateway
Universal LLM Endpoint"]:::service CF_RAG_Worker["RAG Query Worker
Embeds query, searches Vectorize"]:::worker CF_Vectorize["fa:fa-database Cloudflare Vectorize
Vector Store"]:::datastore end subgraph Persistence ["fa:fa-database Data & Persistence"] direction TB PG["PostgreSQL
Authoritative Datastore"]:::datastore OS["OpenSearch
Keyword Search & Analytics"]:::datastore Observability["Observability
Prometheus & Logs"]:::datastore end %% ---- DATA FLOW PATHWAYS ---- %% We now trace the primary "stories" of how data moves through the system. %% Path 1: The Ingestion Pipeline (Getting data IN) subgraph IngestionPipeline [" "] style IngestionPipeline fill:none,stroke:none CLI -- "crawl:source" --> API_Orchestrator; API_Orchestrator -- "Pulls from" --> EXT_Providers["fa:fa-newspaper News Providers
(RSS, APIs)"]:::external; API_Orchestrator -- "Or triggers" --> SCR_API; SCR_API -- "Dispatches jobs to" --> SCR_Workers; SCR_Workers -- "Fetches from" --> EXT_Providers; SCR_Workers -- "Pushes DTOs to" --> API_Ingest; EXT_Providers -- " " --> SCR_Workers API_Orchestrator -- "Enqueues job" --> RedisQueues; RedisQueues -- "Processed by" --> AsyncWorkers; AsyncWorkers -- "Uses" --> API_Processor; API_Ingest -- "Uses" --> API_Processor; API_Processor -- "1. Persists to" --> PG; API_Processor -- "2. Enqueues job" --> RedisQueues; AsyncWorkers -- "3. Indexes in" --> OS; AsyncWorkers -- "4. (Optional) Upserts to" --> CF_Vectorize; end %% Path 2: The RAG & Analysis Pipeline (Getting answers OUT) subgraph AnalysisPipeline [" "] style AnalysisPipeline fill:none,stroke:none USER_QUERY -- "Initiates request" --> RAG_Orchestrator; RAG_Orchestrator -- "a. Keyword search" --> OS; RAG_Orchestrator -- "b. Vector search" --> CF_RAG_Worker; CF_RAG_Worker -- "Queries" --> CF_Vectorize; OS -- "BM25 Results" --> RAG_Orchestrator; CF_RAG_Worker -- "Vector Results" --> RAG_Orchestrator; RAG_Orchestrator -- "Builds" --> EvidencePack["fa:fa-file-alt Evidence Pack
Ranked, relevant context"]; EvidencePack --> Analysis_Orchestrator; Analysis_Orchestrator -- "Prompts for Stance,
Conflict, Explanation" --> CF_Gateway; CF_Gateway -- "Routes to" --> EXT_LLMs["fa:fa-brain LLMs
(Llama, Claude, etc.)"]:::external; EXT_LLMs -- "Returns analysis" --> Analysis_Orchestrator; Analysis_Orchestrator -- "Generates" --> Verdict["fa:fa-gavel Final Verdict
ClaimReview JSON-LD"]; Verdict -- "Persists to" --> OS; Verdict -- "Serves via" --> FinalAPI["fa:fa-broadcast-tower Public API & UI"]; end %% Path 3: Observability ApiService & ScraperService & AsyncWorkers -- "Emit Metrics & Logs" --> Observability %% ---- APPLY STATUS & COMPONENT STYLES ---- classDef platform stroke:#475569,stroke-width:2px,fill:#f8fafc,color:#334155; classDef cloudPlatform stroke:#059669,stroke-width:2px,fill:#f0fdf4,color:#047857; classDef live fill:#ecfdf5,stroke:#22c55e,stroke-width:1.5px,color:#15803d; classDef mvp fill:#fefce8,stroke:#eab308,stroke-width:1.5px,color:#713f12; classDef later fill:#fee2e2,stroke:#ef4444,stroke-width:1.5px,color:#991b1b; classDef service fill:#eef2ff,stroke:#6366f1,stroke-width:1.5px; classDef worker fill:#fffbeb,stroke:#f59e0b,stroke-width:1.5px; classDef datastore fill:#f1f5f9,stroke:#64748b,stroke-width:1.5px; classDef queue fill:#fff7ed,stroke:#f97316,stroke-width:1.5px; classDef trigger fill:#f5f3ff,stroke:#8b5cf6,stroke-width:1.5px; classDef external fill:#f8fafc,stroke:#9ca3af,stroke-width:1.5px,stroke-dasharray:3 3; classDef optional stroke-dasharray: 3 3, fill:#f4f4f5; %% Statuses class SCR_API,SCR_Workers,API_Orchestrator,API_Ingest,API_Processor,RedisQueues,AsyncWorkers,PG,OS live; class RAG_Orchestrator,CF_RAG_Worker,CF_Vectorize,EvidencePack mvp; class Analysis_Orchestrator,CF_Gateway,EXT_LLMs,Verdict,FinalAPI later;

Key Components

  • CrawlOrchestrator (Service): The main entry point. It acquires a lock on a Source to prevent concurrent runs, uses the ProviderFactory to get the correct adapter, and dispatches jobs to the queue for each item discovered by the adapter.
  • ProviderFactory (Service): A factory that dynamically instantiates the correct SourceAdapter based on the provider key in a Source model (e.g., newsdata, rss).
  • SourceAdapter (Contract): A PHP interface that defines the contract for all data providers. Each provider must implement a discover() method that yields a stream of IngestedItem DTOs.
  • IngestedItem (DTO): A simple, immutable Data Transfer Object that represents a single scraped article, ensuring structured data is passed between components.
  • ArticleRepository (Repository): A class that encapsulates all the complex database logic for creating, updating, and deduplicating articles, their content, and their relationships.
  • AnalyzeAndIndexJob (Job): The primary background job. It takes an IngestedItem, normalizes its content, uses the ArticleRepository to persist it, and then triggers indexing in OpenSearch.

3. Ingestion API Contract (v1)

This contract defines the rules for all data submitted to the platform.

  • Endpoint: POST /v1/ingest/articles
  • Authentication: Authorization: Bearer <INGEST_TOKEN>
  • Headers:
    • Idempotency-Key: <source_id>:<external_id> (Recommended for single-article posts)
    • X-Contract-Version: 1 (Optional)

Provider category mapping

When a source only supplies a provider-specific label (e.g., "Politics"), send it as category_label along with the provider. The API consults config/category_map.php to translate the label into our canonical category_id, sets category_source to provider, clamps any category_confidence into the 0–100 range, and fills taxonomy_version automatically.

Request Payloads

The endpoint accepts a single article object or a batch of articles.

{
  "articles": [
    {
      "source_id": "verify-sy",
      "external_id": "abc-123",
      "url": "https://example.com/post/1",
      "canonical_url": null,
      "title": "Example title",
      "content": "Plain text body only (no HTML).",
      "lang": "ar",
      "published_at": "2025-08-14T09:10:00Z",
      "authors": ["Author Name"],
      "images": ["https://example.com/cover.jpg"],
      "raw": { "provider": "rss", "feed": "..." }
    }
  ]
}
{
  "source_id": "verify-sy",
  "external_id": "abc-123",
  "url": "https://example.com/post/1",
  "title": "Example title",
  "content": "Plain text body only (no HTML).",
  "lang": "ar",
  "published_at": "2025-08-14T09:10:00Z",
  "authors": ["Author Name"],
  "images": [],
  "raw": { "provider": "rss" }
}

Field Constraints

  • title: 1–512 characters
  • content: 200–200,000 characters (longer content will be truncated)
  • lang: ar, en, or null
  • published_at: ISO 8601 format or null
  • authors, images: Max 25 items per array

Response Body

The API responds with 202 Accepted when the request is well-formed. If any item conflicts with existing content, the endpoint returns 409 and an application/problem+json body summarizing the batch. Each item’s status is detailed in the results array.

Example Response
{
  "schema": "ingest.v1",
  "summary": {"request_id": "<uuid>", "received": 3, "accepted": 2, "failed": 1, "conflicts": 0},
  "results": [
    {"external_id":"abc-123","status":"accepted","article_id":12345},
    {"external_id":"abc-124","status":"duplicate","article_id":10221},
    {"external_id":"abc-125","status":"failed","error":{"type":"/errors/invalid-url","title":"Invalid Payload"}}
  ]
}
Item Status Description
accepted The article was new and has been successfully persisted and queued for analysis.
duplicate An article with the same content_hash or unique identifiers already exists. No action was taken.
conflict The (source_id, external_id) pair already exists but points to different content. The batch is rejected with 409.
failed The article failed validation (e.g., missing a required field, content too short). See the error object for details.

Quick cURL Examples

curl -X POST http://localhost:8000/v1/ingest/articles \
 -H "Authorization: Bearer $INGEST_TOKEN" \
 -H "Content-Type: application/json" \
 -H "Idempotency-Key: verify-sy:abc-123" \
 -d '{
  "source_id":"verify-sy",
  "external_id":"abc-123",
  "url":"https://example.com/post/1",
  "title":"Example title",
  "content":"Plain text...",
  "lang":"ar",
  "published_at":"2025-08-14T09:10:00Z",
  "authors":["Name"],
  "images":[],
  "raw":{"provider":"rss"}
 }'
curl -X POST http://localhost:8000/v1/ingest/articles \
 -H "Authorization: Bearer $INGEST_TOKEN" \
 -H "Content-Type: application/json" \
 -d '{
  "articles":[{ "source_id":"verify-sy","external_id":"abc-123","url":"https://example.com/post/1","title":"t1","content":"..."},
           { "source_id":"verify-sy","external_id":"abc-124","url":"https://example.com/post/2","title":"t2","content":"..."}]
 }'

4. How to Add a New Data Provider

The ingestion module is designed for easy extensibility. To add support for a new data source (e.g., a new news API or a custom RSS feed), follow these steps:

Step 1: Create the Provider Class

First, create a new PHP class in the /api/app/Ingestion/Providers directory. This class must implement the SourceAdapter contract.

Example: SimpleRssProvider.php

<?php
// in /api/app/Ingestion/Providers/SimpleRssProvider.php

namespace App\Ingestion\Providers;

use App\Ingestion\Contracts\SourceAdapter;
use App\Ingestion\DTO\IngestedItem;
use App\Models\Source;
use Generator;
use Illuminate\Support\Facades\Http;
use Illuminate\Support\Facades\Log;
use SimpleXMLElement;

class SimpleRssProvider implements SourceAdapter
{
    public function discover(Source $source): Generator
    {
        Log::info("SimpleRssProvider: Discovering articles for source '{$source->key}'.");

        try {
            $response = Http::timeout(30)->get($source->feed_url);
            $xml = new SimpleXMLElement($response->body());

            foreach ($xml->channel->item as $item) {
                yield new IngestedItem(
                    externalId: (string) $item->guid,
                    url: (string) $item->link,
                    title: (string) $item->title,
                    summary: (string) $item->description,
                    content: (string) ($item->children('content', true)->encoded ?? ''),
                    publishedAt: (string) $item->pubDate,
                    sourceMeta: [
                        'provider' => 'simple-rss',
                        'name' => (string) $xml->channel->title,
                    ],
                    raw: $item->asXML()
                );
            }
        } catch (\Throwable $e) {
            Log::error("SimpleRssProvider: Failed to fetch or parse feed for source '{$source->key}'.", [
                'error' => $e->getMessage(),
            ]);
            return;
        }
    }
}

Step 2: Register the Provider in the Factory

Next, open the ProviderFactory and add your new provider to the match statement.

// in /api/app/Ingestion/Services/ProviderFactory.php

// ...
public function make(Source $source): SourceAdapter
{
    return match ($source->provider) {
        'newsdata' => new NewsDataProvider($this->config->get('services.newsdata')),
        'rss' => new RssAdapter(),

        // Add your new provider here
        'simple-rss' => new SimpleRssProvider(),

        default => throw new \InvalidArgumentException(
            "Unsupported provider type: [{$source->provider}] for source [{$source->key}]"
        ),
    };
}
// ...

Step 3: Create a Source in the Database

Finally, add a new record to your sources table. The provider column must match the key you registered in the factory.

Example SQL:

INSERT INTO sources (
    "key", "provider", "name", "feed_url", "language", "active", "created_at", "updated_at"
) VALUES (
    'example_rss_feed', 'simple-rss', 'Example Blog RSS', 'https://example.com/rss.xml', 'en', true, NOW(), NOW()
);

Step 4: Run the Ingestion

That's it! You can now run the ingestion for your new source.

docker compose exec api php artisan crawl:source example_rss_feed


5. Scraper Integration (Push/Pull)

The Labeeb Scraper service plays a crucial role in the ingestion pipeline by fetching and normalizing articles from external sources. It integrates with the API's ingestion module through two primary mechanisms:

  • Pull Mode (ScraperPullAdapter): In this mode, the API's CrawlOrchestrator initiates a scrape job on the Scraper service. The Scraper then fetches data and returns it to the API, which processes it through the ingestion pipeline. This is handled by the App\Ingestion\Adapters\ScraperPullAdapter.
  • Push Mode (ScraperPushAdapter): In this mode, the Scraper service autonomously fetches data and pushes it directly to the API's ingestion endpoint (/v1/ingest/articles). This is handled by the App\Ingestion\Adapters\ScraperPushAdapter, which acts as a no-op adapter within the API's discovery process, as the data is already being pushed.

All incoming articles, whether from pull or push mechanisms, are received by the App\Http\Controllers\IngestArticlesController at the /v1/ingest/articles endpoint (defined in api/routes/api.php). This controller is responsible for validating the incoming data and dispatching it for further processing.

For detailed information on the Scraper service's architecture, endpoints, and operational procedures, please refer to its dedicated documentation:


6. Relations Enrichment Callback

Endpoint Overview

  • Endpoint: POST /ingest/relations
  • Authentication: Authorization: Bearer <INGEST_TOKEN> (same credential as /v1/ingest/articles).
  • Producer: AI-Box relations worker posting the RelationsPackage payload.
  • Purpose: Persist typed event–argument relations extracted by NLP Lab, while maintaining request-tracing metadata.

Request Contract (summary)

{
  "doc_id": "123",
  "lang": "ar",
  "task_version": "re-hadath-v1:model-x",
  "nlp_lab_version": "2.3.4",
  "enrichment_status": "ok | cached | timeout | error | skipped",
  "relations": [
    {
      "type": "hasAgent",
      "args": {
        "agent": "وزارة الصحة",
        "event": "أعلنت",
        "location": "غزة"
      },
      "confidence": 0.86,
      "offsets": {"start": 120, "end": 168}
    }
  ],
  "enrich_ms": 143
}

Persistence Rules

  • The controller validates the payload, propagates X-Request-Id, and stores the full package under article.raw.ai.relations along with raw.ai.re_rid.
  • When enrichment_status === "ok":
  • Existing article_relations rows are replaced with the new set, preserving sidecar-provided {start,end} offsets.
  • Entities referenced in relation arguments are upserted via EntityRepository, and their IDs are tracked in each row's meta.entities map.
  • article.raw.ai.ie.relations_summary is updated with type counts for fast reads.
  • For non-ok statuses, the payload is captured in raw.ai.relations but previously stored relations are left untouched; the API responds with 202 Accepted so upstream workers can retry later.

Observability

  • X-Request-Id is echoed for tracing across AI-Box, the worker, and the API.
  • Relation inserts participate in the existing Prometheus instrumentation via the ingestion service's database metrics.