In today’s fast-paced world of software development, new changes (both in code and infrastructure) are being released at breakneck pace. At Uber, we roll out ~11,000 changes every week and it’s important for us to have a way to quickly be able to identify and resolve issues caused by these changes. A delay in detecting issues can create a number of issues including impacts to: user experience, our ability to facilitate transactions on the platform, company revenue, and the overall trust and confidence of our users. At Uber, we have built a system called “Healthline” to help with our Mean Time To Detect (MTTD) and Mean Time To Resolve (MTTR) issues and to avoid potential outages and large-scale user impacts. Due to our ability to detect the issues in real time, this has become the go-to tool for release managers to observe the impact of canary release and decide whether to proceed further or to rollback.
In this article we will be sharing details on how we are leveraging Apache Pinot™ to achieve this in real time at Uber scale.
About Healthline
Healthline is a crash/exception logging, monitoring, analysis, and alerting tool built for all Uber mobile applications for multiple platforms as well as more than 5,000 microservices written using various programming languages, and owned by a number of tech teams within Uber.
On a very high level, it processes all the crashes, errors, and exceptions logs generated by internal systems and builds analytical insights on these by classifying the similar exceptions/crashes into different buckets called issues. It also solves for identifying potential changes causing these issues and subsequently notifying the system owners whenever there is an anomaly observed for a specific issue.
For this article, we will limit our scope to Mobile App Crash Analytics, however it’s quite similar for backend logs. Also we will not be covering the data collection pipelines and preprocessing steps like deobfuscation of crash dumps, classification logic, code change, and ownership identifications.
Terminology
Crash – We are using it to represent an instance of any fatal or non fatal (e.g., memory leak) report we receive from our SDK embedded in each Uber mobile app.
Issue – A cluster of crashes which are identified as having the same root cause.
App – An app represents an Uber-developed app. We consider App + Platform (iOS/Android) as one unique app, hence Rider iOS and Rider Android are counted as two apps.
Data Volume and Retention
At peak, we classify more than 1.5 million errors and warning logs from backend services, and more than 1,500 mobile app crashes per second across Uber apps for both iOS and Android. The size of the crash varies from 10 KB to 200 KB with an average per-day data size of 36 TB. The crash QPS is ~1,500. We retain the data for 45 days to understand historical trends. Most of our use cases access the last 30 days data.
Query Patterns
For dashboards and to perform alerting we perform following queries with expected response time:
Filtering Patterns on Data
- Exact match filters (e.g., city = ‘Bangalore’)
- Numerical range filters (e.g., report_time>=123 and report_time<=456)
- Partial text matching using regex
- Search use case:
Healthline should allow users to search crash reports using fields like crash dump module name, exception name, exception class, issue ID, etc.
- Search use case:
Aggregation Patterns on Data
We have a large number of attributes on which aggregation can be performed, and they may occur together in the same query.
Following are the aggregation patterns identified (with simplified examples):
- Aggregate by issue ID
- Apply count distinct for each: crash ID, user, device
- Apply min/max on report_time
- Aggregate by an attribute to get the record distribution over attribute values
- Some fields are present as arrays–for such fields, create a record distribution over unique array elements
Histogram Query Patterns on Data
We also display a variety of graphs/histograms on the UI. They can be generalized as “for a given time window, divide it into equal length bins, and for each bin, perform a particular aggregation.”
Why Apache Pinot
It’s clear from above query patterns that we have a very strong use case for aggregations and matches (including range match). Some partial match use cases are present, however they are very low in QPS. More importantly, most of our queries have a time window specified. This makes Pinot tailor-fit for our use case, with its rich support for various types of indices. We also have a strong platform team providing managed Pinot offering. Pinot also has a very strong open source community with some of the committors from Uber.
Intro to Pinot
Apache Pinot is a real-time, distributed, columnar OLAP datastore, which is used to deliver scalable real time analytics with low latency. It can ingest data from batch data sources (such as Apache Hadoop® Distributed File System, S3, Azure® Data Lake, Google Cloud Storage) as well as streaming sources (such as Apache Kafka®). Pinot is designed to scale horizontally, so that it can scale to larger data sets and higher query rates as needed.
A table is a logical abstraction that represents a collection of related data. It is composed of columns and rows (known as documents in Pinot). The columns, data types, and other metadata related to the table are defined using a schema.
Pinot breaks a table into multiple segments and stores these segments in a deep-store such as Hadoop Distributed File System (HDFS) as well as Pinot servers. Pinot supports the following types of tables:
Type | Description |
Offline | Offline tables ingest pre-built Pinot segments from external data stores and are generally used for batch ingestion. |
Real-time | Real-time tables ingest data from streams (such as Kafka) and build segments from the consumed data. |
Hybrid | Hybrid Pinot tables have both real-time as well as offline tables under the hood. By default, all tables in Pinot are hybrid. |
Excerpt taken from Apache Pinot Official Documentation. For more information, please visit the Apache Pinot Docs.
Architecture
Our ingestion pipeline processes raw crash data and enriches it with classification and other details. It publishes this enriched crash data to a Kafka topic. This is the source which we want to store and want to perform required aggregations and filtering operations to serve to user dashboards and other integrations.
- Write Path
- Apache Flink® job
- Streams from the input Kafka topic
- Flattening the data
- Compression and compression sampling
- Outputs the data to 2 Kafka topics as shown in the diagram above.
- Output Kafka topics – have all the flattened fields and Apache Hive™ ingestion enabled.
- Kafka Consumers – populates real-time Pinot tables from Kafka topic
- Scheduled Apache Spark™ job – populates offline Pinot tables from Hive datasets
- Apache Flink® job
- Read Path
- Reading from the table storing compressed data (healthline_crash_event_compressed_prod)
The data is stored in compressed form and will need to be decompressed using the generic decompressor and converted back to the original Crash Report. - Reading from table storing main flattened data (healthline_crash_event_prod) and exploded array data (healthline_crash_event_denormalized_prod)
The data is flattened and stored–i.e. the nested crash report fields need to be reconstructed using the generic unflattener.
- Reading from the table storing compressed data (healthline_crash_event_compressed_prod)
Hybrid Table Setup for Healthline
Hybrid Pinot tables have both real-time as well as offline tables under the hood.
Offline tables ingest pre-built Pinot segments from external data stores and are generally used for batch ingestion. Real-time tables ingest data from streams (such as Kafka) and build segments from the consumed data.
We utilize both real-time and offline tables for maximum efficiency as well as cost saving. As we can see in the diagram, there is a data overlap between the tables. This is intentional and needed. It is done to hedge against the fact that if the offline job experiences failures in future, the overlapping data is still present in the real-time table and we avoid complete data loss.
Although, while querying in case of overlap, Pinot gives higher priority to offline data and fetches from there.
Implementation
Since our use case is primarily that of aggregation, Pinot is the star candidate. We use Pinot to store data in various schemas, and perform aggregations to surface the metrics on UI.
Pinot Entity Modelling
The crash events received by Healthline follow a highly nested structure, consisting of primitive and collection objects. Since Pinot doesn’t support storing nested objects, we flatten the fields before dropping a crash event in Pinot. Also, since the bulk of crash events consists of the crash dump, and we only need to perform aggregations on metadata of the event, we store a subset of fields by flattening them. The whole crash event is also compressed and stored in a separate Pinot table which the users can retrieve on demand to view the crash dump.
We have implemented annotation-based data flattening and compression.
Data Flattening
Since Pinot does not allow “.” in the column name, we converted it to “_” and to keep naming consistent, switched to snake case.
Exploding Collections for Which Aggregation is Required
Our entity contains the fields experiments and analyticsLogs, which are an array. We perform aggregations on this array and surface the analytics for individual array elements on healthline. Since Pinot does not allow querying/aggregation on array elements, we took the approach to explode the data.
For each issue ID, get the array field of size n, and create n rows of data, each containing issue ID, and array element [i]. We store this denormalized data in healthline_crash_event_denormalized_prod. This table contains an additional column called “type” that indicates to which field the record corresponds.
Data Compression and Sampling
Crash Events processed by healthline are huge payloads, varying from 10 kB to 200 kB per event. Even after compressing the raw crash events, depending on the compression ratio achieved using the GZip compression algorithm, compressed events vary till 100 kB. The entire raw crash event is primarily used for the below use cases:
- Providing the instance specific view of the crash event on the dashboard
- Provide breadcrumbs (a trail of console/memory/network logs) view of the crash event
Both the above use cases are essentially a primary key (i.e.. crash_uuid) based lookup. Both the write and read path access patterns can be solved by a row-key-based lookup. The right database choice in such a scenario would have been a NoSql/Blob store, which can solve these use cases in a better way. However to avoid the complexity of additional DB integration and maintenance (infrastructure and development) cost, it is decided that Pinot will be used as a blob store in this case, even though it is not a right use case supported by Pinot.
During the POCs it is identified that segment size for a single table containing all the report types is getting bigger. Some of the data points around this are:
On a business-as-usual day, a single table containing all the crash types for 6 hours has a total table size of 40 GB and total of 30 Segments with an average size of 1.1 GB per segment. For each segment, no records are 100K. These numbers are not ideal for a regular Pinot table, and because of this there can be reliability issues.
To solve this (data size) problem there can be multiple solutions that need to be explored:
Solution 1 – Reduce the Retention time for both the tables (Offline and Real-time). Reducing the TTL might help in reducing the segments for offline tables, but not for real-time tables. At the current throughput, real-time tables will still have higher size per segment.
Solution 2 – GZIP compression algorithm is providing compression ratios of 50-60%. Using any alternative compression algorithm, even if we achieve 20% more compression, it will still not make the Pinot table reliable, as segment size would still be on the higher side.
Solution 3 – Split the data into multiple tables:
- Use a separate table for each crash type. There are 4 types of crashes processed by healthline (ANRs -Application Not Responding), MemoryLeaks, NonFatals, Crashes). Out of these types, Memory leaks and Non-fatals have higher throughput of events, contributing to nearly 80% of the overall data size. So the data size problem/reliability issue can occur for these tables as well.
- Uniformly distribute the data across multiple tables using a logical sharding approach. But this increases complexity and ease of maintenance.
Solution 4 – Apply random sampling on the data which is leading to higher data growth (discussed in next section).
Sampling Logic on Compressed Data
*sampling – By sampling in this document, we mean sampling the compressed data only. We would still be recording and storing index fields for ALL crashes, as the aggregation numbers should not be affected.
We display sample crash instances to users on the UI. Since a human user could only sift through a limited number of instances, it makes sense to store only a subset of compressed data. Currently the system shows up to 1,000 instances, which can safely be brought down to a couple hundred.
A point to note is that we do not want a very small time window, as it will create a lot of buckets and Flink will spend too many resources on managing and distributing the buckets, making the job less efficient. Hence it is a tradeoff between resource management overhead and job stability. By POCs, 10s time window was found to suit our use case.
Advantages of Sampling:
- With sampling, we are able to keep the Pinot segment size and the number of segments in check
- Increased Flink throughput, as only a subset of crashes need to be compressed now, and less time is spent in compute
- Increased stability of Flink jobs
Flink distributes the buckets randomly across task managers (TM) which are worker nodes- If the time window was not included in the bucket key, since some issues can observe spikes, those buckets may experience hotspotting. Due to the hotspotting, the TM holding the spiky issue ID bucket may choke due to high input volume. This results in OOM and TM failure. Due to the nature of Flink jobs, failure of 1 TM results in the failure of the whole job.
- Hence we included the time window aspect in the bucket key. Even during spikes, a single bucket will not receive a very high traffic volume. This reduces hotspotting to some extent (similar to how sharding is used in databases). Since the Flink job manager distributes tasks almost uniformly across workers, the traffic of a single issue ID is split across multiple TMs. This reduces the load on the assigned TMs and makes the job less prone to failures.
Data Decompression and Unflattening Guideline
Unflattening
A config is created which holds the mapping between flattened and JSON keys along with class type. For example, for the nested JSON integer node A.B.C, the config will hold:
A_B_C -> A.B.C, Integer
This is a one-time operation which uses reflection to construct the config map. Using this config, we can quickly determine the JSON node of a given class attribute.
Decompression
Since there is a bug on Neutrino which converts bytes array to string inconsistently, we decided to do the string conversion at application level and store the compressed_data_string in Pinot. The bytes-to-string conversion uses a base-64 encoder. To decompress compressed_data_string:
- Convert the string to byte array using base 64 decoder.
- A shared compression and decompression library is used to keep the results consistent.
Use this library to decompress and get a decompressed object
Indexes Created for Healthline
Pinot supports a variety of indexes. However for our use case (filter patterns on data), we required 3 types of indexes:
- Inverted index – supports exact match
- Text index – supports partial text and regex matching
- Range index – supports numerical range match
Performance Analysis
We have migrated from using Elasticsearch to Pinot. The following table represents Elasticsearch vs. Pinot performance comparison when data was queried over a range of time periods. Time duration is in seconds.
The performance degradation in Elasticsearch is much higher compared to that of Pinot when the time period is increased.
Impact
Since we have migrated from using Elasticsearch to Pinot, the following numbers represent the impact of said migration:
Challenges
- Multiple queries (fan out) vs. a single elk query – Elasticsearch allows to query on multiple attributes across multiple dimensions in a single query, but Pinot does not. However, to get around this, we fired multiple queries on Pinot in parallel and reduced the overall page load time.
- Histogram function – Pinot provides no support for aggregation on multiple dimensions. Hence we had to write our own complex custom queries to achieve this.
- Number of segments fixed for offline job – Since number of segments is statically defined for offline job, if there is a spike in traffic it causes very large/bloated segments to be created. This reduces the segment reliability.
- No built-in Cross DC Replication – Pinot clusters in Uber do not support cross DC replication out of the box. To get around this, we leveraged the fact that our Kafka infrastructure provides aggregated cluster results. We use these aggregated results to populate the Pinot cluster in each region. This way, each Pinot cluster has all the data.
- Offline and Real-Time tables are not transparent to customers – Since Pinot users need to write their own offline jobs, the hybrid table architecture is not transparent to users and they need to understand some of the internal workings.
Future Scope
- Evaluating various text indices supported from Pinot for search use cases. Pinot supports various text indices like Lucene, Native text search, etc. We need to evaluate these indices from both functional and non-functional perspectives, and proceed with the best fit for our use case.
- Since there are multiple dimensions of aggregations, we need to understand the performance implication when aggregation is done on multiple attributes within the same query. This will also provide a view of stress testing on the aggregation across multiple dimensions.
- Aggregation on JSON data values. This feature can be super useful and can be leveraged without flattening the data.
- Correlation of data between multiple Pinot tables (e.g., correlation of Mobile crashes with service error logs).
Conclusion
Using Pinot for crash analytics has proven faster, cheaper, and easier than alternatives. This has certainly enhanced the user experience while bringing down operational costs for the Healthline team. We have also managed to unlock better analytical capabilities, which were earlier not possible or readily available. We are already working on next-gen healthline features like Breadcrumbs (a trail of console/memory/network logs for a given issue) and mobile crash- to-backend logs correlation.
Apache®, Apache Pinot™, Apache Hadoop®, Apache Kafka®, Apache Flink®, Apache Hive™, Pinot™, Hadoop®, Kafka®, Flink®, and Hive™ are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. No endorsement by The Apache Software Foundation is implied by the use of these marks.
Microsoft and Azure are trademarks of the Microsoft group of companies.
Amazon Web Services, AWS, the Powered by AWS logo, and S3 are trademarks of Amazon.com, Inc. or its affiliates
Main Image Attribution: NASA Goddard Space Flight Center. Image License.