Skip to main content

Managing Documents Example

Complete examples for managing RAG documents.

List All Documents

import requests

def list_all_documents(collection_name):
response = requests.get(
f"http://localhost/api/v1/rag/collections/{collection_name}/documents",
headers={"Authorization": "Bearer YOUR_API_KEY"}
)

return response.json()["documents"]

# List documents
documents = list_all_documents("documentation")

print(f"Total documents: {len(documents)}\n")
for doc in documents:
print(f"ID: {doc['id']}")
print(f"Name: {doc['name']}")
print(f"Chunks: {doc['chunk_count']}")
print(f"Uploaded: {doc['created_at']}")
print("-" * 40)

Paginated List

def list_documents_paginated(collection_name, page_size=50):
all_documents = []
page = 1

while True:
response = requests.get(
f"http://localhost/api/v1/rag/collections/{collection_name}/documents",
headers={"Authorization": "Bearer YOUR_API_KEY"},
params={
"page": page,
"page_size": page_size
}
)

data = response.json()
documents = data["documents"]
all_documents.extend(documents)

if len(documents) < page_size:
break

page += 1
print(f"Loaded page {page} ({len(all_documents)} documents)...")

return all_documents

# List all documents with pagination
documents = list_documents_paginated("large_collection")
print(f"\nTotal: {len(documents)} documents")

Search and Delete

def find_and_delete(collection_name, search_term):
# Find documents matching search term
documents = list_all_documents(collection_name)

matches = [doc for doc in documents
if search_term.lower() in doc['name'].lower()]

if not matches:
print(f"No documents found matching '{search_term}'")
return

print(f"Found {len(matches)} documents matching '{search_term}':")
for i, doc in enumerate(matches, 1):
print(f"{i}. {doc['name']} ({doc['id']})")

# Confirm deletion
confirm = input(f"\nDelete all {len(matches)} documents? (yes/no): ")
if confirm.lower() != "yes":
print("Cancelled.")
return

# Delete documents
deleted = 0
for doc in matches:
try:
response = requests.delete(
f"http://localhost/api/v1/rag/documents/{doc['id']}",
headers={"Authorization": "Bearer YOUR_API_KEY"}
)
deleted += 1
print(f"Deleted: {doc['name']}")
except Exception as e:
print(f"Failed to delete {doc['name']}: {e}")

print(f"\nDeleted {deleted} documents.")

# Find and delete documents
find_and_delete("documentation", "test")

Document Statistics

def get_document_stats(collection_name):
documents = list_all_documents(collection_name)

total_docs = len(documents)
total_chunks = sum(doc['chunk_count'] for doc in documents)

# Document types
types = {}
for doc in documents:
ext = doc['name'].split('.')[-1].lower()
types[ext] = types.get(ext, 0) + 1

# Age distribution
from datetime import datetime
now = datetime.now()
age_ranges = {
"today": 0,
"week": 0,
"month": 0,
"older": 0
}

for doc in documents:
uploaded = datetime.fromisoformat(doc['created_at'].replace('Z', '+00:00'))
delta = now - uploaded

if delta.days == 0:
age_ranges["today"] += 1
elif delta.days <= 7:
age_ranges["week"] += 1
elif delta.days <= 30:
age_ranges["month"] += 1
else:
age_ranges["older"] += 1

print(f"Collection: {collection_name}")
print(f"\nSummary:")
print(f" Total Documents: {total_docs}")
print(f" Total Chunks: {total_chunks}")
print(f" Avg Chunks/Doc: {total_chunks / total_docs:.1f}")

print(f"\nDocument Types:")
for ext, count in sorted(types.items()):
print(f" .{ext}: {count}")

print(f"\nAge Distribution:")
for range_name, count in age_ranges.items():
print(f" {range_name}: {count}")

get_document_stats("documentation")

Bulk Delete with Backup

import requests
import json
from datetime import datetime

def bulk_delete_with_backup(collection_name, document_ids):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"backup_{timestamp}.json"

# Backup document metadata
documents = list_all_documents(collection_name)
backup_data = []

for doc in documents:
if doc['id'] in document_ids:
backup_data.append(doc)

# Save backup
with open(backup_file, 'w') as f:
json.dump(backup_data, f, indent=2)

print(f"Backup saved to: {backup_file}")

# Delete documents
deleted = 0
failed = []

for doc_id in document_ids:
try:
response = requests.delete(
f"http://localhost/api/v1/rag/documents/{doc_id}",
headers={"Authorization": "Bearer YOUR_API_KEY"},
timeout=10
)
deleted += 1
print(f"Deleted: {doc_id}")
except Exception as e:
failed.append((doc_id, str(e)))
print(f"Failed: {doc_id} - {e}")

print(f"\nSummary:")
print(f"Deleted: {deleted}")
print(f"Failed: {len(failed)}")
print(f"Backup: {backup_file}")

return {
"deleted": deleted,
"failed": failed,
"backup": backup_file
}

# Bulk delete with backup
docs_to_delete = ["doc_1", "doc_2", "doc_3"]
result = bulk_delete_with_backup("documentation", docs_to_delete)

JavaScript Document Management

class DocumentManager {
constructor(apiKey) {
this.apiKey = apiKey;
this.baseUrl = 'http://localhost/api/v1';
}

async listDocuments(collectionName, page = 1, pageSize = 50) {
const response = await fetch(
`${this.baseUrl}/rag/collections/${collectionName}/documents?page=${page}&page_size=${pageSize}`,
{
headers: {
'Authorization': `Bearer ${this.apiKey}`
}
}
);

return await response.json();
}

async getDocument(documentId) {
const response = await fetch(
`${this.baseUrl}/rag/documents/${documentId}`,
{
headers: {
'Authorization': `Bearer ${this.apiKey}`
}
}
);

return await response.json();
}

async deleteDocument(documentId) {
const response = await fetch(
`${this.baseUrl}/rag/documents/${documentId}`,
{
method: 'DELETE',
headers: {
'Authorization': `Bearer ${this.apiKey}`
}
}
);

return await response.json();
}

async deleteMultiple(documentIds) {
const results = [];

for (const id of documentIds) {
try {
await this.deleteDocument(id);
results.push({ success: true, id });
} catch (error) {
results.push({ success: false, id, error: error.message });
}
}

return results;
}
}

// Usage
const manager = new DocumentManager('YOUR_API_KEY');

// List documents
const documents = await manager.listDocuments('documentation');
console.log(`Found ${documents.documents.length} documents`);

// Get document details
const doc = await manager.getDocument('doc_123');
console.log('Document:', doc);

// Delete multiple documents
const results = await manager.deleteMultiple(['doc_1', 'doc_2', 'doc_3']);
console.log('Delete results:', results);

React Document Management

import React, { useState, useEffect } from 'react';

function DocumentManager() {
const [documents, setDocuments] = useState([]);
const [selected, setSelected] = useState([]);
const [loading, setLoading] = useState(false);

const loadDocuments = async () => {
setLoading(true);
try {
const response = await fetch(
'http://localhost/api/v1/rag/collections/documentation/documents',
{
headers: {
'Authorization': 'Bearer YOUR_API_KEY'
}
}
);
const data = await response.json();
setDocuments(data.documents || []);
} catch (error) {
console.error('Error:', error);
} finally {
setLoading(false);
}
};

const deleteSelected = async () => {
if (!confirm(`Delete ${selected.length} documents?`)) return;

setLoading(true);
try {
for (const docId of selected) {
await fetch(
`http://localhost/api/v1/rag/documents/${docId}`,
{
method: 'DELETE',
headers: {
'Authorization': 'Bearer YOUR_API_KEY'
}
}
);
}
setSelected([]);
await loadDocuments();
} catch (error) {
console.error('Error:', error);
} finally {
setLoading(false);
}
};

useEffect(() => {
loadDocuments();
}, []);

const toggleSelect = (id) => {
setSelected(prev =>
prev.includes(id) ? prev.filter(x => x !== id) : [...prev, id]
);
};

return (
<div className="p-6">
<h2 className="text-2xl font-bold mb-4">Document Manager</h2>

<div className="mb-4 flex space-x-2">
<button
onClick={loadDocuments}
disabled={loading}
className="px-4 py-2 bg-gray-500 text-white rounded"
>
Refresh
</button>
{selected.length > 0 && (
<button
onClick={deleteSelected}
disabled={loading}
className="px-4 py-2 bg-red-500 text-white rounded"
>
Delete Selected ({selected.length})
</button>
)}
</div>

{loading && <p>Loading...</p>}

<div className="border rounded">
<table className="w-full">
<thead>
<tr className="bg-gray-100">
<th className="p-2 text-left">
<input
type="checkbox"
onChange={(e) => {
if (e.target.checked) {
setSelected(documents.map(d => d.id));
} else {
setSelected([]);
}
}}
/>
</th>
<th className="p-2 text-left">Name</th>
<th className="p-2 text-left">Chunks</th>
<th className="p-2 text-left">Uploaded</th>
</tr>
</thead>
<tbody>
{documents.map(doc => (
<tr key={doc.id} className="border-t">
<td className="p-2">
<input
type="checkbox"
checked={selected.includes(doc.id)}
onChange={() => toggleSelect(doc.id)}
/>
</td>
<td className="p-2">{doc.name}</td>
<td className="p-2">{doc.chunk_count}</td>
<td className="p-2">{new Date(doc.created_at).toLocaleDateString()}</td>
</tr>
))}
</tbody>
</table>
</div>

{documents.length === 0 && !loading && (
<p className="text-gray-500">No documents found.</p>
)}
</div>
);
}

export default DocumentManager;

Document Health Check

def check_document_health(collection_name):
documents = list_all_documents(collection_name)

issues = []

for doc in documents:
# Check chunk count
if doc['chunk_count'] == 0:
issues.append({
"doc_id": doc['id'],
"name": doc['name'],
"issue": "No chunks",
"severity": "high"
})

# Check very old documents
from datetime import datetime
uploaded = datetime.fromisoformat(doc['created_at'].replace('Z', '+00:00'))
age_days = (datetime.now() - uploaded).days

if age_days > 365:
issues.append({
"doc_id": doc['id'],
"name": doc['name'],
"issue": f"Very old ({age_days} days)",
"severity": "info"
})

# Summary
print(f"Health check for '{collection_name}':")
print(f"Total documents: {len(documents)}")
print(f"Issues found: {len(issues)}\n")

if issues:
print("Issues:")
for issue in issues:
print(f" [{issue['severity'].upper()}] {issue['name']}")
print(f" {issue['issue']}\n")
else:
print("No issues found.")

check_document_health("documentation")

Best Practices

  1. Backup before delete - Always backup document metadata
  2. Confirm deletion - Use confirmation prompts
  3. Monitor collection size - Keep collections manageable
  4. Regular cleanup - Remove outdated or duplicate documents
  5. Track changes - Log deletions for audit trail
  6. Test operations - Test on small batches first

Next Steps