Ship audit to Kafka

Problem: you want audit lines copied to a Kafka topic for SIEM, archival, or stream processing.

Solution: add a kafka audit sink. mcpgw keeps writing the local JSONL file first, then asynchronously publishes one Kafka record per audit line.

Confluent Cloud

Use SASL/PLAIN over TLS:

audit:
  path: /var/log/mcpgw/audit.jsonl
  max_size_mb: 100
  compress_rotated: true
  sinks:
    - type: kafka
      brokers: ["pkc-xxxxx.us-central1.gcp.confluent.cloud:9092"]
      topic: mcpgw-audit
      partition_strategy: hash
      partition_key: session_id
      acks: all
      compression: zstd
      flush_interval: 5s
      flush_batch_lines: 5000
      flush_batch_bytes: 1000000
      tls:
        enabled: true
      sasl:
        mechanism: plain
        username_env: KAFKA_USERNAME
        password_env: KAFKA_PASSWORD

Set the referenced environment variables on the mcpgw process:

export KAFKA_USERNAME='<api-key>'
export KAFKA_PASSWORD='<api-secret>'

AWS MSK

Use SASL/SCRAM over TLS:

audit:
  path: /var/log/mcpgw/audit.jsonl
  max_size_mb: 100
  sinks:
    - type: kafka
      brokers:
        - b-1.audit.example.kafka.us-east-1.amazonaws.com:9096
        - b-2.audit.example.kafka.us-east-1.amazonaws.com:9096
      topic: mcpgw-audit
      tls:
        enabled: true
      sasl:
        mechanism: scram-sha-512
        username_env: KAFKA_USERNAME
        password_env: KAFKA_PASSWORD

Self-hosted development

For an unauthenticated local broker:

audit:
  path: /var/log/mcpgw/audit.jsonl
  max_size_mb: 100
  sinks:
    - type: kafka
      brokers: ["localhost:9092"]
      topic: mcpgw-audit
      compression: gzip

Verify

Reload with SIGHUP:

docker kill --signal=HUP mcpgw

Consume the topic:

kafka-console-consumer.sh \
  --bootstrap-server kafka-1:9092 \
  --topic mcpgw-audit \
  --from-beginning

Each Kafka record value is one JSONL audit line. The record key defaults to session_id; empty keys fall back to Kafka’s balancer behavior.

Notes

  • The local audit file remains canonical.
  • Sink failures are logged and do not block MCP requests.
  • partition_key can be session_id, auth_key_id, tool_name, method, or request_id.
  • partition_strategy can be hash, round_robin, or sticky.
  • TLS client certificates require tls.cert_file and tls.key_file together.