Bulk Upload Example
Efficiently upload multiple documents to RAG collections.
Batch Upload with Python
import requests
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
def upload_single_document(file_path, collection_name, description=None):
try:
with open(file_path, "rb") as f:
data = {"collection_name": collection_name}
if description:
data["description"] = description
response = requests.post(
"http://localhost/api/v1/rag/upload",
headers={"Authorization": "Bearer YOUR_API_KEY"},
files={"file": f},
data=data,
timeout=60
)
response.raise_for_status()
return {
"success": True,
"file": os.path.basename(file_path),
"result": response.json()
}
except Exception as e:
return {
"success": False,
"file": os.path.basename(file_path),
"error": str(e)
}
def bulk_upload(directory, collection_name, max_workers=3):
supported_extensions = [".pdf", ".txt", ".md", ".docx", ".json"]
file_paths = []
# Collect files
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path):
ext = os.path.splitext(filename)[1].lower()
if ext in supported_extensions:
file_paths.append(file_path)
print(f"Found {len(file_paths)} files to upload")
# Upload in parallel
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(
upload_single_document,
path,
collection_name
): path for path in file_paths
}
for future in as_completed(futures):
result = future.result()
results.append(result)
status = "✓" if result["success"] else "✗"
print(f"{status} {result['file']}")
# Summary
successful = sum(1 for r in results if r["success"])
failed = len(results) - successful
print(f"\nSummary:")
print(f"Total: {len(results)}")
print(f"Successful: {successful}")
print(f"Failed: {failed}")
return results
# Usage
results = bulk_upload("./documents", "knowledge_base", max_workers=3)
Upload with Progress Tracking
import requests
import os
from tqdm import tqdm
def upload_with_progress(file_path, collection_name):
file_size = os.path.getsize(file_path)
with open(file_path, "rb") as f:
with tqdm(
total=file_size,
unit="B",
unit_scale=True,
desc=os.path.basename(file_path)
) as pbar:
def read_with_progress(size):
chunk = f.read(size)
pbar.update(len(chunk))
return chunk
response = requests.post(
"http://localhost/api/v1/rag/upload",
headers={"Authorization": "Bearer YOUR_API_KEY"},
files={"file": read_with_progress},
data={"collection_name": collection_name},
timeout=120
)
return response.json()
def bulk_upload_with_progress(directory, collection_name):
files = [f for f in os.listdir(directory)
if os.path.isfile(os.path.join(directory, f))]
results = []
for filename in tqdm(files, desc="Uploading files"):
file_path = os.path.join(directory, filename)
try:
result = upload_with_progress(file_path, collection_name)
results.append({
"file": filename,
"success": True,
"id": result.get("document_id")
})
except Exception as e:
results.append({
"file": filename,
"success": False,
"error": str(e)
})
return results
# Usage
results = bulk_upload_with_progress("./docs", "documentation")
Organized Upload by Type
import requests
import os
import shutil
def organize_and_upload(source_dir, collection_name):
# Organize files by type
type_dirs = {
"pdf": "./pdfs",
"text": "./text",
"markdown": "./markdown"
}
for dir_path in type_dirs.values():
os.makedirs(dir_path, exist_ok=True)
# Copy files to organized directories
for filename in os.listdir(source_dir):
file_path = os.path.join(source_dir, filename)
if not os.path.isfile(file_path):
continue
ext = os.path.splitext(filename)[1].lower()
if ext == ".pdf":
shutil.copy(file_path, os.path.join(type_dirs["pdf"], filename))
elif ext in [".txt", ".json"]:
shutil.copy(file_path, os.path.join(type_dirs["text"], filename))
elif ext == ".md":
shutil.copy(file_path, os.path.join(type_dirs["markdown"], filename))
# Upload each type
for type_name, dir_path in type_dirs.items():
print(f"\nUploading {type_name} files...")
files = [f for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, f))]
for filename in files:
file_path = os.path.join(dir_path, filename)
try:
with open(file_path, "rb") as f:
response = requests.post(
"http://localhost/api/v1/rag/upload",
headers={"Authorization": "Bearer YOUR_API_KEY"},
files={"file": f},
data={
"collection_name": collection_name,
"metadata": '{"type": "' + type_name + '"}'
}
)
print(f" Uploaded: {filename}")
except Exception as e:
print(f" Failed: {filename} - {e}")
# Usage
organize_and_upload("./unorganized", "organized_docs")
Upload with Error Recovery
import requests
import json
import os
import time
def upload_with_recovery(directory, collection_name, log_file="upload_log.json"):
# Load existing log
uploaded = set()
if os.path.exists(log_file):
with open(log_file, "r") as f:
log = json.load(f)
uploaded = set(log.get("uploaded", []))
files = [f for f in os.listdir(directory)
if os.path.isfile(os.path.join(directory, f)) and
f not in uploaded]
print(f"Files to upload: {len(files)}")
print(f"Already uploaded: {len(uploaded)}")
results = {"uploaded": list(uploaded), "failed": []}
for filename in files:
file_path = os.path.join(directory, filename)
try:
with open(file_path, "rb") as f:
response = requests.post(
"http://localhost/api/v1/rag/upload",
headers={"Authorization": "Bearer YOUR_API_KEY"},
files={"file": f},
data={"collection_name": collection_name},
timeout=60
)
response.raise_for_status()
result = response.json()
results["uploaded"].append(filename)
print(f"Uploaded: {filename}")
except Exception as e:
error = {"file": filename, "error": str(e)}
results["failed"].append(error)
print(f"Failed: {filename} - {e}")
# Save progress
with open(log_file, "w") as f:
json.dump(results, f, indent=2)
return results
# Usage
results = upload_with_recovery("./large_dataset", "docs")
print(f"\nUploaded: {len(results['uploaded'])}")
print(f"Failed: {len(results['failed'])}")
JavaScript Bulk Upload
class BulkUploader {
constructor(apiKey) {
this.apiKey = apiKey;
this.baseUrl = 'http://localhost/api/v1';
}
async uploadFile(file, collectionName, options = {}) {
const formData = new FormData();
formData.append('file', file);
formData.append('collection_name', collectionName);
if (options.description) {
formData.append('description', options.description);
}
if (options.metadata) {
formData.append('metadata', JSON.stringify(options.metadata));
}
const response = await fetch(`${this.baseUrl}/rag/upload`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`
},
body: formData
});
if (!response.ok) {
const error = await response.json();
throw new Error(error.message || 'Upload failed');
}
return await response.json();
}
async bulkUpload(files, collectionName, options = {}) {
const {
batchSize = 3,
onProgress,
onComplete
} = options;
const results = [];
let completed = 0;
let failed = 0;
for (let i = 0; i < files.length; i += batchSize) {
const batch = files.slice(i, i + batchSize);
const batchPromises = batch.map(async (file) => {
try {
const result = await this.uploadFile(file, collectionName);
return {
success: true,
file: file.name,
result
};
} catch (error) {
failed++;
return {
success: false,
file: file.name,
error: error.message
};
}
});
const batchResults = await Promise.all(batchPromises);
results.push(...batchResults);
completed += batchResults.length;
if (onProgress) {
onProgress(completed, files.length, failed);
}
}
if (onComplete) {
onComplete(results);
}
return results;
}
}
// Usage
const uploader = new BulkUploader('YOUR_API_KEY');
const fileInput = document.getElementById('file-input');
const files = Array.from(fileInput.files);
const results = await uploader.bulkUpload(files, 'documentation', {
batchSize: 3,
onProgress: (completed, total, failed) => {
console.log(`Progress: ${completed}/${total} (Failed: ${failed})`);
},
onComplete: (results) => {
console.log('All uploads complete:', results);
}
});
// Show summary
const successful = results.filter(r => r.success);
const failed_uploads = results.filter(r => !r.success);
console.log(`Successful: ${successful.length}`);
console.log(`Failed: ${failed_uploads.length}`);
React Bulk Upload Component
import React, { useState } from 'react';
function BulkUpload() {
const [files, setFiles] = useState([]);
const [progress, setProgress] = useState({ completed: 0, total: 0, failed: 0 });
const [results, setResults] = useState([]);
const handleFileSelect = (e) => {
setFiles(Array.from(e.target.files));
};
const uploadFiles = async () => {
const results = [];
let completed = 0;
let failed = 0;
for (const file of files) {
const formData = new FormData();
formData.append('file', file);
formData.append('collection_name', 'documentation');
try {
const response = await fetch('http://localhost/api/v1/rag/upload', {
method: 'POST',
headers: {
'Authorization': 'Bearer YOUR_API_KEY'
},
body: formData
});
const result = await response.json();
results.push({ success: true, file: file.name, result });
} catch (error) {
failed++;
results.push({ success: false, file: file.name, error: error.message });
}
completed++;
setProgress({ completed, total: files.length, failed });
}
setResults(results);
};
return (
<div className="p-6">
<h2 className="text-2xl font-bold mb-4">Bulk Upload</h2>
<input
type="file"
multiple
onChange={handleFileSelect}
className="mb-4"
/>
<p>Selected: {files.length} files</p>
{files.length > 0 && (
<button
onClick={uploadFiles}
className="px-4 py-2 bg-blue-500 text-white rounded"
>
Upload All
</button>
)}
{progress.total > 0 && (
<div className="mt-4">
<p>Progress: {progress.completed} / {progress.total}</p>
{progress.failed > 0 && (
<p className="text-red-500">Failed: {progress.failed}</p>
)}
</div>
)}
{results.length > 0 && (
<div className="mt-6">
<h3 className="font-bold mb-2">Results</h3>
{results.map((r, i) => (
<div key={i} className={r.success ? 'text-green-600' : 'text-red-600'}>
{r.file}: {r.success ? '✓' : '✗'}
{!r.success && <span className="ml-2">{r.error}</span>}
</div>
))}
</div>
)}
</div>
);
}
export default BulkUpload;
Best Practices
- Batch size - Use 3-5 concurrent uploads to avoid timeouts
- Error handling - Log failed uploads for retry
- Progress tracking - Show users upload progress
- File validation - Check file types before upload
- Recovery mechanism - Resume interrupted uploads
- Testing - Test with small batches first
Troubleshooting
Timeouts on Large Files
Solution:
- Increase timeout value
- Upload in smaller batches
- Reduce concurrent uploads
- Split large files
Rate Limiting
Solution:
- Reduce batch size
- Add delays between batches
- Implement exponential backoff
- Monitor API rate limits
Memory Issues
Solution:
- Process files one at a time
- Reduce concurrent uploads
- Use streaming uploads
- Clear memory between uploads
Next Steps
- Search Example - Search uploaded documents
- RAG Chat Example - Use documents in AI chat
- Managing Example - Manage uploaded documents