9 min read

Python SDK

Official Python library for DocuRift API

Integrate DocuRift document processing into your Python applications. While an official SDK is in development, you can use the requests library for seamless API integration.

Installation

Install the required dependencies:

pip install requests python-dotenv

Quick Start

Here's a minimal example to process a document:

import os
import requests
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

API_KEY = os.getenv('DOCURIFT_API_KEY')
API_URL = 'https://api.docurift.com/v1'

headers = {
    'X-API-Key': API_KEY
}

# Process a document
with open('invoice.pdf', 'rb') as f:
    files = {'file': ('invoice.pdf', f, 'application/pdf')}
    data = {'documentType': 'invoice'}

    response = requests.post(
        f'{API_URL}/documents/process',
        headers=headers,
        files=files,
        data=data
    )

result = response.json()
print(result)

Authentication

DocuRift uses API key authentication. Always store your API key securely using environment variables.

Setting Up Your API Key

Create a .env file in your project root:

# .env
DOCURIFT_API_KEY=frc_your_api_key_here

Load and use the API key:

import os
from dotenv import load_dotenv

load_dotenv()

API_KEY = os.getenv('DOCURIFT_API_KEY')

if not API_KEY:
    raise ValueError("DOCURIFT_API_KEY environment variable is required")

headers = {
    'X-API-Key': API_KEY
}

Processing Documents

Synchronous Processing

For small documents that process quickly, use synchronous processing:

import os
import requests

API_KEY = os.getenv('DOCURIFT_API_KEY')
API_URL = 'https://api.docurift.com/v1'

def process_document_sync(file_path: str, document_type: str = 'general') -> dict:
    """
    Process a document synchronously.

    Args:
        file_path: Path to the document file
        document_type: Type of document (invoice, receipt, contract, general)

    Returns:
        Extracted document data
    """
    headers = {'X-API-Key': API_KEY}

    with open(file_path, 'rb') as f:
        files = {'file': (os.path.basename(file_path), f)}
        data = {'documentType': document_type}

        response = requests.post(
            f'{API_URL}/documents/process/sync',
            headers=headers,
            files=files,
            data=data,
            timeout=120  # 2 minute timeout for sync processing
        )

    response.raise_for_status()
    return response.json()

# Usage
result = process_document_sync('invoice.pdf', 'invoice')
print(f"Document ID: {result['data']['documentId']}")
print(f"Extracted data: {result['data']['result']}")

Asynchronous Processing

For larger documents or batch processing, use async mode with polling:

import os
import time
import requests

API_KEY = os.getenv('DOCURIFT_API_KEY')
API_URL = 'https://api.docurift.com/v1'

def process_document_async(file_path: str, document_type: str = 'general') -> str:
    """
    Submit a document for async processing.

    Returns:
        Job ID for status polling
    """
    headers = {'X-API-Key': API_KEY}

    with open(file_path, 'rb') as f:
        files = {'file': (os.path.basename(file_path), f)}
        data = {'documentType': document_type}

        response = requests.post(
            f'{API_URL}/documents/process/async',
            headers=headers,
            files=files,
            data=data
        )

    response.raise_for_status()
    return response.json()['data']['jobId']

def poll_job_status(job_id: str, max_attempts: int = 30, interval: int = 2) -> dict:
    """
    Poll for job completion.

    Args:
        job_id: The job ID to poll
        max_attempts: Maximum number of polling attempts
        interval: Seconds between polls

    Returns:
        Completed job result
    """
    headers = {'X-API-Key': API_KEY}

    for attempt in range(max_attempts):
        response = requests.get(
            f'{API_URL}/jobs/{job_id}',
            headers=headers
        )
        response.raise_for_status()

        job = response.json()['data']
        status = job['status']

        if status == 'completed':
            return job
        elif status == 'failed':
            raise Exception(f"Job failed: {job.get('error', 'Unknown error')}")

        print(f"Job status: {status} (attempt {attempt + 1}/{max_attempts})")
        time.sleep(interval)

    raise TimeoutError(f"Job {job_id} did not complete within timeout")

# Usage
job_id = process_document_async('large-document.pdf', 'contract')
print(f"Job submitted: {job_id}")

result = poll_job_status(job_id)
print(f"Processing complete!")
print(f"Extracted data: {result['result']}")

Batch Processing

Process multiple documents efficiently:

import os
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

API_KEY = os.getenv('DOCURIFT_API_KEY')
API_URL = 'https://api.docurift.com/v1'

def process_single_document(file_path: str, document_type: str) -> dict:
    """Process a single document and return result."""
    headers = {'X-API-Key': API_KEY}

    with open(file_path, 'rb') as f:
        files = {'file': (os.path.basename(file_path), f)}
        data = {'documentType': document_type}

        response = requests.post(
            f'{API_URL}/documents/process/sync',
            headers=headers,
            files=files,
            data=data,
            timeout=120
        )

    response.raise_for_status()
    return {'file': file_path, 'result': response.json()}

def process_batch(file_paths: list, document_type: str = 'invoice', max_workers: int = 5) -> list:
    """
    Process multiple documents concurrently.

    Args:
        file_paths: List of file paths to process
        document_type: Type of documents
        max_workers: Maximum concurrent requests

    Returns:
        List of processing results
    """
    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(process_single_document, fp, document_type): fp
            for fp in file_paths
        }

        for future in as_completed(futures):
            file_path = futures[future]
            try:
                result = future.result()
                results.append(result)
                print(f"Processed: {file_path}")
            except Exception as e:
                results.append({'file': file_path, 'error': str(e)})
                print(f"Failed: {file_path} - {e}")

    return results

# Usage
files = ['invoice1.pdf', 'invoice2.pdf', 'invoice3.pdf']
results = process_batch(files, document_type='invoice')

for r in results:
    if 'error' in r:
        print(f"{r['file']}: Error - {r['error']}")
    else:
        print(f"{r['file']}: Success")

Error Handling

Implement robust error handling for production applications:

import os
import requests
from requests.exceptions import HTTPError, Timeout, ConnectionError

API_KEY = os.getenv('DOCURIFT_API_KEY')
API_URL = 'https://api.docurift.com/v1'

class DocuRiftError(Exception):
    """Base exception for DocuRift API errors."""
    def __init__(self, message: str, code: str = None, status_code: int = None):
        self.message = message
        self.code = code
        self.status_code = status_code
        super().__init__(message)

class AuthenticationError(DocuRiftError):
    """Raised when API key is invalid or missing."""
    pass

class InsufficientCreditsError(DocuRiftError):
    """Raised when account has insufficient credits."""
    pass

class RateLimitError(DocuRiftError):
    """Raised when rate limit is exceeded."""
    pass

class ValidationError(DocuRiftError):
    """Raised when request validation fails."""
    pass

def process_document(file_path: str, document_type: str = 'general') -> dict:
    """
    Process a document with comprehensive error handling.
    """
    headers = {'X-API-Key': API_KEY}

    try:
        with open(file_path, 'rb') as f:
            files = {'file': (os.path.basename(file_path), f)}
            data = {'documentType': document_type}

            response = requests.post(
                f'{API_URL}/documents/process/sync',
                headers=headers,
                files=files,
                data=data,
                timeout=120
            )

        # Parse response
        result = response.json()

        # Check for API errors
        if not result.get('success', True):
            error = result.get('error', {})
            code = error.get('code', 'UNKNOWN_ERROR')
            message = error.get('message', 'An unknown error occurred')

            if response.status_code == 401:
                raise AuthenticationError(message, code, 401)
            elif response.status_code == 402:
                raise InsufficientCreditsError(message, code, 402)
            elif response.status_code == 429:
                raise RateLimitError(message, code, 429)
            elif response.status_code == 400:
                raise ValidationError(message, code, 400)
            else:
                raise DocuRiftError(message, code, response.status_code)

        return result

    except FileNotFoundError:
        raise DocuRiftError(f"File not found: {file_path}")
    except Timeout:
        raise DocuRiftError("Request timed out. Try async processing for large documents.")
    except ConnectionError:
        raise DocuRiftError("Failed to connect to DocuRift API. Check your internet connection.")

# Usage with error handling
try:
    result = process_document('invoice.pdf', 'invoice')
    print("Success:", result)
except AuthenticationError as e:
    print(f"Authentication failed: {e.message}")
    print("Please check your API key in the environment variables.")
except InsufficientCreditsError as e:
    print(f"Insufficient credits: {e.message}")
    print("Add credits at https://docurift.com/dashboard/billing")
except RateLimitError as e:
    print(f"Rate limit exceeded: {e.message}")
    print("Wait a moment and try again, or upgrade your plan.")
except ValidationError as e:
    print(f"Validation error: {e.message}")
except DocuRiftError as e:
    print(f"API error: {e.message}")

Complete Example

A production-ready class for DocuRift integration:

"""
DocuRift Python Client

A production-ready client for the DocuRift document processing API.
"""

import os
import time
import logging
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class ProcessingResult:
    """Represents a document processing result."""
    document_id: str
    status: str
    document_type: str
    confidence: float
    data: Dict[str, Any]
    pages_processed: int

class DocuRiftClient:
    """
    DocuRift API client with retry logic and comprehensive error handling.
    """

    def __init__(
        self,
        api_key: Optional[str] = None,
        base_url: str = 'https://api.docurift.com/v1',
        timeout: int = 120,
        max_retries: int = 3
    ):
        self.api_key = api_key or os.getenv('DOCURIFT_API_KEY')
        if not self.api_key:
            raise ValueError("API key is required")

        self.base_url = base_url
        self.timeout = timeout

        # Configure session with retry logic
        self.session = requests.Session()
        retry_strategy = Retry(
            total=max_retries,
            backoff_factor=1,
            status_forcelist=[500, 502, 503, 504],
            allowed_methods=["GET", "POST"]
        )
        adapter = HTTPAdapter(max_retries=retry_strategy)
        self.session.mount("https://", adapter)
        self.session.headers.update({'X-API-Key': self.api_key})

    def process_sync(
        self,
        file_path: str,
        document_type: str = 'general'
    ) -> ProcessingResult:
        """
        Process a document synchronously.
        """
        logger.info(f"Processing document: {file_path}")

        with open(file_path, 'rb') as f:
            files = {'file': (os.path.basename(file_path), f)}
            data = {'documentType': document_type}

            response = self.session.post(
                f'{self.base_url}/documents/process/sync',
                files=files,
                data=data,
                timeout=self.timeout
            )

        self._handle_response(response)
        result = response.json()['data']

        return ProcessingResult(
            document_id=result['documentId'],
            status=result['status'],
            document_type=result.get('documentType', document_type),
            confidence=result.get('confidence', 0.0),
            data=result.get('result', {}),
            pages_processed=result.get('pagesProcessed', 1)
        )

    def process_async(
        self,
        file_path: str,
        document_type: str = 'general',
        webhook_url: Optional[str] = None
    ) -> str:
        """
        Submit a document for async processing.

        Returns:
            Job ID for status polling
        """
        logger.info(f"Submitting document for async processing: {file_path}")

        with open(file_path, 'rb') as f:
            files = {'file': (os.path.basename(file_path), f)}
            data = {'documentType': document_type}
            if webhook_url:
                data['webhookUrl'] = webhook_url

            response = self.session.post(
                f'{self.base_url}/documents/process/async',
                files=files,
                data=data,
                timeout=30
            )

        self._handle_response(response)
        return response.json()['data']['jobId']

    def get_job_status(self, job_id: str) -> Dict[str, Any]:
        """Get the status of an async job."""
        response = self.session.get(
            f'{self.base_url}/jobs/{job_id}',
            timeout=30
        )
        self._handle_response(response)
        return response.json()['data']

    def wait_for_completion(
        self,
        job_id: str,
        max_wait: int = 300,
        poll_interval: int = 2
    ) -> ProcessingResult:
        """
        Wait for an async job to complete.

        Args:
            job_id: Job ID to wait for
            max_wait: Maximum wait time in seconds
            poll_interval: Seconds between status checks
        """
        start_time = time.time()

        while time.time() - start_time < max_wait:
            job = self.get_job_status(job_id)
            status = job['status']

            if status == 'completed':
                return ProcessingResult(
                    document_id=job['documentId'],
                    status=status,
                    document_type=job.get('documentType', 'general'),
                    confidence=job.get('confidence', 0.0),
                    data=job.get('result', {}),
                    pages_processed=job.get('pagesProcessed', 1)
                )
            elif status == 'failed':
                raise Exception(f"Processing failed: {job.get('error', 'Unknown error')}")

            logger.debug(f"Job {job_id} status: {status}")
            time.sleep(poll_interval)

        raise TimeoutError(f"Job {job_id} did not complete within {max_wait} seconds")

    def get_document(self, document_id: str) -> Dict[str, Any]:
        """Retrieve a processed document by ID."""
        response = self.session.get(
            f'{self.base_url}/documents/{document_id}',
            timeout=30
        )
        self._handle_response(response)
        return response.json()['data']

    def list_documents(
        self,
        limit: int = 20,
        offset: int = 0
    ) -> List[Dict[str, Any]]:
        """List processed documents."""
        response = self.session.get(
            f'{self.base_url}/documents',
            params={'limit': limit, 'offset': offset},
            timeout=30
        )
        self._handle_response(response)
        return response.json()['data']

    def get_credit_balance(self) -> Dict[str, Any]:
        """Get current credit balance."""
        response = self.session.get(
            f'{self.base_url}/credits/balance',
            timeout=30
        )
        self._handle_response(response)
        return response.json()['data']

    def _handle_response(self, response: requests.Response) -> None:
        """Handle API response and raise appropriate exceptions."""
        if response.status_code == 401:
            raise Exception("Authentication failed. Check your API key.")
        elif response.status_code == 402:
            raise Exception("Insufficient credits. Add credits to continue.")
        elif response.status_code == 429:
            raise Exception("Rate limit exceeded. Please wait and retry.")
        elif response.status_code >= 400:
            try:
                error = response.json().get('error', {})
                message = error.get('message', 'Request failed')
            except:
                message = f"Request failed with status {response.status_code}"
            raise Exception(message)


# Usage example
if __name__ == '__main__':
    # Initialize client
    client = DocuRiftClient()

    # Check balance
    balance = client.get_credit_balance()
    print(f"Credit balance: {balance['balance']} credits")

    # Process document synchronously
    result = client.process_sync('invoice.pdf', 'invoice')
    print(f"Document ID: {result.document_id}")
    print(f"Confidence: {result.confidence:.2%}")
    print(f"Extracted data: {result.data}")

    # Process document asynchronously
    job_id = client.process_async('large-contract.pdf', 'contract')
    result = client.wait_for_completion(job_id)
    print(f"Async processing complete: {result.document_id}")

Next Steps