Summary
Here we document how to secure Kafka cluster with Kerberos. Kerberos is one of the most widely used security protocol in corporate networks, thanks largely to widespread adoption of Microsoft Active Directory in corporations for directory-based identity-related services. A fair number of Kafka installations live alongside Hadoop in a Big data ecosystem. Kerberos is the only way to provide authentication in Hadoop, so it seems natural to use same stack to secure Kafka installations too. This blog is the result of an implementation that was done recently at a customer site. Customer was already running Kerberized Hadoop cluster that was used as one of the sinks for streaming data. Please note that we used Confluent licensed components (Schema Registry, REST Proxy, Group Authorization, Control Center) for this setup. At the time of writing this, it’s the only way to achieve end to end security with Kerberos.
Architecture
Details
Zookeeper
We will be running Zookeeper service with a dedicated SPN for each instance of service. Zookeeper will be started with a JAAS file that looks like below.
Server { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false useKeyTab=true keyTab="/etc/kafka/keytabs/zookeeper.keytab" storeKey=true principal="zookeeper/<HOST>@<DOMAIN>"; };
Following snippet shows security related configurations for Zookeeper. Zookeeper instance is configured to require valid authentication provided to interact with it. This is important since Zookeeper is used to store Kafka Access Control List (ACL) that are used for enforcing Authorization of resources.
# Zookeeper SASL Configuration authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000 kerberos.removeHostFromPrincipal=true kerberos.removeRealmFromPrincipal=true
Brokers
Since we use the PLAINTEXT port for inter-broker communication, the ANONYMOUS user will be used between brokers (Replication, for example). To help with that, add following ACL.
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:ANONYMOUS --allow-host <BROKER_HOST1> --allow-host <BROKER_HOST2> --allow-host <BROKER_HOSTn> --operation All --cluster
Add/edit security related parameters for broker. Note that we set zookeeper.set.acl parameter to TRUE. This makes sure only privileged users can manipulate Zookeeper internal structures, enhancing security.
############################################# # KERBEROS # ############################################# sasl.enabled.mechanisms=GSSAPI,PLAIN # Inter broker communication is done using PLAINTEXT protocol security.inter.broker.protocol=PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN # PLAINTEXT and SASL_PLAINTEXT listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093 advertised.listeners=PLAINTEXT://<BROKER_HOST>:9092,SASL_PLAINTEXT://<BROKER_HOST>:9093 sasl.kerberos.service.name=kafka # Enable ACLs on ZooKeeper znodes zookeeper.set.acl=true # https://community.hortonworks.com/articles/14463/auth-to-local-rules-syntax.html # We just want UPN sasl.kerberos.principal.to.local.rules=RULE:[1:$1],RULE:[2:$1],DEFAULT
Create SPN for each broker. Start broker instance with proper JAAS file.
// Broker KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false useKeyTab=true keyTab="/etc/security/keytabs/kafka.keytab" storeKey=true principal="kafka/<HOST>@<DOMAIN>" }; // Broker is Zookeeper Client Client { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false useKeyTab=true keyTab="/etc/security/keytabs/kafka.keytab" storeKey=true principal="kafka/<HOST>@<DOMAIN>"; };
Run Zookeeper migration tool
If you’re already running a non secure cluster and migrating to security, run following tool. This will set the ACLs on Zookeeper znodes recursively.
# cat /etc/kafka/zookeeper_client_jaas.properties Client { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false useKeyTab=true keyTab="/etc/security/keytabs/kafka.keytab" storeKey=true principal="kafka/<ZK_HOST>@<DOMAIN>"; }; KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties /bin/zookeeper-security-migration --zookeeper.acl secure --zookeeper.connect <ZK_HOST>:2181
Schema Registry
Schema registry is very important component to secure, from Data governance point of view. By default, client applications automatically register new schemas. If they produce new messages to a new topic, then they will automatically try to register new schemas. With security enabled, we are disabling this behavior, so only cluster admins can do it. In order for the applications that interact with Schema Registry to work, they need to disable automatic schema registration by setting the configuration parameter auto.register.schemas=false. Eg: In Java Application, this is how one would do it.
producerProperties.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, "false");
Run each instance of Schema Registry service using a dedicated SPN.
Following listing shows relevant parameters for Schema Registry. Note that registry is talking to secure Kafka end points. In addition, it has configuration for end clients to securely talk to it over SSL (Authentication).
kafkastore.bootstrap.servers=SASL_PLAINTEXT://<BROKER_HOST1>:9093,SASL_PLAINTEXT://<BROKER_HOST2>:9093,SASL_PLAINTEXT://<BROKER_HOST3>:9093 kafkastore.security.protocol=SASL_PLAINTEXT # Configure a service name that matches the primary name of the Kafka server configured in the broker JAAS file. kafkastore.sasl.kerberos.service.name=kafka kafkastore.sasl.mechanism=GSSAPI # Schema Registry Security Plugin schema.registry.resource.extension.class=io.confluent.kafka.schemaregistry.security.SchemaRegistrySecurityResourceExtension confluent.schema.registry.auth.mechanism=SSL ssl.client.auth=true schema.registry.inter.instance.protocol=HTTPS confluent.license=<LICENSE> confluent.schema.registry.authorizer.class=io.confluent.kafka.schemaregistry.security.authorizer.schemaregistryacl.SchemaRegistryAclAuthorizer # Additional configurations for HTTPS ssl.truststore.location=<FILE> ssl.truststore.password=<PASSWORD> ssl.keystore.location=<FILE> ssl.keystore.password=<PASSWORD> ssl.key.password=<PASSWORD>
Next, we need to add Schema Registry ACLs so only a designated admin account (let’s call it “sradmin” can register/delete/update schemas. In addition, since we use C3 in the setup, it needs to an admin account too (Let’s call this account “c3admin”).
# cat /etc/schema-registry/schemaregistry_client_jaas.properties // Kafka Client KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false useKeyTab=true keyTab="/etc/security/keytabs/schemaregistry.keytab" storeKey=true principal="schemaregistry/<SR_HOST>@<DOMAIN>"; }; # Provide admin privileges to "sradmin" and "c3admin" users SECURITY_PLUGINS_OPTS=-Djava.security.auth.login.config=/etc/schema-registry/schemaregistry_client_jaas.properties /bin/sr-acl-cli --add --config /etc/schema-registry/schema-registry.properties -o \* -s \* -p "CN=sradmin.mycompany.com,O=My Company,L=My City,ST=California,C=US" -p "CN=c3admin.mycompany.com,O=My Company,L=My City,ST=California,C=US" # Now, provide read privileges to everyone SECURITY_PLUGINS_OPTS=-Djava.security.auth.login.config=/etc/schema-registry/schemaregistry_client_jaas.properties /bin/sr-acl-cli --add --config /etc/schema-registry/schema-registry.properties -o GLOBAL_COMPATIBILITY_READ -o GLOBAL_SUBJECTS_READ -p \* SECURITY_PLUGINS_OPTS=-Djava.security.auth.login.config=/etc/schema-registry/schemaregistry_client_jaas.properties /bin/sr-acl-cli --add --config /etc/schema-registry/schema-registry.properties -o SUBJECT_COMPATIBILITY_READ -o SUBJECT_READ -s \* -p \*
We also need to provide access to “_schemas” topic to “schemaregistry” principal, so it can create/destroy schemas as needed. In addition, we need to provide same access to “c3admin” user as well, as it’s the account used from Control Center.
#Provide "schemaregistry" user following ACLs on "_schemas" topic. Create the topic (with 1 partition, compact cleanup policy), if it doesn't exist cat /etc/kafka/zookeeper_client_jaas.properties Client { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false useKeyTab=true keyTab="/etc/security/keytabs/kafka.keytab" storeKey=true debug=true principal="kafka/<ZK_HOST>@<DOMAIN>"; }; KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:schemaregistry --operation DescribeConfigs --topic _schemas --cluster KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:schemaregistry --producer --topic _schemas KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:schemaregistry --consumer --topic _schemas --group schema-registry # For C3, add all permissions from C3 host KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --allow-host '<C3_HOST>' --topic _schemas
Start the service with appropriate JAAS file.
// Kafka Client KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false useKeyTab=true keyTab="/etc/security/keytabs/schemaregistry.keytab" storeKey=true principal="schemaregistry/<SR_HOST>@<DOMAIN>"; }; // Zookeeper Client Client { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false useKeyTab=true keyTab="/etc/security/keytabs/kafka.keytab" storeKey=true principal="kafka/<SR_HOST>@<DOMAIN>"; };
REST Proxy
REST proxy security uses confluent licensed plugins. Following listing shows relevant parameters that need to be setup.
bootstrap.servers=SASL_PLAINTEXT://<BROKER_HOST1>:9093,SASL_PLAINTEXT://<BROKER_HOST2>:9093,SASL_PLAINTEXT://<BROKER_HOST3>:9093 schema.registry.url=https://<SR_HOST>:<SR_PORT> producer.auto.register.schemas=false # License for security plugins confluent.license=<LICENSE> ############################################# # SSL/Kerberos # ############################################# kafka.rest.resource.extension.class=io.confluent.kafkarest.security.KafkaRestSecurityResourceExtension # The authentication mechanism for the incoming requests confluent.rest.auth.propagate.method=SSL client.security.protocol=SASL_PLAINTEXT client.sasl.mechanism=GSSAPI client.sasl.kerberos.service.name=kafka # A list of rules to map from the distinguished name(DN)in the client certificate to a # short name principal for authentication with the Kafka broker. Rules are tested # from left to right. The first rule that matches will be applied. confluent.rest.auth.ssl.principal.mapping.rules=RULE:^CN=(.*?).mycompany.com,.*$/$1/,RULE:^CN=(.*?),.*$/$1/ # Require REST clients to authenticate using SSL ssl.client.auth=true # SSL Stores used for REST clients for Authentication ssl.keystore.location=<FILE> ssl.keystore.password=<PASSWORD> ssl.key.password=<PASSWORD> ssl.truststore.location=<FILE> ssl.truststore.password=<PASSWORD>
REST clients authenticate with proxy using SSL certificate. Security plugin then translates DN (Distinguished Name) that comes along the certificate to a Kafka principal, which is then used to authenticate with brokers. In order for this to work, we need to ensure we have all the REST client principals configured in the JAAS file that proxy will use at startup. Note that if you add a new client, you’ll have to reconfigure this.
// Make sure principal names don't have domain specified. // Principal lookup for REST Clients fail, if that is set KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false useKeyTab=true keyTab="/etc/security/keytabs/appuser1.keytab" storeKey=true principal="appuser1"; com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false useKeyTab=true keyTab="/etc/security/keytabs/appuser2.keytab" storeKey=true principal="appuser2"; com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false useKeyTab=true keyTab="/etc/security/keytabs/dimadmin.keytab" storeKey=true principal="sradmin"; };
Connect
Following steps describe how to setup a distributed connect cluster to sink secure topics to HDFS. Let’s assume “connectuser” as the principal that runs the connector. This account will need proper ACLs to access connect related topics.
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Create --cluster KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Read --topic connect-status --group connect-cluster KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Read --topic connect-offsets --group connect-cluster KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Read --topic connect-configs --group connect-cluster KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Write --topic connect-status KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Write --topic connect-offsets KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Write --topic connect-configs
Grant topic ACLs for Schema Registry user as well.
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:schemaregistry --operation Describe --topic connect-status --topic connect-configs --topic connect-offsets --allow-host '<SR_HOST>'
Set up following properties for the distributed connector.
############################################# # Security Settings # ############################################# sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka security.protocol=SASL_PLAINTEXT consumer.sasl.mechanism=GSSAPI consumer.sasl.kerberos.service.name=kafka consumer.security.protocol=SASL_PLAINTEXT ############################################# # Other Settings # ############################################# bootstrap.servers=<BROKER_HOST1>:9093,<BROKER_HOST2>:9093,<BROKER_HOST3>:9093 config.storage.topic=connect-configs offset.storage.topic=connect-offsets status.storage.topic=connect-status group.id=connect-cluster key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=https://<SR_HOST>:<SR_PORT> value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=https://<SR_HOST>:<SR_PORT> value.converter.schemas.enable=true plugin.path=/usr/share/java
Now, start the connector with right JAAS file.
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false useKeyTab=true keyTab="/etc/security/keytabs/connectuser.keytab" storeKey=true principal="connectuser@<DOMAIN>"; };
Control Center
Finally, let’s configure Confluent Control center to talk to Kerberos enabled cluster. Remember that this is also a licensed component.
Set the initialization parameters (notice C3 talks to secure broker end points).
confluent.controlcenter.kafka.development.bootstrap.servers=<BROKER_HOST1>:9093,<BROKER_HOST2>:9093,<BROKER_HOST3>:9093 confluent.controlcenter.kafka.development.sasl.mechanism=GSSAPI confluent.controlcenter.kafka.development.security.protocol=SASL_PLAINTEXT confluent.controlcenter.kafka.development.sasl.kerberos.service.name=kafka # License string for the Control Center confluent.license=<LICENSE> # Schema Registry cluster URL confluent.controlcenter.schema.registry.url=https://<SR_HOST>:<SR_PORT> ####################################### https ############################################ confluent.controlcenter.rest.listeners=https://0.0.0.0:9021 confluent.controlcenter.rest.ssl.keystore.location=<FILE> confluent.controlcenter.rest.ssl.keystore.password=<PASSWORD> confluent.controlcenter.rest.ssl.key.password=<PASSWORD> confluent.controlcenter.rest.ssl.truststore.location=<FILE> confluent.controlcenter.rest.ssl.truststore.password=<PASSWORD>
Make sure we have ACLs on cluster + _schemas + secure topics + connect topics + connect consumer groups to c3admin principal.
KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties" kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --allow-host '<C3_HOST>' --operation Describe --operation DescribeConfigs --operation Create --cluster KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties" kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --allow-host '<C3_HOST>' --operation Read --operation Describe --topic _schemas KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties" kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --allow-host '<C3_HOST>' --operation Read --operation Describe --topic connect-configs --topic connect-offsets --topic connect-status KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties" kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --allow-host '<C3_HOST>' --operation Read --operation Describe --operation DescribeConfigs --topic canary_topic KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --consumer --allow-host '<C3_HOST>' --topic canary_topic --group connect-hdfs-sink-canary_topic
Finally, start C3 with right JAAS file.
// Kafka Client KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false useKeyTab=true keyTab="/etc/security/keytabs/c3admin.keytab" storeKey=true principal="c3admin/<C3_HOST>@<DOMAIN>"; }; // ZK Client Client { com.sun.security.auth.module.Krb5LoginModule required useTicketCache=false useKeyTab=true keyTab="/etc/security/keytabs/c3admin.keytab" storeKey=true principal="c3admin/<C3_HOST>@<DOMAIN>"; };
Hello, Very nice article. I have. question on testing REST PROXY on SASL_SSL enabled cluster. On a two way SSL enabled cluster I have used below command to test it . What changes will be required to test it on SASL_SSL cluster ?
curl -X POST -H “Content-Type: application/vnd.kafka.json.v2+json” -H “Accept: application/vnd.kafka.v2+json” –data ‘{“records”:[{“value”:{“Name”:”Anirban”}}]}’ –cacert /Users/MM47294/mycert.crt https://sp-qa-rest-proxy.dsawsnprd.mycompany.com:443/topics/test
LikeLike