Sunday 20 November 2016

Running Zookeeper and Kafka in an AWS auto-scaling group

Background


I've been using Apache Kafka and Zookeeper on AWS as the entry point into a data capture and onward processing pipeline and it's proved to be a reliable deployment.

The challenge within AWS is of course to correctly assign and publish the IP addresses, Zookeeper node ids and other service specific values in an auto-scaling environment where servers are brought up without direct administrator oversight and where IP addresses can change.

The details of this deployment forms the basis of the post.


Solution summary


The minimum recommended size of a resilient Zookeeper (ZK) ensemble is 3 nodes. This allows for one node to go down, whilst retaining quorum. To optimally utilise the EC2 instances required to run the infrastructure and to simplify their management, in this example, each EC2 instance hosts one ZK and one Kafka server instance - although nothing present in the solution outlined here, mandates this dual-hosting approach. In an environment with significantly higher data volumes, this may not be an appropriate starting point.

For increased reliability, the 3 EC2 instances hosting the ZK and Kafka services run in an auto-scaling group. This is configured to maintain a constant size (for reliability, rather than for scaling up in response to demand spikes) and the instances are distributed across 3 availability zones (AZ). The auto-scaling group's launch configuration defines 'User Data' (a shell script) for the EC2 instances, and this user data dynamically configures the ZK and Kafka service instances to act as a cluster as the host EC2 instances are brought online. This allows the auto-scaling group to manage instance lifecycle without user intervention.

Key to this are 3 pre-created Elastic Network Interfaces (ENI), one in each AZ, each with a known IP address.

When an EC2 instance's user data runs, it uses standard AWS metadata services to examine its parent subnet, select the ENI assigned to that subnet and associate that ENI with itself. This means that each EC2 instance in the auto-scaling group always has one known IP address and the shared Kafka and ZK config files can refer to those known addresses, with the user data script further tailoring the config on a per instance basis.


Solution detail


Environment prerequisites


Depending on your IP addressing strategy and your network (VPC) and subnet sizes, your actual IP addresses and CIDR blocks may vary from those in the example given below.

Create a  security group 'event-servers-sg' with inbound rules which allow access to ZKs quorum, election and client port (as specified in the sample config, below) and to the Kafka server's listen port (9095 in this example) from anywhere in the VPC. The SSH port is also opened to the named security group associated with the bastion host.

Custom TCP Rule     TCP     2888        10.0.0.0/26
Custom TCP Rule     TCP     3888        10.0.0.0/26
Custom TCP Rule     TCP     2181        10.0.0.0/26
Custom TCP Rule     TCP     9095        10.0.0.0/26
SSH                         TCP     22            sg-12345678


Let's say that the 3 target AZs for the auto-scaling group are eu-west-1a, eu-west-1b and eu-west-1c. Create an ENI in a VPC subnet associated with each AZ and set the private IP addresses to be e.g. 10.0.0.53, 10.0.0.21 and 10.0.0.37 respectively. No public IP addresses are required. Name all three ENIs 'event-servers-eni'. Associate the 'event-servers-sg' security group with all three ENIs.

Ideally, ZK config should be identical across all nodes in an ensemble. Here's a sensible starting point which references the IP addresses associated with the ENIs described above.

tickTime=2000
dataDir=/var/lib/zookeeper/data
dataLogDir=/var/lib/zookeeper/transaction-log
clientPort=2181
initLimit=15
syncLimit=2
autopurge.purgeInterval=1

server.1=10.0.0.53:2888:3888
server.2=10.0.0.21:2888:3888
server.3=10.0.0.37:2888:3888

Similarly, the starting point for the Kafka config might look like:

log.dir=/var/lib/kafka/event-log
num.partitions=3
zookeeper.connect=10.0.0.21:2181,10.0.0.37:2181,10.0.0.53:2181

# other settings:
num.recovery.threads.per.data.dir=1
log.flush.interval.ms=2000
..

Note that some key properties such as 'broker.id', 'listeners' and 'advertised.listeners' are not present, they are set dynamically by the user data script.

These base Kafka and ZK configurations are read from S3 by the EC2 instance user data, so create an S3 bucket called e.g. com.example.event-servers and put the sample config (above) in there, as zookeeper.config and kafka.config respectively. Similarly, the user data script downloads and extracts Kafka and Zookeeper installation zips, so put those in the same S3 bucket. This process could be streamlined by creating an Amazon Machine Image (AMI) with Kafaka and ZK pre-installed but for this example, the process is self-contained.

The EC2 instances will need permissions to read from the com.example.event-servers bucket, so create an Identity and Access Management (IAM) role called 'event-servers-role' which has the standard 'AmazonEC2FullAccess' permission and which also has the following policy associated with it, which allows it to read from the S3 bucket.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": [
                "arn:aws:s3:::com.example.event-servers"
            ]
        },
        {            
            "Effect": "Allow",
            "Action": [
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::com.example.event-servers/*"
            ]
        }
    ]
}

With all of the pre-requisites in place, it's time to create..

The auto-scaling group


The choice of AMI type to select for use in the auto-scaling group's launch configuration is highly workload dependent and the documentation for both projects covers the points to consider. Generally though, both Kafka and ZK perform best with lots of RAM and dedicated disk storage, that is, where each of: 

  • the ZK log directory
  • the ZK log data directory
  • the Kafka data directory

is mounted on a dedicated disk device with its own controller.

A 'D2 – Dense-storage Instance' with local SSDs seems well suited to this kind of workload but this example demonstrates the use of a more cost effective option and works well with any 'General purpose' instance type.

Download the user data shell script. This is the core of how this all works together, so it's worth stepping through the inlined comments. The script assumes a Linux image with the AWS cli tools already installed.

Create an auto-scaling launch configuration called 'event-servers-launch-config'. The key points during the definition are:

  • Select the previously created 'event-servers-role' as the associated IAM 
  • For certain AMIs, checking 'EBS-optimized instance' to maximise the EBS I/O, doesn't add to the hourly cost of the instance.
  • Paste all of the user data script in the 'User data' ('As Text') section under 'Advanced Details'
  • Associate three Elastic Block Store (EBS) volumes (of type General Purpose SSD) of a size appropriate for your data volumes. The user data script formats and mounts these volumes at the appropriate points for use by the Kafka and ZK directories shown above. The user data script expects the device names /dev/sdb, /dev/sdc and /dev/sdd. Amazon Linux will retain these names if you select them while associating the storage but other Linux flavours may change the names. Any problems, you can ssh into an instance and use lsblk to see which names were assigned.
  • Select the 'event-servers-sg' security group as the pre defined security group

Now create an actual auto-scaling group based on 'event-servers-launch-config' with an initial group size of 3. As previously discussed, use the target VPC which has subnets in the required AZs (eu-west-1a, eu-west-1b and eu-west-1c) and select those three subnets. Select the 'Keep this group at its initial size' option and for an instance count of 3, set the 'Min' count to 2 and 'Max' count to 4.

When you create your group, three EC2 instances should come up and you can verify that each has dual network interfaces; eth0 (the default) and eth1 (the ENI assigned by the 'User Data'). Any problems, ssh into the VPC's bastion host and from there, ssh into the problematic EC2 instance. Have a look in /var/log/cloud-init-output.log to examine start-up errors. 

Once the AWS environment and the user data config are all working together as expected, the final step is to review the logs across the 3 nodes, and/or in the case of ZK, ssh in to VPC and execute the following to get the server to report on its status:

echo srvr | nc 10.0.0.21 2181 (for all three IP addresses)

The final step is to create some Kafka topics and start pumping data through!


Summary


Firstly, if you're running a reliable event streaming infrastructure in an AWS hosted environment, why not simply use Kinesis and take the hosted option? Kinesis is architecturally very similar to Kafka and will feel very familiar to anyone with experience of Kafka so it's a perfectly viable alternative. 

However, for the use case under which I developed this approach, two factors promoted Kafka.

  1. While working locally, it's very easy to run ZK and Kafka in a VirtualBox instance or a Docker container and this complete, packaged infrastructure can then easily be shared with a team. There is no official version of Kinesis which runs locally. 
  2. I write Scala applications which consume the Kafka messages using Akka streams and Reactive Kafka provides an elegant integration with native Kafka from this programming environment.

The combination of these factors meant that it was worth the extra effort to run Kafka in AWS.

AWS recently added the option to enable host networking in the Elastic Container Service (ECS) deployments and this opens up the possibility of creating a reliable infrastructure using Dockerised Kafka and ZK rather than using a 'native' auto-scaling group. This option wasn't available a few months ago and having had good experiences with ECS, it's one that I might explore at some stage in the future. 




4 comments:

  1. Derek, trying to follow your example but I'm confused about how you set up your VPC subnets. VPC is 10.0.0.0/26 and .21 is az1 subnet, .37 in az2 subnet etc? I would have thought you would use 10.0.0.0/16 and then 10.0.1.0/24 etc for subnets and ENI assigned accordingly.. any clarification would really be appreciated!

    ReplyDelete
    Replies
    1. The ranges I used are completely non-prescriptive ("your actual IP addresses and CIDR blocks may vary"). Thanks.

      Delete
  2. Derek thanks for writing this post. I'm the process of designing & deploying a Kafka Cluster in AWS & this post addressed some of my questions regarding Kafka ZK ECS ASGs & how all those concepts would all play together.

    Do you have any further insights or experience you can share regarding your AWS ECS Kafka ZK scaling?

    ReplyDelete
    Replies
    1. Hello. Glad it was useful. Unfortunately, at the time of writing, an ECS deployment hasn't progressed beyond my TODO list. Thanks.

      Delete