Docling - Batch Processing
The component supports batch processing of multiple documents when using docling-serve API mode. This is particularly useful for: - Processing multiple documents efficiently with parallel execution - Queue-based document processing workflows - High-volume document conversion scenarios - Better resource utilization with configurable parallelism
Batch Operations
The following batch operations are available (all require useDoclingServe=true):
| Operation | Description |
|---|---|
| Convert multiple documents to Markdown format in parallel |
| Convert multiple documents to HTML format in parallel |
| Convert multiple documents to JSON format in parallel |
| Extract text from multiple documents in parallel |
| Extract structured data from multiple documents in parallel with table structure recognition enabled by default |
Basic Batch Processing
-
Java
-
YAML
from("direct:documents")
.process(exchange -> {
List<String> documents = Arrays.asList(
"/data/doc1.pdf",
"/data/doc2.pdf",
"/data/doc3.docx"
);
exchange.getIn().setBody(documents);
})
.to("docling:convert?" +
"operation=BATCH_CONVERT_TO_MARKDOWN&" +
"useDoclingServe=true&" +
"batchParallelism=4&" +
"batchFailOnFirstError=true")
.process(exchange -> {
BatchProcessingResults results = exchange.getIn().getBody(BatchProcessingResults.class);
log.info("Processed {} documents, {} succeeded, {} failed",
results.getTotalDocuments(),
results.getSuccessCount(),
results.getFailureCount());
// Access individual results
for (BatchConversionResult result : results.getResults()) {
if (result.isSuccess()) {
log.info("Document {}: {}", result.getOriginalPath(), result.getResult());
} else {
log.error("Document {} failed: {}", result.getOriginalPath(), result.getErrorMessage());
}
}
}); - route:
id: batch-convert
from:
uri: direct:documents
steps:
- to:
uri: docling:convert
parameters:
operation: "BATCH_CONVERT_TO_MARKDOWN"
useDoclingServe: true
batchParallelism: 4
batchFailOnFirstError: true
- log:
message: "Processed ${header.CamelDoclingBatchSuccessCount}/${header.CamelDoclingBatchTotalDocuments} documents successfully"
- split:
expression:
simple:
expression: "${body.results}"
steps:
- choice:
when:
- expression:
simple:
expression: "${body.success}"
steps:
- to:
uri: file:///data/output
parameters:
fileName: "${body.documentId}.md"
otherwise:
steps:
- log:
message: "Failed: ${body.originalPath} - ${body.errorMessage}" Queue-Based Batch Processing
This example shows a queue-based batch processing workflow:
-
Java
-
YAML
// Route 1: Collect documents from file system and send to queue
from("file:///data/incoming?noop=true&maxMessagesPerPoll=50")
.convertBodyTo(String.class)
.setHeader("documentPath", simple("${body}"))
.to("seda:document-queue?waitForTaskToComplete=Never");
// Route 2: Aggregate documents from queue into batches
from("seda:document-queue?concurrentConsumers=1")
.aggregate(constant(true))
.completionSize(10) // Batch size
.completionTimeout(5000) // Or timeout after 5 seconds
.process(exchange -> {
// Convert aggregated exchanges to document list
@SuppressWarnings("unchecked")
List<Exchange> exchanges = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
List<String> documentPaths = exchanges.stream()
.map(e -> e.getIn().getHeader("documentPath", String.class))
.collect(Collectors.toList());
exchange.getIn().setBody(documentPaths);
})
.to("direct:batch-process");
// Route 3: Process batch with docling
from("direct:batch-process")
.to("docling:convert?" +
"operation=BATCH_CONVERT_TO_MARKDOWN&" +
"useDoclingServe=true&" +
"batchParallelism=5&" +
"batchFailOnFirstError=false")
.process(exchange -> {
BatchProcessingResults results = exchange.getIn().getBody(BatchProcessingResults.class);
log.info("Batch completed: {}/{} successful",
results.getSuccessCount(), results.getTotalDocuments());
})
.split(simple("${body.results}"))
.choice()
.when(simple("${body.success}"))
.to("file:///data/output?fileName=${body.documentId}.md")
.otherwise()
.to("file:///data/failed?fileName=${body.documentId}.error"); # Define beans for processing
- beans:
- name: documentListProcessor
type: "#class:org.apache.camel.processor.aggregate.GroupedBodyAggregationStrategy"
properties:
strategyMethodName: "aggregate"
# Route 1: Collect documents
- route:
from:
uri: file:///data/incoming
parameters:
noop: true
maxMessagesPerPoll: 50
steps:
- convertBodyTo:
type: "java.lang.String"
- setHeader:
name: "documentPath"
expression:
simple:
expression: "${body}"
- to:
uri: seda:document-queue
parameters:
waitForTaskToComplete: "Never"
# Route 2: Aggregate into batches
- route:
from:
uri: seda:document-queue
parameters:
concurrentConsumers: 1
steps:
- aggregate:
aggregationStrategy:
bean: "documentListProcessor"
correlationExpression:
constant: true
completionSize: 10
completionTimeout: 5000
- to:
uri: direct:batch-process
# Route 3: Process batch
- route:
from:
uri: direct:batch-process
steps:
- to:
uri: docling:convert
parameters:
operation: "BATCH_CONVERT_TO_MARKDOWN"
useDoclingServe: true
batchParallelism: 5
batchFailOnFirstError: false
- split:
expression:
simple:
expression: "${body.results}"
steps:
- choice:
when:
- expression:
simple:
expression: "${body.success}"
steps:
- to:
uri: file:///data/output
parameters:
fileName: "${body.documentId}.md"
otherwise:
steps:
- to:
uri: file:///data/failed
parameters:
fileName: "${body.documentId}.error" | For the aggregation example above, you can also use a custom processor. Create a Java class: |
public class DocumentListProcessor implements Processor {
@Override
public void process(Exchange exchange) throws Exception {
@SuppressWarnings("unchecked")
List<Exchange> exchanges = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
List<String> documentPaths = exchanges.stream()
.map(e -> e.getIn().getHeader("documentPath", String.class))
.collect(Collectors.toList());
exchange.getIn().setBody(documentPaths);
}
} Then reference it in the YAML:
- beans:
- name: documentListProcessor
type: "com.example.DocumentListProcessor" Batch Processing with Error Handling
Control how errors are handled during batch processing:
-
Java
-
YAML
// Fail entire batch on first error
from("direct:batch-strict")
.to("docling:convert?" +
"operation=BATCH_CONVERT_TO_MARKDOWN&" +
"useDoclingServe=true&" +
"batchFailOnFirstError=true")
.log("All documents converted successfully");
// Continue processing on errors
from("direct:batch-lenient")
.to("docling:convert?" +
"operation=BATCH_CONVERT_TO_MARKDOWN&" +
"useDoclingServe=true&" +
"batchFailOnFirstError=false")
.process(exchange -> {
BatchProcessingResults results = exchange.getIn().getBody(BatchProcessingResults.class);
if (results.hasAnyFailures()) {
log.warn("Batch completed with {} failures", results.getFailureCount());
// Handle failed documents
for (BatchConversionResult failure : results.getFailed()) {
log.error("Failed: {} - {}",
failure.getOriginalPath(),
failure.getErrorMessage());
}
}
}); # Fail on first error
- route:
id: batch-strict
from:
uri: direct:batch-strict
steps:
- to:
uri: docling:convert
parameters:
operation: "BATCH_CONVERT_TO_MARKDOWN"
useDoclingServe: true
batchFailOnFirstError: true
- log:
message: "All documents converted successfully"
# Continue on errors and process failures
- route:
id: batch-lenient
from:
uri: direct:batch-lenient
steps:
- to:
uri: docling:convert
parameters:
operation: "BATCH_CONVERT_TO_MARKDOWN"
useDoclingServe: true
batchFailOnFirstError: false
- log:
message: "Batch completed: ${header.CamelDoclingBatchSuccessCount} succeeded, ${header.CamelDoclingBatchFailureCount} failed"
- choice:
when:
- expression:
simple:
expression: "${header.CamelDoclingBatchFailureCount} > 0"
steps:
- split:
expression:
simple:
expression: "${body.failed}"
steps:
- log:
message: "Failed document: ${body.originalPath} - ${body.errorMessage}"
- to:
uri: file:///data/failed
parameters:
fileName: "${body.documentId}.error"
otherwise:
steps:
- log:
message: "All documents processed successfully" Batch Configuration Parameters
| Parameter | Default | Description |
|---|---|---|
| 10 | Maximum number of documents in a single batch |
| 4 | Number of parallel threads for processing documents |
| true | If true, fail entire batch on first error; if false, continue processing |
| 300000 | Maximum time to wait for batch completion in milliseconds |
| false | Split batch results into individual exchanges (List) instead of single BatchProcessingResults object |
Batch Processing Headers
Headers can be used to override batch configuration per-message:
| Header | Type | Description |
|---|---|---|
| Integer | Override batch size for this operation |
| Integer | Override parallelism for this operation |
| Boolean | Override fail-on-first-error setting |
| Long | Override batch timeout in milliseconds |
| Integer | Total documents in batch (output header) |
| Integer | Number of successful conversions (output header) |
| Integer | Number of failed conversions (output header) |
| Long | Total processing time in milliseconds (output header) |
| Boolean | Override splitBatchResults setting for this operation |
Input Formats for Batch Processing
The batch operations accept multiple input formats:
// List of file paths
List<String> paths = Arrays.asList("/data/doc1.pdf", "/data/doc2.pdf");
// List of File objects
List<File> files = Arrays.asList(new File("doc1.pdf"), new File("doc2.pdf"));
// Array of paths
String[] pathArray = {"/data/doc1.pdf", "/data/doc2.pdf"};
// Array of File objects
File[] fileArray = {new File("doc1.pdf"), new File("doc2.pdf")};
// Directory path (processes all files in directory)
String dirPath = "/data/documents"; BatchProcessingResults Object
The batch operations return a BatchProcessingResults object with:
Properties: - results: List of individual BatchConversionResult objects - totalDocuments: Total number of documents processed - successCount: Number of successful conversions - failureCount: Number of failed conversions - totalProcessingTimeMs: Total processing time in milliseconds
Helper Methods: - getSuccessful(): Returns list of successful results - getFailed(): Returns list of failed results - isAllSuccessful(): Returns true if all documents succeeded - hasAnySuccessful(): Returns true if at least one document succeeded - hasAnyFailures(): Returns true if at least one document failed - getSuccessRate(): Returns success rate as percentage (0.0-100.0)
BatchConversionResult Properties: - documentId: Unique identifier for the document - originalPath: Original file path or URL - result: Converted content (if successful) - success: Whether conversion succeeded - errorMessage: Error message (if failed) - processingTimeMs: Processing time for this document - batchIndex: Index in the batch (0-based)
Splitting Batch Results into Individual Exchanges
By default, batch operations return a single BatchProcessingResults object containing all results. You can enable splitBatchResults=true to return a List<BatchConversionResult> instead, allowing you to process each document individually using Camel’s split EIP.
Use Cases: - Process each document result independently - Route successful and failed documents to different destinations - Apply individual transformations per document - Integrate with streaming or async processing patterns
-
Java
-
YAML
// Example 1: Split and process each document individually
from("direct:batch-documents")
.to("docling:convert?" +
"operation=BATCH_CONVERT_TO_MARKDOWN&" +
"useDoclingServe=true&" +
"splitBatchResults=true&" +
"contentInBody=true")
.split(body())
.process(exchange -> {
BatchConversionResult result = exchange.getIn().getBody(BatchConversionResult.class);
log.info("Processing document: {}", result.getDocumentId());
if (result.isSuccess()) {
// Process successful conversion
String content = result.getResult();
// ... do something with content
} else {
// Handle failed conversion
log.error("Failed to convert {}: {}",
result.getOriginalPath(), result.getErrorMessage());
}
})
.end();
// Example 2: Route based on success/failure
from("direct:batch-with-routing")
.to("docling:convert?" +
"operation=BATCH_CONVERT_TO_MARKDOWN&" +
"useDoclingServe=true&" +
"splitBatchResults=true&" +
"batchFailOnFirstError=false&" +
"contentInBody=true")
.split(body())
.choice()
.when(simple("${body.success} == true"))
.log("Success: ${body.documentId}")
.to("file:///data/success?fileName=${body.documentId}.md")
.otherwise()
.log("Failed: ${body.documentId} - ${body.errorMessage}")
.to("file:///data/failed?fileName=${body.documentId}.error")
.end()
.end();
// Example 3: Parallel processing with threads
from("direct:batch-parallel-individual")
.to("docling:convert?" +
"operation=BATCH_CONVERT_TO_MARKDOWN&" +
"useDoclingServe=true&" +
"splitBatchResults=true&" +
"contentInBody=true")
.split(body())
.parallelProcessing()
.threads(5)
.process(exchange -> {
BatchConversionResult result = exchange.getIn().getBody(BatchConversionResult.class);
// Process each document in parallel
processDocument(result);
})
.end(); # Example 1: Split and route based on success
- route:
from:
uri: direct:batch-with-split
steps:
- to:
uri: docling:convert
parameters:
operation: "BATCH_CONVERT_TO_MARKDOWN"
useDoclingServe: true
splitBatchResults: true
contentInBody: true
- split:
expression:
simple:
expression: "${body}"
steps:
- choice:
when:
- expression:
simple:
expression: "${body.success}"
steps:
- log:
message: "Success: ${body.documentId}"
- to:
uri: file:///data/success
parameters:
fileName: "${body.documentId}.md"
otherwise:
steps:
- log:
message: "Failed: ${body.documentId}"
- to:
uri: file:///data/failed
parameters:
fileName: "${body.documentId}.error"
# Example 2: Split with parallel processing
- route:
id: batch-split-parallel
from:
uri: direct:batch-parallel
steps:
- to:
uri: docling:convert
parameters:
operation: "BATCH_CONVERT_TO_MARKDOWN"
useDoclingServe: true
splitBatchResults: true
batchParallelism: 4
contentInBody: true
- split:
expression:
simple:
expression: "${body}"
parallelProcessing: true
steps:
- log:
message: "Processing document ${body.documentId} (index ${body.batchIndex})"
- choice:
when:
- expression:
simple:
expression: "${body.success}"
steps:
- log:
message: "Successfully converted ${body.documentId}"
- to:
uri: file:///data/processed
parameters:
fileName: "${body.documentId}.md"
otherwise:
steps:
- log:
message: "Failed to convert ${body.documentId}: ${body.errorMessage}"
- to:
uri: file:///data/errors
parameters:
fileName: "${body.documentId}.error" Comparison: BatchProcessingResults vs Split Results
| Scenario | splitBatchResults=false | splitBatchResults=true |
|---|---|---|
Return type |
|
|
Number of exchanges | 1 exchange with all results | Use |
Use case | Aggregate statistics, batch-level processing | Individual document processing, routing per result |
Access to batch stats | Direct via object methods | Via headers (CamelDoclingBatch*) |
Camel pattern | Process entire batch together | Split and process individually |
Note: When using splitBatchResults=true, batch statistics are still available via headers: - CamelDoclingBatchTotalDocuments - CamelDoclingBatchSuccessCount - CamelDoclingBatchFailureCount - CamelDoclingBatchProcessingTime