Skip to main content
Version: 1.5.1

Amazon MSK

Amazon AWS Message Queue

Synopsis

Creates a target that writes log messages to Amazon Managed Streaming for Apache Kafka (MSK) topics with support for batching, compression, and AWS authentication methods. The target handles message delivery efficiently with configurable batch limits based on size or event count. Amazon MSK is a fully managed Apache Kafka service on AWS.

Schema

- name: <string>
description: <string>
type: amazonmsk
pipelines: <pipeline[]>
status: <boolean>
properties:
address: <string>
port: <numeric>
client_id: <string>
topic: <string>
algorithm: <string>
username: <string>
password: <string>
compression: <string>
compression_level: <string>
acknowledgments: <string>
allow_auto_topic_creation: <boolean>
disable_idempotent_write: <boolean>
max_bytes: <numeric>
max_events: <numeric>
field_format: <string>
tls:
status: <boolean>
insecure_skip_verify: <boolean>
min_tls_version: <string>
max_tls_version: <string>
cert_name: <string>
key_name: <string>
passphrase: <string>
interval: <string|numeric>
cron: <string>

Configuration

The following fields are used to define the target:

FieldRequiredDefaultDescription
nameYTarget name
descriptionN-Optional description
typeYMust be amazonmsk
pipelinesN-Optional post-processor pipelines
statusNtrueEnable/disable the target

Amazon MSK Connection

FieldRequiredDefaultDescription
addressY-MSK broker bootstrap server address (e.g., b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com)
portN9092MSK broker port (9092 for plaintext, 9094 for SASL/SCRAM, 9098 for IAM)
client_idN-Client identifier for connection tracking
topicY-Kafka topic name for message delivery

Authentication

FieldRequiredDefaultDescription
algorithmN"none"Authentication mechanism: none, plain, scram-sha-256, scram-sha-512
usernameN*-Username for SASL authentication
passwordN*-Password for SASL authentication

* = Conditionally required when using SASL authentication.

note

For IAM authentication, use AWS IAM roles and policies. IAM authentication is configured at the AWS MSK cluster level and does not require username/password in the target configuration.

Producer Settings

FieldRequiredDefaultDescription
compressionN"none"Message compression: none, gzip, snappy, lz4, zstd
compression_levelN-Compression level (algorithm-specific)
acknowledgmentsN"leader"Acknowledgment level: none, leader, all
allow_auto_topic_creationNfalseAllow automatic topic creation if topic doesn't exist
disable_idempotent_writeNfalseDisable idempotent producer (not recommended)

Batch Configuration

FieldRequiredDefaultDescription
max_bytesN0Maximum batch size in bytes (0 = unlimited)
max_eventsN1000Maximum number of events per batch
field_formatN-Data normalization format. See applicable Normalization section
note

Batches are sent when either max_bytes or max_events limit is reached, whichever comes first.

TLS Configuration

FieldRequiredDefaultDescription
tls.statusNfalseEnable TLS/SSL encryption
tls.insecure_skip_verifyNfalseSkip certificate verification (not recommended)
tls.min_tls_versionN"tls1.2"Minimum TLS version: tls1.2, tls1.3
tls.max_tls_versionN"tls1.3"Maximum TLS version: tls1.2, tls1.3
tls.cert_nameN"cert.pem"Client certificate file name for mTLS
tls.key_nameN"key.pem"Private key file name for mTLS
tls.passphraseN-Passphrase for encrypted private key

Scheduler

FieldRequiredDefaultDescription
intervalNrealtimeExecution frequency. See Interval for details
cronN-Cron expression for scheduled execution. See Cron for details

Details

Amazon MSK is a fully managed Apache Kafka service on AWS. This target type allows you to connect to MSK clusters using the standard Kafka protocol.

Authentication Methods

Amazon MSK supports multiple authentication methods:

Unauthenticated Access (plaintext)

  • No authentication required
  • Port: 9092
  • Set algorithm: "none"
  • Not recommended for production

SASL/SCRAM Authentication

  • Username/password authentication stored in AWS Secrets Manager
  • Port: 9094
  • Supported algorithms: scram-sha-256, scram-sha-512
  • TLS encryption recommended

IAM Authentication

  • Uses AWS IAM roles and policies for authentication
  • Port: 9098
  • Configured at the AWS level, not in target configuration
  • Requires AWS credentials configured on the DataStream host
  • TLS encryption required

Connection Requirements

MSK cluster bootstrap servers are available in the AWS MSK Console. The address format is typically:

b-1.clustername.xxxxxx.kafka.region.amazonaws.com

For multi-broker clusters, you can use any broker from the bootstrap server list.

TLS Configuration

MSK supports both plaintext and TLS-encrypted connections:

  • Plaintext: Port 9092 (unauthenticated) or 9094 (SASL/SCRAM)
  • TLS: Port 9094 (SASL/SCRAM) or 9098 (IAM)

Enable TLS with tls.status: true for encrypted connections.

Message Delivery Guarantees

The acknowledgments setting controls delivery guarantees:

LevelBehaviorUse Case
noneNo acknowledgment from brokerMaximum throughput, lowest durability
leaderAcknowledgment from partition leader onlyBalanced throughput and durability
allAcknowledgment from all in-sync replicasMaximum durability

Compression

Message compression reduces network bandwidth and storage requirements:

AlgorithmCompression RatioCPU UsageSpeed
noneNoneMinimalFastest
gzipHighHighSlow
snappyMediumLowFast
lz4MediumLowFast
zstdHighMediumMedium

Examples

Basic Configuration (Unauthenticated)

The minimum configuration for an MSK target without authentication:

targets:
- name: basic_msk
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9092
topic: "application-logs"
algorithm: "none"

With SASL/SCRAM-SHA-512

Configuration using SASL/SCRAM-SHA-512 authentication:

targets:
- name: msk_sasl
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "authenticated-logs"
algorithm: "scram-sha-512"
username: "kafka-producer"
password: "stored-in-secrets-manager"
tls:
status: true

With SASL/SCRAM-SHA-256

Configuration using SASL/SCRAM-SHA-256 authentication:

targets:
- name: msk_scram256
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "secure-logs"
algorithm: "scram-sha-256"
username: "kafka-user"
password: "secure-password"
compression: "snappy"
tls:
status: true

IAM Authentication

Configuration for MSK with IAM authentication:

targets:
- name: msk_iam
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9098
topic: "iam-logs"
algorithm: "none"
tls:
status: true
note

For IAM authentication, configure AWS credentials on the DataStream host using environment variables, IAM role, or AWS credentials file. The IAM policy must grant appropriate Kafka permissions.

With Compression

Configuration with zstd compression:

targets:
- name: compressed_msk
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "compressed-logs"
algorithm: "scram-sha-256"
username: "kafka-producer"
password: "password-here"
compression: "zstd"
tls:
status: true

High Throughput

Configuration optimized for maximum throughput:

targets:
- name: high_throughput_msk
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "high-volume-logs"
algorithm: "scram-sha-512"
username: "kafka-producer"
password: "password-here"
compression: "lz4"
acknowledgments: "leader"
max_bytes: 1048576
max_events: 10000
tls:
status: true

High Reliability

Configuration optimized for maximum durability:

targets:
- name: reliable_msk
type: amazonmsk
pipelines:
- checkpoint
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "critical-logs"
algorithm: "scram-sha-512"
username: "kafka-producer"
password: "password-here"
compression: "zstd"
acknowledgments: "all"
max_events: 100
disable_idempotent_write: false
tls:
status: true
min_tls_version: "tls1.3"

With Field Normalization

Using field normalization for standard format:

targets:
- name: normalized_msk
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "normalized-logs"
algorithm: "scram-sha-256"
username: "kafka-producer"
password: "password-here"
field_format: "cim"
compression: "snappy"
tls:
status: true

MSK Serverless

Configuration for MSK Serverless clusters:

targets:
- name: msk_serverless
type: amazonmsk
properties:
address: "boot-abc123.c1.kafka-serverless.us-east-1.amazonaws.com"
port: 9098
topic: "serverless-logs"
algorithm: "none"
compression: "zstd"
tls:
status: true

With Client ID

Configuration with client ID for tracking:

targets:
- name: msk_tracked
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "tracked-logs"
client_id: "datastream-producer-01"
algorithm: "scram-sha-256"
username: "kafka-producer"
password: "password-here"
compression: "lz4"
tls:
status: true

Scheduled Batching

Configuration with scheduled batch delivery:

targets:
- name: scheduled_msk
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "scheduled-logs"
algorithm: "scram-sha-512"
username: "kafka-producer"
password: "password-here"
max_events: 5000
interval: "5m"
compression: "gzip"
tls:
status: true

Auto Topic Creation

Configuration with automatic topic creation enabled:

targets:
- name: auto_topic_msk
type: amazonmsk
properties:
address: "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com"
port: 9094
topic: "dynamic-logs"
algorithm: "scram-sha-256"
username: "kafka-producer"
password: "password-here"
allow_auto_topic_creation: true
compression: "snappy"
max_events: 500
tls:
status: true