Batch Processing
Process large volumes of text efficiently.
Overview
For high-volume verification, use these strategies:
- Combine text into larger payloads
- Use async/concurrent requests
- Implement request queuing
- Cache results
Combining Text
Instead of one request per item, combine related text:
# Slow: 100 requests
for article in articles:
result = client.verify(article.text)
# Fast: 1 request (or fewer)
combined = "\n\n---\n\n".join(a.text for a in articles)
result = client.verify(combined)With Claim Mapping
Track which claims belong to which source:
def batch_verify(items: list[dict]) -> list[dict]:
# Combine with markers
combined_text = ""
markers = []
for i, item in enumerate(items):
start_pos = len(combined_text)
combined_text += f"\n\n[ITEM_{i}]\n{item['text']}"
markers.append({
"id": item["id"],
"start": start_pos,
"end": len(combined_text)
})
# Verify combined text
result = requests.post(
"https://api.glyphnet.io/v1/verify",
headers={"X-API-Key": API_KEY},
json={"text": combined_text}
).json()
# Map claims back to items
for claim in result["claims"]:
claim_pos = combined_text.find(claim["text"])
for marker in markers:
if marker["start"] <= claim_pos < marker["end"]:
claim["source_id"] = marker["id"]
break
return resultAsync Processing
Use asyncio for concurrent requests:
import asyncio
import aiohttp
async def verify_batch_async(texts: list[str], max_concurrent: int = 10) -> list[dict]:
semaphore = asyncio.Semaphore(max_concurrent)
async def verify_one(session: aiohttp.ClientSession, text: str) -> dict:
async with semaphore:
async with session.post(
"https://api.glyphnet.io/v1/verify",
headers={"X-API-Key": API_KEY, "Content-Type": "application/json"},
json={"text": text}
) as response:
return await response.json()
async with aiohttp.ClientSession() as session:
tasks = [verify_one(session, text) for text in texts]
return await asyncio.gather(*tasks)
# Usage
results = asyncio.run(verify_batch_async(texts, max_concurrent=20))Rate-Limited Queue
Process within rate limits:
import asyncio
from collections import deque
import time
class RateLimitedVerifier:
def __init__(self, requests_per_minute: int = 200):
self.rate = requests_per_minute
self.queue = deque()
self.results = {}
self.interval = 60.0 / requests_per_minute
async def add(self, id: str, text: str):
self.queue.append((id, text))
async def process(self):
while self.queue:
id, text = self.queue.popleft()
result = await self._verify(text)
self.results[id] = result
await asyncio.sleep(self.interval)
async def _verify(self, text: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.glyphnet.io/v1/verify",
headers={"X-API-Key": API_KEY},
json={"text": text}
) as response:
return await response.json()
# Usage
verifier = RateLimitedVerifier(requests_per_minute=200)
# Add items
for article in articles:
await verifier.add(article.id, article.text)
# Process all
await verifier.process()
# Get results
for article in articles:
result = verifier.results[article.id]Worker Pool Pattern
For production systems:
from concurrent.futures import ThreadPoolExecutor
import queue
import threading
class VerificationWorkerPool:
def __init__(self, num_workers: int = 5, rate_limit: int = 200):
self.task_queue = queue.Queue()
self.result_queue = queue.Queue()
self.workers = []
self.rate_limit = rate_limit
self.lock = threading.Lock()
self.request_times = []
for _ in range(num_workers):
worker = threading.Thread(target=self._worker)
worker.daemon = True
worker.start()
self.workers.append(worker)
def _wait_for_rate_limit(self):
with self.lock:
now = time.time()
# Remove requests older than 1 minute
self.request_times = [t for t in self.request_times if now - t < 60]
if len(self.request_times) >= self.rate_limit:
sleep_time = 60 - (now - self.request_times[0])
time.sleep(max(0, sleep_time))
self.request_times.append(time.time())
def _worker(self):
while True:
task_id, text = self.task_queue.get()
if task_id is None:
break
self._wait_for_rate_limit()
try:
response = requests.post(
"https://api.glyphnet.io/v1/verify",
headers={"X-API-Key": API_KEY},
json={"text": text}
)
self.result_queue.put((task_id, response.json()))
except Exception as e:
self.result_queue.put((task_id, {"error": str(e)}))
self.task_queue.task_done()
def submit(self, task_id: str, text: str):
self.task_queue.put((task_id, text))
def get_results(self, count: int) -> dict:
results = {}
for _ in range(count):
task_id, result = self.result_queue.get()
results[task_id] = result
return results
# Usage
pool = VerificationWorkerPool(num_workers=10, rate_limit=200)
for article in articles:
pool.submit(article.id, article.text)
results = pool.get_results(len(articles))Database Integration
Store results for later use:
import sqlite3
import hashlib
import json
class VerificationCache:
def __init__(self, db_path: str = "verification_cache.db"):
self.conn = sqlite3.connect(db_path)
self.conn.execute("""
CREATE TABLE IF NOT EXISTS verifications (
text_hash TEXT PRIMARY KEY,
result TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
def _hash(self, text: str) -> str:
return hashlib.sha256(text.encode()).hexdigest()
def get(self, text: str) -> dict | None:
cursor = self.conn.execute(
"SELECT result FROM verifications WHERE text_hash = ?",
(self._hash(text),)
)
row = cursor.fetchone()
return json.loads(row[0]) if row else None
def set(self, text: str, result: dict):
self.conn.execute(
"INSERT OR REPLACE INTO verifications (text_hash, result) VALUES (?, ?)",
(self._hash(text), json.dumps(result))
)
self.conn.commit()
# Usage with batch processing
cache = VerificationCache()
def verify_with_cache(text: str) -> dict:
cached = cache.get(text)
if cached:
return cached
result = requests.post(
"https://api.glyphnet.io/v1/verify",
headers={"X-API-Key": API_KEY},
json={"text": text}
).json()
cache.set(text, result)
return resultMonitoring Batch Jobs
import logging
from dataclasses import dataclass
from datetime import datetime
@dataclass
class BatchStats:
total: int = 0
completed: int = 0
verified: int = 0
flagged: int = 0
errors: int = 0
start_time: datetime = None
@property
def progress(self) -> float:
return (self.completed / self.total * 100) if self.total > 0 else 0
@property
def elapsed(self) -> float:
if self.start_time:
return (datetime.now() - self.start_time).total_seconds()
return 0
def batch_with_monitoring(texts: list[str]) -> tuple[list, BatchStats]:
stats = BatchStats(total=len(texts), start_time=datetime.now())
results = []
for text in texts:
try:
result = verify_with_cache(text)
results.append(result)
stats.completed += 1
if result.get("flagged"):
stats.flagged += 1
else:
stats.verified += 1
if stats.completed % 100 == 0:
logging.info(f"Progress: {stats.progress:.1f}% ({stats.completed}/{stats.total})")
except Exception as e:
stats.errors += 1
logging.error(f"Error: {e}")
logging.info(f"Batch complete: {stats.verified} verified, {stats.flagged} flagged, {stats.errors} errors")
return results, stats