Skip to main content

Bulk Upload Example

Efficiently upload multiple documents to RAG collections.

Batch Upload with Python

import requests
import os
from concurrent.futures import ThreadPoolExecutor, as_completed

def upload_single_document(file_path, collection_name, description=None):
try:
with open(file_path, "rb") as f:
data = {"collection_name": collection_name}
if description:
data["description"] = description

response = requests.post(
"http://localhost/api/v1/rag/upload",
headers={"Authorization": "Bearer YOUR_API_KEY"},
files={"file": f},
data=data,
timeout=60
)

response.raise_for_status()
return {
"success": True,
"file": os.path.basename(file_path),
"result": response.json()
}
except Exception as e:
return {
"success": False,
"file": os.path.basename(file_path),
"error": str(e)
}

def bulk_upload(directory, collection_name, max_workers=3):
supported_extensions = [".pdf", ".txt", ".md", ".docx", ".json"]
file_paths = []

# Collect files
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path):
ext = os.path.splitext(filename)[1].lower()
if ext in supported_extensions:
file_paths.append(file_path)

print(f"Found {len(file_paths)} files to upload")

# Upload in parallel
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(
upload_single_document,
path,
collection_name
): path for path in file_paths
}

for future in as_completed(futures):
result = future.result()
results.append(result)
status = "✓" if result["success"] else "✗"
print(f"{status} {result['file']}")

# Summary
successful = sum(1 for r in results if r["success"])
failed = len(results) - successful

print(f"\nSummary:")
print(f"Total: {len(results)}")
print(f"Successful: {successful}")
print(f"Failed: {failed}")

return results

# Usage
results = bulk_upload("./documents", "knowledge_base", max_workers=3)

Upload with Progress Tracking

import requests
import os
from tqdm import tqdm

def upload_with_progress(file_path, collection_name):
file_size = os.path.getsize(file_path)

with open(file_path, "rb") as f:
with tqdm(
total=file_size,
unit="B",
unit_scale=True,
desc=os.path.basename(file_path)
) as pbar:

def read_with_progress(size):
chunk = f.read(size)
pbar.update(len(chunk))
return chunk

response = requests.post(
"http://localhost/api/v1/rag/upload",
headers={"Authorization": "Bearer YOUR_API_KEY"},
files={"file": read_with_progress},
data={"collection_name": collection_name},
timeout=120
)

return response.json()

def bulk_upload_with_progress(directory, collection_name):
files = [f for f in os.listdir(directory)
if os.path.isfile(os.path.join(directory, f))]

results = []

for filename in tqdm(files, desc="Uploading files"):
file_path = os.path.join(directory, filename)
try:
result = upload_with_progress(file_path, collection_name)
results.append({
"file": filename,
"success": True,
"id": result.get("document_id")
})
except Exception as e:
results.append({
"file": filename,
"success": False,
"error": str(e)
})

return results

# Usage
results = bulk_upload_with_progress("./docs", "documentation")

Organized Upload by Type

import requests
import os
import shutil

def organize_and_upload(source_dir, collection_name):
# Organize files by type
type_dirs = {
"pdf": "./pdfs",
"text": "./text",
"markdown": "./markdown"
}

for dir_path in type_dirs.values():
os.makedirs(dir_path, exist_ok=True)

# Copy files to organized directories
for filename in os.listdir(source_dir):
file_path = os.path.join(source_dir, filename)

if not os.path.isfile(file_path):
continue

ext = os.path.splitext(filename)[1].lower()

if ext == ".pdf":
shutil.copy(file_path, os.path.join(type_dirs["pdf"], filename))
elif ext in [".txt", ".json"]:
shutil.copy(file_path, os.path.join(type_dirs["text"], filename))
elif ext == ".md":
shutil.copy(file_path, os.path.join(type_dirs["markdown"], filename))

# Upload each type
for type_name, dir_path in type_dirs.items():
print(f"\nUploading {type_name} files...")

files = [f for f in os.listdir(dir_path) if os.path.isfile(os.path.join(dir_path, f))]

for filename in files:
file_path = os.path.join(dir_path, filename)
try:
with open(file_path, "rb") as f:
response = requests.post(
"http://localhost/api/v1/rag/upload",
headers={"Authorization": "Bearer YOUR_API_KEY"},
files={"file": f},
data={
"collection_name": collection_name,
"metadata": '{"type": "' + type_name + '"}'
}
)
print(f" Uploaded: {filename}")
except Exception as e:
print(f" Failed: {filename} - {e}")

# Usage
organize_and_upload("./unorganized", "organized_docs")

Upload with Error Recovery

import requests
import json
import os
import time

def upload_with_recovery(directory, collection_name, log_file="upload_log.json"):
# Load existing log
uploaded = set()
if os.path.exists(log_file):
with open(log_file, "r") as f:
log = json.load(f)
uploaded = set(log.get("uploaded", []))

files = [f for f in os.listdir(directory)
if os.path.isfile(os.path.join(directory, f)) and
f not in uploaded]

print(f"Files to upload: {len(files)}")
print(f"Already uploaded: {len(uploaded)}")

results = {"uploaded": list(uploaded), "failed": []}

for filename in files:
file_path = os.path.join(directory, filename)

try:
with open(file_path, "rb") as f:
response = requests.post(
"http://localhost/api/v1/rag/upload",
headers={"Authorization": "Bearer YOUR_API_KEY"},
files={"file": f},
data={"collection_name": collection_name},
timeout=60
)

response.raise_for_status()
result = response.json()
results["uploaded"].append(filename)
print(f"Uploaded: {filename}")

except Exception as e:
error = {"file": filename, "error": str(e)}
results["failed"].append(error)
print(f"Failed: {filename} - {e}")

# Save progress
with open(log_file, "w") as f:
json.dump(results, f, indent=2)

return results

# Usage
results = upload_with_recovery("./large_dataset", "docs")
print(f"\nUploaded: {len(results['uploaded'])}")
print(f"Failed: {len(results['failed'])}")

JavaScript Bulk Upload

class BulkUploader {
constructor(apiKey) {
this.apiKey = apiKey;
this.baseUrl = 'http://localhost/api/v1';
}

async uploadFile(file, collectionName, options = {}) {
const formData = new FormData();
formData.append('file', file);
formData.append('collection_name', collectionName);

if (options.description) {
formData.append('description', options.description);
}

if (options.metadata) {
formData.append('metadata', JSON.stringify(options.metadata));
}

const response = await fetch(`${this.baseUrl}/rag/upload`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.apiKey}`
},
body: formData
});

if (!response.ok) {
const error = await response.json();
throw new Error(error.message || 'Upload failed');
}

return await response.json();
}

async bulkUpload(files, collectionName, options = {}) {
const {
batchSize = 3,
onProgress,
onComplete
} = options;

const results = [];
let completed = 0;
let failed = 0;

for (let i = 0; i < files.length; i += batchSize) {
const batch = files.slice(i, i + batchSize);

const batchPromises = batch.map(async (file) => {
try {
const result = await this.uploadFile(file, collectionName);
return {
success: true,
file: file.name,
result
};
} catch (error) {
failed++;
return {
success: false,
file: file.name,
error: error.message
};
}
});

const batchResults = await Promise.all(batchPromises);
results.push(...batchResults);

completed += batchResults.length;
if (onProgress) {
onProgress(completed, files.length, failed);
}
}

if (onComplete) {
onComplete(results);
}

return results;
}
}

// Usage
const uploader = new BulkUploader('YOUR_API_KEY');

const fileInput = document.getElementById('file-input');
const files = Array.from(fileInput.files);

const results = await uploader.bulkUpload(files, 'documentation', {
batchSize: 3,
onProgress: (completed, total, failed) => {
console.log(`Progress: ${completed}/${total} (Failed: ${failed})`);
},
onComplete: (results) => {
console.log('All uploads complete:', results);
}
});

// Show summary
const successful = results.filter(r => r.success);
const failed_uploads = results.filter(r => !r.success);

console.log(`Successful: ${successful.length}`);
console.log(`Failed: ${failed_uploads.length}`);

React Bulk Upload Component

import React, { useState } from 'react';

function BulkUpload() {
const [files, setFiles] = useState([]);
const [progress, setProgress] = useState({ completed: 0, total: 0, failed: 0 });
const [results, setResults] = useState([]);

const handleFileSelect = (e) => {
setFiles(Array.from(e.target.files));
};

const uploadFiles = async () => {
const results = [];
let completed = 0;
let failed = 0;

for (const file of files) {
const formData = new FormData();
formData.append('file', file);
formData.append('collection_name', 'documentation');

try {
const response = await fetch('http://localhost/api/v1/rag/upload', {
method: 'POST',
headers: {
'Authorization': 'Bearer YOUR_API_KEY'
},
body: formData
});

const result = await response.json();
results.push({ success: true, file: file.name, result });
} catch (error) {
failed++;
results.push({ success: false, file: file.name, error: error.message });
}

completed++;
setProgress({ completed, total: files.length, failed });
}

setResults(results);
};

return (
<div className="p-6">
<h2 className="text-2xl font-bold mb-4">Bulk Upload</h2>

<input
type="file"
multiple
onChange={handleFileSelect}
className="mb-4"
/>

<p>Selected: {files.length} files</p>

{files.length > 0 && (
<button
onClick={uploadFiles}
className="px-4 py-2 bg-blue-500 text-white rounded"
>
Upload All
</button>
)}

{progress.total > 0 && (
<div className="mt-4">
<p>Progress: {progress.completed} / {progress.total}</p>
{progress.failed > 0 && (
<p className="text-red-500">Failed: {progress.failed}</p>
)}
</div>
)}

{results.length > 0 && (
<div className="mt-6">
<h3 className="font-bold mb-2">Results</h3>
{results.map((r, i) => (
<div key={i} className={r.success ? 'text-green-600' : 'text-red-600'}>
{r.file}: {r.success ? '✓' : '✗'}
{!r.success && <span className="ml-2">{r.error}</span>}
</div>
))}
</div>
)}
</div>
);
}

export default BulkUpload;

Best Practices

  1. Batch size - Use 3-5 concurrent uploads to avoid timeouts
  2. Error handling - Log failed uploads for retry
  3. Progress tracking - Show users upload progress
  4. File validation - Check file types before upload
  5. Recovery mechanism - Resume interrupted uploads
  6. Testing - Test with small batches first

Troubleshooting

Timeouts on Large Files

Solution:

  • Increase timeout value
  • Upload in smaller batches
  • Reduce concurrent uploads
  • Split large files

Rate Limiting

Solution:

  • Reduce batch size
  • Add delays between batches
  • Implement exponential backoff
  • Monitor API rate limits

Memory Issues

Solution:

  • Process files one at a time
  • Reduce concurrent uploads
  • Use streaming uploads
  • Clear memory between uploads

Next Steps