OAuth Integration for Kafka

Summary

We recently pushed OAuth2 authentication for Kafka into Production for a fortune 500 client. KIP-255 allows one to implement OAuth2 authentication from Java clients to brokers and for inter-broker authentication. In addition to these, we also implemented OAuth2 authentication for REST Proxy clients. In this post, we give enough details for anyone interested to do these on their own (short of handing over the code). If you’re interested, we would love to get engaged with you to implement these for your Kafka clusters. Keep in mind that there’s no official implementation of this available anywhere yet, including confluent distribution of Kafka.

Once the client is authenticated, we can use simpleAclAuthorizer that ships with Kafka to authorize operations on Kafka resources (topics, for example). This helps implement security in Kafka clusters which is very important in many customer installations, especially multi-tenant ones.

Architecture

Following picture depicts how a Java client will interact with the brokers using credentials configured in JAAS file (credentials can be configured programmatically as well).

Following diagram illustrates on how brokers authenticate between themselves for inter-broker communication. In reality, this is very similar to how Java clients authenticate with brokers.

Finally, following diagram shows how a REST client uses OAuth2 authentication mechanism.

Implementation Notes

  • AuthenticateCallbackHandler interface is implemented for Java authentication (inter-broker is a Java client really)
  • RestResourceExtension interface is implemented for REST Client authentication
  • Useful JMX Metrics are provided to let Operators know how security is working (Successful and failed request count at cluster level as well as topic level)
  • Security can be rolled into an existing cluster in a phased manner. Eg: cluster can be setup to serve authenticated as well as unauthenticated traffic in the beginning. You can work with tenants to move the applications to use OAuth2 authentication. When all tenants (or Applications) are using authentication, you can turn off unauthenticated traffic completely.
  • You can add additional scope check (shown in the diagram), in case you want additional security.

Feel free to leave comments. If you’re really interested in implementing this, drop me a line at manoj@upala.com

Kafka+Kerberos

Summary

Here we document how to secure Kafka cluster with Kerberos. Kerberos is one of the most widely used security protocol in corporate networks, thanks largely to widespread adoption of Microsoft Active Directory in corporations for directory-based identity-related services. A fair number of Kafka installations live alongside Hadoop in a Big data ecosystem. Kerberos is the only way to provide authentication in Hadoop, so it seems natural to use same stack to secure Kafka installations too. This blog is the result of an implementation that was done recently at a customer site. Customer was already running Kerberized Hadoop cluster that was used as one of the sinks for streaming data. Please note that we used Confluent licensed components (Schema Registry, REST Proxy, Group Authorization, Control Center) for this setup. At the time of writing this, it’s the only way to achieve end to end security with Kerberos.

Architecture

 

Detailed Kafka Security Architecture

Details

 

Zookeeper

We will be running Zookeeper service with a dedicated SPN for each instance of service. Zookeeper will be started with a JAAS file that looks like below.

Server {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/kafka/keytabs/zookeeper.keytab"
  storeKey=true
  principal="zookeeper/<HOST>@<DOMAIN>";
};

Following snippet shows security related configurations for Zookeeper. Zookeeper instance is configured to require valid authentication provided to interact with it. This is important since Zookeeper is used to store Kafka Access Control List (ACL) that are used for enforcing Authorization of resources.

# Zookeeper SASL Configuration
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
kerberos.removeHostFromPrincipal=true
kerberos.removeRealmFromPrincipal=true

Brokers

Since we use the PLAINTEXT port for inter-broker communication, the ANONYMOUS user will be used between brokers (Replication, for example). To help with that, add following ACL.

KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:ANONYMOUS --allow-host <BROKER_HOST1> --allow-host <BROKER_HOST2> --allow-host <BROKER_HOSTn> --operation All --cluster

Add/edit security related parameters for broker. Note that we set zookeeper.set.acl parameter to TRUE. This makes sure only privileged users can manipulate Zookeeper internal structures, enhancing security.

#############################################
#               KERBEROS                    #
#############################################
sasl.enabled.mechanisms=GSSAPI,PLAIN
# Inter broker communication is done using PLAINTEXT protocol
security.inter.broker.protocol=PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
# PLAINTEXT and SASL_PLAINTEXT
listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://:9093
advertised.listeners=PLAINTEXT://<BROKER_HOST>:9092,SASL_PLAINTEXT://<BROKER_HOST>:9093
sasl.kerberos.service.name=kafka
# Enable ACLs on ZooKeeper znodes
zookeeper.set.acl=true
# https://community.hortonworks.com/articles/14463/auth-to-local-rules-syntax.html
# We just want UPN
sasl.kerberos.principal.to.local.rules=RULE:[1:$1],RULE:[2:$1],DEFAULT

Create SPN for each broker. Start broker instance with proper JAAS file.

// Broker
KafkaServer {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/kafka.keytab"
  storeKey=true
  principal="kafka/<HOST>@<DOMAIN>"
};

// Broker is Zookeeper Client
Client {
  com.sun.security.auth.module.Krb5LoginModule required 
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/kafka.keytab"
  storeKey=true
  principal="kafka/<HOST>@<DOMAIN>";
};

Run Zookeeper migration tool

If you’re already running a non secure cluster and migrating to security, run following tool. This will set the ACLs on Zookeeper znodes recursively.

# cat /etc/kafka/zookeeper_client_jaas.properties 
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/kafka.keytab"
  storeKey=true
  principal="kafka/<ZK_HOST>@<DOMAIN>";
};

KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties /bin/zookeeper-security-migration --zookeeper.acl secure --zookeeper.connect <ZK_HOST>:2181

Schema Registry

Schema registry is very important component to secure, from Data governance point of view. By default, client applications automatically register new schemas. If they produce new messages to a new topic, then they will automatically try to register new schemas. With security enabled, we are disabling this behavior, so only cluster admins can do it. In order for the applications that interact with Schema Registry to work, they need to disable automatic schema registration by setting the configuration parameter auto.register.schemas=false. Eg: In Java Application, this is how one would do it.

producerProperties.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, "false");

Run each instance of Schema Registry service using a dedicated SPN.

Following listing shows relevant parameters for Schema Registry. Note that registry is talking to secure Kafka end points. In addition, it has configuration for end clients to securely talk to it over SSL (Authentication).

kafkastore.bootstrap.servers=SASL_PLAINTEXT://<BROKER_HOST1>:9093,SASL_PLAINTEXT://<BROKER_HOST2>:9093,SASL_PLAINTEXT://<BROKER_HOST3>:9093
kafkastore.security.protocol=SASL_PLAINTEXT
# Configure a service name that matches the primary name of the Kafka server configured in the broker JAAS file.
kafkastore.sasl.kerberos.service.name=kafka
kafkastore.sasl.mechanism=GSSAPI

# Schema Registry Security Plugin
schema.registry.resource.extension.class=io.confluent.kafka.schemaregistry.security.SchemaRegistrySecurityResourceExtension
confluent.schema.registry.auth.mechanism=SSL
ssl.client.auth=true
schema.registry.inter.instance.protocol=HTTPS
confluent.license=<LICENSE>
confluent.schema.registry.authorizer.class=io.confluent.kafka.schemaregistry.security.authorizer.schemaregistryacl.SchemaRegistryAclAuthorizer

# Additional configurations for HTTPS
ssl.truststore.location=<FILE>
ssl.truststore.password=<PASSWORD>
ssl.keystore.location=<FILE>
ssl.keystore.password=<PASSWORD>
ssl.key.password=<PASSWORD>

Next, we need to add Schema Registry ACLs so only a designated admin account (let’s call it “sradmin” can register/delete/update schemas. In addition, since we use C3 in the setup, it needs to an admin account too (Let’s call this account “c3admin”).

# cat /etc/schema-registry/schemaregistry_client_jaas.properties
// Kafka Client
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/schemaregistry.keytab"
  storeKey=true
  principal="schemaregistry/<SR_HOST>@<DOMAIN>";
};


# Provide admin privileges to "sradmin" and "c3admin" users
SECURITY_PLUGINS_OPTS=-Djava.security.auth.login.config=/etc/schema-registry/schemaregistry_client_jaas.properties /bin/sr-acl-cli --add --config /etc/schema-registry/schema-registry.properties -o \* -s \* -p "CN=sradmin.mycompany.com,O=My Company,L=My City,ST=California,C=US" -p "CN=c3admin.mycompany.com,O=My Company,L=My City,ST=California,C=US"

# Now, provide read privileges to everyone
SECURITY_PLUGINS_OPTS=-Djava.security.auth.login.config=/etc/schema-registry/schemaregistry_client_jaas.properties /bin/sr-acl-cli --add --config /etc/schema-registry/schema-registry.properties -o GLOBAL_COMPATIBILITY_READ -o GLOBAL_SUBJECTS_READ -p \*

SECURITY_PLUGINS_OPTS=-Djava.security.auth.login.config=/etc/schema-registry/schemaregistry_client_jaas.properties /bin/sr-acl-cli --add --config /etc/schema-registry/schema-registry.properties -o SUBJECT_COMPATIBILITY_READ -o SUBJECT_READ -s \* -p \*

We also need to provide access to “_schemas” topic to “schemaregistry” principal, so it can create/destroy schemas as needed. In addition, we need to provide same access to “c3admin” user as well, as it’s the account used from Control Center.

#Provide "schemaregistry" user following ACLs on "_schemas" topic. Create the topic (with 1 partition, compact cleanup policy), if it doesn't exist

cat /etc/kafka/zookeeper_client_jaas.properties
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/kafka.keytab"
  storeKey=true
  debug=true
  principal="kafka/<ZK_HOST>@<DOMAIN>";
};

KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:schemaregistry --operation DescribeConfigs --topic _schemas --cluster
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:schemaregistry --producer --topic _schemas

KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:schemaregistry --consumer --topic _schemas --group schema-registry

# For C3, add all permissions from C3 host
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --allow-host '<C3_HOST>' --topic _schemas

Start the service with appropriate JAAS file.

// Kafka Client
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/schemaregistry.keytab"
  storeKey=true
  principal="schemaregistry/<SR_HOST>@<DOMAIN>";
};

// Zookeeper Client
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/kafka.keytab"
  storeKey=true
  principal="kafka/<SR_HOST>@<DOMAIN>";
};

REST Proxy

REST proxy security uses confluent licensed plugins. Following listing shows relevant parameters that need to be setup.

bootstrap.servers=SASL_PLAINTEXT://<BROKER_HOST1>:9093,SASL_PLAINTEXT://<BROKER_HOST2>:9093,SASL_PLAINTEXT://<BROKER_HOST3>:9093
schema.registry.url=https://<SR_HOST>:<SR_PORT>
producer.auto.register.schemas=false
# License for security plugins
confluent.license=<LICENSE>

#############################################
#                SSL/Kerberos               #
#############################################
kafka.rest.resource.extension.class=io.confluent.kafkarest.security.KafkaRestSecurityResourceExtension
# The authentication mechanism for the incoming requests
confluent.rest.auth.propagate.method=SSL
client.security.protocol=SASL_PLAINTEXT
client.sasl.mechanism=GSSAPI
client.sasl.kerberos.service.name=kafka
# A list of rules to map from the distinguished name(DN)in the client certificate to a 
# short name principal for authentication with the Kafka broker. Rules are tested 
# from left to right. The first rule that matches will be applied.
confluent.rest.auth.ssl.principal.mapping.rules=RULE:^CN=(.*?).mycompany.com,.*$/$1/,RULE:^CN=(.*?),.*$/$1/
# Require REST clients to authenticate using SSL
ssl.client.auth=true

# SSL Stores used for REST clients for Authentication
ssl.keystore.location=<FILE>
ssl.keystore.password=<PASSWORD>
ssl.key.password=<PASSWORD>
ssl.truststore.location=<FILE>
ssl.truststore.password=<PASSWORD>

REST clients authenticate with proxy using SSL certificate.  Security plugin then translates DN (Distinguished Name) that comes along the certificate to a Kafka principal, which is then used to authenticate with brokers. In order for this to work, we need to ensure we have all the REST client principals configured in the JAAS file that proxy will use at startup.  Note that if you add a new client, you’ll have to reconfigure this.

// Make sure principal names don't have domain specified.
// Principal lookup for REST Clients fail, if that is set
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/appuser1.keytab"
  storeKey=true
  principal="appuser1";
  
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/appuser2.keytab"
  storeKey=true
  principal="appuser2";

  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/dimadmin.keytab"
  storeKey=true
  principal="sradmin";
};

Connect

Following steps describe how to setup a distributed connect cluster to sink secure topics to HDFS. Let’s assume “connectuser” as the principal that runs the connector. This account will need proper ACLs to access connect related topics.

KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Create --cluster
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Read --topic connect-status --group connect-cluster
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Read --topic connect-offsets --group connect-cluster
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Read --topic connect-configs --group connect-cluster
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Write --topic connect-status
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Write --topic connect-offsets
KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:connectuser --operation Write --topic connect-configs

Grant topic ACLs for Schema Registry user as well.

KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:schemaregistry --operation Describe --topic connect-status --topic connect-configs --topic connect-offsets --allow-host '<SR_HOST>'

Set up following properties for the distributed connector.

#############################################
#          Security Settings                #
#############################################
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=kafka
security.protocol=SASL_PLAINTEXT
consumer.sasl.mechanism=GSSAPI
consumer.sasl.kerberos.service.name=kafka
consumer.security.protocol=SASL_PLAINTEXT
#############################################
#             Other Settings                #
#############################################
bootstrap.servers=<BROKER_HOST1>:9093,<BROKER_HOST2>:9093,<BROKER_HOST3>:9093
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
group.id=connect-cluster
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=https://<SR_HOST>:<SR_PORT>
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=https://<SR_HOST>:<SR_PORT>
value.converter.schemas.enable=true
plugin.path=/usr/share/java

Now, start the connector with right JAAS file.

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/connectuser.keytab"
  storeKey=true
  principal="connectuser@<DOMAIN>";
};

Control Center

Finally, let’s configure Confluent Control center to talk to Kerberos enabled cluster. Remember that this is also a licensed component.

Set the initialization parameters (notice C3 talks to secure broker end points).

confluent.controlcenter.kafka.development.bootstrap.servers=<BROKER_HOST1>:9093,<BROKER_HOST2>:9093,<BROKER_HOST3>:9093
confluent.controlcenter.kafka.development.sasl.mechanism=GSSAPI
confluent.controlcenter.kafka.development.security.protocol=SASL_PLAINTEXT
confluent.controlcenter.kafka.development.sasl.kerberos.service.name=kafka

# License string for the Control Center
confluent.license=<LICENSE>

# Schema Registry cluster URL
confluent.controlcenter.schema.registry.url=https://<SR_HOST>:<SR_PORT>

#######################################   https  ############################################
confluent.controlcenter.rest.listeners=https://0.0.0.0:9021
confluent.controlcenter.rest.ssl.keystore.location=<FILE>
confluent.controlcenter.rest.ssl.keystore.password=<PASSWORD>
confluent.controlcenter.rest.ssl.key.password=<PASSWORD>
confluent.controlcenter.rest.ssl.truststore.location=<FILE>
confluent.controlcenter.rest.ssl.truststore.password=<PASSWORD>

Make sure we have ACLs on cluster + _schemas + secure topics + connect topics + connect consumer groups to c3admin principal.

KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties" kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --allow-host '<C3_HOST>' --operation Describe --operation DescribeConfigs --operation Create --cluster

KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties" kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --allow-host '<C3_HOST>' --operation Read --operation Describe --topic _schemas

KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties" kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --allow-host '<C3_HOST>' --operation Read --operation Describe --topic connect-configs --topic connect-offsets --topic connect-status

KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties" kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --allow-host '<C3_HOST>' --operation Read --operation Describe --operation DescribeConfigs --topic canary_topic

KAFKA_OPTS=-Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties kafka-acls --authorizer-properties zookeeper.connect=<ZK_HOST>:2181 --add --allow-principal User:c3admin --consumer --allow-host '<C3_HOST>' --topic canary_topic --group connect-hdfs-sink-canary_topic

Finally, start C3 with right JAAS file.

// Kafka Client
KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/c3admin.keytab"
  storeKey=true
  principal="c3admin/<C3_HOST>@<DOMAIN>";
};

// ZK Client
Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  keyTab="/etc/security/keytabs/c3admin.keytab"
  storeKey=true
  principal="c3admin/<C3_HOST>@<DOMAIN>";
};

Kafka Security using SSL

Summary

 

There are few posts on the internet that talk about Kafka security, such as this one. However, none of them cover the topic from end to end. This article is an attempt to bridge that gap for folks who are interested in securing their clusters from end to end. We will discuss securing all the components of Confluent’s Open source distribution of Kafka which includes following components: Zookeeper, Brokers, Schema Registry, REST Proxy and Connectors. This came out a PoC that we did for a customer recently.

Architecture

 

Following diagram explains how the architecture looks for the PoC.

Slide2.jpg

We are going to use SSL for authenticating to the cluster as well as inter-broker communication. SSL for client and server interaction is necessitated due to the fact that Confluent REST Proxy propagates the principal used to authorize a topic via SSL only (in the latest available version which is Confluent 3.30.11).

Details

 

Secure Zookeeper

 

Since we are going to implement authorization, it’s imperative that we secure zookeeper first. This is where Kafka stores all ACLs (Access Control Lists) and we need to ensure only an Administrator can create/modify/delete them. If we don’t do this step, we defeat the purpose of securing the cluster, when it comes to Authorization.

ZooKeeper provides SASL based authentication mechanism that we’re going to use to secure the cluster.  Confluent distribution provided startup scripts for all the services that we’ll modify to suit our security needs (such as passing JAAS file as an argument).

# tail -2 /bin/zookeeper-server-start-secure 

exec $base_dir/kafka-run-class $EXTRA_ARGS -Djava.security.auth.login.config=/etc/kafka/zookeeper_server_jaas.properties org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"

JAAS file will contain user credentials.

# cat /etc/kafka/zookeeper_server_jaas.properties
/*****
* user/password is used for connections between ZK instances. 
* user_zk_admin defines a user that client uses to authenticate with this ZK instance.
*****/

Server {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    user="zk_admin"
    password="zk_admin_secret"
    user_zk_admin="zk_admin_secret";
};

Finally, configure Zookeeper to use SASL mechanism for authentication in it’s properties file.

# SASL configuration
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl

Start zookeeper ensemble with this configuration.

# /bin/zookeeper-server-start-secure -daemon /etc/kafka/zookeeper.properties 

 

Secure Brokers using TLS

 

First thing we need to do is to generate a key and certificate for each of the brokers and each client in the ecosystem.The common name (CN) of the broker certificate must match the fully qualified domain name (FQDN) of the server, since during authentication, client will compare the CN with the DNS domain name to ensure that it is connecting to the desired broker. For the purpose of PoC, we used self signed certificates. Following picture shows the setup that we’ll use for the setup.

TLS Configuration.jpg

You may use scripts to automate the most of the TLS setup.

Generate CA and Truststore

 

Generate public-private key pair and certificate for the CA and add the same CA certificate to each broker’s truststore.

#!/bin/bash

PASSWORD=test1234
CLIENT_PASSWORD=test1234
VALIDITY=365

# Generate CA (certificate authority) public-private key pair and certificate, and it is intended to sign other certificates.
openssl req -new -x509 -keyout ../ca/ca-key -out ../ca/ca-cert -days $VALIDITY -passin pass:$PASSWORD -passout pass:$PASSWORD -subj "/C=US/ST=CA/L=San Jose/O=Company/OU=Org/CN=FQDN" -nodes

# Add the CA to the servers’ truststore 
keytool -keystore /etc/kafka/ssl-server-steps/ca/kafka.server.truststore.jks -alias CARoot -import -file /etc/kafka/ssl-server-steps/ca/ca-cert -storepass $PASSWORD -keypass $PASSWORD

Keystore stores each application’s identity, the truststore stores all the certificates that the application should trust. Importing a certificate into one’s truststore also means trusting all certificates that are signed by that certificate. This attribute is called the chain of trust, and it is particularly useful when deploying TLS on a large Kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that contains the CA certificate. That way all machines can authenticate all other machines. This is as depicted in the picture above.

Generate Certificate/key pair for brokers

 

Next, generate a certificate/key pair for each of the brokers using this script.

#!/bin/bash

PASSWORD=test1234
VALIDITY=365

if [ $# -lt 1 ];
then
   echo "`basename $0` <host fqdn|user name|app name>"
   exit 1
fi

CNAME=$1
ALIAS=`echo $CNAME|cut -f1 -d"."`


# Generate keypair, ensure CN matches exactly with the FQDN of the server.
keytool -noprompt -keystore kafka.server.keystore.jks -alias $ALIAS -keyalg RSA -validity $VALIDITY -genkey -dname "CN=$CNAME,OU=BDP,O=Company,L=San Jose,S=CA,C=US" -storepass $PASSWORD -keypass $PASSWORD

# The next step is to sign all certificates in the keystore with the CA we generated in another step
# First, you need to export the certificate from the keystore
keytool -keystore kafka.server.keystore.jks -alias $ALIAS -certreq -file cert-file -storepass $PASSWORD

# Then, sign it with the CA that was generated earlier
openssl x509 -req -CA /etc/kafka/ssl-server-steps/ca/ca-cert -CAkey /etc/kafka/ssl-server-steps/ca/ca-key -in cert-file -out cert-signed -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD

# Finally, you need to import both the certificate of CA and the signed certificate into the keystore
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file /etc/kafka/ssl-server-steps/ca/ca-cert -storepass $PASSWORD
keytool -keystore kafka.server.keystore.jks -alias $ALIAS -import -file cert-signed -storepass $PASSWORD

Save above commands in a file and run it with broker FQDN as argument to generate the key and certificate for that broker.

# ./generate_keystore.sh <FQDN>

Copy the broker keystore to each broker in a desired location. In addition, we need to copy same trust store that has CAs certificate to each broker as well.

Configure brokers

 

We will run the brokers that’ll server both SSL as well as non SSL traffic, since not every client may support SSL (or want to). The configuration looks like this for the brokers.

listeners=PLAINTEXT://<FQDN>:9092,SSL://<FQDN>:10092
security.inter.broker.protocol=SSL
zookeeper.set.acl=true
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

# By default, if a Resource R has no associated ACLs, no one other than super users is allowed to access R.
# We want the opposite, i.e. if ACL is not defined, allow all the access to the resource R
allow.everyone.if.no.acl.found=true

############### Kafka SSL #####################
ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=test1234
ssl.client.auth=requested

# In order to enable hostname verification
ssl.endpoint.identification.algorithm=HTTPS

Finally, when we use REST client to authenticate with REST Proxy, it passes the principal name to the brokers, which by default is DN (Distinguished Name) in clients’ certificate. We use a custom function to translate that into something that looks familiar. Following is the setting for it.

# Custom SSL principal builder
principal.builder.class=customPrincipalBuilderClass

This function breaks up the DN “CN=$CNAME,OU=BDP,O=Company,L=San Jose,S=CA,C=US” and translated principal name to “User=$CNAME”. This is useful since Kafka ACLs follow this format, that we’ll show later. For this to work, just drop the jar file containing customPrincipalBuilderClass into /usr/share/java/kafka directory.

Start brokers

 

Since we have secured zookeeper, we need to start brokers with a JAAS file that contains credentials to interact with zookeeper ensemble. Again, we copy the startup script to suit our need here and use it.

# tail -2 /bin/kafka-server-start-secure 

exec $base_dir/kafka-run-class $EXTRA_ARGS -Djava.security.auth.login.config=/etc/kafka/zookeeper_client_jaas.properties io.confluent.support.metrics.SupportedKafka "$@"

Contents of the JAAS file are as follows:

# cat /etc/kafka/zookeeper_client_jaas.properties
Client {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="zk_admin"
    password="zk_admin_secret";
};

Start broker using this script and earlier mentioned configuration.

# /bin/kafka-server-start-secure -daemon /etc/kafka/server.properties 

Configure Schema Registry and Start

 

Although it’s possible to use SSL for communication between brokers and schema registry, we didn’t see a need for it. Schema registry stores metadata about topic structures that are used by HDFS connectors to sink topic level data to Hive tables. Since this data is not super confidential, we decided to go with simpler configuration for it. However, since this component interacts with secure Zookeeper, we need to ensure we pass right credentials to the daemon.

Key configuration file parametes:

kafkastore.bootstrap.servers=PLAINTEXT://<Broker FQDN>:9092
zookeeper.set.acl=true

Here’s how the startup script looks like:

# tail -2 /bin/schema-registry-start-secure 

exec $(dirname $0)/schema-registry-run-class ${EXTRA_ARGS} -Djava.security.auth.login.config=/etc/schema-registry/schema_registry_client_jaas.properties io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain "$@"

Contents of the JAAS file:

# cat /etc/schema-registry/schema_registry_client_jaas.properties
/* Zookeeper client */
Client {
    org.apache.zookeeper.server.auth.DigestLoginModule required
    username="zk_admin"
    password="zk_admin_secret";
};

Start the schema registry, like so:

# /bin/schema-registry-start-secure -daemon /etc/schema-registry/schema-registry.properties 

Configure REST

 

Typically, you would need to run 2 separate instances of REST Proxy, one to server SSL traffic (which is a requirement for Kafka ACLs) and other one for non SSL traffic. We’ll discuss SSL configuration here. Here’s how the configuration file looks like:

listeners=https://0.0.0.0:8083
schema.registry.url=http://localhost:8081
zookeeper.connect=FQDN1:2181,FQDN2:2181,FQDN3:2181
bootstrap.servers=SSL://FQDN1:10092

# ********************************
# Kafka security
# ********************************

kafka.rest.resource.extension.class=io.confluent.kafkarest.security.KafkaRestSecurityResourceExtension

# Principal propagation for the incoming requests is determined by following - Only SSL allowed and is mandatory
confluent.rest.auth.propagate.method=SSL

# Configuration Options for HTTPS
ssl.client.auth=true
ssl.keystore.location=/etc/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password=test1234

# Configuration Options for SSL Encryption between REST Proxy and Apache Kafka Brokers
client.security.protocol=SSL
client.ssl.key.password=client1234
client.ssl.keystore.location=/etc/kafka/ssl/kafka.rest.keystore.jks
client.ssl.keystore.password=client1234
client.ssl.truststore.location=/etc/kafka/ssl/kafka.server.truststore.jks
client.ssl.truststore.password=test1234

Keystore file (kafka.rest.keystore.jks) that is needed to communicate with brokers is generated in same way as brokers (using generate_keystore.sh script). One difference is it’ll use a user name as alias instead of server name for the CN. Ensure that the certificate is signed by same CA certificate that was generated earlier and which was used to sign broker certificates. We generated the keystore to have 2 certificates, one generated for a user called secure_user and another one called inscure_user. These user names are used as CN for the certificates and stored in the same key store.

# ./client.sh secure_user
# ./client.sh insecure_user

Finally, notice that we use same trust store as the brokers.

Create topic level ACLs

 

Create a topic called test_secure_topic and add ACLs to it.

User:secure_user has Allow permission for operations: Read from hosts: <REST Proxy host>
User:secure_user has Allow permission for operations: Describe from hosts:<REST Proxy host>
User:secure_user has Allow permission for operations: Write from hosts:<REST Proxy host>
User:* has Allow permission for operations: Describe from hosts: *

Last ACL (anyone can describe this topic from anywhere) is needed to get a meaningful error message when someone who has no access tries an operation on the topic. If this ACL is absent, he’ll get UNKNOWN_TOPIC_OR_PARTITION error, which is cryptic.

Test Java client

 

We use console producer and consoler programs to test the ACLs against the topic.

kafka-avro-console-producer --broker-list <Broker FQDN>:10092 \
                            --topic test_secure_topic \
                            --producer.config client_ssl.properties \
                            --property value.schema='{"type":"record","name":"test","fields":[{"name":"name","type":"string"},{"name":"salary","type":"int"}]}' <<EOF
{"name": "console", "salary": 2000}
EOF

Producer config file contents are as follows:

security.protocol=SSL
ssl.keystore.location=kafka.client.keystore.jks
ssl.keystore.password=client1234
ssl.key.password=client1234
ssl.truststore.location=kafka.server.truststore.jks
ssl.truststore.password=client1234

kafka.client.keystore.jks is generated just like what we did for REST Proxy, except it uses certificate for just secure_user alias (generated by ./client.sh secure_user command) .We also use same trust store that’s used by everybody else.

Let’s consume that record that we ingested like below:

kafka-avro-console-consumer --bootstrap-server <Broker FQDN>:10092 --topic test_secure_topic --consumer.config client_ssl.properties --from-beginning --new-consumer

{"name":"console","salary":2000}

Let’s try the same with a certificate that’s generated for insecure_user alias (generated by ./client.sh insecure_user command).

[2017-11-09 22:57:51,435] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$:105)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test_secure_topic]

Test REST client

 

We’ll use curl for testing ACLs via REST. Since curl doesn’t understand java key store, we need to extract and store certificate and keys for the client in PEM (Privacy Enhanced Mail) format. We used a script to automate that portion.

#!/bin/bash

PASSWORD=client1234
if [ $# -lt 1 ];
then
   echo "`basename $0` <alias>"
   exit 1
fi

ALIAS=$1

rm -f /etc/kafka/ssl-client-steps/certs/kafka.client.keystore.p12

# Convert JKS keystore into PKCS#12 keystore, then into PEM file:
keytool -importkeystore -srckeystore kafka.client.keystore.jks \
   -destkeystore kafka.client.keystore.p12 \
   -srcalias $ALIAS \
   -destalias $ALIAS \
   -srcstoretype jks \
   -deststoretype pkcs12 \
   -srcstorepass $PASSWORD \
   -deststorepass $PASSWORD 

openssl pkcs12 -in kafka.client.keystore.p12 -chain -name $ALIAS -out ${ALIAS}.pem -passin pass:$PASSWORD -nodes 

For example, to extract keys and certificate for secure_user, run the script with secure_user alias as the argument. This will create secure_user.pem file, that’ll contain keys and certificate for that user.

# ../convert.sh secure_user
MAC verified OK

Whenever we make a call to REST Proxy using curl, we need to send the CA certificate and PEM file as arguments.

# Produce a message with JSON data

curl --tlsv1.0 --cacert <Location of CA Cert> -E secure_user.pem \
     -X POST -H "Content-Type: application/vnd.kafka.avro.v2+json" \
     -H "Accept: application/vnd.kafka.v2+json" \
     --data @test.data \
      https://<FQDN>:8083/topics/test_secure_topic

test.data file has the following:

{"value_schema_id": 21,"records": [ {
       "value": {
         "name":"REST API",
         "salary":1000
       }
     }
   ]
}

Note that we have registered that schema in schema registry earlier:

curl localhost:8081/schemas/ids/21

{"schema":"{\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"salary\",\"type\":\"int\"}]}"}

Consume the message that was created

# Create a consumer for AVRO data, starting at the beginning of the topic's
# log and subscribe to a topic. Then consume some data using the base URL in the first response.
# Finally, close the consumer with a DELETE to make it leave the group and clean up
# its resources.

curl --tlsv1.0 --cacert <Location of CA Cert> -E secure_user.pem \
     -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
     --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' \
      https://<REST Proxy FQDN>:8083/consumers/secure_topic_group

curl --tlsv1.0 --cacert <Location of CA Cert> -E secure_user.pem \
     -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
     --data '{"topics":["test_secure_topic"]}' \
     https://<REST Proxy FQDN>:8083/consumers/secure_topic_group/instances/my_consumer_instance/subscription

curl --tlsv1.0 --cacert <Location of CA Cert> -E secure_user.pem \
     -X GET -H "Accept: application/vnd.kafka.avro.v2+json" \
      https://<REST Proxy FQDN>:8083/consumers/secure_topic_group/instances/my_consumer_instance/records

curl --tlsv1.0 --cacert <Location of CA Cert> -E secure_user.pem \
     -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
     https://<REST Proxy FQDN>:8083/consumers/secure_topic_group/instances/my_consumer_instance

Read operation will produce output like below:

[{"topic":"test_secure_topic","key":null,"value":{"name":"REST API","salary":1000},"partition":0,"offset":0}]

When we try the same operation with insecure_user account that doesn’t have privileges for this topic (you’ll have to pass right PEM file in the curl call), Read operation will produce output like below:

{"error_code":50002,"message":"Kafka error: Not authorized to access group: secure_topic_group"}

Configure Connector

 

Connector is just another producer/consumer to a topic. Connectors connect to the cluster as ANONYMOUS, so we need to provide producer/consumer access to that role. We configure connectors to brokers using PLAINTEXT, since there’s no need to do SSL for this.

name=hdfs_sink_test_secure_topic
bootstrap.servers=PLAINTEXT://<Broker FQDN>:9092

Source connectors must be given WRITE permission to any topics that they need to write to. Similarly, sink connectors need READ permission to any topics they will read from. They also need Group READ permission since sink tasks depend on consumer groups internally. Connect defines the consumer group.id conventionally for each sink connector as connect-{name} where {name} is substituted by the name of the connector.

As an example, when we are running HDFS connector, we need following ACLs to the topic (for a connector that’s named as above).

Current ACLs for resource `Group:connect-hdfs_sink_test_secure_topic`: 
  User:ANONYMOUS has Allow permission for operations: Read from hosts: <Connector Host>

Current ACLs for resource `Topic:test_secure_topic`:
  User:ANONYMOUS has Allow permission for operations: Read from hosts:<Connector Host>

 

 

 

 

Setting up Tez on CDH Cluster

Summary

It is known that Cloudera has no official support for Tez execution engine. They push their customers to use Impala instead (or Hive on Spark nowadays). This article describes how we set Tez engine up on CDH cluster including Tez UI.

Install Java/Maven

Follow official instructions on how to install Java. Ensure version is same as the one you have your CDH cluster running on. In our case it is Java 7 update 67.

Follow instructions to install latest version of Maven.

Download Tez source

Download source from Tez site. We built 0.8.4 version for use on CDH 5.8.3 cluster.

Build Tez

As of this writing, Cloudera doesn’t support timeline server that ships with it’s distribution. If you want to host Tez UI, timeline server is required. A choice is to use apache version of timeline server. Timeline server that ships with hadoop 2.6 or later versions uses 1.9 version of jackson jars. CDH  5.8.3 however, uses 1.8.8 version of jackson jars, so we need to ensure when we build Tez against CDH, we need to ensure it is built with right jackson dependencies to ensure Tez is able to post events to apache version of timeline server.

Build protobuf 2.5.0

Tez requires protocol buffers 2.5.0. Download the source code from here. Build the binaries locally.

tar xvzf protobuf-2.5.0.tar.gz
cd protobuf-2.5.0
./configure --prefix $PWD
make
make check
make install

Verify version like so:

bin/protoc --version
libprotoc 2.5.0

Setup git protocol

Our firewall blocked git protocol, so we set it up to use https protocol.

git config --global url.https://github.com/.insteadOf git://github.com/

Build against CDH repo

Since we want to maintain compatibility with existing CDH installation, it’s a good idea to build against CDH 5.8.3 repo. Let’s create a new maven build profile for that.

    <profile>
       <id>cdh5.8.3</id>
       <activation>
       <activeByDefault>false</activeByDefault>
       </activation>
       <properties>
         <hadoop.version>2.6.0-cdh5.8.3</hadoop.version>
         <pig.version>0.12.0-cdh5.8.3</pig.version>
       </properties>
       <pluginRepositories>
         <pluginRepository>
         <id>cloudera</id>
         <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
         </pluginRepository>
       </pluginRepositories>
       <repositories>
         <repository>
           <id>cloudera</id>
           <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
         </repository>
       </repositories>
    </profile>

However, this repo will by default download 1.8.8 version of jackson jars, which will not work when Tez tries post events to apache timeline server, let’s ensure we specify those dependencies right.

     <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-mapper-asl</artifactId>
        <version>1.9.13</version>
      </dependency>
      <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-core-asl</artifactId>
        <version>1.9.13</version>
      </dependency>
      <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-jaxrs</artifactId>
        <version>1.9.13</version>
      </dependency>
      <dependency>
        <groupId>org.codehaus.jackson</groupId>
        <artifactId>jackson-xc</artifactId>
        <version>1.9.13</version>
      </dependency>

Here’s the complete pom.xml under the top level tez source directory for reference, in pdf format.

Finally, build Tez:

mvn -e clean package -DskipTests=true -Dmaven.javadoc.skip=true -Dprotoc.path=<location of protoc> -Pcdh5.8.3

Run Apache timeline server

We are going to run timeline server that ships with 2.7.3 version of apache hadoop. Install it on a separate server. Copy core-site.xml from CDH cluster. Change yarn-site.xml to have right timeline server properties.

  <property>
    <name>yarn.timeline-service.store-class</name>
    <value>org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore</value>
  </property>
  <property>
    <name>yarn.timeline-service.leveldb-timeline-store.path</name>
    <value>/var/log/hadoop-yarn/timeline/leveldb</value>
  </property>
   <property>
     <name>yarn.timeline-service.bind-host</name>
     <value>0.0.0.0</value>
   </property>
   <property>
     <name>yarn.timeline-service.leveldb-timeline-store.ttl-interval-ms</name>
     <value>300000</value>
   </property>
   <property>
     <name>yarn.timeline-service.hostname</name>
     <value>ATS_HOSTNAME</value>
   </property>
   <property>
     <name>yarn.timeline-service.http-cross-origin.enabled</name>
     <value>true</value>
   </property>

Start the timeline server after setting right environment variables.

yarn-daemon.sh start timelineserver

Install httpd

We are going to use apache web server to host Tez-UI.

yum install httpd

Untar tez-ui.war file that was built as part of Tez in the directory pointed to by DocumentRoot (default /var/www/html).

cd /var/www/html
mkdir tez-ui
cd tez-ui
jar xf tez-ui.war
ln -s . ui -- this is because UI is looking for <base_url>/ui from YARN UI when the app is still running

Setup YARN on CDH

Add following properties to YARN configuration of CDH cluster in “YARN Service Advanced Configuration Snippet (Safety Valve) for yarn-site.xml” and restart the service.

<property>
  <name>yarn.timeline-service.http-cross-origin.enabled</name>
   <value>true</value>
</property>
<property>
   <name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
   <value>true</value>
</property>
<property>
   <name>yarn.timeline-service.generic-application-history.enabled</name>
   <value>true</value>
</property>
<property>
   <name>yarn.timeline-service.enabled</name>
   <value>true</value>
</property>
<property>
   <name>yarn.timeline-service.hostname</name>
   <value>ATS_HOSTNAME</value>
</property>

Setup Tez

Ensure you copy tez-0.8.4.tar.gz that got created when  you built Tez to HDFS. Let’s say we copied it to /apps/tez. This has all the files needed by Tez.

hdfs dfs -copyFromLocal tez-0.8.4.tar.gz /apps/tez

On client,  extract tez-0.8.4.tar.gz in installation (let’s use /usr/local/tez/client) directory.

ssh root@<client>
mkdir -p /usr/local/tez/client 
cd /usr/local/tez/client
tar xvzf <location on FS>/tez-0.8.4.tar.gz

Setup /etc/tez/conf/tez-site.xml on the client. Following are some of the properties needed to make Tez work properly.

  <property>
    <name>tez.lib.uris</name>
    <value>/apps/tez/tez-0.8.4.tar.gz</value>
  </property>
  <property>
    <name>tez.use.cluster.hadoop-libs</name>
    <value>false</value>
  </property>
  <property>
    <name>tez.tez-ui.history-url.base</name>
    <value>UI_HOSTNAME/tez-ui</value>
  </property>
  <property>
    <name>yarn.timeline-service.hostname</name>
    <value>ATS_HOSTNAME</value>
  </property>
  <property>
    <name>yarn.timeline-service.enabled</name>
    <value>true</value>
  </property>
  <property>
    <name>tez.task.launch.env</name>
    <value>LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native</value>
  </property>
  <property>
    <name>tez.am.launch.env</name>
    <value>LD_LIBRARY_PATH=/opt/cloudera/parcels/CDH/lib/hadoop/lib/native</value>
  </property>
  <property>
    <name>tez.history.logging.service.class</name>
    <value>org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService</value>
  </property>

Ensure HADOOP_CLASSPATH is set properly on tez client.

export TEZ_CONF_DIR=/etc/tez/conf
export TEZ_JARS=/usr/local/tez/client
export HADOOP_CLASSPATH=${TEZ_CONF_DIR}:${TEZ_JARS}/*:${TEZ_JARS}/lib/*:`hadoop classpath`

If you are using hiveserver2 clients (beeline, HUE), ensure you have extracted tez-0.8.4.tar.gz contents in /usr/local/tez/client (as an example), and created /etc/tez/conf/tez-site.xml like you did on Tez client. In addition, set the HADOOP_CLASSPATH with actual values in hive configuration on CM at Hive Service Environment Advanced Configuration Snippet (Safety Valve).  Once done, restart hive.

HADOOP_CLASSPATH=/etc/tez/conf:/usr/local/tez/client/*:/usr/local/tez/client/lib/*:/etc/hadoop/conf:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop/.//*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-hdfs/./:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-hdfs/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-hdfs/.//*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-yarn/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop/libexec/../../hadoop-yarn/.//*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/lib/*:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/.//*

Run clients

-bash-4.1$ hive

Logging initialized using configuration in jar:file:/apps/cloudera/parcels/CDH-5.8.3-1.cdh5.8.3.p0.2/jars/hive-common-1.1.0-cdh5.8.3.jar!/hive-log4j.properties
WARNING: Hive CLI is deprecated and migration to Beeline is recommended.
hive> set hive.execution.engine=tez;
hive> select count(*) from warehouse.dummy;
Query ID = nex37045_20170305002020_18680c2c-0df1-4f5c-b0a5-5d511da78c63
Total jobs = 1
Launching Job 1 out of 1

Status: Running (Executing on YARN cluster with App id application_1488498341718_39067)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED      1          1        0        0       0       0
Reducer 2 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 02/02  [==========================>>] 100%  ELAPSED TIME: 6.91 s     
--------------------------------------------------------------------------------
OK
1
Time taken: 19.114 seconds, Fetched: 1 row(s)
hive>

You should see details for this job in Tez UI now.

screen-shot-2017-03-04-at-4-26-17-pm

 

 

Building rack aware mirroring scheme in Greenplum

Summary

Customers using software only installations of Greenplum have an option to configure mirrors that fit their availability and performance needs. This post describes a way that can be leveraged to maximize availability and performance when Greenplum cluster is utilizing servers on 2 more racks.

Rack/Server setup

In a typical data center, customers usually have servers in more than 1 rack. In order to mitigate risk of having a platform service (such as GPDB) go down because of a node or a rack going down, it’s recommended to configure the cluster on servers in multiple racks. By default, GPDB comes with a standby node for the master services (in active-passive mode). This host should be hosted on a rack different that the master node. In addition, GPDB provides mirroring for protection against data loss due to node failures. These mirrors can be configured in a way to provide protection against failures at the rack level (power failures, TOR switch issues etc) to enhance the availability of the clusters by a large degree. Following picture depicts how we did this at one of our customer sites:

Important things to notice:

  • Master and Standby nodes are on separate racks. VIP in front of them directs traffic to ‘live’ master
  • Segments on a worker node in a given rack has mirrors spread across segment hosts on different racks, using spread mirroring scheme (arrows depict master -> mirror relationship)
  • Every single node in the cluster hosts same number of mirrors, thus providing same performance degradation in case of node failures (Eg: In a setup where we have 4 mirrors/segment host as depicted in the diagram, in case of a node failure, each of it’s mirror hosts get 1 mirror)

More details…

We used separate ports for master (for incoming connection requests) and standby nodes (for replication traffic). In addition, F5 hardware load balancer was used that detected which was the ‘real’ master by probing the port that the master is supposed to be listening at. Finally, we had to manually move the mirrors using ‘gpmovemirrors’ utility using custom configuration file. We tested the setup in 3 separate ways:

  1. Remove rack 3, by rebooting all the servers on that rack in live cluster. Check if those segments fail over with minimal disruption.
  2. Remove rack 2, by rebooting all the servers on that rack in live cluster. Cluster should still be available.
  3. Remove rack 1, by rebooting all the servers on that rack in live cluster. Fail master over to rack 2 and bring the cluster up. It should work just fine.

Performing FULL VACCUM in Greenplum efficiently

Summary

Greenplum is a MPP Warehouse platform based on PostgreSQL database. We discuss one of the most important and most common maintenance task that needs to be executed on periodic basis on the platform.

What causes bloat?

Greenplum platform is ACID compliant. The isolation property ensures that the concurrent execution of transactions results in a system state that would be obtained if transactions were executed serially, i.e., one after the other. Providing isolation is the main goal of concurrency control. Greenplum (PostgreSQL, really) implements isolation with Multi Version Concurrency Control. In normal PostgreSQL operation, rows that are deleted or obsoleted by an update are not physically removed from their table; they remain present until a VACUUM is done. Therefore it’s necessary to do VACUUM periodically, especially on frequently updated tables. Failure to do table maintenance in order to allow reuse of this space causes table data file to grow bigger and therefore scans of the table take longer.

Options to cleanup the bloat

VACUUM FULL

This the slowest method of the lot, when VACUUM FULL command is executed, rows in the table are reshuffled. If there are large number of rows, it leads to various reshuffling of the data leading to very unpredictable time.

Any bloated catalog table of the database (i.e all tables under pg_catalog schema) can only use this method to remove the bloat, thus removing highly bloated catalog tables can be time-consuming.

Create Table As Select (CTAS)

CTAS is a quick method to remove bloat and using this method also helps to avoid the table lock (EXCLUSIVE LOCK) which VACUUM FULL method acquires to do the operation, thus allowing the table to be used by end users while maintenance is being performed on the main table.

The disadvantage is that it involves many steps to perform the activity:

  1. Obtain the DDL of the table using pg_dump -s -t <bloated-table-name> <database> command. This creates a script with all the sub objects like indexes that are involved with the table. It also provides list of grants associated with the table.
  2. Once the DDL is obtained, replace the to using the find/replace option with any editor of your choice and then execute the file on psql to create new objects in the database.
  3. Then follow steps put in the code below:
INSERT INTO <schema-name>.<new-table-name> SELECT * FROM <schema-name>.<bloated-table-name>;
ALTER TABLE <schema-name>.<bloated-table-name> RENAME TO <table-name-to-drop>;
ALTER TABLE <schema-name>.<new-table-name> RENAME TO <bloated-table-name>;
-- Once users confirm everything is good.
DROP TABLE <schema-name>.<bloated-table-name>;
Backup/Restore

This is another way to clear up the bloat, this involves backing up the tables and then restoring them back. Tools that can be used to achieve this are:

  1. gpcrondump / gpdbrestore
  2. gp_dump / gp_restore
  3. pg_dump / pg_restore
  4. COPY .. TO .. / COPY .. FROM ..
Redistribute

This is one the quickest method of the list, when you execute redistribute command, internally the database creates a new file and loads the existing data into it. Once load is done, it removes the old file, effectively eliminating the bloat. Following script automates this process by building redistribution script that preserves distribution key order.

SELECT 'ALTER TABLE '||n.nspname||'.'||c.relname||' SET with (reorganize=false) DISTRIBUTED RANDOMLY;\n'||
 'ALTER TABLE '||n.nspname||'.'||c.relname||' SET with (reorganize=true) DISTRIBUTED'||
 CASE
 WHEN length(dist_col) > 0 THEN
 ' BY ('||dist_col||');\n'
 ELSE
 ' RANDOMLY;\n'
 END||'ANALYZE '||n.nspname||'.'||c.relname||';\n'
 FROM pg_class c, pg_namespace n,
 (SELECT pc.oid,
 string_agg(attname, ', ' order by colorder) AS dist_col
 FROM pg_class AS pc
 LEFT JOIN (SELECT localoid,
 unnest(attrnums) as colnum,
 generate_series(1, array_upper(attrnums, 1)) as colorder
 FROM gp_distribution_policy
 ) AS d
 ON (d.localoid = pc.oid)
 LEFT JOIN pg_attribute AS a
 ON (d.localoid = a.attrelid AND d.colnum = a.attnum)
 GROUP BY
 pc.oid
 ) AS keys
 WHERE c.oid = keys.oid
 AND c.relnamespace = n.oid
 AND c.relkind='r'
 AND c.relstorage='h'
 -- Following filter takes out all partitions, we care just about master table
 AND c.oid NOT IN (SELECT inhrelid FROM pg_inherits)
 AND n.nspname not in (
 'gp_toolkit',
 'pg_toast',
 'pg_bitmapindex',
 'pg_aoseg',
 'pg_catalog',
 'information_schema')
 AND n.nspname not like 'pg_temp%';

When we execute this script against a database, it produces SQL that will cleanup the bloat when run against the database.

psql -t gpadmin -f fullvacuum.sql |sed 's/\s*$//'

ALTER TABLE public.testtable SET with (reorganize=false) DISTRIBUTED RANDOMLY;
ALTER TABLE public.testtable SET with (reorganize=true) DISTRIBUTED BY (n);
ANALYZE public.testtable;

Design Pattern: Avoid HAWQ creating too many small files on HDFS for tables

Summary

This post deals with rather unknown problem that HAWQ has on hadoop clusters, specifically creating too many small files on HDFS even when there’s negligible amount of data in tables. This post talks about our encounter with this problem and we overcame that issue. Bear in mind that the solution may not work for your use case, so you may have to get little creative to workaround the problem.

Problem

I am sure you have heard that for larger read-only tables and especially the ones with lots of columns, using Column orientation is a good idea. Column-oriented tables can offer better query performance on wide tables (lots of columns) where you typically only access a small subset of columns in your queries. It is one of the strategies used to reduce Disk IO. At our install site, HAWQ is used as an archival platform for ERP databases that tend to have lots and lots of tables with negligible amount of data. The data is rarely queried and when it is queried, queries tend to be simple queries (with minimal joins). Going by the above recommendation, architects had asked all the internal customers to create tables using that as default storage format. After few months, name node started crashing because JVM started running out of memory. When we looked into the problem, largest database with 34145 tables (all with column orientation) has 5,323,424 files on HDFS (cluster has 116 segments). Overall, number of files on HDFS for all databases in the cluster had reached 120 million at which point name node JVM kept running out of memory making the cluster highly unstable.

Problem in a nutshell arises because, HAWQ, like Greenplum database, implements columnar storage with one file per column. As you may already know, Hadoop doesn’t do well with lots and lots of really small files. Let’s look at the technical details:

[gpadmin@hdm2 migration]$ psql manoj
psql (8.2.15)
Type "help" for help.
-- CREATE TABLE
manoj=# create table testhdfs (mynum1 integer, mynum2 integer, mytext1 char(2), mytext2 char(2)) 
manoj-# with (appendonly=true, orientation=column) distributed randomly;
CREATE TABLE

-- INSERT A ROW
manoj=# insert into testhdfs values (1, 2, 'v1', 'v2');
INSERT 0 1

Now, let’s look at how many files this table has created. HAWQ stores files for this on HDFS at this location (/hawq_data/<segment>/<tablespace ID>/<database ID>).

[gpadmin@hdm2 migration]$ hadoop fs -ls /hawq_data/gpseg0/16385/5699341
Found 7 items
-rw------- 3 postgres gpadmin 0 2015-02-12 18:35 /hawq_data/gpseg0/16385/5699341/5699342
-rw------- 3 postgres gpadmin 0 2015-02-12 18:58 /hawq_data/gpseg0/16385/5699341/5699432
-rw------- 3 postgres gpadmin 0 2015-02-12 18:58 /hawq_data/gpseg0/16385/5699341/5699432.1
-rw------- 3 postgres gpadmin 0 2015-02-12 18:58 /hawq_data/gpseg0/16385/5699341/5699432.129
-rw------- 3 postgres gpadmin 0 2015-02-12 18:58 /hawq_data/gpseg0/16385/5699341/5699432.257
-rw------- 3 postgres gpadmin 0 2015-02-12 18:58 /hawq_data/gpseg0/16385/5699341/5699432.385
-rw------- 3 postgres gpadmin 4 2015-02-12 18:35 /hawq_data/gpseg0/16385/5699341/PG_VERSION

As you can see, it created 1 file per column per segment (even where there’s no data for this particular segment). That is a HUGE problem.

Solution

There are couple of ways to deal with this. One option is to use parquet format to store the data.

with (appendonly=true, orientation=parquet);

This will create one file for storing data per segment. You can use a nifty tool called “gpextract” to see where these files are stored on HDFS.

gpextract -o manoj.yaml -W testhdfs -dmanoj

[gpadmin@hdm2 migration]$ cat manoj.yaml 
DBVersion: PostgreSQL 8.2.15 (Greenplum Database 4.2.0 build 1) (HAWQ 1.2.0.1 build
 8119) on x86_64-unknown-linux-gnu, compiled by GCC gcc (GCC) 4.4.2 compiled on Apr
 23 2014 16:12:32
DFS_URL: hdfs://hdm1.gphd.local:8020
Encoding: UTF8
FileFormat: Parquet
Parquet_FileLocations:
 Checksum: false
 CompressionLevel: 0
 CompressionType: null
 EnableDictionary: false
 Files:
 - path: /hawq_data/gpseg0/16385/5699341/5699460.1
 size: 0
 - path: /hawq_data/gpseg1/16385/5699341/5699460.1
 size: 0
 - path: /hawq_data/gpseg2/16385/5699341/5699460.1
 size: 0
 - path: /hawq_data/gpseg3/16385/5699341/5699460.1
 size: 0
 - path: /hawq_data/gpseg4/16385/5699341/5699460.1
 size: 0
 - path: /hawq_data/gpseg5/16385/5699341/5699460.1
 size: 1371
 - path: /hawq_data/gpseg6/16385/5699341/5699460.1
 size: 0
 - path: /hawq_data/gpseg7/16385/5699341/5699460.1
 size: 0
 - path: /hawq_data/gpseg8/16385/5699341/5699460.1
 size: 0
 - path: /hawq_data/gpseg9/16385/5699341/5699460.1
 size: 0
 - path: /hawq_data/gpseg10/16385/5699341/5699460.1
 size: 0
 - path: /hawq_data/gpseg11/16385/5699341/5699460.1
 size: 0
 - path: /hawq_data/gpseg12/16385/5699341/5699460.1
 size: 0
 - path: /hawq_data/gpseg13/16385/5699341/5699460.1
 size: 0
 - path: /hawq_data/gpseg14/16385/5699341/5699460.1
 size: 0
 - path: /hawq_data/gpseg15/16385/5699341/5699460.1
 size: 0
 PageSize: 1048576
 RowGroupSize: 8388608
TableName: public.testhdfs
Version: 1.0.0

Another way is to just copy the data in a flat file format on HDFS and create extrenal tables on it. We used this approach as this creates just one file per table in the whole cluster. We did not need the read performance, as this is an archival database, so it was an easy choice.

Conclusion

HAWQ provides a nice SQL layer on HDFS, use the tool in right way to solve your business problem. In addition, parquet format is easy to use in HAWQ and doesn’t lock you into a Pivotal HD/HAWQ only solution. It is easy to use the other tools like Pig or MapReduce to read the Parquet files in your Hadoop cluster.

Authorization in Hadoop using Apache Ranger

Summary

Over the past couple of years, Apache Hadoop has made great progress in the area of security. Security for any computing system is divided in two categories:

  1. Authentication is the process of ascertaining that somebody really is who he claims to be. In Hadoop, this is achieved via Kerberization. This post will not cover any details about this as there’s ample material already available online.
  2. Authorization refers to rules that determine who is allowed to do what. E.g. Manoj may be authorized to create and delete databases, while Terence is only authorized to read the tables.

For folks who are coming from a traditional database world, authorization is a well understood concept. It involves creation of roles, grants at various levels on variety of objects. Apache Hadoop community is trying to emulate some of those concepts in the recently incubated Apache Ranger project. We recently implemented this in one of our customer sites and talk about that experience here.

What does Ranger do?

Ranger provides a centralized way to manage security across various components in a Hadoop cluster. Currently, it’s capable of providing authorization as well as auditing for HDFS, Hive, HBase, Knox and Storm services.  At the core, Ranger is  a centralized web application, which consists of the policy administration, audit and reporting modules. Authorized users will be able to manage their security policies using the web tool or using REST APIs. These security policies are enforced within Hadoop ecosystem using lightweight Ranger Java plugins, which run as part of the same process as the Namenode (HDFS), Hive2Server(Hive), HBase server (Hbase), Nimbus server (Storm) and Knox server (Knox). Thus there is no additional OS level process to manage. In addition, this means there’s no single point of failure (for example, if the web application or the policy database goes down, security is not compromised. It just disables security administrator from pushing new policies).

Components of Ranger

There are three main components of Ranger:

  1. Portal/Policy Manager is central UI for security administration. Users can create and update policies, which are then stored in a policy database. Plugins within each component poll these policies at regular intervals.
  2. Plugins are lightweight Java programs which embed within processes of each cluster component. For example, plugin for HDFS runs as part of the namenode process. The plugins pull policies from the policy database at regular intervals (configurable) and store them locally (in a file). Whenever a request is made for a resource, the plugins intercept the request and evaluate against the security policy in effect. Plugins also collect data for all the requests and send it back to the audit server via separate thread.
  3. User/Group Sync is a utility provided to enable synchronization of users and groups from OS/LDAP/AD. This information is used while defining policies (and we’ll shortly see an example).

Setup

Ranger Admin

Ranger software is already included in HDP 2.2 repos. Find the ranger policy admin software (assuming you have setup yum repos correctly):

yum search ranger
====================================================== N/S Matched: ranger =========================================
ranger_2_2_0_0_1947-admin.x86_64 : Web Interface for Ranger
ranger_2_2_0_0_1947-debuginfo.x86_64 : Debug information for package ranger_2_2_0_0_1947
ranger_2_2_0_0_1947-hbase-plugin.x86_64 : ranger plugin for hbase
ranger_2_2_0_0_1947-hdfs-plugin.x86_64 : ranger plugin for hdfs
ranger_2_2_0_0_1947-hive-plugin.x86_64 : ranger plugin for hive
ranger_2_2_0_0_1947-knox-plugin.x86_64 : ranger plugin for knox
ranger_2_2_0_0_1947-storm-plugin.x86_64 : ranger plugin for storm
ranger_2_2_0_0_1947-usersync.x86_64 : Synchronize User/Group information from Corporate LD/AD or Unix

Install the admin module

yum install ranger_2_2_0_0_1947-admin

In the installation directory (/usr/hdp/current/ranger-admin/) edit install.properties file:

SQL_COMMAND_INVOKER=mysql
SQL_CONNECTOR_JAR=/usr/share/java/mysql-connector-java.jar

# DB password for the DB admin user-id
db_root_user=root
db_root_password=<password>
db_host=<database host>

#
# DB UserId used for the XASecure schema
#
db_name=ranger
db_user=rangeradmin
db_password=<password>

# DB UserId for storing auditlog infromation
#
# * audit_db can be same as the XASecure schema db
# * audit_db must exists in the same ${db_host} as xaserver database ${db_name}
# * audit_user must be a different user than db_user (as audit user has access to only
audit tables)
#
audit_db_name=ranger_audit
audit_db_user=rangerlogger
audit_db_password=<password>

#
# ------- PolicyManager CONFIG ----------------
#

policymgr_external_url=http://<portal host>:6080
policymgr_http_enabled=true

#
# ------- UNIX User CONFIG ----------------
#
unix_user=ranger
unix_group=ranger

#
# ** The installation of xasecure-unix-ugsync package can be installed after the
policymanager installation is finished.
#
#LDAP|ACTIVE_DIRECTORY|UNIX|NONE
authentication_method=NONE

Run the setup as root:

export JAVA_HOME=<path of installed jdk version folder>
/usr/hdp/current/ranger-admin/setup.sh
service ranger-admin start

UserSync

Install the module

yum install ranger_2_2_0_0_1947-usersync

In the Ranger UserSync installation directory (/usr/hdp/current/ranger-usersync), update install.properties file as appropriate for HDP 2.2:

POLICY_MGR_URL = http://<portal host>:6080

# sync source,  only unix and ldap are supported at present
# defaults to unix
SYNC_SOURCE =  ldap

# sync interval in minutes
# user, groups would be synced again at the end of each sync interval
# defaults to 5   if SYNC_SOURCE is unix
# defaults to 360 if SYNC_SOURCE is ldap
SYNC_INTERVAL=1

#User and group for the usersync process
unix_user=ranger
unix_group=ranger

# URL of source ldap
# a sample value would be:  ldap://ldap.example.com:389
# Must specify a value if SYNC_SOURCE is ldap
SYNC_LDAP_URL = ldap://<ldap host>:389

# ldap bind dn used to connect to ldap and query for users and groups
# a sample value would be cn=admin,ou=users,dc=hadoop,dc=apache,dc-org
# Must specify a value if SYNC_SOURCE is ldap
SYNC_LDAP_BIND_DN = <bind username>

# ldap bind password for the bind dn specified above
# please ensure read access to this file  is limited to root, to protect the password
# Must specify a value if SYNC_SOURCE is ldap
# unless anonymous search is allowed by the directory on users and group
SYNC_LDAP_BIND_PASSWORD = <password>
CRED_KEYSTORE_FILENAME=/usr/lib/xausersync/.jceks/xausersync.jceks

# search base for users
# sample value would be ou=users,dc=hadoop,dc=apache,dc=org
SYNC_LDAP_USER_SEARCH_BASE = <Value depends upon your LDAP setup>

# search scope for the users, only base, one and sub are supported values 
# please customize the value to suit your deployment 
# default value: sub SYNC_LDAP_USER_SEARCH_SCOPE = sub 
# objectclass to identify user entries 
# please customize the value to suit your deployment 
# default value: person SYNC_LDAP_USER_OBJECT_CLASS = person 
# optional additional filter constraining the users selected for syncing 
# a sample value would be (dept=eng) 
# please customize the value to suit your deployment 
# default value is empty
SYNC_LDAP_USER_SEARCH_FILTER = <Value depends upon your LDAP setup>

# attribute from user entry that would be treated as user name
# please customize the value to suit your deployment
# default value: cn
SYNC_LDAP_USER_NAME_ATTRIBUTE=sAMAccountName

# attribute from user entry whose values would be treated as
# group values to be pushed into Policy Manager database
# You could provide multiple attribute names separated by comma
# default value: memberof, ismemberof
SYNC_LDAP_USER_GROUP_NAME_ATTRIBUTE=memberOf

#
# UserSync - Case Conversion Flags
# possible values:  none, lower, upper
SYNC_LDAP_GROUPNAME_CASE_CONVERSION=lower

NOTE: Customize SYNC_LDAP_USER_SEARCH_FILTER parameter to suit your needs.

Run the setup:

export JAVA_HOME=<path of installed jdk version folder>
./usr/hdp/current/ranger-usersync/setup.sh

service ranger-usersync start

Verify by visiting ranger portal and clicking Users/Groups tab. You should see all LDAP users. Furthermore, you may add LDAP/AD user/group and it should show up in the portal within SYNC_INTERVAL.

PlugIns

We will go over one of the plugins. Similar setup should be followed for all interested plugins.

HDFS

On NameNode (in case of HA NameNode setup, on all the namenodes), install the plugin.

yum install ranger_2_2_0_0_1947-hdfs-plugin

In the plugin installation directory (/usr/hdp/current/ranger-hdfs-plugin), edit install.properties.

POLICY_MGR_URL=http://<portal host>:6080
SQL_CONNECTOR_JAR=/usr/share/java/mysql-connector-java.jar
#
# Example:
# REPOSITORY_NAME=hadoopdev
#
REPOSITORY_NAME=<This is the repo that'll be looked up when plugin is loaded>
XAAUDIT.DB.IS_ENABLED=true
XAAUDIT.DB.FLAVOUR=MYSQL
XAAUDIT.DB.HOSTNAME=<database host>
XAAUDIT.DB.DATABASE_NAME=ranger_audit
XAAUDIT.DB.USER_NAME=rangerlogger
XAAUDIT.DB.PASSWORD=<password>
XAAUDIT.HDFS.IS_ENABLED=true
XAAUDIT.HDFS.DESTINATION_DIRECTORY=hdfs://<NameNode>:8020/ranger/audit/%app-type%/%time:yyyyMMdd%
XAAUDIT.HDFS.LOCAL_BUFFER_DIRECTORY=/var/log/hadoop/%app-type%/audit
XAAUDIT.HDFS.LOCAL_ARCHIVE_DIRECTORY=/var/log/hadoop/%app-type%/audit/archive

Run the script to enable the plugin:

export JAVA_HOME=<path of installed jdk version folder>
/usr/hdp/current/ranger-hdfs-plugin/enable-hdfs-plugin.sh

Restart namenode(s) from Ambari or manually.

Test the setup

  • On ranger portal, click “Policy Manager”. Click “+” sign on HDFS tab and create a repository. Ensure name of this repository is EXACTLY same as the one you specified during installation.
  • Let’s take a test user “svemuri” and check his permissions on a test directory:
[svemuri@sfdmgctmn005 ~]$ hadoop fs -ls /user/mmurumkar
ls: Permission denied: user=svemuri, access=READ_EXECUTE,
inode="/user/mmurumkar":mmurumkar:sfdmgct_admin:drwxr-x---:user:mmurumkar:r--,user:ranger:---,user:rbolla:---,user:svemuri:---,group::r-x
  • Now, let’s create a policy called “TestPolicy”, that allows “svemuri” all the privileges on “/user/mmurumkar”

Policy

  • Now the earlier command should work:
[svemuri@sfdmgctmn005 ~]$ hadoop fs -ls /user/mmurumkar
Found 9 items
drwxr-xr-x   - mmurumkar sfdmgct_admin          0 2014-11-20 02:22 /user/mmurumkar/.hiveJars
drwxr-xr-x   - mmurumkar sfdmgct_admin          0 2014-11-18 20:00 /user/mmurumkar/test
drwxr-xr-x   - mmurumkar sfdmgct_admin          0 2014-11-18 20:01 /user/mmurumkar/test1
drwxr-xr-x   - mmurumkar sfdmgct_admin          0 2014-11-18 20:08 /user/mmurumkar/test2
drwxr-xr-x   - rbolla    sfdmgct_admin          0 2014-11-18 20:09 /user/mmurumkar/test3
drwxr-xr-x   - rbolla    sfdmgct_admin          0 2014-11-18 20:10 /user/mmurumkar/test4
drwxr-xr-x   - ranger    sfdmgct_admin          0 2014-11-18 20:18 /user/mmurumkar/test5
drwxr-xr-x   - mmurumkar sfdmgct_admin          0 2014-11-20 18:01 /user/mmurumkar/test7
drwxr-xr-x   - ranger    sfdmgct_admin          0 2014-11-19 14:21 /user/mmurumkar/test8
  •  Audit records will now show up in audit UI on the portal.
Audit

 Conclusion

Apache Ranger is starting to fill critical security needs in Hadoop environment, marking a big progress towards making Hadoop an enterprise data platform.