Amazon MSK
Synopsis
Creates a target that writes log messages to Amazon Managed Streaming for Apache Kafka (MSK) topics with support for batching, compression, and AWS authentication methods. The target handles message delivery efficiently with configurable batch limits based on size or event count. Amazon MSK is a fully managed Apache Kafka service on AWS.
Schema
- name: <string>
description: <string>
type: amazonmsk
pipelines: <pipeline[]>
status: <boolean>
properties:
address: <string>
port: <numeric>
client_id: <string>
topic: <string>
algorithm: <string>
username: <string>
password: <string>
compression: <string>
compression_level: <string>
acknowledgments: <string>
allow_auto_topic_creation: <boolean>
disable_idempotent_write: <boolean>
max_bytes: <numeric>
max_events: <numeric>
field_format: <string>
tls:
status: <boolean>
insecure_skip_verify: <boolean>
min_tls_version: <string>
max_tls_version: <string>
cert_name: <string>
key_name: <string>
passphrase: <string>
interval: <string|numeric>
cron: <string>
Configuration
The following fields are used to define the target:
| Field | Required | Default | Description |
|---|---|---|---|
name | Y | Target name | |
description | N | - | Optional description |
type | Y | Must be amazonmsk | |
pipelines | N | - | Optional post-processor pipelines |
status | N | true | Enable/disable the target |
Amazon MSK Connection
| Field | Required | Default | Description |
|---|---|---|---|
address | Y | - | MSK broker bootstrap server address (e.g., b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com) |
port | N | 9092 | MSK broker port (9092 for plaintext, 9094 for SASL/SCRAM, 9098 for IAM) |
client_id | N | - | Client identifier for connection tracking |
topic | Y | - | Kafka topic name for message delivery |
Authentication
| Field | Required | Default | Description |
|---|---|---|---|
algorithm | N | "none" | Authentication mechanism: none, plain, scram-sha-256, scram-sha-512 |
username | N* | - | Username for SASL authentication |
password | N* | - | Password for SASL authentication |
* = Conditionally required when using SASL authentication.
For IAM authentication, use AWS IAM roles and policies. IAM authentication is configured at the AWS MSK cluster level and does not require username/password in the target configuration.
Producer Settings
| Field | Required | Default | Description |
|---|---|---|---|
compression | N | "none" | Message compression: none, gzip, snappy, lz4, zstd |
compression_level | N | - | Compression level (algorithm-specific) |
acknowledgments | N | "leader" | Acknowledgment level: none, leader, all |
allow_auto_topic_creation | N | false | Allow automatic topic creation if topic doesn't exist |
disable_idempotent_write | N | false | Disable idempotent producer (not recommended) |
Batch Configuration
| Field | Required | Default | Description |
|---|---|---|---|
max_bytes | N | 0 | Maximum batch size in bytes (0 = unlimited) |
max_events | N | 1000 | Maximum number of events per batch |
field_format | N | - | Data normalization format. See applicable Normalization section |
Batches are sent when either max_bytes or max_events limit is reached, whichever comes first.
TLS Configuration
| Field | Required | Default | Description |
|---|---|---|---|
tls.status | N | false | Enable TLS/SSL encryption |
tls.insecure_skip_verify | N | false | Skip certificate verification (not recommended) |
tls.min_tls_version | N | "tls1.2" | Minimum TLS version: tls1.2, tls1.3 |
tls.max_tls_version | N | "tls1.3" | Maximum TLS version: tls1.2, tls1.3 |
tls.cert_name | N | "cert.pem" | Client certificate file name for mTLS |
tls.key_name | N | "key.pem" | Private key file name for mTLS |
tls.passphrase | N | - | Passphrase for encrypted private key |
Scheduler
| Field | Required | Default | Description |
|---|---|---|---|
interval | N | realtime | Execution frequency. See Interval for details |
cron | N | - | Cron expression for scheduled execution. See Cron for details |
Details
Amazon MSK is a fully managed Apache Kafka service on AWS. This target type allows you to connect to MSK clusters using the standard Kafka protocol.
Authentication Methods
Amazon MSK supports multiple authentication methods:
Unauthenticated Access (plaintext)
- No authentication required
- Port: 9092
- Set
algorithm: "none" - Not recommended for production
SASL/SCRAM Authentication
- Username/password authentication stored in AWS Secrets Manager
- Port: 9094
- Supported algorithms:
scram-sha-256,scram-sha-512 - TLS encryption recommended
IAM Authentication
- Uses AWS IAM roles and policies for authentication
- Port: 9098
- Configured at the AWS level, not in target configuration
- Requires AWS credentials configured on the DataStream host
- TLS encryption required
Connection Requirements
MSK cluster bootstrap servers are available in the AWS MSK Console. The address format is typically:
b-1.clustername.xxxxxx.kafka.region.amazonaws.com
For multi-broker clusters, you can use any broker from the bootstrap server list.
TLS Configuration
MSK supports both plaintext and TLS-encrypted connections:
- Plaintext: Port 9092 (unauthenticated) or 9094 (SASL/SCRAM)
- TLS: Port 9094 (SASL/SCRAM) or 9098 (IAM)
Enable TLS with tls.status: true for encrypted connections.
Message Delivery Guarantees
The acknowledgments setting controls delivery guarantees:
| Level | Behavior | Use Case |
|---|---|---|
none | No acknowledgment from broker | Maximum throughput, lowest durability |
leader | Acknowledgment from partition leader only | Balanced throughput and durability |
all | Acknowledgment from all in-sync replicas | Maximum durability |
Compression
Message compression reduces network bandwidth and storage requirements:
| Algorithm | Compression Ratio | CPU Usage | Speed |
|---|---|---|---|
none | None | Minimal | Fastest |
gzip | High | High | Slow |
snappy | Medium | Low | Fast |
lz4 | Medium | Low | Fast |
zstd | High | Medium | Medium |
Examples
Basic Configuration (Unauthenticated)
The minimum configuration for an MSK target without authentication:
targets:
- name: basic_msk
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9092
topic: "application-logs"
algorithm: "none"
With SASL/SCRAM-SHA-512
Configuration using SASL/SCRAM-SHA-512 authentication:
targets:
- name: msk_sasl
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "authenticated-logs"
algorithm: "scram-sha-512"
username: "kafka-producer"
password: "stored-in-secrets-manager"
tls:
status: true
With SASL/SCRAM-SHA-256
Configuration using SASL/SCRAM-SHA-256 authentication:
targets:
- name: msk_scram256
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "secure-logs"
algorithm: "scram-sha-256"
username: "kafka-user"
password: "secure-password"
compression: "snappy"
tls:
status: true
IAM Authentication
Configuration for MSK with IAM authentication:
targets:
- name: msk_iam
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9098
topic: "iam-logs"
algorithm: "none"
tls:
status: true
For IAM authentication, configure AWS credentials on the DataStream host using environment variables, IAM role, or AWS credentials file. The IAM policy must grant appropriate Kafka permissions.
With Compression
Configuration with zstd compression:
targets:
- name: compressed_msk
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "compressed-logs"
algorithm: "scram-sha-256"
username: "kafka-producer"
password: "password-here"
compression: "zstd"
tls:
status: true
High Throughput
Configuration optimized for maximum throughput:
targets:
- name: high_throughput_msk
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "high-volume-logs"
algorithm: "scram-sha-512"
username: "kafka-producer"
password: "password-here"
compression: "lz4"
acknowledgments: "leader"
max_bytes: 1048576
max_events: 10000
tls:
status: true
High Reliability
Configuration optimized for maximum durability:
targets:
- name: reliable_msk
type: amazonmsk
pipelines:
- checkpoint
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "critical-logs"
algorithm: "scram-sha-512"
username: "kafka-producer"
password: "password-here"
compression: "zstd"
acknowledgments: "all"
max_events: 100
disable_idempotent_write: false
tls:
status: true
min_tls_version: "tls1.3"
With Field Normalization
Using field normalization for standard format:
targets:
- name: normalized_msk
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "normalized-logs"
algorithm: "scram-sha-256"
username: "kafka-producer"
password: "password-here"
field_format: "cim"
compression: "snappy"
tls:
status: true
MSK Serverless
Configuration for MSK Serverless clusters:
targets:
- name: msk_serverless
type: amazonmsk
properties:
address: "boot-abc123.c1.kafka-serverless.us-east-1.amazonaws.com"
port: 9098
topic: "serverless-logs"
algorithm: "none"
compression: "zstd"
tls:
status: true
With Client ID
Configuration with client ID for tracking:
targets:
- name: msk_tracked
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "tracked-logs"
client_id: "datastream-producer-01"
algorithm: "scram-sha-256"
username: "kafka-producer"
password: "password-here"
compression: "lz4"
tls:
status: true
Scheduled Batching
Configuration with scheduled batch delivery:
targets:
- name: scheduled_msk
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "scheduled-logs"
algorithm: "scram-sha-512"
username: "kafka-producer"
password: "password-here"
max_events: 5000
interval: "5m"
compression: "gzip"
tls:
status: true
Auto Topic Creation
Configuration with automatic topic creation enabled:
targets:
- name: auto_topic_msk
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "dynamic-logs"
algorithm: "scram-sha-256"
username: "kafka-producer"
password: "password-here"
allow_auto_topic_creation: true
compression: "snappy"
max_events: 500
tls:
status: true