Apache Storm: What We Learned About Scaling & Pushing the Performance Envelope
Log management isn’t easy to do at scale. We designed Loggly Gen2 using the latest social-media-scaletechnologies—including ElasticSearch, Kafka from LinkedIn, and Apache Storm—as the backbone of ingestion processing for our multi-tenant, geo-distributed, and real-time log management system. (If you want to learn how we did it and why, watch our presentation at last year’s AWS re:Invent or check out the slides. There’s a reason that our talk has become the most watched non-keynote talk by far.)
Since we launched Gen2, we’ve learned a lot more about these technologies. We regularly contribute back to the open source community, so I decided that it’s time to give an update on our experience with Apache Storm and explain why we have dropped it from our platform, at least for now.
Log Management Presents Unique Performance Challenges
It turns out that we push more data—and complex data—to Storm than almost anybody out there. We have logged more than 750 billion events to date, and we measure our data space in petabytes. We cannot afford to lose a single log, even when we experience sustained spikes of several hours during which our system is ingesting 100,000+ events per second sustained, with log events that vary from 100 bytes to several megabytes in size. As such, we represent an ideal “real-world” environment to understand how complex systems behave at scale.
Apache Storm Offered An Attractive Framework for Stream Processing
When we designed our Gen2 architecture, we were attracted to what Storm could bring us: A framework for stream processing that was highly distributed and fault tolerant. It looked great architecturally with its spout and bolt approach, guaranteed data processing, and dynamic deployment.
The spouts and bolts principle was perfectly suited for the network approach that we had because sometimes logs go from bolt to bolt sequentially, and sometimes they have to be consumed by several bolts in parallel. This behavior is possible in Storm with a simple definition of the data stream paths.
Guaranteed data processing of the data stream meant that we didn’t need to worry as much about the inner working of the framework, and this allowed us to focus on writing the best possible code for the different bolts that make our log management so special and support our true multi-tenant approach.
Dynamic deployment meant that we could deploy once and then at a later stage easily add or remove new nodes to adjust for actual loads and requirements. This fit well with our business because the load on our system is highly dynamic. We need the real-time elasticity to absorb massive peaks that last several hours and then release workers when facing valleys of low traffic. Need more parsers? No problem! Just add a few nodes and ask Storm to redeploy in the new topology. Indexers running almost idle? No problem! Release them and save resources.
So we built Loggly Gen2 inside the Storm container. A collector—think of it as an ingest engine—persists logs to Kafka and immediately streams them to Storm. At that point, Storm takes care of all the synchronization and orchestration for us, so we just had to code our bolts for log indexing, parsing, storage etc.
Post-launch Performance Optimization Efforts
After extensive testing, we went live with this architecture in production. In parallel, we launched a major optimization effort, looking at every opportunity to streamline our solution, reduce overhead, and increase its performance – while applying Loggly’s “no log left behind” and absolute resilience principles. We haven’t always been perfect. But as the service is maturing, our uptime stats are something we’re really proud of.
In order to ensure that we were not losing logs, we decided to switch on Apache Storm’s guaranteed delivery – really guaranteeing processing of the end-to-end workflow by ack’ing messages as they flow from bolt to bolt. We knew that we’d see some performance degradation, but we were instantly hit with a 2.5X performance hit.
The test protocol was simple enough:
Per cluster environment:
- Preload Kafka broker with 50 GB of raw log data from production cluster.
- Deploy Storm topology with a Kafka Spout to consume and an anchored bolt to map events to a customer. Kafka had partitions with 8 spouts to drain from and 20 mapper bolts. The Kafka disk was on a 4K Provisioned IOPS backed AWS instance, so disk performance was not a problem.
Ack’ing per tuple turned off (Config.TOPOLOGY_ACKERS set to 0)
- Performance was 200,000 events per second on average
- Kafka disks were red hot.
Ack’ing per tuple enabled
- Performance was 80,000 events per second on average.
- Kafka disks were not saturated and bolts were not running on high capacity.
In our quest to optimize performance everywhere, we thought that instead of ack’ing individual logs, we should batch a bunch of logs together and ack the whole set. However, this meant changing the semantics of Storm’s “message.” For us, a Storm message was a log, from the collector to the Kafka spout to bolt to bolt. Treating a Storm message as a bunch of logs meant changing the Kafka spout and also changing each bolt to re-interpret a single message as a bunch of logs. Not trivial…
Why We Decided to Move Away from Storm
After we thought really hard about what we needed from a high performance stream processing framework, we decided to go custom. We designed our own fast, simple, very low overhead, reliable, and committed queue for module-to-module communication. And because we knew and appreciated Kafka, it was easy to use it as the main component in our framework.
Porting our code to this new framework was actually very easy. We were able to reuse the code from each of our bolts to write all the required modules. Of course, we had to modify slightly each piece of code to include communication to/from Kafka.
The end result is that we now have a very tight, high performance, high reliability, high resilience solution that perfectly implements our workflow – at sustained rates of 100,000+ events per second, each with an average of 300 bytes per message. We can process more than 2 terabytes per day per cluster. That means we collect, store, index, parse, alert, report, dashboard, etc etc across thousands of customers of all sizes… in real time!
Conclusions: What We Learned About Storm
We thought it was important to give you an update on this topic since we’ve been such a strong advocate for Apache Storm. We still believe that Storm is a great solution with great potential (after all, we were only using version 0.82). But in the end, as we pushed on the edges we realized that we needed a custom framework built for the type of performance and requirements that are unique to our high-performance massively scalable cloud-based log management solution… our secret sauce.