Docling - Asynchronous Processing
The component supports asynchronous document conversion when using docling-serve API mode. This is particularly useful for: - Large documents that take a long time to process - High-volume batch processing scenarios - Better resource utilization on the server side
Enabling Async Mode
-
Java
-
YAML
from("file:///data/documents?include=.*\\.pdf")
.to("docling:CONVERT_TO_MARKDOWN?" +
"useDoclingServe=true&" +
"useAsyncMode=true&" +
"asyncPollInterval=2000&" +
"asyncTimeout=300000&" +
"contentInBody=true")
.to("file:///data/output"); - route:
from:
uri: file:///data/documents
parameters:
include: ".*\\.pdf"
steps:
- to:
uri: docling:CONVERT_TO_MARKDOWN
parameters:
useDoclingServe: true
useAsyncMode: true
asyncPollInterval: 2000
asyncTimeout: 300000
contentInBody: true
- to:
uri: file:///data/output Async Processing with Custom Timeout
For very large documents, you may need to increase the timeout:
-
Java
-
YAML
from("file:///data/large-documents?include=.*\\.pdf")
.to("docling:CONVERT_TO_MARKDOWN?" +
"useDoclingServe=true&" +
"useAsyncMode=true&" +
"asyncPollInterval=5000&" +
"asyncTimeout=600000&" + // 10 minutes
"contentInBody=true")
.to("file:///data/output"); - route:
from:
uri: file:///data/large-documents
parameters:
include: ".*\\.pdf"
steps:
- to:
uri: docling:CONVERT_TO_MARKDOWN
parameters:
useDoclingServe: true
useAsyncMode: true
asyncPollInterval: 5000
asyncTimeout: 600000
contentInBody: true
- to:
uri: file:///data/output Using Headers to Control Async Behavior
You can override async settings per-message using headers:
-
Java
-
YAML
from("file:///data/documents?include=.*\\.pdf")
.process(exchange -> {
File file = exchange.getIn().getBody(File.class);
// Use async mode only for large files
if (file.length() > 10 * 1024 * 1024) { // > 10MB
exchange.getIn().setHeader("CamelDoclingUseAsyncMode", true);
exchange.getIn().setHeader("CamelDoclingAsyncTimeout", 600000L);
}
})
.to("docling:CONVERT_TO_MARKDOWN?useDoclingServe=true&contentInBody=true")
.to("file:///data/output"); - route:
from:
uri: file:///data/documents
parameters:
include: ".*\\.pdf"
steps:
- process:
ref: "asyncDecisionProcessor"
- to:
uri: docling:CONVERT_TO_MARKDOWN
parameters:
useDoclingServe: true
contentInBody: true
- to:
uri: file:///data/output Custom Async Workflows
For advanced use cases, you can use the SUBMIT_ASYNC_CONVERSION and CHECK_CONVERSION_STATUS operations to build custom async workflows with full control over task submission and status polling.
When to use custom workflows:
-
You need custom polling intervals that vary per task
-
You want to implement custom retry or backoff strategies
-
You need to coordinate multiple async tasks
-
You want to store task IDs in a database for later retrieval
-
You need fine-grained control over timeout and error handling
When to use built-in async mode (useAsyncMode=true):
-
Standard use cases where automatic polling is sufficient
-
You want the simplest configuration
-
Default polling intervals and timeouts work for your needs
Custom polling workflows require Java processors and are more complex. The built-in async mode (useAsyncMode=true) is recommended for most use cases. |
Simple Manual Polling (Java)
The simplest custom workflow uses a Java loop to poll for status:
// Submit conversion
String taskId = template.requestBody(
"docling:convert?operation=SUBMIT_ASYNC_CONVERSION&useDoclingServe=true",
"/path/to/document.pdf", String.class);
// Poll for completion
ConversionStatus status;
int attempts = 0;
do {
Thread.sleep(1000);
status = template.requestBody(
"docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true",
taskId, ConversionStatus.class);
attempts++;
} while (status.isInProgress() && attempts < 60);
// Get result
if (status.isCompleted()) {
String result = status.getResult();
// Process result...
} Submit and Poll Pattern (Camel Route)
-
Java
-
YAML
// Submit async conversion and poll until complete
from("file:///data/documents?include=.*\\.pdf")
.log("Starting async conversion for: ${header.CamelFileName}")
// Step 1: Submit conversion
.to("docling:convert?operation=SUBMIT_ASYNC_CONVERSION&useDoclingServe=true")
.log("Submitted conversion with task ID: ${body}")
.setHeader("taskId", body())
.setProperty("maxAttempts", constant(60))
.setProperty("attempt", constant(0))
// Step 2: Poll for completion
.loopDoWhile(method(MyPollingHelper.class, "shouldContinuePolling"))
.process(exchange -> {
// Increment attempt counter
Integer attempt = exchange.getProperty("attempt", Integer.class);
exchange.setProperty("attempt", attempt != null ? attempt + 1 : 1);
})
.log("Polling attempt ${exchangeProperty.attempt} of ${exchangeProperty.maxAttempts}")
.setBody(header("taskId"))
.to("docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true")
.setProperty("conversionStatus", body())
.process(exchange -> {
ConversionStatus status = exchange.getProperty("conversionStatus", ConversionStatus.class);
if (status.isCompleted()) {
exchange.setProperty("isCompleted", true);
} else if (status.isFailed()) {
exchange.setProperty("isFailed", true);
exchange.setProperty("errorMessage", status.getErrorMessage());
}
})
.choice()
.when(exchangeProperty("isCompleted").isEqualTo(true))
.stop()
.when(exchangeProperty("isFailed").isEqualTo(true))
.throwException(new RuntimeException("Conversion failed"))
.end()
.delay(1000)
.end()
// Step 3: Extract result
.process(exchange -> {
ConversionStatus status = exchange.getProperty("conversionStatus", ConversionStatus.class);
if (status != null && status.isCompleted() && status.getResult() != null) {
exchange.getIn().setBody(status.getResult());
} else {
throw new RuntimeException("Conversion did not complete");
}
})
.to("file:///data/output");
// Helper class for loop condition
public class MyPollingHelper {
public static boolean shouldContinuePolling(Exchange exchange) {
Integer attempt = exchange.getProperty("attempt", Integer.class);
Integer maxAttempts = exchange.getProperty("maxAttempts", Integer.class);
Boolean isCompleted = exchange.getProperty("isCompleted", Boolean.class);
Boolean isFailed = exchange.getProperty("isFailed", Boolean.class);
if (Boolean.TRUE.equals(isCompleted) || Boolean.TRUE.equals(isFailed)) {
return false;
}
if (attempt != null && maxAttempts != null && attempt >= maxAttempts) {
return false;
}
return true;
}
} # Note: For YAML, consider using the built-in async mode (useAsyncMode=true)
# which handles polling automatically. Custom polling is easier in Java DSL.
- route:
id: async-with-custom-polling
from:
uri: file:///data/documents
parameters:
include: ".*\\.pdf"
steps:
- log:
message: "Starting async conversion for: ${header.CamelFileName}"
- to:
uri: docling:convert
parameters:
operation: "SUBMIT_ASYNC_CONVERSION"
useDoclingServe: true
- log:
message: "Submitted conversion with task ID: ${body}"
- setHeader:
name: "taskId"
expression:
simple:
expression: "${body}"
# For YAML, simpler to use Java processor bean or built-in async mode
- to:
uri: bean:asyncPollingProcessor
- to:
uri: file:///data/output ConversionStatus Object
The CHECK_CONVERSION_STATUS operation returns a ConversionStatus object with the following properties:
-
taskId (String) - The task identifier
-
status (enum) - PENDING, IN_PROGRESS, COMPLETED, FAILED, or UNKNOWN
-
result (String) - Converted document content (available when status is COMPLETED)
-
errorMessage (String) - Error details (available when status is FAILED)
-
progress (Integer) - Task queue position
Helper methods: - isCompleted() - Returns true if conversion completed successfully - isFailed() - Returns true if conversion failed - isInProgress() - Returns true if conversion is still processing
Parallel Processing with Custom Workflow
-
Java
-
YAML
// Submit multiple conversions
from("file:///data/documents?include=.*\\.pdf")
.to("docling:convert?operation=SUBMIT_ASYNC_CONVERSION&useDoclingServe=true")
.to("seda:task-queue");
// Process task queue with multiple threads
from("seda:task-queue?concurrentConsumers=5")
.log("Processing task: ${body}")
.setHeader("taskId", body())
.setProperty("maxAttempts", constant(60))
.setProperty("attempt", constant(0))
.loopDoWhile(method(MyPollingHelper.class, "shouldContinuePolling"))
.process(exchange -> {
Integer attempt = exchange.getProperty("attempt", Integer.class);
exchange.setProperty("attempt", attempt != null ? attempt + 1 : 1);
})
.setBody(header("taskId"))
.to("docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true")
.setProperty("conversionStatus", body())
.process(exchange -> {
ConversionStatus status = exchange.getProperty("conversionStatus", ConversionStatus.class);
if (status.isCompleted()) {
exchange.setProperty("isCompleted", true);
} else if (status.isFailed()) {
exchange.setProperty("isFailed", true);
}
})
.choice()
.when(exchangeProperty("isCompleted").isEqualTo(true))
.stop()
.when(exchangeProperty("isFailed").isEqualTo(true))
.stop()
.end()
.delay(1000)
.end()
.process(exchange -> {
ConversionStatus status = exchange.getProperty("conversionStatus", ConversionStatus.class);
if (status != null && status.isCompleted()) {
exchange.getIn().setBody(status.getResult());
}
})
.choice()
.when(body().isNotNull())
.to("file:///data/output?fileName=${header.CamelFileName}")
.end(); # For parallel processing in YAML, recommend using built-in async mode
# which is simpler and handles concurrency automatically
- route:
from:
uri: file:///data/documents
parameters:
include: ".*\\.pdf"
steps:
- to:
uri: docling:convert
parameters:
operation: "CONVERT_TO_MARKDOWN"
useDoclingServe: true
useAsyncMode: true
asyncPollInterval: 1000
asyncTimeout: 120000
contentInBody: true
- to:
uri: file:///data/output
parameters:
fileName: "${header.CamelFileName}" For a complete working example of custom polling workflow, see the testCustomPollingWorkflowWithRoute() test in DoclingServeProducerIT.java in the camel-docling test sources. |