Blog Elasticsearch

Monitoring Elasticsearch with structured JSON: Logfooding, part 2

By Jon Gifford 20 Sep 2017

In this post, I’ll dig into the data that we capture on the Loggly service and show how we structure our data in JSON to make it most useful in the Loggly system. All of the data we send is in JSON, because this gives us complete control over how that data is handled within Loggly.

This post builds on part 1 of my series, in which we looked at what data we should be gathering to solve the problem of monitoring Elasticsearch clusters for indexing hotspots. To recap, we decided to gather:

  • Kafka deltas for the queues feeding our writers
  • Writer instrumentation of indexing write rates
  • Elasticsearch _cat/shards data for shard level detail on writes

The state of Kafka deltas

When we were building the Loggly system, we knew that we would need to be able to quickly determine how much data was queued up for processing in our Kafka topics. To satisfy that need, we built a command line tool that lets us get the current state of any topic in Kafka. We called this tool kafkamon, and it can be run as either a one-time request or in auto-update mode. This tool made it easy for us to see what was happening with Kafka in real time in a plain old terminal. Our old-school terminal junkies loved it.

Here is some (opaqued, and severely truncated) example output:

Number of brokers:       6
Number of partitions:    8
Total broker depth:      38.3GB
Total delta:             1.0MB

+------+------+---+-------------+-------------+-------------------------+-------------+--------+
|Broker| Topic| P |   Earliest  |    Latest   | Consumer                |   Current   | Delta  |
+------+------+---+-------------+-------------+-------------------------+-------------+--------+
| k01  | x.NN | 0 | 26241584163 | 26872059108 | writer.cNN-consumer00-0 | 26872038123 | 20.5KB |
| k01  | x.NN | 1 | 26239479295 | 26863422257 | writer.cNN-consumer01-0 | 26863394451 | 27.2KB |
| k01  | x.NN | 2 | 26241109138 | 26881494765 | writer.cNN-consumer10-0 | 26881472830 | 21.4KB |
| k01  | x.NN | 3 | 26239059715 | 26864718394 | writer.cNN-consumer11-0 | 26864679139 | 38.3KB |
…
+------+------+---+-------------+-------------+-------------------------+-------------+--------+

As you can see, the script output is designed to be easy for a human to read but has the nice property that it’s also easy to parse programmatically. As a trivial example, consider how easy it is to find the total delta…

$ cat kafkamon.out  | awk '{d=$10-$14;t+=d}END{print t}’
109981

With this as a base, you can see that it is relatively easy to set up a cron job to gather this data on a periodic basis. To “publish” it, we chose to collect all of the deltas for all of the Kafka topics and partitions simultaneously and publish them every two minutes as a single event. Here is a snippet from one of those events:

{..."delta.c02":176462124,"delta.c03":157929128,"delta.c04":169623072,"delta.c05":186136075,"delta.c06":211999823,...,"delta.cNN":72763405,...}

As you can see, we have one field per cluster. There are pros and cons to this approach, and we’ll talk about these a little later.

Writer instrumentation

Our writer (aka indexer) is written in Java, and we use a class named Metrics to gather counts and sums as events were processed. This class was designed to be as lightweight as possible and can emit the data it collects as JSON via syslog or an internal HTTP API. Our writers are multi-threaded, and we have one Metrics object per thread.

Here is a snippet of a sample event:

{
  "pid":"4985",
  "host":"xxxx",
  "name":"EventIndexer@1b7554d4",
  ...
  "Interval":{
    "tSecs":10,
    "BulkExecOK":{"cnt":1,"rate":0},
    "EventReceived":{"cnt":1266,"rate":126},
    "EventIndexed":{"cnt":1696,"rate":169},
    "EventIndexedBytes":{"cnt":3499922,"rate":349992},
    ...
  },
  "MultiInterval":{
    "tSecs":60,
    "BulkExecOK":{"cnt":5,"rate":0},
    "EventReceived":{"cnt":8428,"rate":140},
    "EventIndexed":{"cnt":8394,"rate":139},
    "EventIndexedBytes":{"cnt":16998778,"rate":283312},
    ...
  },
  "LifeTime":{
    "tSecs":2327912,
    "BulkExecOK":{"cnt":202176,"rate":0},
    "EventReceived":{"cnt":287482907,"rate":123},
    "EventIndexed":{"cnt":281807867,"rate":121},
    "EventIndexedBytes":{"cnt":699458042590,"rate":300465},
    ...
  }
}

Each event has some header information (e.g., process ID, host name, thread ID), then three sections containing the values being gathered. The first section (“Interval”) is the primary window, and spans 10 seconds. The second section (“MultiInterval”) is an aggregation of the last six windows, and so spans one minute. The final section is for the lifetime of the writer process. In the example above, the writer has been running for three weeks, five days, 22 hours.

There is a high degree of commonality in the structure of this event. Each of the three main sections contains exactly the same metrics (BulkExecOK, EventReceived, …), and each of those contain the same two fields: cnt and rate. This is a deliberate choice on our part to make each Metrics event as similar to every other Metrics event as possible. Consistent structure reduces the time it takes to understand what the data actually is.

Just like the Kafka delta data, this JSON has pros and cons. Again, we’ll delay discussion of those till we’ve seen the _cat/shard data.

Elasticsearch cat data

The Elasticsearch _cat request returns plain old text with spaces between fields and newlines separating the records. It also lets you specify which fields to return. We gather most of the fields available for shards, but here we’ll focus on the indexing.index_total and indexing.index_time fields we described in the first part of this series.

As we said there, we’re primarily interested in the rate of change of these values. In the same way that we can calculate the rate value for the Metrics data above, we can calculate a rate of change for the two index values for each shard.

Here is a snippet of a sample event:

{
  "loggly":{
    "cluster":"cNN",
    "deployment":"PROD",
    "what":"shardDelta",
    ...
  },
  "shard":{
    "name":"xxxx 16",
    "node":"cNN-xxxx",
    "ips":785.55,
    "iMSps":315.56,
    ...
  }
}

The first section is metadata that lets us filter down to a specific cluster in a specific deployment.

The second section gives the shard name (index name plus shard id), the assigned node for that shard,  and then the data values. The ips field contains the number of indexing requests per second being made to this shard. The iMSps field contains the milliseconds of CPU time per second spent on those index requests.  Both rates are calculated as the delta between the values from the previous and current request, divided by the time between those requests.

In this example, we’ve added about 786 events per second, and used about 315ms of CPU time to do so, since the last time we gathered data for this shard.

Our log event structure

In the above examples, we’ve seen three different approaches to structuring data. Each has strengths and weaknesses, and we reflects how we want to use the data.

Structure Strengths Weaknesses
Single event
containing all data
(Kafka delta)
  • High level view
  • Easy to understand
  • Small number of events needed
  • Can’t deep-dive
  • Natural limit to how much data each event can contain
  • Collection of data is more complex
Event per “worker”
with repeated structure(writer data)
  • Mid-/low-level view
  • Quickly identify which worker has issue
  • Natural place to collect data in the code
  • More events needed
  • Aggregate view across all workers may be difficult or impossible to see
Event per “worker”
with repeated structure plus metadata(_cat/shards data)
As above, plus:

  • Common view of data
As above, plus:

  • Larger events

The way you structure your data should be based on how you intend to use it and how you gather it. If all you need is a high-level view, then you can use a simple structure. If you need to dive into the details, your events will be smaller, but you’ll have more of them. Finally, if certain fields are common across many events, you can create a metadata section that lets you quickly slice and dice your data based on those fields.

Logging should evolve as your system does

We haven’t talked about the relative age of each of these implementations. The Metrics class has been used since the earliest days of Loggly, and has remained largely unchanged. The Kafka monitoring data is more recent, and the _cat/shards data is most recent. These last two replaced earlier versions and reflect the lessons we’ve learned. Implementing them required us to redo various searches, alerts, and dashboards, but the end result is that we have better visibility, which we’ll discuss in the final part of this series.

Your systems will evolve, as will your understanding of what you should be tracking. If we look at what we track within our system, some areas have remained stable, while others have evolved (or died) over time. As you evolve, you will find structures that work for you, in the same way that the “loggly” section works for us.

Tough data structure decisions

Whatever structure you use, you’re likely to find that there are some edge-cases that need to be solved. If we want to see the overall rate at which an individual writer is receiving and indexing data, there is no easy way to do that when all we have is per-thread logging. So, we aggregate the metrics from each thread up into an application-level Metrics object, and log this along with the per-thread data. Our problem is solved, the data structure is identical, and we can filter on the name field to switch between per-thread and per-writer data. When you can reuse a structure you’re already familiar with, there are definite benefits to doing so.

Similarly, when all we’re measuring is the total Kafka delta, we can’t determine whether a growing Kafka delta is caused by a single partition, or whether it is a global slowdown and all partitions are affected. We could expand our Kafka data to include the delta for every partition but the resulting event could grow so large that it becomes essentially unusable. Alternatively, we could emit an event per topic containing the delta for each partition in that topic. This would let us quickly identify which partitions were stuck. However, we would still need to map that partition to a writer, and then to a thread within that writer, to be able to identify why the partition is stalled. It is much quicker to go straight to the source of the problem via the writer Metrics. Just because you can do something doesn’t mean it’s the best approach.

Takeaways

  1. You aren’t always going to need it!
    • If the data is never going to be used for deep-dive analytics, you can use a simple structure, and cram a lot of related data into it (e.g., Kafka delta)
  2. Use the source, Luke…
    • If you have access to the source code, and thus direct access to the data you need, log it. This is always better than trying to correlate outside data to identify the root cause of a problem (e.g., writer instrumentation)
  3. Don’t reinvent the wheel.
    • If someone has already done the work for you, try using it before trying to do it yourself. (e.g., ES _cat). Even if it is not exactly what you think you need, it may still be useful, and will help you decide what you actually need.
  4. Don’t let perfect be the enemy of good.
    • No one can see the future, and analysis paralysis is real, so do something, even if you think it’s going to have to change “when we release X.” Real live data today is going to help you more than perfect data in a month.
  5. The only constant is change.
    • Your system (and your understanding of it) changes, and your logging should reflect that. You’re going to have too much logging from some components, and not enough for others. As you learn which is which, fix it!

Learn 9 tips on how to Configure Elasticsearch for Performance

Jon Gifford

Jon Gifford

Share Your Thoughts

Shares