Kafka+Kerberos

Summary

Here we document how to secure Kafka cluster with Kerberos. Kerberos is one of the most widely used security protocol in corporate networks, thanks largely to widespread adoption of Microsoft Active Directory in corporations for directory-based identity-related services. A fair number of Kafka installations live alongside Hadoop in a Big data ecosystem. Kerberos is the only way to provide authentication in Hadoop, so it seems natural to use same stack to secure Kafka installations too. This blog is the result of an implementation that was done recently at a customer site. Customer was already running Kerberized Hadoop cluster that was used as one of the sinks for streaming data. Please note that we used Confluent licensed components (Schema Registry, REST Proxy, Group Authorization, Control Center) for this setup. At the time of writing this, it’s the only way to achieve end to end security with Kerberos.

Architecture

 

Detailed Kafka Security Architecture

Details

 

Zookeeper

We will be running Zookeeper service with a dedicated SPN for each instance of service. Zookeeper will be started with a JAAS file that looks like below.

Server {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/kafka/keytabs/zookeeper.keytab"
  storeKey=true
  principal="zookeeper/<HOST>@<DOMAIN>";
};

Following snippet shows security related configurations for Zookeeper. Zookeeper instance is configured to require valid authentication provided to interact with it. This is important since Zookeeper is used to store Kafka Access Control List (ACL) that are used for enforcing Authorization of resources.

# Zookeeper SASL Configuration
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
kerberos.removeHostFromPrincipal=true
kerberos.removeRealmFromPrincipal=true

Brokers

Since we use the PLAINTEXT port for inter-broker communication, the ANONYMOUS user will be used between brokers (Replication, for example). To help with that, add following ACL.

KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:ANONYMOUS --allow-host <BROKER_HOST1> --allow-host <BROKER_HOST2> --allow-host <BROKER_HOSTn> --operation All --cluster

Add/edit security related parameters for broker. Note that we set zookeeper.set.acl parameter to TRUE. This makes sure only privileged users can manipulate Zookeeper internal structures, enhancing security.

#############################################
#               KERBEROS                    #
#############################################
sasl.enabled.mechanisms=GSSAPI,PLAIN
# Inter broker communication is done using PLAINTEXT protocol
security.inter.broker.protocol=PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
# PLAINTEXT and SASL_PLAINTEXT
listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
advertised.listeners=PLAINTEXT://<BROKER_HOST>:9092,SASL_PLAINTEXT://<BROKER_HOST>:9093
sasl.kerberos.service.name=kafka
# Enable ACLs on ZooKeeper znodes
zookeeper.set.acl=true
# https://community.hortonworks.com/articles/14463/auth-to-local-rules-syntax.html
# We just want UPN
sasl.kerberos.principal.to.local.rules=RULE:[1:$1],RULE:[2:$1],DEFAULT

Create SPN for each broker. Start broker instance with proper JAAS file.

// Broker
KafkaServer {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/kafka.keytab"
  storeKey=true
  principal="kafka/<HOST>@<DOMAIN>"
};

// Broker is Zookeeper Client
Client {
  com.sun.security.auth.module.Krb5LoginModule required 
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/kafka.keytab"
  storeKey=true
  principal="kafka/<HOST>@<DOMAIN>";
};

Run Zookeeper migration tool

If you’re already running a non secure cluster and migrating to security, run following tool. This will set the ACLs on Zookeeper znodes recursively.

# cat /etc/kafka/zookeeper_client_jaas.properties 
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/kafka.keytab"
  storeKey=true
  principal="kafka/<ZK_HOST>@<DOMAIN>";
};

KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties /bin/zookeeper-security-migration --zookeeper.acl secure --zookeeper.connect <ZK_HOST>:2181

Schema Registry

Schema registry is very important component to secure, from Data governance point of view. By default, client applications automatically register new schemas. If they produce new messages to a new topic, then they will automatically try to register new schemas. With security enabled, we are disabling this behavior, so only cluster admins can do it. In order for the applications that interact with Schema Registry to work, they need to disable automatic schema registration by setting the configuration parameter auto.register.schemas=false. Eg: In Java Application, this is how one would do it.

producerProperties.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, "false");

Run each instance of Schema Registry service using a dedicated SPN.

Following listing shows relevant parameters for Schema Registry. Note that registry is talking to secure Kafka end points. In addition, it has configuration for end clients to securely talk to it over SSL (Authentication).

kafkastore.bootstrap.servers=SASL_PLAINTEXT://<BROKER_HOST1>:9093,SASL_PLAINTEXT://<BROKER_HOST2>:9093,SASL_PLAINTEXT://<BROKER_HOST3>:9093
kafkastore.security.protocol=SASL_PLAINTEXT
# Configure a service name that matches the primary name of the Kafka server configured in the broker JAAS file.
kafkastore.sasl.kerberos.service.name=kafka
kafkastore.sasl.mechanism=GSSAPI

# Schema Registry Security Plugin
schema.registry.resource.extension.class=io.confluent.kafka.schemaregistry.security.SchemaRegistrySecurityResourceExtension
confluent.schema.registry.auth.mechanism=SSL
ssl.client.auth=true
schema.registry.inter.instance.protocol=HTTPS
confluent.license=<LICENSE>
confluent.schema.registry.authorizer.class=io.confluent.kafka.schemaregistry.security.authorizer.schemaregistryacl.SchemaRegistryAclAuthorizer

# Additional configurations for HTTPS
ssl.truststore.location=<FILE>
ssl.truststore.password=<PASSWORD>
ssl.keystore.location=<FILE>
ssl.keystore.password=<PASSWORD>
ssl.key.password=<PASSWORD>

Next, we need to add Schema Registry ACLs so only a designated admin account (let’s call it “sradmin” can register/delete/update schemas. In addition, since we use C3 in the setup, it needs to an admin account too (Let’s call this account “c3admin”).

# cat /etc/schema-registry/schemaregistry_client_jaas.properties
// Kafka Client
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/schemaregistry.keytab"
  storeKey=true
  principal="schemaregistry/<SR_HOST>@<DOMAIN>";
};


# Provide admin privileges to "sradmin" and "c3admin" users
SECURITY_PLUGINS_OPTS=-Djava.security.auth.login.config=/etc/schema-registry/schemaregistry_client_jaas.properties /bin/sr-acl-cli --add --config /etc/schema-registry/schema-registry.properties -o \* -s \* -p "CN=sradmin.mycompany.com,O=My Company,L=My City,ST=California,C=US" -p "CN=c3admin.mycompany.com,O=My Company,L=My City,ST=California,C=US"

# Now, provide read privileges to everyone
SECURITY_PLUGINS_OPTS=-Djava.security.auth.login.config=/etc/schema-registry/schemaregistry_client_jaas.properties /bin/sr-acl-cli --add --config /etc/schema-registry/schema-registry.properties -o GLOBAL_COMPATIBILITY_READ -o GLOBAL_SUBJECTS_READ -p \*

SECURITY_PLUGINS_OPTS=-Djava.security.auth.login.config=/etc/schema-registry/schemaregistry_client_jaas.properties /bin/sr-acl-cli --add --config /etc/schema-registry/schema-registry.properties -o SUBJECT_COMPATIBILITY_READ -o SUBJECT_READ -s \* -p \*

We also need to provide access to “_schemas” topic to “schemaregistry” principal, so it can create/destroy schemas as needed. In addition, we need to provide same access to “c3admin” user as well, as it’s the account used from Control Center.

#Provide "schemaregistry" user following ACLs on "_schemas" topic. Create the topic (with 1 partition, compact cleanup policy), if it doesn't exist

cat /etc/kafka/zookeeper_client_jaas.properties
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/kafka.keytab"
  storeKey=true
  debug=true
  principal="kafka/<ZK_HOST>@<DOMAIN>";
};

KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:schemaregistry --operation DescribeConfigs --topic _schemas --cluster
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:schemaregistry --producer --topic _schemas

KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:schemaregistry --consumer --topic _schemas --group schema-registry

# For C3, add all permissions from C3 host
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --allow-host '<C3_HOST>' --topic _schemas

Start the service with appropriate JAAS file.

// Kafka Client
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/schemaregistry.keytab"
  storeKey=true
  principal="schemaregistry/<SR_HOST>@<DOMAIN>";
};

// Zookeeper Client
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/kafka.keytab"
  storeKey=true
  principal="kafka/<SR_HOST>@<DOMAIN>";
};

REST Proxy

REST proxy security uses confluent licensed plugins. Following listing shows relevant parameters that need to be setup.

bootstrap.servers=SASL_PLAINTEXT://<BROKER_HOST1>:9093,SASL_PLAINTEXT://<BROKER_HOST2>:9093,SASL_PLAINTEXT://<BROKER_HOST3>:9093
schema.registry.url=https://<SR_HOST>:<SR_PORT>
producer.auto.register.schemas=false
# License for security plugins
confluent.license=<LICENSE>

#############################################
#                SSL/Kerberos               #
#############################################
kafka.rest.resource.extension.class=io.confluent.kafkarest.security.KafkaRestSecurityResourceExtension
# The authentication mechanism for the incoming requests
confluent.rest.auth.propagate.method=SSL
client.security.protocol=SASL_PLAINTEXT
client.sasl.mechanism=GSSAPI
client.sasl.kerberos.service.name=kafka
# A list of rules to map from the distinguished name(DN)in the client certificate to a 
# short name principal for authentication with the Kafka broker. Rules are tested 
# from left to right. The first rule that matches will be applied.
confluent.rest.auth.ssl.principal.mapping.rules=RULE:^CN=(.*?).mycompany.com,.*$/$1/,RULE:^CN=(.*?),.*$/$1/
# Require REST clients to authenticate using SSL
ssl.client.auth=true

# SSL Stores used for REST clients for Authentication
ssl.keystore.location=<FILE>
ssl.keystore.password=<PASSWORD>
ssl.key.password=<PASSWORD>
ssl.truststore.location=<FILE>
ssl.truststore.password=<PASSWORD>

REST clients authenticate with proxy using SSL certificate.  Security plugin then translates DN (Distinguished Name) that comes along the certificate to a Kafka principal, which is then used to authenticate with brokers. In order for this to work, we need to ensure we have all the REST client principals configured in the JAAS file that proxy will use at startup.  Note that if you add a new client, you’ll have to reconfigure this.

// Make sure principal names don't have domain specified.
// Principal lookup for REST Clients fail, if that is set
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/appuser1.keytab"
  storeKey=true
  principal="appuser1";
  
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/appuser2.keytab"
  storeKey=true
  principal="appuser2";

  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/dimadmin.keytab"
  storeKey=true
  principal="sradmin";
};

Connect

Following steps describe how to setup a distributed connect cluster to sink secure topics to HDFS. Let’s assume “connectuser” as the principal that runs the connector. This account will need proper ACLs to access connect related topics.

KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Create --cluster
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Read --topic connect-status --group connect-cluster
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Read --topic connect-offsets --group connect-cluster
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Read --topic connect-configs --group connect-cluster
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Write --topic connect-status
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Write --topic connect-offsets
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Write --topic connect-configs

Grant topic ACLs for Schema Registry user as well.

KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:schemaregistry --operation Describe --topic connect-status --topic connect-configs --topic connect-offsets --allow-host '<SR_HOST>'

Set up following properties for the distributed connector.

#############################################
#          Security Settings                #
#############################################
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
security.protocol=SASL_PLAINTEXT
consumer.sasl.mechanism=GSSAPI
consumer.sasl.kerberos.service.name=kafka
consumer.security.protocol=SASL_PLAINTEXT
#############################################
#             Other Settings                #
#############################################
bootstrap.servers=<BROKER_HOST1>:9093,<BROKER_HOST2>:9093,<BROKER_HOST3>:9093
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=https://<SR_HOST>:<SR_PORT>
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=https://<SR_HOST>:<SR_PORT>
value.converter.schemas.enable=true
plugin.path=/usr/share/java

Now, start the connector with right JAAS file.

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/connectuser.keytab"
  storeKey=true
  principal="connectuser@<DOMAIN>";
};

Control Center

Finally, let’s configure Confluent Control center to talk to Kerberos enabled cluster. Remember that this is also a licensed component.

Set the initialization parameters (notice C3 talks to secure broker end points).

confluent.controlcenter.kafka.development.bootstrap.servers=<BROKER_HOST1>:9093,<BROKER_HOST2>:9093,<BROKER_HOST3>:9093
confluent.controlcenter.kafka.development.sasl.mechanism=GSSAPI
confluent.controlcenter.kafka.development.security.protocol=SASL_PLAINTEXT
confluent.controlcenter.kafka.development.sasl.kerberos.service.name=kafka

# License string for the Control Center
confluent.license=<LICENSE>

# Schema Registry cluster URL
confluent.controlcenter.schema.registry.url=https://<SR_HOST>:<SR_PORT>

#######################################   https  ############################################
confluent.controlcenter.rest.listeners=https://0.0.0.0:9021
confluent.controlcenter.rest.ssl.keystore.location=<FILE>
confluent.controlcenter.rest.ssl.keystore.password=<PASSWORD>
confluent.controlcenter.rest.ssl.key.password=<PASSWORD>
confluent.controlcenter.rest.ssl.truststore.location=<FILE>
confluent.controlcenter.rest.ssl.truststore.password=<PASSWORD>

Make sure we have ACLs on cluster + _schemas + secure topics + connect topics + connect consumer groups to c3admin principal.

KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties" kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --allow-host '<C3_HOST>' --operation Describe --operation DescribeConfigs --operation Create --cluster

KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties" kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --allow-host '<C3_HOST>' --operation Read --operation Describe --topic _schemas

KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties" kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --allow-host '<C3_HOST>' --operation Read --operation Describe --topic connect-configs --topic connect-offsets --topic connect-status

KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties" kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --allow-host '<C3_HOST>' --operation Read --operation Describe --operation DescribeConfigs --topic canary_topic

KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --consumer --allow-host '<C3_HOST>' --topic canary_topic --group connect-hdfs-sink-canary_topic

Finally, start C3 with right JAAS file.

// Kafka Client
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/c3admin.keytab"
  storeKey=true
  principal="c3admin/<C3_HOST>@<DOMAIN>";
};

// ZK Client
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/c3admin.keytab"
  storeKey=true
  principal="c3admin/<C3_HOST>@<DOMAIN>";
};

One thought on “Kafka+Kerberos

  1. Hello, Very nice article. I have. question on testing REST PROXY on SASL_SSL enabled cluster. On a two way SSL enabled cluster I have used below command to test it . What changes will be required to test it on SASL_SSL cluster ?

    curl -X POST -H “Content-Type: application/vnd.kafka.json.v2+json” -H “Accept: application/vnd.kafka.v2+json” –data ‘{“records”:[{“value”:{“Name”:”Anirban”}}]}’ –cacert /Users/MM47294/mycert.crt https://sp-qa-rest-proxy.dsawsnprd.mycompany.com:443/topics/test

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s