9 min read
Python SDK
Official Python library for DocuRift API
Integrate DocuRift document processing into your Python applications. While an official SDK is in development, you can use the requests library for seamless API integration.
Installation
Install the required dependencies:
pip install requests python-dotenv
Quick Start
Here's a minimal example to process a document:
import os
import requests
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
API_KEY = os.getenv('DOCURIFT_API_KEY')
API_URL = 'https://api.docurift.com/v1'
headers = {
'X-API-Key': API_KEY
}
# Process a document
with open('invoice.pdf', 'rb') as f:
files = {'file': ('invoice.pdf', f, 'application/pdf')}
data = {'documentType': 'invoice'}
response = requests.post(
f'{API_URL}/documents/process',
headers=headers,
files=files,
data=data
)
result = response.json()
print(result)
Authentication
DocuRift uses API key authentication. Always store your API key securely using environment variables.
Setting Up Your API Key
Create a .env file in your project root:
# .env
DOCURIFT_API_KEY=frc_your_api_key_here
Load and use the API key:
import os
from dotenv import load_dotenv
load_dotenv()
API_KEY = os.getenv('DOCURIFT_API_KEY')
if not API_KEY:
raise ValueError("DOCURIFT_API_KEY environment variable is required")
headers = {
'X-API-Key': API_KEY
}
Processing Documents
Synchronous Processing
For small documents that process quickly, use synchronous processing:
import os
import requests
API_KEY = os.getenv('DOCURIFT_API_KEY')
API_URL = 'https://api.docurift.com/v1'
def process_document_sync(file_path: str, document_type: str = 'general') -> dict:
"""
Process a document synchronously.
Args:
file_path: Path to the document file
document_type: Type of document (invoice, receipt, contract, general)
Returns:
Extracted document data
"""
headers = {'X-API-Key': API_KEY}
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f)}
data = {'documentType': document_type}
response = requests.post(
f'{API_URL}/documents/process/sync',
headers=headers,
files=files,
data=data,
timeout=120 # 2 minute timeout for sync processing
)
response.raise_for_status()
return response.json()
# Usage
result = process_document_sync('invoice.pdf', 'invoice')
print(f"Document ID: {result['data']['documentId']}")
print(f"Extracted data: {result['data']['result']}")
Asynchronous Processing
For larger documents or batch processing, use async mode with polling:
import os
import time
import requests
API_KEY = os.getenv('DOCURIFT_API_KEY')
API_URL = 'https://api.docurift.com/v1'
def process_document_async(file_path: str, document_type: str = 'general') -> str:
"""
Submit a document for async processing.
Returns:
Job ID for status polling
"""
headers = {'X-API-Key': API_KEY}
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f)}
data = {'documentType': document_type}
response = requests.post(
f'{API_URL}/documents/process/async',
headers=headers,
files=files,
data=data
)
response.raise_for_status()
return response.json()['data']['jobId']
def poll_job_status(job_id: str, max_attempts: int = 30, interval: int = 2) -> dict:
"""
Poll for job completion.
Args:
job_id: The job ID to poll
max_attempts: Maximum number of polling attempts
interval: Seconds between polls
Returns:
Completed job result
"""
headers = {'X-API-Key': API_KEY}
for attempt in range(max_attempts):
response = requests.get(
f'{API_URL}/jobs/{job_id}',
headers=headers
)
response.raise_for_status()
job = response.json()['data']
status = job['status']
if status == 'completed':
return job
elif status == 'failed':
raise Exception(f"Job failed: {job.get('error', 'Unknown error')}")
print(f"Job status: {status} (attempt {attempt + 1}/{max_attempts})")
time.sleep(interval)
raise TimeoutError(f"Job {job_id} did not complete within timeout")
# Usage
job_id = process_document_async('large-document.pdf', 'contract')
print(f"Job submitted: {job_id}")
result = poll_job_status(job_id)
print(f"Processing complete!")
print(f"Extracted data: {result['result']}")
Batch Processing
Process multiple documents efficiently:
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
API_KEY = os.getenv('DOCURIFT_API_KEY')
API_URL = 'https://api.docurift.com/v1'
def process_single_document(file_path: str, document_type: str) -> dict:
"""Process a single document and return result."""
headers = {'X-API-Key': API_KEY}
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f)}
data = {'documentType': document_type}
response = requests.post(
f'{API_URL}/documents/process/sync',
headers=headers,
files=files,
data=data,
timeout=120
)
response.raise_for_status()
return {'file': file_path, 'result': response.json()}
def process_batch(file_paths: list, document_type: str = 'invoice', max_workers: int = 5) -> list:
"""
Process multiple documents concurrently.
Args:
file_paths: List of file paths to process
document_type: Type of documents
max_workers: Maximum concurrent requests
Returns:
List of processing results
"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(process_single_document, fp, document_type): fp
for fp in file_paths
}
for future in as_completed(futures):
file_path = futures[future]
try:
result = future.result()
results.append(result)
print(f"Processed: {file_path}")
except Exception as e:
results.append({'file': file_path, 'error': str(e)})
print(f"Failed: {file_path} - {e}")
return results
# Usage
files = ['invoice1.pdf', 'invoice2.pdf', 'invoice3.pdf']
results = process_batch(files, document_type='invoice')
for r in results:
if 'error' in r:
print(f"{r['file']}: Error - {r['error']}")
else:
print(f"{r['file']}: Success")
Error Handling
Implement robust error handling for production applications:
import os
import requests
from requests.exceptions import HTTPError, Timeout, ConnectionError
API_KEY = os.getenv('DOCURIFT_API_KEY')
API_URL = 'https://api.docurift.com/v1'
class DocuRiftError(Exception):
"""Base exception for DocuRift API errors."""
def __init__(self, message: str, code: str = None, status_code: int = None):
self.message = message
self.code = code
self.status_code = status_code
super().__init__(message)
class AuthenticationError(DocuRiftError):
"""Raised when API key is invalid or missing."""
pass
class InsufficientCreditsError(DocuRiftError):
"""Raised when account has insufficient credits."""
pass
class RateLimitError(DocuRiftError):
"""Raised when rate limit is exceeded."""
pass
class ValidationError(DocuRiftError):
"""Raised when request validation fails."""
pass
def process_document(file_path: str, document_type: str = 'general') -> dict:
"""
Process a document with comprehensive error handling.
"""
headers = {'X-API-Key': API_KEY}
try:
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f)}
data = {'documentType': document_type}
response = requests.post(
f'{API_URL}/documents/process/sync',
headers=headers,
files=files,
data=data,
timeout=120
)
# Parse response
result = response.json()
# Check for API errors
if not result.get('success', True):
error = result.get('error', {})
code = error.get('code', 'UNKNOWN_ERROR')
message = error.get('message', 'An unknown error occurred')
if response.status_code == 401:
raise AuthenticationError(message, code, 401)
elif response.status_code == 402:
raise InsufficientCreditsError(message, code, 402)
elif response.status_code == 429:
raise RateLimitError(message, code, 429)
elif response.status_code == 400:
raise ValidationError(message, code, 400)
else:
raise DocuRiftError(message, code, response.status_code)
return result
except FileNotFoundError:
raise DocuRiftError(f"File not found: {file_path}")
except Timeout:
raise DocuRiftError("Request timed out. Try async processing for large documents.")
except ConnectionError:
raise DocuRiftError("Failed to connect to DocuRift API. Check your internet connection.")
# Usage with error handling
try:
result = process_document('invoice.pdf', 'invoice')
print("Success:", result)
except AuthenticationError as e:
print(f"Authentication failed: {e.message}")
print("Please check your API key in the environment variables.")
except InsufficientCreditsError as e:
print(f"Insufficient credits: {e.message}")
print("Add credits at https://docurift.com/dashboard/billing")
except RateLimitError as e:
print(f"Rate limit exceeded: {e.message}")
print("Wait a moment and try again, or upgrade your plan.")
except ValidationError as e:
print(f"Validation error: {e.message}")
except DocuRiftError as e:
print(f"API error: {e.message}")
Complete Example
A production-ready class for DocuRift integration:
"""
DocuRift Python Client
A production-ready client for the DocuRift document processing API.
"""
import os
import time
import logging
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ProcessingResult:
"""Represents a document processing result."""
document_id: str
status: str
document_type: str
confidence: float
data: Dict[str, Any]
pages_processed: int
class DocuRiftClient:
"""
DocuRift API client with retry logic and comprehensive error handling.
"""
def __init__(
self,
api_key: Optional[str] = None,
base_url: str = 'https://api.docurift.com/v1',
timeout: int = 120,
max_retries: int = 3
):
self.api_key = api_key or os.getenv('DOCURIFT_API_KEY')
if not self.api_key:
raise ValueError("API key is required")
self.base_url = base_url
self.timeout = timeout
# Configure session with retry logic
self.session = requests.Session()
retry_strategy = Retry(
total=max_retries,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504],
allowed_methods=["GET", "POST"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("https://", adapter)
self.session.headers.update({'X-API-Key': self.api_key})
def process_sync(
self,
file_path: str,
document_type: str = 'general'
) -> ProcessingResult:
"""
Process a document synchronously.
"""
logger.info(f"Processing document: {file_path}")
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f)}
data = {'documentType': document_type}
response = self.session.post(
f'{self.base_url}/documents/process/sync',
files=files,
data=data,
timeout=self.timeout
)
self._handle_response(response)
result = response.json()['data']
return ProcessingResult(
document_id=result['documentId'],
status=result['status'],
document_type=result.get('documentType', document_type),
confidence=result.get('confidence', 0.0),
data=result.get('result', {}),
pages_processed=result.get('pagesProcessed', 1)
)
def process_async(
self,
file_path: str,
document_type: str = 'general',
webhook_url: Optional[str] = None
) -> str:
"""
Submit a document for async processing.
Returns:
Job ID for status polling
"""
logger.info(f"Submitting document for async processing: {file_path}")
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f)}
data = {'documentType': document_type}
if webhook_url:
data['webhookUrl'] = webhook_url
response = self.session.post(
f'{self.base_url}/documents/process/async',
files=files,
data=data,
timeout=30
)
self._handle_response(response)
return response.json()['data']['jobId']
def get_job_status(self, job_id: str) -> Dict[str, Any]:
"""Get the status of an async job."""
response = self.session.get(
f'{self.base_url}/jobs/{job_id}',
timeout=30
)
self._handle_response(response)
return response.json()['data']
def wait_for_completion(
self,
job_id: str,
max_wait: int = 300,
poll_interval: int = 2
) -> ProcessingResult:
"""
Wait for an async job to complete.
Args:
job_id: Job ID to wait for
max_wait: Maximum wait time in seconds
poll_interval: Seconds between status checks
"""
start_time = time.time()
while time.time() - start_time < max_wait:
job = self.get_job_status(job_id)
status = job['status']
if status == 'completed':
return ProcessingResult(
document_id=job['documentId'],
status=status,
document_type=job.get('documentType', 'general'),
confidence=job.get('confidence', 0.0),
data=job.get('result', {}),
pages_processed=job.get('pagesProcessed', 1)
)
elif status == 'failed':
raise Exception(f"Processing failed: {job.get('error', 'Unknown error')}")
logger.debug(f"Job {job_id} status: {status}")
time.sleep(poll_interval)
raise TimeoutError(f"Job {job_id} did not complete within {max_wait} seconds")
def get_document(self, document_id: str) -> Dict[str, Any]:
"""Retrieve a processed document by ID."""
response = self.session.get(
f'{self.base_url}/documents/{document_id}',
timeout=30
)
self._handle_response(response)
return response.json()['data']
def list_documents(
self,
limit: int = 20,
offset: int = 0
) -> List[Dict[str, Any]]:
"""List processed documents."""
response = self.session.get(
f'{self.base_url}/documents',
params={'limit': limit, 'offset': offset},
timeout=30
)
self._handle_response(response)
return response.json()['data']
def get_credit_balance(self) -> Dict[str, Any]:
"""Get current credit balance."""
response = self.session.get(
f'{self.base_url}/credits/balance',
timeout=30
)
self._handle_response(response)
return response.json()['data']
def _handle_response(self, response: requests.Response) -> None:
"""Handle API response and raise appropriate exceptions."""
if response.status_code == 401:
raise Exception("Authentication failed. Check your API key.")
elif response.status_code == 402:
raise Exception("Insufficient credits. Add credits to continue.")
elif response.status_code == 429:
raise Exception("Rate limit exceeded. Please wait and retry.")
elif response.status_code >= 400:
try:
error = response.json().get('error', {})
message = error.get('message', 'Request failed')
except:
message = f"Request failed with status {response.status_code}"
raise Exception(message)
# Usage example
if __name__ == '__main__':
# Initialize client
client = DocuRiftClient()
# Check balance
balance = client.get_credit_balance()
print(f"Credit balance: {balance['balance']} credits")
# Process document synchronously
result = client.process_sync('invoice.pdf', 'invoice')
print(f"Document ID: {result.document_id}")
print(f"Confidence: {result.confidence:.2%}")
print(f"Extracted data: {result.data}")
# Process document asynchronously
job_id = client.process_async('large-contract.pdf', 'contract')
result = client.wait_for_completion(job_id)
print(f"Async processing complete: {result.document_id}")
Next Steps
- Authentication Guide - Detailed authentication options
- API Reference - Full endpoint documentation
- Webhooks Setup - Receive notifications for async processing
- Error Codes - Complete error code reference