AWS S3 - Streaming Upload

Streaming Upload mode

With the stream mode enabled, users will be able to upload data to S3 without knowing ahead of time the dimension of the data, by leveraging multipart upload. The upload will be completed when the batchSize has been completed or the batchMessageNumber has been reached. There are two possible naming strategies: progressive and random. With the progressive strategy, each file will have the name composed by keyName option and a progressive counter, and eventually the file extension (if any), while with the random strategy a UUID will be added after keyName and eventually the file extension will be appended.

Additionally, streaming upload mode supports timestamp-based file grouping, which allows messages to be automatically grouped into time windows based on their timestamps.

As an example:

Java-only: Endpoint DSL builder style
from(kafka("topic1").brokers("localhost:9092"))
        .log("Kafka Message is: ${body}")
        .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25)
                .namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive)
                .keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));

from(kafka("topic2").brokers("localhost:9092"))
         .log("Kafka Message is: ${body}")
         .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25)
                 .namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.random)
                 .keyName("{{kafkaTopic2}}/{{kafkaTopic2}}.txt"));

The default size for a batch is 1 Mb, but you can adjust it according to your requirements.

When you stop your producer route, the producer will take care of flushing the remaining buffered message and complete the upload.

In Streaming upload, you’ll be able to restart the producer from the point where it left. It’s important to note that this feature is critical only when using the progressive naming strategy.

By setting the restartingPolicy to lastPart, you will restart uploading files and contents from the last part number the producer left.

As example: - Start the route with progressive naming strategy and keyname equals to camel.txt, with batchMessageNumber equals to 20, and restartingPolicy equals to lastPart - Send 70 messages. - Stop the route - On your S3 bucket you should now see four files: camel.txt, camel-1.txt, camel-2.txt and camel-3.txt, the first three will have 20 messages, while the last one is only 10. - Restart the route - Send 25 messages - Stop the route - You’ll now have two other files in your bucket: camel-5.txt and camel-6.txt, the first with 20 messages and the second with 5 messages. - Go ahead

This won’t be needed when using the random naming strategy.

On the opposite, you can specify the override restartingPolicy. In that case, you’ll be able to override whatever you written before (for that particular keyName) in your bucket.

In Streaming upload mode, the only keyName option that will be taken into account is the endpoint option. Using the header will throw an NPE and this is done by design. Setting the header means potentially change the file name on each exchange, and this is against the aim of the streaming upload producer. The keyName needs to be fixed and static. The selected naming strategy will do the rest of the work.

Another possibility is specifying a streamingUploadTimeout with batchMessageNumber and batchSize options. With this option, the user will be able to complete the upload of a file after a certain time passed. In this way, the upload completion will be passed on three tiers: the timeout, the number of messages and the batch size.

As an example:

Java-only: Endpoint DSL builder style
from(kafka("topic1").brokers("localhost:9092"))
        .log("Kafka Message is: ${body}")
        .to(aws2S3("camel-bucket").streamingUploadMode(true).batchMessageNumber(25)
                .streamingUploadTimeout(10000)
                .namingStrategy(AWS2S3EndpointBuilderFactory.AWSS3NamingStrategyEnum.progressive)
                .keyName("{{kafkaTopic1}}/{{kafkaTopic1}}.txt"));

In this case, the upload will be completed after 10 seconds.

Timestamp Grouping

The streaming upload mode also supports timestamp-based file grouping, which allows messages to be automatically grouped into time windows and written to the same S3 file based on their timestamps. This feature enables append-like behavior where messages with timestamps falling within the same time window are combined into a single file.

To enable timestamp grouping, use the following configuration options:

  • timestampGroupingEnabled: Set to true to enable timestamp-based grouping (default: false)

  • timestampWindowSizeMillis: The size of the time window in milliseconds (default: 300000 = 5 minutes)

  • timestampHeaderName: The name of the message header containing the timestamp (default: Exchange.MESSAGE_TIMESTAMP)

Messages are grouped based on their timestamps extracted from the specified header. The timestamp can be provided as:

  • Long: Unix timestamp in milliseconds

  • Date: Java Date object

  • String: String representation of Unix timestamp in milliseconds

Files are automatically named using a timestamp-based pattern that includes the time window information.

Example Configuration

Basic timestamp grouping with 5-minute windows:

  • Java

  • XML

  • YAML

from("timer:messages?period=10000")
    .setHeader(Exchange.MESSAGE_TIMESTAMP, simple("${date:now:timestamp}"))
    .setBody(constant("Message with timestamp"))
    .to("aws2-s3://my-bucket?streamingUploadMode=true&timestampGroupingEnabled=true&timestampWindowSizeMillis=300000&keyName=grouped-messages.txt");
<route>
  <from uri="timer:messages?period=10000"/>
  <setHeader name="CamelMessageTimestamp">
    <simple>${date:now:timestamp}</simple>
  </setHeader>
  <setBody>
    <constant>Message with timestamp</constant>
  </setBody>
  <to uri="aws2-s3://my-bucket?streamingUploadMode=true&amp;timestampGroupingEnabled=true&amp;timestampWindowSizeMillis=300000&amp;keyName=grouped-messages.txt"/>
</route>
- route:
    from:
      uri: timer:messages
      parameters:
        period: 10000
      steps:
        - setHeader:
            name: CamelMessageTimestamp
            simple: "${date:now:timestamp}"
        - setBody:
            constant: Message with timestamp
        - to:
            uri: aws2-s3://my-bucket
            parameters:
              streamingUploadMode: true
              timestampGroupingEnabled: true
              timestampWindowSizeMillis: 300000
              keyName: grouped-messages.txt

Custom window size (1 minute) with custom header name:

  • Java

  • XML

  • YAML

from("direct:timestamped")
    .setHeader("MyTimestamp", simple("${date:now:timestamp}"))
    .setBody(constant("Custom timestamped message"))
    .to("aws2-s3://my-bucket?streamingUploadMode=true&timestampGroupingEnabled=true&timestampWindowSizeMillis=60000&timestampHeaderName=MyTimestamp&keyName=custom-grouped.txt");
<route>
  <from uri="direct:timestamped"/>
  <setHeader name="MyTimestamp">
    <simple>${date:now:timestamp}</simple>
  </setHeader>
  <setBody>
    <constant>Custom timestamped message</constant>
  </setBody>
  <to uri="aws2-s3://my-bucket?streamingUploadMode=true&amp;timestampGroupingEnabled=true&amp;timestampWindowSizeMillis=60000&amp;timestampHeaderName=MyTimestamp&amp;keyName=custom-grouped.txt"/>
</route>
- route:
    from:
      uri: direct:timestamped
      steps:
        - setHeader:
            name: MyTimestamp
            simple: "${date:now:timestamp}"
        - setBody:
            constant: Custom timestamped message
        - to:
            uri: aws2-s3://my-bucket
            parameters:
              streamingUploadMode: true
              timestampGroupingEnabled: true
              timestampWindowSizeMillis: 60000
              timestampHeaderName: MyTimestamp
              keyName: custom-grouped.txt

Large files with multipart and timestamp grouping:

  • Java

  • XML

  • YAML

from("direct:large-timestamped")
    .setHeader(Exchange.MESSAGE_TIMESTAMP, simple("${date:now:timestamp}"))
    .setBody(constant("Large message content..."))
    .to("aws2-s3://my-bucket?streamingUploadMode=true&timestampGroupingEnabled=true&timestampWindowSizeMillis=1800000&multiPartUpload=true&partSize=5242880&keyName=large-grouped.txt");
<route>
  <from uri="direct:large-timestamped"/>
  <setHeader name="CamelMessageTimestamp">
    <simple>${date:now:timestamp}</simple>
  </setHeader>
  <setBody>
    <constant>Large message content...</constant>
  </setBody>
  <to uri="aws2-s3://my-bucket?streamingUploadMode=true&amp;timestampGroupingEnabled=true&amp;timestampWindowSizeMillis=1800000&amp;multiPartUpload=true&amp;partSize=5242880&amp;keyName=large-grouped.txt"/>
</route>
- route:
    from:
      uri: direct:large-timestamped
      steps:
        - setHeader:
            name: CamelMessageTimestamp
            simple: "${date:now:timestamp}"
        - setBody:
            constant: "Large message content..."
        - to:
            uri: aws2-s3://my-bucket
            parameters:
              streamingUploadMode: true
              timestampGroupingEnabled: true
              timestampWindowSizeMillis: 1800000
              multiPartUpload: true
              partSize: 5242880
              keyName: large-grouped.txt

File Naming

Files are automatically named using the following pattern:

{baseFileName}_{YYYYMMDD}_{HHMM}_{HHMM-HHMM}.{extension}

For time windows smaller than 1 minute, seconds precision is used:

{baseFileName}_{YYYYMMDD}_{HHMMSS}_{HHMMSS-HHMMSS}.{extension}

Example file names: - 5-minute window: messages_20240101_0800_0800-0805.txt - 1-minute window: data_20240315_1430_1430-1431.log - 5-second window: events_20241225_000000_000000-000005.json

Time Window Examples

With a 5-minute window (300000ms): - Window 1: 08:00:00 - 08:04:59 → file_20240101_0800_0800-0805.txt - Window 2: 08:05:00 - 08:09:59 → file_20240101_0805_0805-0810.txt - Window 3: 08:10:00 - 08:14:59 → file_20240101_0810_0810-0815.txt

Fallback Behavior

  • If no timestamp header is found, the current system time is used

  • If the timestamp header contains invalid data, the current system time is used

  • Warning messages are logged when fallback behavior occurs

Performance Considerations

  • Multiple concurrent time windows are supported

  • Each window maintains its own upload state

  • Memory usage scales with the number of active time windows

  • Completed windows are automatically cleaned up

  • Works with all existing streaming upload features (multipart uploads, timeouts, etc.)