Guides
Batch Processing

Batch Processing

Process large volumes of text efficiently.

Overview

For high-volume verification, use these strategies:

  • Combine text into larger payloads
  • Use async/concurrent requests
  • Implement request queuing
  • Cache results

Combining Text

Instead of one request per item, combine related text:

# Slow: 100 requests
for article in articles:
    result = client.verify(article.text)
 
# Fast: 1 request (or fewer)
combined = "\n\n---\n\n".join(a.text for a in articles)
result = client.verify(combined)

With Claim Mapping

Track which claims belong to which source:

def batch_verify(items: list[dict]) -> list[dict]:
    # Combine with markers
    combined_text = ""
    markers = []
 
    for i, item in enumerate(items):
        start_pos = len(combined_text)
        combined_text += f"\n\n[ITEM_{i}]\n{item['text']}"
        markers.append({
            "id": item["id"],
            "start": start_pos,
            "end": len(combined_text)
        })
 
    # Verify combined text
    result = requests.post(
        "https://api.glyphnet.io/v1/verify",
        headers={"X-API-Key": API_KEY},
        json={"text": combined_text}
    ).json()
 
    # Map claims back to items
    for claim in result["claims"]:
        claim_pos = combined_text.find(claim["text"])
        for marker in markers:
            if marker["start"] <= claim_pos < marker["end"]:
                claim["source_id"] = marker["id"]
                break
 
    return result

Async Processing

Use asyncio for concurrent requests:

import asyncio
import aiohttp
 
async def verify_batch_async(texts: list[str], max_concurrent: int = 10) -> list[dict]:
    semaphore = asyncio.Semaphore(max_concurrent)
 
    async def verify_one(session: aiohttp.ClientSession, text: str) -> dict:
        async with semaphore:
            async with session.post(
                "https://api.glyphnet.io/v1/verify",
                headers={"X-API-Key": API_KEY, "Content-Type": "application/json"},
                json={"text": text}
            ) as response:
                return await response.json()
 
    async with aiohttp.ClientSession() as session:
        tasks = [verify_one(session, text) for text in texts]
        return await asyncio.gather(*tasks)
 
# Usage
results = asyncio.run(verify_batch_async(texts, max_concurrent=20))

Rate-Limited Queue

Process within rate limits:

import asyncio
from collections import deque
import time
 
class RateLimitedVerifier:
    def __init__(self, requests_per_minute: int = 200):
        self.rate = requests_per_minute
        self.queue = deque()
        self.results = {}
        self.interval = 60.0 / requests_per_minute
 
    async def add(self, id: str, text: str):
        self.queue.append((id, text))
 
    async def process(self):
        while self.queue:
            id, text = self.queue.popleft()
 
            result = await self._verify(text)
            self.results[id] = result
 
            await asyncio.sleep(self.interval)
 
    async def _verify(self, text: str) -> dict:
        async with aiohttp.ClientSession() as session:
            async with session.post(
                "https://api.glyphnet.io/v1/verify",
                headers={"X-API-Key": API_KEY},
                json={"text": text}
            ) as response:
                return await response.json()
 
# Usage
verifier = RateLimitedVerifier(requests_per_minute=200)
 
# Add items
for article in articles:
    await verifier.add(article.id, article.text)
 
# Process all
await verifier.process()
 
# Get results
for article in articles:
    result = verifier.results[article.id]

Worker Pool Pattern

For production systems:

from concurrent.futures import ThreadPoolExecutor
import queue
import threading
 
class VerificationWorkerPool:
    def __init__(self, num_workers: int = 5, rate_limit: int = 200):
        self.task_queue = queue.Queue()
        self.result_queue = queue.Queue()
        self.workers = []
        self.rate_limit = rate_limit
        self.lock = threading.Lock()
        self.request_times = []
 
        for _ in range(num_workers):
            worker = threading.Thread(target=self._worker)
            worker.daemon = True
            worker.start()
            self.workers.append(worker)
 
    def _wait_for_rate_limit(self):
        with self.lock:
            now = time.time()
            # Remove requests older than 1 minute
            self.request_times = [t for t in self.request_times if now - t < 60]
 
            if len(self.request_times) >= self.rate_limit:
                sleep_time = 60 - (now - self.request_times[0])
                time.sleep(max(0, sleep_time))
 
            self.request_times.append(time.time())
 
    def _worker(self):
        while True:
            task_id, text = self.task_queue.get()
            if task_id is None:
                break
 
            self._wait_for_rate_limit()
 
            try:
                response = requests.post(
                    "https://api.glyphnet.io/v1/verify",
                    headers={"X-API-Key": API_KEY},
                    json={"text": text}
                )
                self.result_queue.put((task_id, response.json()))
            except Exception as e:
                self.result_queue.put((task_id, {"error": str(e)}))
 
            self.task_queue.task_done()
 
    def submit(self, task_id: str, text: str):
        self.task_queue.put((task_id, text))
 
    def get_results(self, count: int) -> dict:
        results = {}
        for _ in range(count):
            task_id, result = self.result_queue.get()
            results[task_id] = result
        return results
 
# Usage
pool = VerificationWorkerPool(num_workers=10, rate_limit=200)
 
for article in articles:
    pool.submit(article.id, article.text)
 
results = pool.get_results(len(articles))

Database Integration

Store results for later use:

import sqlite3
import hashlib
import json
 
class VerificationCache:
    def __init__(self, db_path: str = "verification_cache.db"):
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS verifications (
                text_hash TEXT PRIMARY KEY,
                result TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
 
    def _hash(self, text: str) -> str:
        return hashlib.sha256(text.encode()).hexdigest()
 
    def get(self, text: str) -> dict | None:
        cursor = self.conn.execute(
            "SELECT result FROM verifications WHERE text_hash = ?",
            (self._hash(text),)
        )
        row = cursor.fetchone()
        return json.loads(row[0]) if row else None
 
    def set(self, text: str, result: dict):
        self.conn.execute(
            "INSERT OR REPLACE INTO verifications (text_hash, result) VALUES (?, ?)",
            (self._hash(text), json.dumps(result))
        )
        self.conn.commit()
 
# Usage with batch processing
cache = VerificationCache()
 
def verify_with_cache(text: str) -> dict:
    cached = cache.get(text)
    if cached:
        return cached
 
    result = requests.post(
        "https://api.glyphnet.io/v1/verify",
        headers={"X-API-Key": API_KEY},
        json={"text": text}
    ).json()
 
    cache.set(text, result)
    return result

Monitoring Batch Jobs

import logging
from dataclasses import dataclass
from datetime import datetime
 
@dataclass
class BatchStats:
    total: int = 0
    completed: int = 0
    verified: int = 0
    flagged: int = 0
    errors: int = 0
    start_time: datetime = None
 
    @property
    def progress(self) -> float:
        return (self.completed / self.total * 100) if self.total > 0 else 0
 
    @property
    def elapsed(self) -> float:
        if self.start_time:
            return (datetime.now() - self.start_time).total_seconds()
        return 0
 
def batch_with_monitoring(texts: list[str]) -> tuple[list, BatchStats]:
    stats = BatchStats(total=len(texts), start_time=datetime.now())
    results = []
 
    for text in texts:
        try:
            result = verify_with_cache(text)
            results.append(result)
 
            stats.completed += 1
            if result.get("flagged"):
                stats.flagged += 1
            else:
                stats.verified += 1
 
            if stats.completed % 100 == 0:
                logging.info(f"Progress: {stats.progress:.1f}% ({stats.completed}/{stats.total})")
 
        except Exception as e:
            stats.errors += 1
            logging.error(f"Error: {e}")
 
    logging.info(f"Batch complete: {stats.verified} verified, {stats.flagged} flagged, {stats.errors} errors")
    return results, stats