Apache Storm: What We Learned About Scaling & Pushing the Performance Envelope

 

Tweet This Post Button

Log management isn’t easy to do at scale. We designed Loggly Gen2 using the latest social-media-scaletechnologies—including ElasticSearch, Kafka from LinkedIn, and Apache Storm—as the backbone of ingestion processing for our multi-tenant, geo-distributed, and real-time log management system. (If you want to learn how we did it and why, watch our presentation at last year’s AWS re:Invent or check out the slides. There’s a reason that our talk has become the most watched non-keynote talk by far.)

Since we launched Gen2, we’ve learned a lot more about these technologies. We regularly contribute back to the open source community, so I decided that it’s time to give an update on our experience with Apache Storm and explain why we have dropped it from our platform, at least for now.

Log Management Presents Unique Performance Challenges

Apache StormIt turns out that we push more data—and complex data—to Storm than almost anybody out there. We have logged more than 750 billion events to date, and we measure our data space in petabytes. We cannot afford to lose a single log, even when we experience sustained spikes of several hours during which our system is ingesting 100,000+ events per second sustained, with log events that vary from 100 bytes to several megabytes in size. As such, we represent an ideal “real-world” environment to understand how complex systems behave at scale.

Apache Storm Offered An Attractive Framework for Stream Processing

When we designed our Gen2 architecture, we were attracted to what Storm could bring us: A framework for stream processing that was highly distributed and fault tolerant. It looked great architecturally with its spout and bolt approach, guaranteed data processing, and dynamic deployment.

The spouts and bolts principle was perfectly suited for the network approach that we had because sometimes logs go from bolt to bolt sequentially, and sometimes they have to be consumed by several bolts in parallel. This behavior is possible in Storm with a simple definition of the data stream paths.

Guaranteed data processing of the data stream meant that we didn’t need to worry as much about the inner working of the framework, and this allowed us to focus on writing the best possible code for the different bolts that make our log management so special and support our true multi-tenant approach.

Dynamic deployment meant that we could deploy once and then at a later stage easily add or remove new nodes to adjust for actual loads and requirements. This fit well with our business because the load on our system is highly dynamic.  We need the real-time elasticity to absorb massive peaks that last several hours and then release workers when facing valleys of low traffic. Need more parsers? No problem! Just add a few nodes and ask Storm to redeploy in the new topology. Indexers running almost idle? No problem! Release them and save resources.

So we built Loggly Gen2 inside the Storm container. A collector—think of it as an ingest engine—persists logs to Kafka and immediately streams them to Storm. At that point, Storm takes care of all the synchronization and orchestration for us, so we just had to code our bolts for log indexing, parsing, storage etc.

Post-launch Performance Optimization Efforts

Apache Storm loggingAfter extensive testing, we went live with this architecture in production. In parallel, we launched a major optimization effort, looking at every opportunity to streamline our solution, reduce overhead, and increase its performance – while applying Loggly’s “no log left behind” and absolute resilience principles. We haven’t always been perfect. But as the service is maturing, our uptime stats are something we’re really proud of.

In order to ensure that we were not losing logs, we decided to switch on Apache Storm’s guaranteed delivery – really guaranteeing processing of the end-to-end workflow by ack’ing messages as they flow from bolt to bolt. We knew that we’d see some performance degradation, but we were instantly hit with a 2.5X performance hit.

The test protocol was simple enough:

  • Per cluster environment:

    1. Preload Kafka broker with 50 GB of raw log data from production cluster.
    2. Deploy Storm topology with a Kafka Spout to consume and an anchored bolt to map events to a customer. Kafka had partitions with 8 spouts to drain from and 20 mapper bolts. The Kafka disk was on a 4K Provisioned IOPS backed AWS instance, so disk performance was not a problem.
  • Ack’ing per tuple turned off (Config.TOPOLOGY_ACKERS set to 0)

    1. Performance was 200,000 events per second on average
    2. Kafka disks were red hot.
  • Ack’ing per tuple enabled

    1. Performance was 80,000 events per second on average.
    2. Kafka disks were not saturated and bolts were not running on high capacity.

In our quest to optimize performance everywhere, we thought that instead of ack’ing individual logs, we should batch a bunch of logs together and ack the whole set. However,  this meant changing the semantics of Storm’s “message.” For us, a Storm message was a log, from the collector to the Kafka spout to bolt to bolt. Treating a Storm message as a bunch of logs meant changing the Kafka spout and also changing each bolt to re-interpret a single message as a bunch of logs. Not trivial…

Why We Decided to Move Away from Storm

After we thought really hard about what we needed from a high performance stream processing framework, we decided to go custom. We designed our own fast, simple, very low overhead, reliable, and committed queue for module-to-module communication.  And because we knew and appreciated Kafka, it was easy to use it as the main component in our framework.

Porting our code to this new framework was actually very easy. We were able to reuse the code from each of our bolts to write all the required modules. Of course, we had to modify slightly each piece of code to include communication to/from Kafka.

The end result is that we now have a very tight, high performance, high reliability, high resilience solution that perfectly implements our workflow – at sustained rates of 100,000+ events per second, each with an average of 300 bytes per message. We can process more than 2 terabytes per day per cluster. That means we collect, store, index, parse, alert, report, dashboard, etc etc across thousands of customers of all sizes… in real time!

Conclusions: What We Learned About Storm

We thought it was important to give you an update on this topic since we’ve been such a strong advocate for Apache Storm. We still believe that Storm is a great solution with great potential (after all, we were only using version 0.82).  But in the end, as we pushed on the edges we realized that we needed a custom framework built for the type of performance and requirements that are unique to our high-performance massively scalable cloud-based log management solution…  our secret sauce.

Tweet This Post Button


19 comments

  • Scott

    6 months ago

    Did you consider Storm Trident’s micro-batching?

    • Jon Gifford

      Jon Gifford

      3 months ago

      Scott, honestly I’m not sure at this point! I will ask my team.

  • Hariprasad Taduru

    1 year ago

    Currently I am using Storm version 0.9.5

  • Hariprasad Taduru

    1 year ago

    I am using storm from last one year. I am using Storm version 0.9.5. My worry with storm is at scaling part. I need to decide on number of tasks before deploying the topology. Because it cannot scale more than “number of tasks” assigned. If I have 5 executors, each having 5 tasks, then there will be total of 25 tasks. As per my previous experience, I can have at max 25 nodes in my cluster.

    Correct me If I am wrong.

  • Kishore Senji

    1 year ago

    Wouldn’t you have to get some kind of acking even in your Kafka->Module->Kafka system? Unless you are using AsyncProducer without acks, in which case it is not really reliable (and is similar to no acking in Storm)

    Plus at every stage you have Kafka storage (with replication) meaning you have to have more data nodes. Wouldn’t Storm get similar performance with acks if we add more nodes and increase parallelism?

    • Jon Gifford

      Jon Gifford

      3 months ago

      Yes, we ACK everything, for reliability.

      The primary benefit we get from the change is that every stage of the pipeline is decoupled from its predecessors and successors, so if there is an issue with any individual stage, it is limited to that stage. Previous stages can continue to run, simply building up the queue for the affected stage. With Storm, we’d have to reprocess all of the data through all previous bolts once we fix the issue. If the issue is in a bolt that follows a resource heavy bolt, then reprocessing can be very expensive. Decoupling, as we’ve done, removes this potential cost. We’re happy to trade off some of the power of Storm in favor of an operationally simpler model.

  • anonymous_support_feedback

    2 years ago

    Full disclosure, I do biz dev with SQLstream.

    Im wondering any one has considered SQLstream as an option for high throughput processing. We took part in a 3rd party benchmark that showed 15x performance with a lower TCO than a comparable storm deployment.

    2M eps using just 4 cores…..event size 1KB

    Just curious if anyone has feedback on our technology?

  • P. Taylor Goetz

    2 years ago

    I can’t tell without more information, but my guess is that the performance degradation was caused by not having enough acker tasks configured. Back in version 0.8.2 the default behavior (unless overridden) was to have 1 acker task per topology. In 0.9.x we changed that to be one acker task per worker. We also replaced the 0mq transport with a Netty implementation that’s added significant performance improvements.

    While there is a performance penalty inherent in turning on guaranteed processing, that penalty is nowhere near as dramatic as what you experienced if the number of ackers is set properly. (Again, I can’t tell if you tried tuning the number of acker tasks.)

    Disclaimer: I’m an Apache Storm committer.

  • Roger Rea

    3 years ago

    We’re seeing the same thing at many customers and prospects. Storm doesn’t scale to high volumes – with a customer we published an email analysis performance comparison of IBM InfoSphere Streams vs Apache Storm. ~10x better performance and also more scalability. Also 2 years ago we published log analysis benchmark and code for 39 functional and performance tests on Streams – for simple parsing, 14 million messages per second on 4 octicore nodes. 3.5 million for pattern detection.

  • nxspeed

    3 years ago

    What hardware were you running Storm on to get ‘200,000 events per second on average’?

  • Robert Mckeown

    3 years ago

    Thx for sharing.
    Is this another case of ….. ‘general solutions’ being good, and some being great, but if you want to wring that last bit of performance out, you have to go custom ?

  • EJ

    3 years ago

    Were you able to find the source for the 200k -> 80k decrease in throughput? Which aspect of the system started running hot ? Did the storm community offer any tuning advice?

    Thanks for the post!

  • neelesh

    3 years ago

    This is interesting. We are seriously looking at Storm for our event stream needs. The difference does not look too big (80K vs 100K). Does the custom solution run on less hardware, but providing better throughput? Is the custom solution built on something like Akka?

  • Michael E. Driscoll

    3 years ago

    Manoj –

    Nice piece and love the “no log left behind” philosophy.

    Recognizing every company’s use cases are different, we’ve found Storm has worked quite well for us. We run an always always-on ingestion service with sustained peaks north of 1 million EPS.

    We presented a talk at Strata two months ago on the Kafka –> Storm –> Druid architecture that we rely on: http://youtu.be/kJMYVpnW_AQ .

  • Amol Kekre

    3 years ago

    Manoj, datatorrent may be able to help

  • Bert

    3 years ago

    Thanks for sharing. When you tested with ACKS what did you have configured for TOPOLOGY_MAX_SPOUT_PENDING? We have found that if you have ACKs turned on and this is set too low it will significnalty impact the performance.

  • Jonathan

    3 years ago

    As a Storm and Kafka user, I’d love to see what you guys came up with. Any plans to open it up?

  • Michael Osofsky

    3 years ago

    Thanks for sharing, I hadn’t heard of Apache Storm before so I was glad to know how it performed.

Share Your Thoughts

Top