Docling - Batch Processing

The component supports batch processing of multiple documents when using docling-serve API mode. This is particularly useful for: - Processing multiple documents efficiently with parallel execution - Queue-based document processing workflows - High-volume document conversion scenarios - Better resource utilization with configurable parallelism

Batch Operations

The following batch operations are available (all require useDoclingServe=true):

Operation Description

BATCH_CONVERT_TO_MARKDOWN

Convert multiple documents to Markdown format in parallel

BATCH_CONVERT_TO_HTML

Convert multiple documents to HTML format in parallel

BATCH_CONVERT_TO_JSON

Convert multiple documents to JSON format in parallel

BATCH_EXTRACT_TEXT

Extract text from multiple documents in parallel

BATCH_EXTRACT_STRUCTURED_DATA

Extract structured data from multiple documents in parallel with table structure recognition enabled by default

Basic Batch Processing

  • Java

  • YAML

from("direct:documents")
    .process(exchange -> {
        List<String> documents = Arrays.asList(
            "/data/doc1.pdf",
            "/data/doc2.pdf",
            "/data/doc3.docx"
        );
        exchange.getIn().setBody(documents);
    })
    .to("docling:convert?" +
        "operation=BATCH_CONVERT_TO_MARKDOWN&" +
        "useDoclingServe=true&" +
        "batchParallelism=4&" +
        "batchFailOnFirstError=true")
    .process(exchange -> {
        BatchProcessingResults results = exchange.getIn().getBody(BatchProcessingResults.class);
        log.info("Processed {} documents, {} succeeded, {} failed",
            results.getTotalDocuments(),
            results.getSuccessCount(),
            results.getFailureCount());

        // Access individual results
        for (BatchConversionResult result : results.getResults()) {
            if (result.isSuccess()) {
                log.info("Document {}: {}", result.getOriginalPath(), result.getResult());
            } else {
                log.error("Document {} failed: {}", result.getOriginalPath(), result.getErrorMessage());
            }
        }
    });
- route:
    id: batch-convert
    from:
      uri: direct:documents
    steps:
      - to:
          uri: docling:convert
          parameters:
            operation: "BATCH_CONVERT_TO_MARKDOWN"
            useDoclingServe: true
            batchParallelism: 4
            batchFailOnFirstError: true
      - log:
          message: "Processed ${header.CamelDoclingBatchSuccessCount}/${header.CamelDoclingBatchTotalDocuments} documents successfully"
      - split:
          expression:
            simple:
              expression: "${body.results}"
          steps:
            - choice:
                when:
                  - expression:
                      simple:
                        expression: "${body.success}"
                    steps:
                      - to:
                          uri: file:///data/output
                          parameters:
                            fileName: "${body.documentId}.md"
                otherwise:
                  steps:
                    - log:
                        message: "Failed: ${body.originalPath} - ${body.errorMessage}"

Queue-Based Batch Processing

This example shows a queue-based batch processing workflow:

  • Java

  • YAML

// Route 1: Collect documents from file system and send to queue
from("file:///data/incoming?noop=true&maxMessagesPerPoll=50")
    .convertBodyTo(String.class)
    .setHeader("documentPath", simple("${body}"))
    .to("seda:document-queue?waitForTaskToComplete=Never");

// Route 2: Aggregate documents from queue into batches
from("seda:document-queue?concurrentConsumers=1")
    .aggregate(constant(true))
        .completionSize(10)          // Batch size
        .completionTimeout(5000)     // Or timeout after 5 seconds
    .process(exchange -> {
        // Convert aggregated exchanges to document list
        @SuppressWarnings("unchecked")
        List<Exchange> exchanges = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
        List<String> documentPaths = exchanges.stream()
            .map(e -> e.getIn().getHeader("documentPath", String.class))
            .collect(Collectors.toList());
        exchange.getIn().setBody(documentPaths);
    })
    .to("direct:batch-process");

// Route 3: Process batch with docling
from("direct:batch-process")
    .to("docling:convert?" +
        "operation=BATCH_CONVERT_TO_MARKDOWN&" +
        "useDoclingServe=true&" +
        "batchParallelism=5&" +
        "batchFailOnFirstError=false")
    .process(exchange -> {
        BatchProcessingResults results = exchange.getIn().getBody(BatchProcessingResults.class);
        log.info("Batch completed: {}/{} successful",
            results.getSuccessCount(), results.getTotalDocuments());
    })
    .split(simple("${body.results}"))
        .choice()
            .when(simple("${body.success}"))
                .to("file:///data/output?fileName=${body.documentId}.md")
            .otherwise()
                .to("file:///data/failed?fileName=${body.documentId}.error");
# Define beans for processing
- beans:
  - name: documentListProcessor
    type: "#class:org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy"
    properties:
      strategyMethodName: "aggregate"

# Route 1: Collect documents
- route:
    from:
      uri: file:///data/incoming
      parameters:
        noop: true
        maxMessagesPerPoll: 50
    steps:
      - convertBodyTo:
          type: "java.lang.String"
      - setHeader:
          name: "documentPath"
          expression:
            simple:
              expression: "${body}"
      - to:
          uri: seda:document-queue
          parameters:
            waitForTaskToComplete: "Never"

# Route 2: Aggregate into batches
- route:
    from:
      uri: seda:document-queue
      parameters:
        concurrentConsumers: 1
    steps:
      - aggregate:
          aggregationStrategy:
            bean: "documentListProcessor"
          correlationExpression:
            constant: true
          completionSize: 10
          completionTimeout: 5000
      - to:
          uri: direct:batch-process

# Route 3: Process batch
- route:
    from:
      uri: direct:batch-process
    steps:
      - to:
          uri: docling:convert
          parameters:
            operation: "BATCH_CONVERT_TO_MARKDOWN"
            useDoclingServe: true
            batchParallelism: 5
            batchFailOnFirstError: false
      - split:
          expression:
            simple:
              expression: "${body.results}"
          steps:
            - choice:
                when:
                  - expression:
                      simple:
                        expression: "${body.success}"
                    steps:
                      - to:
                          uri: file:///data/output
                          parameters:
                            fileName: "${body.documentId}.md"
                otherwise:
                  steps:
                    - to:
                        uri: file:///data/failed
                        parameters:
                          fileName: "${body.documentId}.error"
For the aggregation example above, you can also use a custom processor. Create a Java class:
Java-only: Processor implementation
public class DocumentListProcessor implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
        @SuppressWarnings("unchecked")
        List<Exchange> exchanges = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
        List<String> documentPaths = exchanges.stream()
            .map(e -> e.getIn().getHeader("documentPath", String.class))
            .collect(Collectors.toList());
        exchange.getIn().setBody(documentPaths);
    }
}

Then reference it in the YAML:

- beans:
  - name: documentListProcessor
    type: "com.example.DocumentListProcessor"

Batch Processing with Error Handling

Control how errors are handled during batch processing:

  • Java

  • YAML

// Fail entire batch on first error
from("direct:batch-strict")
    .to("docling:convert?" +
        "operation=BATCH_CONVERT_TO_MARKDOWN&" +
        "useDoclingServe=true&" +
        "batchFailOnFirstError=true")
    .log("All documents converted successfully");

// Continue processing on errors
from("direct:batch-lenient")
    .to("docling:convert?" +
        "operation=BATCH_CONVERT_TO_MARKDOWN&" +
        "useDoclingServe=true&" +
        "batchFailOnFirstError=false")
    .process(exchange -> {
        BatchProcessingResults results = exchange.getIn().getBody(BatchProcessingResults.class);

        if (results.hasAnyFailures()) {
            log.warn("Batch completed with {} failures", results.getFailureCount());

            // Handle failed documents
            for (BatchConversionResult failure : results.getFailed()) {
                log.error("Failed: {} - {}",
                    failure.getOriginalPath(),
                    failure.getErrorMessage());
            }
        }
    });
# Fail on first error
- route:
    id: batch-strict
    from:
      uri: direct:batch-strict
    steps:
      - to:
          uri: docling:convert
          parameters:
            operation: "BATCH_CONVERT_TO_MARKDOWN"
            useDoclingServe: true
            batchFailOnFirstError: true
      - log:
          message: "All documents converted successfully"

# Continue on errors and process failures
- route:
    id: batch-lenient
    from:
      uri: direct:batch-lenient
    steps:
      - to:
          uri: docling:convert
          parameters:
            operation: "BATCH_CONVERT_TO_MARKDOWN"
            useDoclingServe: true
            batchFailOnFirstError: false
      - log:
          message: "Batch completed: ${header.CamelDoclingBatchSuccessCount} succeeded, ${header.CamelDoclingBatchFailureCount} failed"
      - choice:
          when:
            - expression:
                simple:
                  expression: "${header.CamelDoclingBatchFailureCount} > 0"
              steps:
                - split:
                    expression:
                      simple:
                        expression: "${body.failed}"
                    steps:
                      - log:
                          message: "Failed document: ${body.originalPath} - ${body.errorMessage}"
                      - to:
                          uri: file:///data/failed
                          parameters:
                            fileName: "${body.documentId}.error"
          otherwise:
            steps:
              - log:
                  message: "All documents processed successfully"

Batch Configuration Parameters

Parameter Default Description

batchSize

10

Maximum number of documents in a single batch

batchParallelism

4

Number of parallel threads for processing documents

batchFailOnFirstError

true

If true, fail entire batch on first error; if false, continue processing

batchTimeout

300000

Maximum time to wait for batch completion in milliseconds

splitBatchResults

false

Split batch results into individual exchanges (List) instead of single BatchProcessingResults object

Batch Processing Headers

Headers can be used to override batch configuration per-message:

Header Type Description

CamelDoclingBatchSize

Integer

Override batch size for this operation

CamelDoclingBatchParallelism

Integer

Override parallelism for this operation

CamelDoclingBatchFailOnFirstError

Boolean

Override fail-on-first-error setting

CamelDoclingBatchTimeout

Long

Override batch timeout in milliseconds

CamelDoclingBatchTotalDocuments

Integer

Total documents in batch (output header)

CamelDoclingBatchSuccessCount

Integer

Number of successful conversions (output header)

CamelDoclingBatchFailureCount

Integer

Number of failed conversions (output header)

CamelDoclingBatchProcessingTime

Long

Total processing time in milliseconds (output header)

CamelDoclingBatchSplitResults

Boolean

Override splitBatchResults setting for this operation

Input Formats for Batch Processing

The batch operations accept multiple input formats:

Java-only: Java collection types
// List of file paths
List<String> paths = Arrays.asList("/data/doc1.pdf", "/data/doc2.pdf");

// List of File objects
List<File> files = Arrays.asList(new File("doc1.pdf"), new File("doc2.pdf"));

// Array of paths
String[] pathArray = {"/data/doc1.pdf", "/data/doc2.pdf"};

// Array of File objects
File[] fileArray = {new File("doc1.pdf"), new File("doc2.pdf")};

// Directory path (processes all files in directory)
String dirPath = "/data/documents";

BatchProcessingResults Object

The batch operations return a BatchProcessingResults object with:

Properties: - results: List of individual BatchConversionResult objects - totalDocuments: Total number of documents processed - successCount: Number of successful conversions - failureCount: Number of failed conversions - totalProcessingTimeMs: Total processing time in milliseconds

Helper Methods: - getSuccessful(): Returns list of successful results - getFailed(): Returns list of failed results - isAllSuccessful(): Returns true if all documents succeeded - hasAnySuccessful(): Returns true if at least one document succeeded - hasAnyFailures(): Returns true if at least one document failed - getSuccessRate(): Returns success rate as percentage (0.0-100.0)

BatchConversionResult Properties: - documentId: Unique identifier for the document - originalPath: Original file path or URL - result: Converted content (if successful) - success: Whether conversion succeeded - errorMessage: Error message (if failed) - processingTimeMs: Processing time for this document - batchIndex: Index in the batch (0-based)

Splitting Batch Results into Individual Exchanges

By default, batch operations return a single BatchProcessingResults object containing all results. You can enable splitBatchResults=true to return a List<BatchConversionResult> instead, allowing you to process each document individually using Camel’s split EIP.

Use Cases: - Process each document result independently - Route successful and failed documents to different destinations - Apply individual transformations per document - Integrate with streaming or async processing patterns

  • Java

  • YAML

// Example 1: Split and process each document individually
from("direct:batch-documents")
    .to("docling:convert?" +
        "operation=BATCH_CONVERT_TO_MARKDOWN&" +
        "useDoclingServe=true&" +
        "splitBatchResults=true&" +
        "contentInBody=true")
    .split(body())
        .process(exchange -> {
            BatchConversionResult result = exchange.getIn().getBody(BatchConversionResult.class);
            log.info("Processing document: {}", result.getDocumentId());

            if (result.isSuccess()) {
                // Process successful conversion
                String content = result.getResult();
                // ... do something with content
            } else {
                // Handle failed conversion
                log.error("Failed to convert {}: {}",
                    result.getOriginalPath(), result.getErrorMessage());
            }
        })
    .end();

// Example 2: Route based on success/failure
from("direct:batch-with-routing")
    .to("docling:convert?" +
        "operation=BATCH_CONVERT_TO_MARKDOWN&" +
        "useDoclingServe=true&" +
        "splitBatchResults=true&" +
        "batchFailOnFirstError=false&" +
        "contentInBody=true")
    .split(body())
        .choice()
            .when(simple("${body.success} == true"))
                .log("Success: ${body.documentId}")
                .to("file:///data/success?fileName=${body.documentId}.md")
            .otherwise()
                .log("Failed: ${body.documentId} - ${body.errorMessage}")
                .to("file:///data/failed?fileName=${body.documentId}.error")
        .end()
    .end();

// Example 3: Parallel processing with threads
from("direct:batch-parallel-individual")
    .to("docling:convert?" +
        "operation=BATCH_CONVERT_TO_MARKDOWN&" +
        "useDoclingServe=true&" +
        "splitBatchResults=true&" +
        "contentInBody=true")
    .split(body())
        .parallelProcessing()
        .threads(5)
        .process(exchange -> {
            BatchConversionResult result = exchange.getIn().getBody(BatchConversionResult.class);
            // Process each document in parallel
            processDocument(result);
        })
    .end();
# Example 1: Split and route based on success
- route:
    from:
      uri: direct:batch-with-split
    steps:
      - to:
          uri: docling:convert
          parameters:
            operation: "BATCH_CONVERT_TO_MARKDOWN"
            useDoclingServe: true
            splitBatchResults: true
            contentInBody: true
      - split:
          expression:
            simple:
              expression: "${body}"
          steps:
            - choice:
                when:
                  - expression:
                      simple:
                        expression: "${body.success}"
                    steps:
                      - log:
                          message: "Success: ${body.documentId}"
                      - to:
                          uri: file:///data/success
                          parameters:
                            fileName: "${body.documentId}.md"
                otherwise:
                  steps:
                    - log:
                        message: "Failed: ${body.documentId}"
                    - to:
                        uri: file:///data/failed
                        parameters:
                          fileName: "${body.documentId}.error"

# Example 2: Split with parallel processing
- route:
    id: batch-split-parallel
    from:
      uri: direct:batch-parallel
    steps:
      - to:
          uri: docling:convert
          parameters:
            operation: "BATCH_CONVERT_TO_MARKDOWN"
            useDoclingServe: true
            splitBatchResults: true
            batchParallelism: 4
            contentInBody: true
      - split:
          expression:
            simple:
              expression: "${body}"
          parallelProcessing: true
          steps:
            - log:
                message: "Processing document ${body.documentId} (index ${body.batchIndex})"
            - choice:
                when:
                  - expression:
                      simple:
                        expression: "${body.success}"
                    steps:
                      - log:
                          message: "Successfully converted ${body.documentId}"
                      - to:
                          uri: file:///data/processed
                          parameters:
                            fileName: "${body.documentId}.md"
                otherwise:
                  steps:
                    - log:
                        message: "Failed to convert ${body.documentId}: ${body.errorMessage}"
                    - to:
                        uri: file:///data/errors
                        parameters:
                          fileName: "${body.documentId}.error"

Comparison: BatchProcessingResults vs Split Results

Scenario splitBatchResults=false splitBatchResults=true

Return type

BatchProcessingResults

List<BatchConversionResult>

Number of exchanges

1 exchange with all results

Use .split(body()) to create 1 exchange per document

Use case

Aggregate statistics, batch-level processing

Individual document processing, routing per result

Access to batch stats

Direct via object methods

Via headers (CamelDoclingBatch*)

Camel pattern

Process entire batch together

Split and process individually

Note: When using splitBatchResults=true, batch statistics are still available via headers: - CamelDoclingBatchTotalDocuments - CamelDoclingBatchSuccessCount - CamelDoclingBatchFailureCount - CamelDoclingBatchProcessingTime