I’d like to introduce an open source library I wrote by detailing its genesis: Hazeltask. I will post a more concise technically focused Hazeltask article later, but I would like to document why Hazeltask was created— because I like to hear about the behind the scenes story on things like that
Hazeltask is a general purpose work distribution library. It runs on top of the Hazelcast in-memory data grid. Hazeltask has actually been in use at Qualys for over a year now. It handles running several million tasks a day. There are lots of task execution libraries out there. Even Hazelcast itself implements a distributed ExecutorService. So, why another one? Good Question!
At Qualys, we have something called dynamic tagging which is used to help automatically organize objects based on properties or relationships of that object. Essentially, you can create a tag with a rule which will cause that tag to attach itself to objects that match. Its a little like a GMail filter that applies a tag to your email. Except dynamic tag rules can explore object relationships, pull in data from external systems and have nearly infinite rule flexibility because they can be written in Groovy. These things are what really makes scaling this system difficult. Tag rules are reevaluated due to a few different reasons, but the most common reasons are: a new dynamic tag was created, a rule was changed, properties on taggable objects have changed. Things are constantly changing in the system, and a lot of times many things change at the same time. Workloads tend to be “bursty” and IO bound.
The first prototype of the tagging system (which was never in production) did all the work on the same machine. It was just a simple thread pool that pulled from a queue. I don’t even think it used the ExecutorService library. The downside was that one machine could quickly be overloaded, run out of memory, and if you restarted the machine all the tasks were lost. We needed a way to distribute tasks to multiple systems, and not lose the work if a machine needed to be restarted.
The Terracotta Solution… and Problem
Our application was already using Terracotta in DSO mode for handling distributed ehcache, and session distribution. I decided to use it to distribute the tagging work using the master-worker library. Back then, Terracotta had a Terracotta Integration Module (TIM) called Master-Worker which was to help distribute work. I would soon find out that it was a terrible library, and come to the realization that Terracotta itself was also a terrible product :-). Perhaps they have fixed their issues now, but it was plagued with bugs at the time we used it. The first problem was with bugs in the Master-Worker library. It was possible to lose work, nodes didn’t recover work correctly, and deadlocks were possible. Over the course a few weeks I actually fixed all of those bugs until I was left with a system that barely resembled the original library. I also don’t think the design of the Master-Worker architecture is that good– but I will go into that later when I talk about Hazeltask’s design.
Still, we had problems. Terracotta would often deadlock on ehcache (they eventually released patches to fix those bugs). And yet still… there were bugs. We noticed some operations going very slowly in parts of the system that had nothing to do with Terracotta. It turned out Terracotta instruments a lot of common methods in DSO mode— methods like Object.getClass. I don’t quite remember what the exact issue was, but it was something like calling getClass (on any object) would pass through the Terracotta library and it would end up having to crawl some tree to see if that object had some kind of Terracotta expression applied to it. As a test I patched the Terracotta DSO library to make it cache that lookup and performance increased by like 500x on this one process. At this point we decided to dump Terracotta. We had too many problems with it over many months.
I really wanted to rearchitect the system. I think I learned that the master-worker architecture just wasn’t a good idea for this usecase— at least they way it was written in the Terracotta TIM. It was too chatty, too many distributed locks, task recovery was inefficient. Too much state tracking.
We considered several new architectures:
Local Execution – we considered going back to the first iteration, but with the significant disadvantages of running a server out of memory and losing work.
Message Queue – Customer starvation. A large customer could enqueue millions of tasks, then a small customer could enqueue 1 task. The small customer would have to wait for the large customer. Their experience of the system will appear slow. We considered creating a queue per customer or creating a hundred queues and hashing customers to one– but the client libraries aren’t designed to deal with pulling messages from that many queues.
Hazelcast ExecutorService – Capable of distributing work, but loses work when a node goes down. Also susceptible to customer starvation.
Database Queue – This was a very popular consideration but I just wasn’t happy with this solution. I don’t think a database table makes a good queue under load. There will be too much contention and I don’t think it would scale.
Storm – Requires installing a lot of infrastructure. Other than that, I think it could be a good solution. Its simply not easy enough to deploy and use.
In my spare time I decided to write my own library based on Hazelcast. I like Hazelcast because its so easy to use. It runs within your application, so you don’t need more servers. I set out to develop a work distribution library with the following requirements:
- Doesn’t lost work if servers go down
- Solves the customer starvation issue
- Maximise hardware usage to efficiently execute task
- Minimize overhead
- Easy to use
- Little state tracking
The primary components to Hazeltask are:
Write ahead log – a Hazelcast distributed map that is responsible for ensuring no work is lost if a member goes down.
Task completion topic – whenever a task is errored, completed, cancelled it is announced on this Hazelcast topic. This is the mechanism in which waiting Futures on other systems are notified.
ExecutorService – a Hazelcast ExecutorService is used to do cluster wide communication
GroupingQueue – a local data structure that is a priority queue of queues. This allows each customer to have its own queue, and we can round-robin each customer’s queue. This is part of the magic that prevents customer starvation.
Worker thread pool – these worker threads poll the GroupingQueue for their next task.
The basics of Hazeltask are that tasks are added to the write ahead log then pushed to worker nodes where they are tracked and executed locally. This allows us to do the fancy group routing without incurring overhead. It works really well, and works reliably. In a later blog post I would like to go into more detail on the customizable queue group prioritization, task stealing, efficient task recovery, and ideas for improving some limitations this architecture has.
Hazeltask Pros and Cons
After running for about a year there are a number of pros and cons. The pros are that it solves all the problems I set out to solve, its fast, and linearly scalable. The best part is that its really easy to use since its API is a java Executor. The one con is really the fact that it runs on top of Hazelcast which means its inside the JVM of your app. This means that its performance can be affected by other parts of the app. We had problems with servers running OOM due to bugs in other areas which would degrade the performance of Hazelcast and cause ripple effects across the cluster. If you don’t have a healthy application with plenty of monitoring in place— an in-memory data grid like Hazelcast is not a good choice for you. We have since fixed all of the other problems and have had no issues. Hazeltask is also only relatively ordered. Tasks will be executed close to enqueue order, but out of order execution is very possible and likely. In-order processing is slow unless you can partition on a small enough grouping (this will be implemented in the future)
Future improvement ideas revolve around isolating limitations of Hazelcast or building a separate infrastructure like Storm– or perhaps building on top of Storm. I think it would be a fun project to turn Hazeltask, or at least the ideas within it, into a message queue like Khafka. I like Hazeltask’s ability to group messages, prioritize, and customize the decision points. I think it could make a cool general purpose message queue. I also want to allow the option for guaranteed in-order processing of messages for situations where that is required.