Keycloak Consumer Operations
The Keycloak consumer allows you to poll and consume events from a Keycloak instance. This is useful for monitoring user activities, admin actions, and implementing event-driven workflows based on Keycloak events.
For an overview of the Keycloak component, see Keycloak Component.
Overview
The consumer supports two types of events:
-
User Events (
events) - Login attempts, logout events, register events, etc. -
Admin Events (
admin-events) - User created, role assigned, realm updated, etc.
The consumer uses a polling mechanism with fingerprint-based deduplication to ensure events are not processed multiple times.
Configuration
The consumer supports the same authentication methods as the producer (access token, refresh token, username/password, or client credentials):
-
Java (Access Token)
-
Java (Username/Password)
-
YAML
// Configure Keycloak component for consuming with access token
KeycloakComponent keycloak = context.getComponent("keycloak", KeycloakComponent.class);
KeycloakConfiguration config = new KeycloakConfiguration();
config.setServerUrl("http://localhost:8080");
config.setRealm("master"); // Auth realm
config.setAccessToken("eyJhbGciOiJSUzI1NiIsInR5cC...");
keycloak.setConfiguration(config); // Configure Keycloak component for consuming with username/password
KeycloakComponent keycloak = context.getComponent("keycloak", KeycloakComponent.class);
KeycloakConfiguration config = new KeycloakConfiguration();
config.setServerUrl("http://localhost:8080");
config.setRealm("master"); // Auth realm
config.setUsername("admin");
config.setPassword("admin");
keycloak.setConfiguration(config); # Configuration in application.yaml (using access token)
camel:
component:
keycloak:
server-url: "http://localhost:8080"
realm: "master"
access-token: "eyJhbGciOiJSUzI1NiIsInR5cC..."
# Or using username/password
camel:
component:
keycloak:
server-url: "http://localhost:8080"
realm: "master"
username: "admin"
password: "admin" Consuming Admin Events
Admin events are generated when administrative operations are performed in Keycloak, such as creating users, assigning roles, or updating realm settings.
-
Java
-
YAML
// Consume admin events from a specific realm
from("keycloak:adminEvents"
+ "?realm=my-realm"
+ "&eventType=admin-events"
+ "&maxResults=50"
+ "&initialDelay=1000"
+ "&delay=5000")
.log("Received admin event: ${body}")
.choice()
.when(simple("${body.operationType} == 'CREATE'"))
.log("Resource created: ${body.resourceType} at ${body.resourcePath}")
.when(simple("${body.operationType} == 'UPDATE'"))
.log("Resource updated: ${body.resourceType}")
.when(simple("${body.operationType} == 'DELETE'"))
.log("Resource deleted: ${body.resourceType}")
.otherwise()
.log("Other operation: ${body.operationType}")
.end()
.to("direct:process-admin-event");
// Process specific admin events
from("direct:process-admin-event")
.process(exchange -> {
AdminEventRepresentation event = exchange.getIn().getBody(AdminEventRepresentation.class);
// Access event details
String operationType = event.getOperationType();
String resourceType = event.getResourceType();
String resourcePath = event.getResourcePath();
long timestamp = event.getTime();
// Get auth details
if (event.getAuthDetails() != null) {
String userId = event.getAuthDetails().getUserId();
String realmId = event.getAuthDetails().getRealmId();
log.info("Operation performed by user: {} in realm: {}", userId, realmId);
}
// Process the event
log.info("Processing {} operation on {} at {}", operationType, resourceType, timestamp);
}); # Consume admin events
- route:
id: consume-admin-events
from:
uri: >
keycloak:adminEvents?
realm=my-realm&
eventType=admin-events&
maxResults=50&
initialDelay=1000&
delay=5000
steps:
- log:
message: "Received admin event: ${body}"
- choice:
when:
- expression:
simple:
expression: "${body.operationType} == 'CREATE'"
steps:
- log:
message: "Resource created: ${body.resourceType} at ${body.resourcePath}"
- expression:
simple:
expression: "${body.operationType} == 'UPDATE'"
steps:
- log:
message: "Resource updated: ${body.resourceType}"
- expression:
simple:
expression: "${body.operationType} == 'DELETE'"
steps:
- log:
message: "Resource deleted: ${body.resourceType}"
otherwise:
steps:
- log:
message: "Other operation: ${body.operationType}"
- to:
uri: direct:process-admin-event
# Process admin events
- route:
id: process-admin-events
from:
uri: direct:process-admin-event
steps:
- log:
message: "Processing admin event: ${body.operationType} on ${body.resourceType}"
- to:
uri: bean:auditService
parameters:
method: recordAdminEvent Consuming User Events
User events track user activities such as logins, logouts, registration, password changes, and more.
-
Java
-
YAML
// Consume user events from a specific realm
from("keycloak:userEvents"
+ "?realm=my-realm"
+ "&eventType=events"
+ "&maxResults=50"
+ "&initialDelay=1000"
+ "&delay=5000")
.log("Received user event: ${body}")
.choice()
.when(simple("${body.type} == 'LOGIN'"))
.log("User logged in: ${body.userId} from IP ${body.ipAddress}")
.to("direct:handle-login")
.when(simple("${body.type} == 'LOGIN_ERROR'"))
.log("Failed login attempt: ${body.userId}")
.to("direct:handle-failed-login")
.when(simple("${body.type} == 'LOGOUT'"))
.log("User logged out: ${body.userId}")
.to("direct:handle-logout")
.when(simple("${body.type} == 'REGISTER'"))
.log("New user registered: ${body.userId}")
.to("direct:handle-registration")
.otherwise()
.log("Other event: ${body.type}")
.end();
// Handle login events
from("direct:handle-login")
.process(exchange -> {
EventRepresentation event = exchange.getIn().getBody(EventRepresentation.class);
String userId = event.getUserId();
String ipAddress = event.getIpAddress();
long timestamp = event.getTime();
// Access event details
if (event.getDetails() != null) {
String username = event.getDetails().get("username");
log.info("User {} logged in from {} at {}", username, ipAddress, timestamp);
}
})
.to("bean:analyticsService?method=recordLogin");
// Handle failed login attempts
from("direct:handle-failed-login")
.process(exchange -> {
EventRepresentation event = exchange.getIn().getBody(EventRepresentation.class);
String ipAddress = event.getIpAddress();
// Check for suspicious activity
log.warn("Failed login attempt from IP: {}", ipAddress);
})
.to("bean:securityService?method=checkFailedAttempts"); # Consume user events
- route:
id: consume-user-events
from:
uri: >
keycloak:userEvents?
realm=my-realm&
eventType=events&
maxResults=50&
initialDelay=1000&
delay=5000
steps:
- log:
message: "Received user event: ${body}"
- choice:
when:
- expression:
simple:
expression: "${body.type} == 'LOGIN'"
steps:
- log:
message: "User logged in: ${body.userId} from IP ${body.ipAddress}"
- to:
uri: direct:handle-login
- expression:
simple:
expression: "${body.type} == 'LOGIN_ERROR'"
steps:
- log:
message: "Failed login attempt: ${body.userId}"
- to:
uri: direct:handle-failed-login
- expression:
simple:
expression: "${body.type} == 'LOGOUT'"
steps:
- log:
message: "User logged out: ${body.userId}"
- to:
uri: direct:handle-logout
- expression:
simple:
expression: "${body.type} == 'REGISTER'"
steps:
- log:
message: "New user registered: ${body.userId}"
- to:
uri: direct:handle-registration
otherwise:
steps:
- log:
message: "Other event: ${body.type}"
# Handle login events
- route:
id: handle-login
from:
uri: direct:handle-login
steps:
- log:
message: "Processing login event for user ${body.userId}"
- to:
uri: bean:analyticsService
parameters:
method: recordLogin
# Handle failed login
- route:
id: handle-failed-login
from:
uri: direct:handle-failed-login
steps:
- log:
message: "Processing failed login from ${body.ipAddress}"
- to:
uri: bean:securityService
parameters:
method: checkFailedAttempts Filtering Events
You can filter events using various options to narrow down the events you want to consume:
-
Java
-
YAML
// Filter admin events by operation type
from("keycloak:adminEvents"
+ "?realm=my-realm"
+ "&eventType=admin-events"
+ "&operationTypes=CREATE,UPDATE,DELETE"
+ "&maxResults=100")
.log("Filtered admin event: ${body}");
// Filter user events by type
from("keycloak:userEvents"
+ "?realm=my-realm"
+ "&eventType=events"
+ "&types=LOGIN,LOGOUT,REGISTER"
+ "&maxResults=100")
.log("Filtered user event: ${body}");
// Filter by date range
from("keycloak:adminEvents"
+ "?realm=my-realm"
+ "&eventType=admin-events"
+ "&dateFrom=1609459200000" // milliseconds since epoch
+ "&dateTo=1640995200000"
+ "&maxResults=100")
.log("Events in date range: ${body}");
// Filter by user and client
from("keycloak:userEvents"
+ "?realm=my-realm"
+ "&eventType=events"
+ "&user=user-id-123"
+ "&client=my-client-id"
+ "&ipAddress=192.168.1.100"
+ "&maxResults=50")
.log("Specific user events: ${body}"); # Filter admin events by operation type
- route:
from:
uri: >
keycloak:adminEvents?
realm=my-realm&
eventType=admin-events&
operationTypes=CREATE,UPDATE,DELETE&
maxResults=100
steps:
- log:
message: "Filtered admin event: ${body}"
# Filter user events by type
- route:
from:
uri: >
keycloak:userEvents?
realm=my-realm&
eventType=events&
types=LOGIN,LOGOUT,REGISTER&
maxResults=100
steps:
- log:
message: "Filtered user event: ${body}"
# Filter by date range
- route:
from:
uri: >
keycloak:adminEvents?
realm=my-realm&
eventType=admin-events&
dateFrom=1609459200000&
dateTo=1640995200000&
maxResults=100
steps:
- log:
message: "Events in date range: ${body}"
# Filter by user and client
- route:
from:
uri: >
keycloak:userEvents?
realm=my-realm&
eventType=events&
user=user-id-123&
client=my-client-id&
ipAddress=192.168.1.100&
maxResults=50
steps:
- log:
message: "Specific user events: ${body}" Event Processing Patterns
Audit Trail
-
Java
-
YAML
// Create comprehensive audit trail from admin events
from("keycloak:adminEvents"
+ "?realm=my-realm"
+ "&eventType=admin-events"
+ "&maxResults=100"
+ "&delay=10000")
.process(exchange -> {
AdminEventRepresentation event = exchange.getIn().getBody(AdminEventRepresentation.class);
// Build audit record
Map<String, Object> auditRecord = new HashMap<>();
auditRecord.put("timestamp", new Date(event.getTime()));
auditRecord.put("operation", event.getOperationType());
auditRecord.put("resourceType", event.getResourceType());
auditRecord.put("resourcePath", event.getResourcePath());
if (event.getAuthDetails() != null) {
auditRecord.put("userId", event.getAuthDetails().getUserId());
auditRecord.put("ipAddress", event.getAuthDetails().getIpAddress());
}
exchange.getIn().setBody(auditRecord);
})
.marshal().json()
.to("kafka:audit-trail?brokers=localhost:9092")
.to("jdbc:dataSource?useHeadersAsParameters=true"); # Audit trail pattern
- route:
from:
uri: >
keycloak:adminEvents?
realm=my-realm&
eventType=admin-events&
maxResults=100&
delay=10000
steps:
- setBody:
simple: >
{
"timestamp": ${body.time},
"operation": "${body.operationType}",
"resourceType": "${body.resourceType}",
"resourcePath": "${body.resourcePath}",
"userId": "${body.authDetails.userId}",
"ipAddress": "${body.authDetails.ipAddress}"
}
- marshal:
json: {}
- to:
uri: kafka:audit-trail
parameters:
brokers: "localhost:9092"
- to:
uri: jdbc:dataSource
parameters:
useHeadersAsParameters: true Security Monitoring
-
Java
-
YAML
// Monitor for security-relevant events
from("keycloak:userEvents"
+ "?realm=my-realm"
+ "&eventType=events"
+ "&types=LOGIN_ERROR,UPDATE_PASSWORD,UPDATE_EMAIL"
+ "&maxResults=50"
+ "&delay=5000")
.filter(simple("${body.type} == 'LOGIN_ERROR'"))
.aggregate(simple("${body.ipAddress}"), new ArrayListAggregationStrategy())
.completionSize(5) // 5 failed attempts
.completionTimeout(300000) // within 5 minutes
.process(exchange -> {
List<EventRepresentation> failedAttempts = exchange.getIn().getBody(List.class);
String ipAddress = failedAttempts.get(0).getIpAddress();
log.warn("SECURITY ALERT: {} failed login attempts from IP: {}",
failedAttempts.size(), ipAddress);
})
.to("direct:block-ip")
.to("direct:send-security-alert"); # Security monitoring pattern
- route:
from:
uri: >
keycloak:userEvents?
realm=my-realm&
eventType=events&
types=LOGIN_ERROR,UPDATE_PASSWORD,UPDATE_EMAIL&
maxResults=50&
delay=5000
steps:
- filter:
expression:
simple:
expression: "${body.type} == 'LOGIN_ERROR'"
- aggregate:
correlationExpression:
expression:
simple:
expression: "${body.ipAddress}"
aggregationStrategy: "#arrayListAggregation"
completionSize: 5
completionTimeout: 300000
steps:
- log:
message: "SECURITY ALERT: Multiple failed login attempts from ${body[0].ipAddress}"
- to:
uri: direct:block-ip
- to:
uri: direct:send-security-alert User Activity Analytics
-
Java
-
YAML
// Track user activity for analytics
from("keycloak:userEvents"
+ "?realm=my-realm"
+ "&eventType=events"
+ "&types=LOGIN,LOGOUT"
+ "&maxResults=100"
+ "&delay=60000")
.process(exchange -> {
EventRepresentation event = exchange.getIn().getBody(EventRepresentation.class);
// Extract analytics data
Map<String, Object> analytics = new HashMap<>();
analytics.put("userId", event.getUserId());
analytics.put("eventType", event.getType());
analytics.put("timestamp", new Date(event.getTime()));
analytics.put("ipAddress", event.getIpAddress());
analytics.put("sessionId", event.getSessionId());
if (event.getDetails() != null) {
analytics.put("username", event.getDetails().get("username"));
analytics.put("clientId", event.getDetails().get("client_id"));
}
exchange.getIn().setBody(analytics);
})
.to("bean:analyticsService?method=recordActivity")
.to("elasticsearch://keycloak-events?operation=Index&indexName=user-activity"); # User activity analytics
- route:
from:
uri: >
keycloak:userEvents?
realm=my-realm&
eventType=events&
types=LOGIN,LOGOUT&
maxResults=100&
delay=60000
steps:
- setBody:
simple: >
{
"userId": "${body.userId}",
"eventType": "${body.type}",
"timestamp": ${body.time},
"ipAddress": "${body.ipAddress}",
"sessionId": "${body.sessionId}"
}
- to:
uri: bean:analyticsService
parameters:
method: recordActivity
- to:
uri: elasticsearch://keycloak-events
parameters:
operation: Index
indexName: user-activity Consumer Options
The consumer supports the following configuration options:
| Option | Default | Description |
|---|---|---|
| The Keycloak realm to consume events from (required) | |
| events | Type of events to consume: |
| 100 | Maximum number of events to retrieve per poll |
| 0 | Offset for pagination (first result index) |
| 1000 | Delay before first poll (milliseconds) |
| 500 | Delay between polls (milliseconds) |
Common Filter Options | ||
| Filter events by client ID | |
| Filter events by user ID | |
| Filter events from this timestamp (milliseconds since epoch) | |
| Filter events until this timestamp (milliseconds since epoch) | |
| Filter events by IP address | |
User Event Filters | ||
| Filter by event types (comma-separated, e.g., | |
Admin Event Filters | ||
| Filter by operation types (comma-separated, e.g., | |
| Filter by authentication realm | |
| Filter by authentication client ID | |
| Filter by authentication user ID | |
| Filter by authentication IP address | |
| Filter by resource path | |
Exchange Headers
The consumer sets the following headers on the exchange:
| Header | Description |
|---|---|
| Type of event: |
| Event timestamp (milliseconds since epoch) |
| Realm name where the event occurred |
Message Body
The message body contains:
-
For user events:
org.keycloak.representations.idm.EventRepresentation -
For admin events:
org.keycloak.representations.idm.AdminEventRepresentation
Complete Consumer Example
-
Java
-
YAML
// Configure Keycloak component
KeycloakComponent keycloak = getContext().getComponent("keycloak", KeycloakComponent.class);
KeycloakConfiguration config = new KeycloakConfiguration();
config.setServerUrl("http://localhost:8080");
config.setRealm("master");
config.setUsername("admin");
config.setPassword("admin");
keycloak.setConfiguration(config);
// Consume admin events and send to audit system
from("keycloak:adminEvents"
+ "?realm=production-realm"
+ "&eventType=admin-events"
+ "&operationTypes=CREATE,UPDATE,DELETE"
+ "&maxResults=100"
+ "&delay=10000")
.routeId("admin-events-audit")
.log("Admin event: ${body.operationType} on ${body.resourceType}")
.to("direct:audit-trail");
// Consume user login events for analytics
from("keycloak:userEvents"
+ "?realm=production-realm"
+ "&eventType=events"
+ "&types=LOGIN,LOGOUT"
+ "&maxResults=50"
+ "&delay=30000")
.routeId("user-activity-tracking")
.log("User activity: ${body.type} for user ${body.userId}")
.to("direct:analytics");
// Monitor failed logins for security
from("keycloak:userEvents"
+ "?realm=production-realm"
+ "&eventType=events"
+ "&types=LOGIN_ERROR"
+ "&maxResults=100"
+ "&delay=5000")
.routeId("security-monitoring")
.log("Failed login from IP: ${body.ipAddress}")
.to("direct:security-check");
// Process audit trail
from("direct:audit-trail")
.marshal().json()
.to("kafka:admin-audit?brokers=localhost:9092")
.to("log:audit");
// Process analytics
from("direct:analytics")
.to("bean:analyticsService?method=processUserActivity")
.to("log:analytics");
// Process security alerts
from("direct:security-check")
.to("bean:securityService?method=checkFailedLogin")
.to("log:security"); # Component configuration
camel:
component:
keycloak:
server-url: "http://localhost:8080"
realm: "master"
username: "admin"
password: "admin"
# Routes
- route:
id: admin-events-audit
from:
uri: >
keycloak:adminEvents?
realm=production-realm&
eventType=admin-events&
operationTypes=CREATE,UPDATE,DELETE&
maxResults=100&
delay=10000
steps:
- log:
message: "Admin event: ${body.operationType} on ${body.resourceType}"
- to:
uri: direct:audit-trail
- route:
id: user-activity-tracking
from:
uri: >
keycloak:userEvents?
realm=production-realm&
eventType=events&
types=LOGIN,LOGOUT&
maxResults=50&
delay=30000
steps:
- log:
message: "User activity: ${body.type} for user ${body.userId}"
- to:
uri: direct:analytics
- route:
id: security-monitoring
from:
uri: >
keycloak:userEvents?
realm=production-realm&
eventType=events&
types=LOGIN_ERROR&
maxResults=100&
delay=5000
steps:
- log:
message: "Failed login from IP: ${body.ipAddress}"
- to:
uri: direct:security-check
# Processing routes
- route:
id: process-audit-trail
from:
uri: direct:audit-trail
steps:
- marshal:
json: {}
- to:
uri: kafka:admin-audit
parameters:
brokers: "localhost:9092"
- to:
uri: log:audit
- route:
id: process-analytics
from:
uri: direct:analytics
steps:
- to:
uri: bean:analyticsService
parameters:
method: processUserActivity
- to:
uri: log:analytics
- route:
id: process-security-check
from:
uri: direct:security-check
steps:
- to:
uri: bean:securityService
parameters:
method: checkFailedLogin
- to:
uri: log:security Enabling Events in Keycloak
Event Deduplication
The consumer uses fingerprint-based deduplication to prevent processing the same event multiple times:
-
Events are uniquely identified by combining timestamp and event-specific properties
-
Fingerprints are cached per timestamp and cleared when moving to newer timestamps
-
Maximum cache size is 1000 fingerprints to prevent memory issues
-
This ensures reliable event processing even with high event volumes