Skip to main content

Document Processing

Process documents through Extract and retrieve structured results.

Processing Flow

Document Upload → Validation → Image Conversion → Vision Model → JSON Extraction → Validation → Result
  1. Upload - Send file with template selection
  2. Validate - Check file type, size, and API key budget
  3. Convert - PDFs converted to images (200 DPI, max 1024x1024)
  4. Extract - Vision model processes images with template prompts
  5. Parse - Response parsed to JSON
  6. Validate - Output validated against template schema
  7. Return - Result with extracted data, validation status, and cost

Processing Documents

Basic Processing

import requests

with open("invoice.pdf", "rb") as f:
response = requests.post(
"http://localhost/api/v1/extract/process",
headers={"Authorization": "Bearer YOUR_API_KEY"},
files={"file": f},
data={"template_id": "detailed_invoice"}
)

result = response.json()
print(f"Status: {result['status']}")
print(f"Data: {result['result']['parsed_data']}")

Processing with Context

Pass context variables to fill template placeholders:

import json

context = {
"company_name": "Acme Corp",
"fiscal_year": "2024"
}

with open("invoice.pdf", "rb") as f:
response = requests.post(
"http://localhost/api/v1/extract/process",
headers={"Authorization": "Bearer YOUR_API_KEY"},
files={"file": f},
data={
"template_id": "detailed_invoice",
"context": json.dumps(context)
}
)

Response Format

Successful Processing

{
"job_id": "550e8400-e29b-41d4-a716-446655440000",
"status": "completed",
"file_metadata": {
"original_name": "invoice.pdf",
"file_type": "application/pdf",
"file_size": 245632,
"page_count": 2
},
"template_id": "detailed_invoice",
"model_used": "gpt-4-vision-preview",
"result": {
"parsed_data": {
"invoice_number": "INV-2024-001",
"vendor": {
"name": "Supplier Co",
"address": "123 Main St, City, ST 12345"
},
"line_items": [
{
"description": "Widget A",
"quantity": 10,
"unit_price": 25.00,
"amount": 250.00
}
],
"subtotal": 250.00,
"tax": 20.00,
"total": 270.00
},
"validation_errors": [],
"validation_warnings": []
},
"usage": {
"prompt_tokens": 1250,
"completion_tokens": 380,
"total_tokens": 1630,
"cost": 0.0245
},
"created_at": "2024-01-15T10:30:00Z",
"completed_at": "2024-01-15T10:30:05Z"
}

Validation Warnings

When extraction succeeds but data quality issues are detected:

{
"result": {
"parsed_data": {...},
"validation_errors": [],
"validation_warnings": [
"Date format non-standard: '15 Jan 2024' (expected YYYY-MM-DD)",
"Address incomplete: missing postal code"
]
}
}

Validation Errors

When extracted data fails schema validation:

{
"status": "completed",
"result": {
"parsed_data": {...},
"validation_errors": [
"Required field 'invoice_number' is missing",
"Field 'total_amount' must be a number, got string"
],
"validation_warnings": []
}
}

Job Management

For large documents or high volume, use async job management.

List Jobs

response = requests.get(
"http://localhost/api/v1/extract/jobs",
headers={"Authorization": "Bearer YOUR_API_KEY"},
params={
"status": "completed", # pending, processing, completed, failed
"limit": 10,
"offset": 0
}
)

for job in response.json()["jobs"]:
print(f"{job['job_id']}: {job['status']} - {job['file_metadata']['original_name']}")

Get Job Details

job_id = "550e8400-e29b-41d4-a716-446655440000"

response = requests.get(
f"http://localhost/api/v1/extract/jobs/{job_id}",
headers={"Authorization": "Bearer YOUR_API_KEY"}
)

job = response.json()
print(f"Status: {job['status']}")
print(f"Result: {job['result']}")

Settings

Get Current Settings

response = requests.get(
"http://localhost/api/v1/extract/settings",
headers={"Authorization": "Bearer YOUR_API_KEY"}
)

settings = response.json()
print(f"Default model: {settings['default_vision_model']}")

Update Default Model

response = requests.put(
"http://localhost/api/v1/extract/settings",
headers={"Authorization": "Bearer YOUR_API_KEY"},
json={"default_vision_model": "gpt-4o"}
)

List Available Models

response = requests.get(
"http://localhost/api/v1/extract/models",
headers={"Authorization": "Bearer YOUR_API_KEY"}
)

for model in response.json()["models"]:
if "vision" in model.get("capabilities", []):
print(f"{model['id']}: {model['name']}")

Processing Limits

LimitValue
Max file size10 MB
Max PDF pages20
Max image dimension1024 px (auto-resized)
Supported formatsPDF, JPG, PNG

Error Handling

File Validation Errors

response = requests.post(
"http://localhost/api/v1/extract/process",
headers={"Authorization": "Bearer YOUR_API_KEY"},
files={"file": open("document.docx", "rb")}, # Unsupported format
data={"template_id": "detailed_invoice"}
)

if response.status_code == 400:
error = response.json()
print(f"Error: {error['detail']}")
# "Unsupported file type: application/vnd.openxmlformats-officedocument.wordprocessingml.document"

Budget Exceeded

if response.status_code == 402:
error = response.json()
print(f"Budget exceeded: {error['detail']}")
# "API key budget exceeded. Remaining: $0.00, Required: ~$0.05"

Template Not Found

if response.status_code == 404:
error = response.json()
print(f"Template not found: {error['detail']}")

Cost Tracking

Extract uses the same token-based pricing as other Enclava inference. Costs are tracked per job:

result = response.json()

print(f"Prompt tokens: {result['usage']['prompt_tokens']}")
print(f"Completion tokens: {result['usage']['completion_tokens']}")
print(f"Total cost: ${result['usage']['cost']:.4f}")

Costs are deducted from API key budgets before processing begins. If budget is insufficient, the request is rejected.

Batch Processing

For processing multiple documents:

import os
from concurrent.futures import ThreadPoolExecutor

def process_document(filepath):
with open(filepath, "rb") as f:
response = requests.post(
"http://localhost/api/v1/extract/process",
headers={"Authorization": "Bearer YOUR_API_KEY"},
files={"file": f},
data={"template_id": "detailed_invoice"}
)
return filepath, response.json()

# Process all PDFs in a directory
pdf_files = [f for f in os.listdir("invoices/") if f.endswith(".pdf")]
pdf_paths = [os.path.join("invoices/", f) for f in pdf_files]

with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(process_document, pdf_paths))

for filepath, result in results:
print(f"{filepath}: {result['status']}")

Next Steps