Sunday 20 November 2016

Running Zookeeper and Kafka in an AWS auto-scaling group

Background


I've been using Apache Kafka and Zookeeper on AWS as the entry point into a data capture and onward processing pipeline and it's proved to be a reliable deployment.

The challenge within AWS is of course to correctly assign and publish the IP addresses, Zookeeper node ids and other service specific values in an auto-scaling environment where servers are brought up without direct administrator oversight and where IP addresses can change.

The details of this deployment forms the basis of the post.


Solution summary


The minimum recommended size of a resilient Zookeeper (ZK) ensemble is 3 nodes. This allows for one node to go down, whilst retaining quorum. To optimally utilise the EC2 instances required to run the infrastructure and to simplify their management, in this example, each EC2 instance hosts one ZK and one Kafka server instance - although nothing present in the solution outlined here, mandates this dual-hosting approach. In an environment with significantly higher data volumes, this may not be an appropriate starting point.

For increased reliability, the 3 EC2 instances hosting the ZK and Kafka services run in an auto-scaling group. This is configured to maintain a constant size (for reliability, rather than for scaling up in response to demand spikes) and the instances are distributed across 3 availability zones (AZ). The auto-scaling group's launch configuration defines 'User Data' (a shell script) for the EC2 instances, and this user data dynamically configures the ZK and Kafka service instances to act as a cluster as the host EC2 instances are brought online. This allows the auto-scaling group to manage instance lifecycle without user intervention.

Key to this are 3 pre-created Elastic Network Interfaces (ENI), one in each AZ, each with a known IP address.

When an EC2 instance's user data runs, it uses standard AWS metadata services to examine its parent subnet, select the ENI assigned to that subnet and associate that ENI with itself. This means that each EC2 instance in the auto-scaling group always has one known IP address and the shared Kafka and ZK config files can refer to those known addresses, with the user data script further tailoring the config on a per instance basis.


Solution detail


Environment prerequisites


Depending on your IP addressing strategy and your network (VPC) and subnet sizes, your actual IP addresses and CIDR blocks may vary from those in the example given below.

Create a  security group 'event-servers-sg' with inbound rules which allow access to ZKs quorum, election and client port (as specified in the sample config, below) and to the Kafka server's listen port (9095 in this example) from anywhere in the VPC. The SSH port is also opened to the named security group associated with the bastion host.

Custom TCP Rule     TCP     2888        10.0.0.0/26
Custom TCP Rule     TCP     3888        10.0.0.0/26
Custom TCP Rule     TCP     2181        10.0.0.0/26
Custom TCP Rule     TCP     9095        10.0.0.0/26
SSH                         TCP     22            sg-12345678


Let's say that the 3 target AZs for the auto-scaling group are eu-west-1a, eu-west-1b and eu-west-1c. Create an ENI in a VPC subnet associated with each AZ and set the private IP addresses to be e.g. 10.0.0.53, 10.0.0.21 and 10.0.0.37 respectively. No public IP addresses are required. Name all three ENIs 'event-servers-eni'. Associate the 'event-servers-sg' security group with all three ENIs.

Ideally, ZK config should be identical across all nodes in an ensemble. Here's a sensible starting point which references the IP addresses associated with the ENIs described above.

tickTime=2000
dataDir=/var/lib/zookeeper/data
dataLogDir=/var/lib/zookeeper/transaction-log
clientPort=2181
initLimit=15
syncLimit=2
autopurge.purgeInterval=1

server.1=10.0.0.53:2888:3888
server.2=10.0.0.21:2888:3888
server.3=10.0.0.37:2888:3888

Similarly, the starting point for the Kafka config might look like:

log.dir=/var/lib/kafka/event-log
num.partitions=3
zookeeper.connect=10.0.0.21:2181,10.0.0.37:2181,10.0.0.53:2181

# other settings:
num.recovery.threads.per.data.dir=1
log.flush.interval.ms=2000
..

Note that some key properties such as 'broker.id', 'listeners' and 'advertised.listeners' are not present, they are set dynamically by the user data script.

These base Kafka and ZK configurations are read from S3 by the EC2 instance user data, so create an S3 bucket called e.g. com.example.event-servers and put the sample config (above) in there, as zookeeper.config and kafka.config respectively. Similarly, the user data script downloads and extracts Kafka and Zookeeper installation zips, so put those in the same S3 bucket. This process could be streamlined by creating an Amazon Machine Image (AMI) with Kafaka and ZK pre-installed but for this example, the process is self-contained.

The EC2 instances will need permissions to read from the com.example.event-servers bucket, so create an Identity and Access Management (IAM) role called 'event-servers-role' which has the standard 'AmazonEC2FullAccess' permission and which also has the following policy associated with it, which allows it to read from the S3 bucket.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": [
                "arn:aws:s3:::com.example.event-servers"
            ]
        },
        {            
            "Effect": "Allow",
            "Action": [
                "s3:GetObject"
            ],
            "Resource": [
                "arn:aws:s3:::com.example.event-servers/*"
            ]
        }
    ]
}

With all of the pre-requisites in place, it's time to create..

The auto-scaling group


The choice of AMI type to select for use in the auto-scaling group's launch configuration is highly workload dependent and the documentation for both projects covers the points to consider. Generally though, both Kafka and ZK perform best with lots of RAM and dedicated disk storage, that is, where each of: 

  • the ZK log directory
  • the ZK log data directory
  • the Kafka data directory

is mounted on a dedicated disk device with its own controller.

A 'D2 – Dense-storage Instance' with local SSDs seems well suited to this kind of workload but this example demonstrates the use of a more cost effective option and works well with any 'General purpose' instance type.

Download the user data shell script. This is the core of how this all works together, so it's worth stepping through the inlined comments. The script assumes a Linux image with the AWS cli tools already installed.

Create an auto-scaling launch configuration called 'event-servers-launch-config'. The key points during the definition are:

  • Select the previously created 'event-servers-role' as the associated IAM 
  • For certain AMIs, checking 'EBS-optimized instance' to maximise the EBS I/O, doesn't add to the hourly cost of the instance.
  • Paste all of the user data script in the 'User data' ('As Text') section under 'Advanced Details'
  • Associate three Elastic Block Store (EBS) volumes (of type General Purpose SSD) of a size appropriate for your data volumes. The user data script formats and mounts these volumes at the appropriate points for use by the Kafka and ZK directories shown above. The user data script expects the device names /dev/sdb, /dev/sdc and /dev/sdd. Amazon Linux will retain these names if you select them while associating the storage but other Linux flavours may change the names. Any problems, you can ssh into an instance and use lsblk to see which names were assigned.
  • Select the 'event-servers-sg' security group as the pre defined security group

Now create an actual auto-scaling group based on 'event-servers-launch-config' with an initial group size of 3. As previously discussed, use the target VPC which has subnets in the required AZs (eu-west-1a, eu-west-1b and eu-west-1c) and select those three subnets. Select the 'Keep this group at its initial size' option and for an instance count of 3, set the 'Min' count to 2 and 'Max' count to 4.

When you create your group, three EC2 instances should come up and you can verify that each has dual network interfaces; eth0 (the default) and eth1 (the ENI assigned by the 'User Data'). Any problems, ssh into the VPC's bastion host and from there, ssh into the problematic EC2 instance. Have a look in /var/log/cloud-init-output.log to examine start-up errors. 

Once the AWS environment and the user data config are all working together as expected, the final step is to review the logs across the 3 nodes, and/or in the case of ZK, ssh in to VPC and execute the following to get the server to report on its status:

echo srvr | nc 10.0.0.21 2181 (for all three IP addresses)

The final step is to create some Kafka topics and start pumping data through!


Summary


Firstly, if you're running a reliable event streaming infrastructure in an AWS hosted environment, why not simply use Kinesis and take the hosted option? Kinesis is architecturally very similar to Kafka and will feel very familiar to anyone with experience of Kafka so it's a perfectly viable alternative. 

However, for the use case under which I developed this approach, two factors promoted Kafka.

  1. While working locally, it's very easy to run ZK and Kafka in a VirtualBox instance or a Docker container and this complete, packaged infrastructure can then easily be shared with a team. There is no official version of Kinesis which runs locally. 
  2. I write Scala applications which consume the Kafka messages using Akka streams and Reactive Kafka provides an elegant integration with native Kafka from this programming environment.

The combination of these factors meant that it was worth the extra effort to run Kafka in AWS.

AWS recently added the option to enable host networking in the Elastic Container Service (ECS) deployments and this opens up the possibility of creating a reliable infrastructure using Dockerised Kafka and ZK rather than using a 'native' auto-scaling group. This option wasn't available a few months ago and having had good experiences with ECS, it's one that I might explore at some stage in the future. 




Sunday 28 September 2014

Labelling a Play application with Git version information

Background


When you have an active release pipeline and multiple deployment environments, it's often useful to be able to do both of the following:
  1. easily identify the version of a release artifact, by name prior to deployment. 
  2. at any time post deployment, ascertain which version of a release artifact is deployed on a given server without having to access the server's filesystem.
The first point may mean locating a historical build amongst a build archive or simply being confident that the most recent deployment artifact was built from the correct source revision - without reference to any build metadata contained within e.g. the zip file.

Obviously giving build artifacts unique file names which reflect the revision of the source code from which they were built would solve this problem.

The second point means being able to examine a deployment's build metadata, remotely and without any access to the deployed file name. This might be to verify which features or bug-fixes are expected to be present on a certain server in a situation where remote system access is denied to the development and/or test teams. Whatever the motivation, it's always easier and faster to access this metadata through a simple browser request to the server.

A clean solution to this second problem is to create an externally accessible description of the build information by generating and writing a text file to a specific location as a part of the build process. For this post, this location will be somewhere accessible under the Play web context.

You can access the full code discussed in this post by cloning https://github.com/dextaa/blog.git. This repo contains multiple projects so go to the build/git-labelling directory to access the root of this project. This is referred to as <BASE> from now on.

The application used for this example is a Play application which provides JSON responses to stock quote GET requests. The application itself is irrelevant however, the focus of this post is the application's build process and the application exists solely to host this process.

Creating the build descriptor


For this example, the build descriptor will contain information something like this:
Git branch: master
Git head revision: 6460054f0287c57d7cad31f0e0dd0c779d016f1c
Git head tag: Not tagged
Git branch had uncommitted changes: true
Built by: user1
Built at: 28-09-2014 09:22:33
Built using Java version: 1.7.0_51
Built using Scala version: 2.10.4
Built using Play version: 2.3.4 
and it will be written to Play's standard public directory. There's a standard mapping for that directory in conf/routes, which causes the built-in Assets controller to serve content from public under the assets context path:

GET     /assets/*file               controllers.Assets.at(path="/public", file)

such that the build descriptor is available via http e.g. http://localhost:9000/assets/build-info.txt.

It could quite reasonably be argued that build and environment information should be protected from casual inspection, in which case you may prefer to write the build descriptor to a directory which is not directly served and provide a secured custom route to access it.

If you run 'sbt test' in <BASE>, the existence and content of this file will be verified by the test BuildMetadataSpec. Because a pre-generated build-info.txt was committed along with the source code, this test will pass immediately but it will also continue to pass after you have run 'gen-build-desc' at the sbt prompt.


Here's the build descriptor data generator:






The JGit library is at the core of this class. It's a useful library for interacting with Git from within JVM languages and it's used here to get the Git branch name, its head revision's hash and optionally the name of the tag associated with that revision. It's also used to check if the source tree contained any uncommitted changes when the build took place.

The treatment of tags is worth a closer look.

The allTags val contains a collection of references to all of the tags in the subject Git branch. That collection is then searched (using find) for a tag whose associated commit id matches the id of the head revision. If a match is found, it's shortened name (with the Git path removed) is returned (in an Option) as the head revision's tag name.

Matching a tag to a revision requires a couple of paths because Git has a couple of tag types; lightweight and annotated.

Because a lightweight tag is just a pointer to a commit, simply calling tag.getObjectId returns the id of the tagged commit. This is the first case in the matchesHead method.

In contrast, an annotated tag (where -a is added to the 'git tag' command) is a first class object with its own id within the git database and so the extra level of indirection - rt.getObject.getId - shown in the second case, is required to get the id of the associated commit. Calling rt.getObjectId returns the object id of the tag itself and would therefore never equal the id of the revision's head commit.

Where both a lightweight and an annotated tag are present on the same revision, the one which is selected depends on the ordering of allTags, so if that situation could arise and you always wanted to give priority to annotated tags, you could filter allTags by type RevTag and check the resulting collection for matches first.

When buildData is called, it generates a list of tuples, each containing a descriptor entry's key/value. This list is utilised in the sbt task itself.































This doesn't require much comment. A task is created which creates a path to public, relative to the application's base directory in Compile scope (this is Play's standard public directory served by the default Assets controller), it then takes the output of BuildDescriptor.buildData, as described above, formats it as a string and writes it to a file in that directory.

A task key named 'gen-build-desc' is also declared and is associated with the task in the build.sbt with the statement: buildInfo <<= buildInfoTask.

Because generating this descriptor seems like a build server only action, having it run with every local build is unnecessary. Calling the custom task (gen-build-desc) is therefore left as a manual step which could, for example, be added to a Jenkins instance's list of build actions. Alternatively, running 'gen-build-desc' locally at the Play prompt will cause build-info.txt to be re-generated.

Naming the release artifact


For a Play application built using the 'dist' command, there are in fact two artifacts to be named. The first is the actual jar containing the Play application and the second is the zip containing the application jar and all of its dependencies.

It's possible to use sbt's artifactName and a modified version of Play's dist command to control this naming but as of Play 2.3.x (sbt 0.13.5), simply setting the version attribute of the build in build.sbt achieves a nice result:






Here the artifacts would be renamed 1.0-<tag name> or 1.0-<revision hash> in that order of precedence. For example, if the head revision had been tagged 'release-v1', the resulting file name would be 'git-labelling-1.0-release-v1.zip'.

Note however that the version TaskKey is fixed on project load so even if further commits were made to the branch, the revision used in the build descriptor and artifact names would remain unchanged until either sbt was re-started or the 'reload' command was issued at the sbt prompt.

Is it worth doing?

Incremental improvements of this type greatly improve a project's build and deployment process and help it progress towards a point of maturity at which it becomes a stress free and almost invisible part of the process of getting reliable software in front of your customers. That sounds worth doing!

Sunday 13 July 2014

Using expressive database id types with Scala and Slick

Background


Assume a many-to-one relationship between Traders and the Company on whose behalf they are trading. This means that both the Trader id and the Company id must be recorded with each transaction.

Consider this common class of error:












You probably spotted that traderId and companyId were switched on the call to debitAccount but it might not be so easy to see in a big codebase and this is an insidious little error which could leave your data in an inconsistent state.

Because this example uses two locally declared methods, having the parameters in the wrong order is poor coding style and should (anyway) be fixed by correcting the method signatures but if these methods were on different interfaces developed by different teams and the Market implementation was orchestrating calls across these service interfaces, a parameter ordering policy is less easily mandated (and is inherently unreliable anyway).

Enter the compiler. It lives to catch problems like this - but it needs type information to do its job.

That's easily provided












and of course, results in a compilation error.

The rest of this post proposes (continuing with the contrived example) a way to introduce this type safety while using Slick for persistence with Scala, where TraderId and CompanyId represent a database identity (primary key) - and the best bit is that by using Value classes, this can be achieved without additional overhead when compared to using the underlying raw type - Long in this case.

You can access the full code discussed in this post by cloning https://github.com/dextaa/blog.git. This repo contains multiple projects so go to the slick/typesafe-ids directory to access the root of this project. This is referred to as <BASE> from now on.

Generating custom Slick metadata from SBT


This is sufficient schema to illustrate the proposition.












This example uses MySQL throughout, although there's nothing that ties anything here to a certain DBMS.

Slick supports both the manual and auto-generation of the Scala schema metadata and it's possible to hand-craft the code using the more expressive types but as the metadata code can get complex, it's preferable to create an SBT task to create this boilerplate using the Slick source code generator.

Let's first examine the basic generator and the un-customised code.

























This creates an SBT build task which uses an un-customised Slick SourceCodeGenerator to generate the schema metadata. The only thing worth noting here is that the list of all of the tables in the database is filtered so that metadata is generated only for those tables listed in the IncludedTables sequence.

This SBT task is made available to the SBT build by adding 'slick <<= slickCodeGenTask' to the top level build.sbt which defines the project. The generation process can then be triggered by typing 'slick-gen' at the SBT prompt.

The generated code includes these artefacts, each of which represents the structure of a row in the associated table. These classes are used when reading or writing that table's data and use the Long datatype for ids.







To keep the ids expressively typed throughout the code, the typing would ideally start at this lowest level and permeate outwards. Artefacts which look like the following are what's required, so the SourceCodeGenerator in the SBT build task should be tweaked accordingly.







This tweaking takes the form of replacing the uncustomisedGenerator method shown above with the customisedGenerator method shown below and introducing the ColumnMappings Map which it references















ColumnMappings describes to the customised generator, which database table qualified column names should have their default Scala type changed to the type specified in the (value of) the mapping. The new type could be a fully qualified type name but the Slick SourceCodeGenerator allows subtypes to provide import statements to be included in the generated code, by overriding the code method.

In the code method shown above, com.github.dextaa.ids.model.api.types contains the CompanyId and TraderId types and com.github.dextaa.ids.model.slick.mapping.MySqlIdentity provides the type mappers which Slick requires. Both of these elements are examined below.

The SourceCodeGenerator provides hooks to override most aspects of the schema metadata using a hierarchical nesting of a table generator and column generator. There are many methods which could be customised, but here it's only necessary to override rawType, as it is used to define a column's Scala datatype in the associated Scala metadata.

The qualifiedTableName val is assigned the database table qualified name of the column currently being processed and is used to perform a look-up in ColumnMappings. If qualifiedTableName exists as a key in ColumnMappings, the matching value replaces the default type returned by the rawType method.

For example, where qualifiedTableName is buy.trader_id, rawType returns TraderId.

Assuming you pulled the code from GitHub, the full source for the SBT task is at
<BASE>/project/SlickMetadataBuild.scala and it writes to <BASE>/src/main/scala/com/github/dextaa/ids/model/slick/generated/SchemaMetadata.scala (there is an SBT 'generated' directory but it's not used as the write target in this example).

The required types are now referenced in the code and the generated artefacts look as they should but the types themselves and the associated mappers are yet to be examined. Read on.

The Type Mappers


The kind of type safety described by TraderId and CompanyId is exactly the kind of named type with zero additional overhead scenario for which Scala Value classes were added to the language in version 2.10, so TraderId and CompanyId are declared as value types.

The declarations are as simple as this




which means that the compiler simply uses the named type for compile time type checking and the underlying type (that is, the type of constructor parameter) at runtime.

Despite the underlying types being used at runtime, the Value class type is present at compile time so Slick still requires type mappers for the id types. They look like this.





















The MySqlIdentity object provides the MySQL specific versions of these mappers by defining a concrete Slick JDBC driver.

These mappers must be in implicit scope when using the new id types with Slick, so that Slick operators such as === will function correctly.

The Joda type mappers project was a useful reference when writing these mappers.

For this example, creating a new mapper for each new id type is a simple mechanical exercise which involves copying the structure of one of the mappers shown above and replacing the type (e.g. TraderId) with the new type (e.g CompanyId).

Conclusion


So that's it.

The application code is using the more expressive types and the Slick metadata is being manipulated to ensure that the use of the expressive types begins down at the level of the database metadata.

Refer to the code pulled from GitHub to see the complete example and to see some database reads and writes which use these id types. See the project README for the steps to actually run the code.