Keycloak Consumer Operations

The Keycloak consumer allows you to poll and consume events from a Keycloak instance. This is useful for monitoring user activities, admin actions, and implementing event-driven workflows based on Keycloak events.

For an overview of the Keycloak component, see Keycloak Component.

Overview

The consumer supports two types of events:

  • User Events (events) - Login attempts, logout events, register events, etc.

  • Admin Events (admin-events) - User created, role assigned, realm updated, etc.

The consumer uses a polling mechanism with fingerprint-based deduplication to ensure events are not processed multiple times.

Configuration

The consumer supports the same authentication methods as the producer (access token, refresh token, username/password, or client credentials):

  • Java (Access Token)

  • Java (Username/Password)

  • YAML

// Configure Keycloak component for consuming with access token
KeycloakComponent keycloak = context.getComponent("keycloak", KeycloakComponent.class);
KeycloakConfiguration config = new KeycloakConfiguration();
config.setServerUrl("http://localhost:8080");
config.setRealm("master"); // Auth realm
config.setAccessToken("eyJhbGciOiJSUzI1NiIsInR5cC...");
keycloak.setConfiguration(config);
// Configure Keycloak component for consuming with username/password
KeycloakComponent keycloak = context.getComponent("keycloak", KeycloakComponent.class);
KeycloakConfiguration config = new KeycloakConfiguration();
config.setServerUrl("http://localhost:8080");
config.setRealm("master"); // Auth realm
config.setUsername("admin");
config.setPassword("admin");
keycloak.setConfiguration(config);
# Configuration in application.yaml (using access token)
camel:
  component:
    keycloak:
      server-url: "http://localhost:8080"
      realm: "master"
      access-token: "eyJhbGciOiJSUzI1NiIsInR5cC..."

# Or using username/password
camel:
  component:
    keycloak:
      server-url: "http://localhost:8080"
      realm: "master"
      username: "admin"
      password: "admin"

Consuming Admin Events

Admin events are generated when administrative operations are performed in Keycloak, such as creating users, assigning roles, or updating realm settings.

  • Java

  • YAML

// Consume admin events from a specific realm
from("keycloak:adminEvents"
     + "?realm=my-realm"
     + "&eventType=admin-events"
     + "&maxResults=50"
     + "&initialDelay=1000"
     + "&delay=5000")
    .log("Received admin event: ${body}")
    .choice()
        .when(simple("${body.operationType} == 'CREATE'"))
            .log("Resource created: ${body.resourceType} at ${body.resourcePath}")
        .when(simple("${body.operationType} == 'UPDATE'"))
            .log("Resource updated: ${body.resourceType}")
        .when(simple("${body.operationType} == 'DELETE'"))
            .log("Resource deleted: ${body.resourceType}")
        .otherwise()
            .log("Other operation: ${body.operationType}")
    .end()
    .to("direct:process-admin-event");

// Process specific admin events
from("direct:process-admin-event")
    .process(exchange -> {
        AdminEventRepresentation event = exchange.getIn().getBody(AdminEventRepresentation.class);

        // Access event details
        String operationType = event.getOperationType();
        String resourceType = event.getResourceType();
        String resourcePath = event.getResourcePath();
        long timestamp = event.getTime();

        // Get auth details
        if (event.getAuthDetails() != null) {
            String userId = event.getAuthDetails().getUserId();
            String realmId = event.getAuthDetails().getRealmId();
            log.info("Operation performed by user: {} in realm: {}", userId, realmId);
        }

        // Process the event
        log.info("Processing {} operation on {} at {}", operationType, resourceType, timestamp);
    });
# Consume admin events
- route:
    id: consume-admin-events
    from:
      uri: >
        keycloak:adminEvents?
        realm=my-realm&
        eventType=admin-events&
        maxResults=50&
        initialDelay=1000&
        delay=5000
      steps:
        - log:
            message: "Received admin event: ${body}"
        - choice:
            when:
              - expression:
                  simple:
                    expression: "${body.operationType} == 'CREATE'"
                steps:
                  - log:
                      message: "Resource created: ${body.resourceType} at ${body.resourcePath}"
              - expression:
                  simple:
                    expression: "${body.operationType} == 'UPDATE'"
                steps:
                  - log:
                      message: "Resource updated: ${body.resourceType}"
              - expression:
                  simple:
                    expression: "${body.operationType} == 'DELETE'"
                steps:
                  - log:
                      message: "Resource deleted: ${body.resourceType}"
            otherwise:
              steps:
                - log:
                    message: "Other operation: ${body.operationType}"
        - to:
            uri: direct:process-admin-event

# Process admin events
- route:
    id: process-admin-events
    from:
      uri: direct:process-admin-event
      steps:
        - log:
            message: "Processing admin event: ${body.operationType} on ${body.resourceType}"
        - to:
            uri: bean:auditService
            parameters:
              method: recordAdminEvent

Consuming User Events

User events track user activities such as logins, logouts, registration, password changes, and more.

  • Java

  • YAML

// Consume user events from a specific realm
from("keycloak:userEvents"
     + "?realm=my-realm"
     + "&eventType=events"
     + "&maxResults=50"
     + "&initialDelay=1000"
     + "&delay=5000")
    .log("Received user event: ${body}")
    .choice()
        .when(simple("${body.type} == 'LOGIN'"))
            .log("User logged in: ${body.userId} from IP ${body.ipAddress}")
            .to("direct:handle-login")
        .when(simple("${body.type} == 'LOGIN_ERROR'"))
            .log("Failed login attempt: ${body.userId}")
            .to("direct:handle-failed-login")
        .when(simple("${body.type} == 'LOGOUT'"))
            .log("User logged out: ${body.userId}")
            .to("direct:handle-logout")
        .when(simple("${body.type} == 'REGISTER'"))
            .log("New user registered: ${body.userId}")
            .to("direct:handle-registration")
        .otherwise()
            .log("Other event: ${body.type}")
    .end();

// Handle login events
from("direct:handle-login")
    .process(exchange -> {
        EventRepresentation event = exchange.getIn().getBody(EventRepresentation.class);

        String userId = event.getUserId();
        String ipAddress = event.getIpAddress();
        long timestamp = event.getTime();

        // Access event details
        if (event.getDetails() != null) {
            String username = event.getDetails().get("username");
            log.info("User {} logged in from {} at {}", username, ipAddress, timestamp);
        }
    })
    .to("bean:analyticsService?method=recordLogin");

// Handle failed login attempts
from("direct:handle-failed-login")
    .process(exchange -> {
        EventRepresentation event = exchange.getIn().getBody(EventRepresentation.class);
        String ipAddress = event.getIpAddress();

        // Check for suspicious activity
        log.warn("Failed login attempt from IP: {}", ipAddress);
    })
    .to("bean:securityService?method=checkFailedAttempts");
# Consume user events
- route:
    id: consume-user-events
    from:
      uri: >
        keycloak:userEvents?
        realm=my-realm&
        eventType=events&
        maxResults=50&
        initialDelay=1000&
        delay=5000
      steps:
        - log:
            message: "Received user event: ${body}"
        - choice:
            when:
              - expression:
                  simple:
                    expression: "${body.type} == 'LOGIN'"
                steps:
                  - log:
                      message: "User logged in: ${body.userId} from IP ${body.ipAddress}"
                  - to:
                      uri: direct:handle-login
              - expression:
                  simple:
                    expression: "${body.type} == 'LOGIN_ERROR'"
                steps:
                  - log:
                      message: "Failed login attempt: ${body.userId}"
                  - to:
                      uri: direct:handle-failed-login
              - expression:
                  simple:
                    expression: "${body.type} == 'LOGOUT'"
                steps:
                  - log:
                      message: "User logged out: ${body.userId}"
                  - to:
                      uri: direct:handle-logout
              - expression:
                  simple:
                    expression: "${body.type} == 'REGISTER'"
                steps:
                  - log:
                      message: "New user registered: ${body.userId}"
                  - to:
                      uri: direct:handle-registration
            otherwise:
              steps:
                - log:
                    message: "Other event: ${body.type}"

# Handle login events
- route:
    id: handle-login
    from:
      uri: direct:handle-login
      steps:
        - log:
            message: "Processing login event for user ${body.userId}"
        - to:
            uri: bean:analyticsService
            parameters:
              method: recordLogin

# Handle failed login
- route:
    id: handle-failed-login
    from:
      uri: direct:handle-failed-login
      steps:
        - log:
            message: "Processing failed login from ${body.ipAddress}"
        - to:
            uri: bean:securityService
            parameters:
              method: checkFailedAttempts

Filtering Events

You can filter events using various options to narrow down the events you want to consume:

  • Java

  • YAML

// Filter admin events by operation type
from("keycloak:adminEvents"
     + "?realm=my-realm"
     + "&eventType=admin-events"
     + "&operationTypes=CREATE,UPDATE,DELETE"
     + "&maxResults=100")
    .log("Filtered admin event: ${body}");

// Filter user events by type
from("keycloak:userEvents"
     + "?realm=my-realm"
     + "&eventType=events"
     + "&types=LOGIN,LOGOUT,REGISTER"
     + "&maxResults=100")
    .log("Filtered user event: ${body}");

// Filter by date range
from("keycloak:adminEvents"
     + "?realm=my-realm"
     + "&eventType=admin-events"
     + "&dateFrom=1609459200000"  // milliseconds since epoch
     + "&dateTo=1640995200000"
     + "&maxResults=100")
    .log("Events in date range: ${body}");

// Filter by user and client
from("keycloak:userEvents"
     + "?realm=my-realm"
     + "&eventType=events"
     + "&user=user-id-123"
     + "&client=my-client-id"
     + "&ipAddress=192.168.1.100"
     + "&maxResults=50")
    .log("Specific user events: ${body}");
# Filter admin events by operation type
- route:
    from:
      uri: >
        keycloak:adminEvents?
        realm=my-realm&
        eventType=admin-events&
        operationTypes=CREATE,UPDATE,DELETE&
        maxResults=100
      steps:
        - log:
            message: "Filtered admin event: ${body}"

# Filter user events by type
- route:
    from:
      uri: >
        keycloak:userEvents?
        realm=my-realm&
        eventType=events&
        types=LOGIN,LOGOUT,REGISTER&
        maxResults=100
      steps:
        - log:
            message: "Filtered user event: ${body}"

# Filter by date range
- route:
    from:
      uri: >
        keycloak:adminEvents?
        realm=my-realm&
        eventType=admin-events&
        dateFrom=1609459200000&
        dateTo=1640995200000&
        maxResults=100
      steps:
        - log:
            message: "Events in date range: ${body}"

# Filter by user and client
- route:
    from:
      uri: >
        keycloak:userEvents?
        realm=my-realm&
        eventType=events&
        user=user-id-123&
        client=my-client-id&
        ipAddress=192.168.1.100&
        maxResults=50
      steps:
        - log:
            message: "Specific user events: ${body}"

Event Processing Patterns

Audit Trail

  • Java

  • YAML

// Create comprehensive audit trail from admin events
from("keycloak:adminEvents"
     + "?realm=my-realm"
     + "&eventType=admin-events"
     + "&maxResults=100"
     + "&delay=10000")
    .process(exchange -> {
        AdminEventRepresentation event = exchange.getIn().getBody(AdminEventRepresentation.class);

        // Build audit record
        Map<String, Object> auditRecord = new HashMap<>();
        auditRecord.put("timestamp", new Date(event.getTime()));
        auditRecord.put("operation", event.getOperationType());
        auditRecord.put("resourceType", event.getResourceType());
        auditRecord.put("resourcePath", event.getResourcePath());

        if (event.getAuthDetails() != null) {
            auditRecord.put("userId", event.getAuthDetails().getUserId());
            auditRecord.put("ipAddress", event.getAuthDetails().getIpAddress());
        }

        exchange.getIn().setBody(auditRecord);
    })
    .marshal().json()
    .to("kafka:audit-trail?brokers=localhost:9092")
    .to("jdbc:dataSource?useHeadersAsParameters=true");
# Audit trail pattern
- route:
    from:
      uri: >
        keycloak:adminEvents?
        realm=my-realm&
        eventType=admin-events&
        maxResults=100&
        delay=10000
      steps:
        - setBody:
            simple: >
              {
                "timestamp": ${body.time},
                "operation": "${body.operationType}",
                "resourceType": "${body.resourceType}",
                "resourcePath": "${body.resourcePath}",
                "userId": "${body.authDetails.userId}",
                "ipAddress": "${body.authDetails.ipAddress}"
              }
        - marshal:
            json: {}
        - to:
            uri: kafka:audit-trail
            parameters:
              brokers: "localhost:9092"
        - to:
            uri: jdbc:dataSource
            parameters:
              useHeadersAsParameters: true

Security Monitoring

  • Java

  • YAML

// Monitor for security-relevant events
from("keycloak:userEvents"
     + "?realm=my-realm"
     + "&eventType=events"
     + "&types=LOGIN_ERROR,UPDATE_PASSWORD,UPDATE_EMAIL"
     + "&maxResults=50"
     + "&delay=5000")
    .filter(simple("${body.type} == 'LOGIN_ERROR'"))
    .aggregate(simple("${body.ipAddress}"), new ArrayListAggregationStrategy())
        .completionSize(5)      // 5 failed attempts
        .completionTimeout(300000) // within 5 minutes
    .process(exchange -> {
        List<EventRepresentation> failedAttempts = exchange.getIn().getBody(List.class);
        String ipAddress = failedAttempts.get(0).getIpAddress();

        log.warn("SECURITY ALERT: {} failed login attempts from IP: {}",
                 failedAttempts.size(), ipAddress);
    })
    .to("direct:block-ip")
    .to("direct:send-security-alert");
# Security monitoring pattern
- route:
    from:
      uri: >
        keycloak:userEvents?
        realm=my-realm&
        eventType=events&
        types=LOGIN_ERROR,UPDATE_PASSWORD,UPDATE_EMAIL&
        maxResults=50&
        delay=5000
      steps:
        - filter:
            expression:
              simple:
                expression: "${body.type} == 'LOGIN_ERROR'"
        - aggregate:
            correlationExpression:
              expression:
                simple:
                  expression: "${body.ipAddress}"
            aggregationStrategy: "#arrayListAggregation"
            completionSize: 5
            completionTimeout: 300000
            steps:
              - log:
                  message: "SECURITY ALERT: Multiple failed login attempts from ${body[0].ipAddress}"
              - to:
                  uri: direct:block-ip
              - to:
                  uri: direct:send-security-alert

User Activity Analytics

  • Java

  • YAML

// Track user activity for analytics
from("keycloak:userEvents"
     + "?realm=my-realm"
     + "&eventType=events"
     + "&types=LOGIN,LOGOUT"
     + "&maxResults=100"
     + "&delay=60000")
    .process(exchange -> {
        EventRepresentation event = exchange.getIn().getBody(EventRepresentation.class);

        // Extract analytics data
        Map<String, Object> analytics = new HashMap<>();
        analytics.put("userId", event.getUserId());
        analytics.put("eventType", event.getType());
        analytics.put("timestamp", new Date(event.getTime()));
        analytics.put("ipAddress", event.getIpAddress());
        analytics.put("sessionId", event.getSessionId());

        if (event.getDetails() != null) {
            analytics.put("username", event.getDetails().get("username"));
            analytics.put("clientId", event.getDetails().get("client_id"));
        }

        exchange.getIn().setBody(analytics);
    })
    .to("bean:analyticsService?method=recordActivity")
    .to("elasticsearch://keycloak-events?operation=Index&indexName=user-activity");
# User activity analytics
- route:
    from:
      uri: >
        keycloak:userEvents?
        realm=my-realm&
        eventType=events&
        types=LOGIN,LOGOUT&
        maxResults=100&
        delay=60000
      steps:
        - setBody:
            simple: >
              {
                "userId": "${body.userId}",
                "eventType": "${body.type}",
                "timestamp": ${body.time},
                "ipAddress": "${body.ipAddress}",
                "sessionId": "${body.sessionId}"
              }
        - to:
            uri: bean:analyticsService
            parameters:
              method: recordActivity
        - to:
            uri: elasticsearch://keycloak-events
            parameters:
              operation: Index
              indexName: user-activity

Consumer Options

The consumer supports the following configuration options:

Option Default Description

realm

The Keycloak realm to consume events from (required)

eventType

events

Type of events to consume: events or admin-events

maxResults

100

Maximum number of events to retrieve per poll

first

0

Offset for pagination (first result index)

initialDelay

1000

Delay before first poll (milliseconds)

delay

500

Delay between polls (milliseconds)

Common Filter Options

client

Filter events by client ID

user

Filter events by user ID

dateFrom

Filter events from this timestamp (milliseconds since epoch)

dateTo

Filter events until this timestamp (milliseconds since epoch)

ipAddress

Filter events by IP address

User Event Filters

types

Filter by event types (comma-separated, e.g., LOGIN,LOGOUT,REGISTER)

Admin Event Filters

operationTypes

Filter by operation types (comma-separated, e.g., CREATE,UPDATE,DELETE)

authRealmFilter

Filter by authentication realm

authClient

Filter by authentication client ID

authUser

Filter by authentication user ID

authIpAddress

Filter by authentication IP address

resourcePath

Filter by resource path

Exchange Headers

The consumer sets the following headers on the exchange:

Header Description

CamelKeycloakEventType

Type of event: event or admin-event

CamelKeycloakEventId

Event timestamp (milliseconds since epoch)

CamelKeycloakRealmName

Realm name where the event occurred

Message Body

The message body contains:

  • For user events: org.keycloak.representations.idm.EventRepresentation

  • For admin events: org.keycloak.representations.idm.AdminEventRepresentation

Complete Consumer Example

  • Java

  • YAML

// Configure Keycloak component
KeycloakComponent keycloak = getContext().getComponent("keycloak", KeycloakComponent.class);
KeycloakConfiguration config = new KeycloakConfiguration();
config.setServerUrl("http://localhost:8080");
config.setRealm("master");
config.setUsername("admin");
config.setPassword("admin");
keycloak.setConfiguration(config);

// Consume admin events and send to audit system
from("keycloak:adminEvents"
             + "?realm=production-realm"
             + "&eventType=admin-events"
             + "&operationTypes=CREATE,UPDATE,DELETE"
             + "&maxResults=100"
             + "&delay=10000")
            .routeId("admin-events-audit")
            .log("Admin event: ${body.operationType} on ${body.resourceType}")
            .to("direct:audit-trail");

        // Consume user login events for analytics
        from("keycloak:userEvents"
             + "?realm=production-realm"
             + "&eventType=events"
             + "&types=LOGIN,LOGOUT"
             + "&maxResults=50"
             + "&delay=30000")
            .routeId("user-activity-tracking")
            .log("User activity: ${body.type} for user ${body.userId}")
            .to("direct:analytics");

        // Monitor failed logins for security
        from("keycloak:userEvents"
             + "?realm=production-realm"
             + "&eventType=events"
             + "&types=LOGIN_ERROR"
             + "&maxResults=100"
             + "&delay=5000")
            .routeId("security-monitoring")
            .log("Failed login from IP: ${body.ipAddress}")
            .to("direct:security-check");

        // Process audit trail
        from("direct:audit-trail")
            .marshal().json()
            .to("kafka:admin-audit?brokers=localhost:9092")
            .to("log:audit");

        // Process analytics
        from("direct:analytics")
            .to("bean:analyticsService?method=processUserActivity")
            .to("log:analytics");

// Process security alerts
from("direct:security-check")
    .to("bean:securityService?method=checkFailedLogin")
    .to("log:security");
# Component configuration
camel:
  component:
    keycloak:
      server-url: "http://localhost:8080"
      realm: "master"
      username: "admin"
      password: "admin"

# Routes
- route:
    id: admin-events-audit
    from:
      uri: >
        keycloak:adminEvents?
        realm=production-realm&
        eventType=admin-events&
        operationTypes=CREATE,UPDATE,DELETE&
        maxResults=100&
        delay=10000
      steps:
        - log:
            message: "Admin event: ${body.operationType} on ${body.resourceType}"
        - to:
            uri: direct:audit-trail

- route:
    id: user-activity-tracking
    from:
      uri: >
        keycloak:userEvents?
        realm=production-realm&
        eventType=events&
        types=LOGIN,LOGOUT&
        maxResults=50&
        delay=30000
      steps:
        - log:
            message: "User activity: ${body.type} for user ${body.userId}"
        - to:
            uri: direct:analytics

- route:
    id: security-monitoring
    from:
      uri: >
        keycloak:userEvents?
        realm=production-realm&
        eventType=events&
        types=LOGIN_ERROR&
        maxResults=100&
        delay=5000
      steps:
        - log:
            message: "Failed login from IP: ${body.ipAddress}"
        - to:
            uri: direct:security-check

# Processing routes
- route:
    id: process-audit-trail
    from:
      uri: direct:audit-trail
      steps:
        - marshal:
            json: {}
        - to:
            uri: kafka:admin-audit
            parameters:
              brokers: "localhost:9092"
        - to:
            uri: log:audit

- route:
    id: process-analytics
    from:
      uri: direct:analytics
      steps:
        - to:
            uri: bean:analyticsService
            parameters:
              method: processUserActivity
        - to:
            uri: log:analytics

- route:
    id: process-security-check
    from:
      uri: direct:security-check
      steps:
        - to:
            uri: bean:securityService
            parameters:
              method: checkFailedLogin
        - to:
            uri: log:security

Enabling Events in Keycloak

Before consuming events, you must enable event logging in Keycloak:

Enable Admin Events

  1. Login to Keycloak Admin Console

  2. Select your realm

  3. Go to Realm SettingsEvents tab

  4. In Admin Events Settings section:

    • Toggle Save Events: ON

    • Toggle Include Representation: ON (optional, for detailed event data)

Enable User Events

  1. In the same Events tab

  2. In User Events Settings section:

    • Toggle Save Events: ON

    • Set Expiration (e.g., 30 days)

    • Add Event Listeners: jboss-logging (default)

    • Select Saved Types: Choose which event types to save (LOGIN, LOGOUT, etc.)

Event Deduplication

The consumer uses fingerprint-based deduplication to prevent processing the same event multiple times:

  • Events are uniquely identified by combining timestamp and event-specific properties

  • Fingerprints are cached per timestamp and cleared when moving to newer timestamps

  • Maximum cache size is 1000 fingerprints to prevent memory issues

  • This ensures reliable event processing even with high event volumes