This blog post tells the story of how we leveraged a textbook data structure in our core platform and achieved a 15x speed up for a key data processing step. After a brief explanation of how union-find works, we’ll go into how Heap’s Identity uses it to provide a complete view of user behavior.
We’ll also describe union by hash, a novel heuristic that is more fitting in a distributed environment than traditional methods. Finally, we’ll cover how we used this knowledge to create a drop-in replacement in our product to reduce a data export process from ~4 hours to ~10 minutes.
Interested in learning more about Heap Engineering? Meet our team to get a feel for what it’s like to work at Heap!
Find(user_id), which returns a canonical ID corresponding to the set of IDs that user_id is a member of.
Identity in Heap
When a webpage or mobile app loads the Heap SDK, the SDK generates a semi-permanent random identifier unique to that cookie or mobile device. All event and user data is associated with that user ID. In order to offer product managers a complete view of a user’s journey through their product, we also offer an API to connect behavior across different devices. This API is called Identity.
If an end-user logs in, Heap customers can tag the user with a canonical identity. Now “identified”, the tracker sets its ID to a hash of this canonical identity and records the identification in our backend. All future data from that user is associated with the canonical identity for analysis. Additionally, the previously tracked data attached to the anonymous user ID is mapped to the canonical identifier, creating a unified view of the user’s interactions.
A single identified user can easily consist of multiple tracking instances. And to make matters more complex, identities can change! Two identified users, each consisting of a set of 1 or more tracking ids, can be merged into a single combined user for analysis. So we need to maintain a complex, dynamically changing user mapping. Enter union-find.
“[Union-Find] is a data structure that stores a collection of non-overlapping sets… [and] provides operations for adding new sets, merging sets, and finding a representative member of a set.”
We use union-find to store and retrieve sets of tracking IDs belonging to the same canonical user, functionality which is critical to support Identity in Heap. A database called “UsersDB” stores the union-find data structure, and allows for the two specified operations:
Find(user_id), which returns a canonical ID corresponding to the set of IDs that user_id is a member of.
Union(old_ID, new_ID), which merges the two sets of IDs that contain these two users.
When users are identified, UsersDB adds the new tracking ID into the data structure and unions the old ID with the new ID to persist the association. union-find ensures that future lookups with Find return the same value for both IDs. That is,
Find(old_id) == Find(new_id) should always be true, indicating that both the
old_id and the
new_id now refer to the same canonical user.
For performance reasons, our main analytics database shards all data based on the canonical user rather than the original user. We persist incoming events under their canonical user, found with Find. As a result, analytical queries act on identified users rather than the original users – they don’t need to do any work at query time to resolve identities and shuffle data accordingly.
Performance optimizations: theory and practice
For all practical purposes, union-find has a runtime of O(N) for a sequence of N union and find operations . This means the amortized runtime for a single operation is effectively constant! How is that possible?
Let’s take a closer look at union-find’s internals, explore two key optimizations (union by rank and path compression), and cover a novel heuristic we use in place of traditional methods.
Union-Find is internally represented by a set of directed trees, in which edges point up to the root of each tree. Each user ID is a node in a tree, and each tree of nodes represents a set of IDs mapped to one canonical user. A tree’s root node is the set’s canonical user. Union operations merge two trees and Find returns a canonical identifier.
An individual Union operation must perform two Find operations, one for each parameter. These Find results are used to determine if the two nodes are already in the same tree. If they are not, Union will merge the trees by connecting one tree’s root node to the other tree. Since adding an edge to the graph is a constant-time operation, a union runs in O(Find).
The Find operation’s runtime is determined by the number of edges between a node and its root, known as a path length.
Without optimizations, the worst case graph could contain a path of length N/2 and result in an O(N^2) runtime, which is undesirable here. The key factor in improving runtime is shortening the path length between any given node and its tree’s root.
Union by Rank and Path Compression
The first optimization, union by rank, is a heuristic for choosing the root of new trees. The goal is to limit growth of paths by consistently attaching smaller trees to larger ones. To do this, a score is calculated for each node — known as its “rank”. A root node’s rank approximates the length of the longest path in the tree.
Each node starts with a rank of 0. When two nodes’ are connected, the higher ranked root becomes the root of the new tree. If the two roots’ ranks are equal, the new tree’s root node is arbitrarily selected and its rank is increased by 1. This simple lightweight heuristic efficiently tracks tree size and limits growth.
Union by rank alone does not ensure short enough path lengths for an O(N) runtime. We must also take advantage of Find operations to shorten path lengths via path compression. Each Find traverses the entire path from lookup element (
Find(x)) to the root (
Find(x) = y). Since we know that all the elements in the path from
x to its root also belong in the same set, we can connect these elements directly to the root once it is found. Afterwards, any subsequent Finds for those elements will only encounter a path of length 1. You can think of this as caching the result of a Find operation by modifying the tree structure.
Union by rank slows down tree growth while path compression proactively shortens each path traversed. Path lengths, and therefore O(Find), become constant and result in a runtime of O(N) for N union-find operations.
Union by Hash
Practical considerations make it complicated to implement a union by rank scheme for our Identity API. Tracking rank requires additional metadata: a rank for each node. Instead, we’ve chosen a stateless heuristic which approximates the same effect of reducing tree growth from unions.
Rather than using rank, we compare hashed values. That is, in our implementation of
Union(A, B) we select the new root based on whichever is lower of
Hash(Find(B)). Each union can only reduce a given tree’s root’s hash. A tree with a small hash is likely the result of many unions, each consistently reducing the hash. In turn, this tree is increasingly likely to be the root of future unions, minimizing growth of path lengths. This method probabilistically approximates union by rank without any additional state.
Moving Complex Processing Out of the Database
Now that we’ve discussed union-find and its performance optimizations, let’s return to the main story of this blog post: how we delivered a 15x performance win by deploying a more efficient union-find implementation.
Recall that we store Identity information in a postgres instance called UsersDB. Identity’s union-find graph is stored as a table of edges, where each row maps one user ID to another. Union and Find operations are implemented as SQL queries on this table.
When we receive a new user mapping, these queries insert new rows into the table to update the data structure. As part of ingesting data for analysis, we find the canonical user for each incoming event from UsersDB and remap the event to that user. With some postgres tuning and a bit of smart caching, this union-find implementation has served Heap well for several years.
The Problem: Slow Queries and Frustrated Customers
The problems came about as we needed to scale our data out feature, Heap Connect. This feature lets a customer sync their Heap dataset to their data warehouse, typically Snowflake or Redshift. As part of each sync, Connect requires exporting a customer’s full set of user mappings from UsersDB.
With more and larger customers, we ran into scaling limits on UsersDB. The bulk export was written as a complicated but finely tuned SQL query, which ran Find on every ID in the customer’s dataset. In the face of long processing times, timeouts, and higher failure rates, we tried a number of tactics to improve the status quo:
- We profiled and tuned the query.
- We vertically scaled the RDS instance powering UsersDB, increasing memory, disk, and provisioned IOPS.
This only delayed the inevitable. The exports were still slowing down and consuming an ever increasing portion of the database’s shared resources. This started degrading the reliability of the export as well as UsersDB, causing performance regressions for other clients.
Our investigation highlighted both algorithmic and system limitations. Our implementation of Find did not implement path compression, and queries consistently ran out of memory and spilled to disk, dramatically slowing down and consuming precious disk I/O. Heap Connect’s resource utilization patterns were fundamentally at odds with ingestion for the analytics database. This made it hard to find a cost-effective and performant middle ground.
The Solution: From SQL to Application Logic
In response, we built a dedicated service in Go with a performant union-find implementation and workload-appropriate resources.
The Identity Export Service directly addressed two main bottlenecks from the slow query: lack of path compression and inadequate memory allocation. We implemented path compression and allocated enough memory to perform the entire export without spilling to disk, far outperforming the old query. Now, instead of a slow, flaky, and opaque SQL query, we have a fast service with detailed logging and metrics.
We narrowed our scope to only re-implement the export’s bottleneck: repeated Finds for evaluating user mappings. For each batch export, the new service fetches the full edge list from UsersDB, which avoids complicating Identity’s source of truth and dramatically simplifies implementation. By removing the Find operations from SQL, we reduced a complex aggregation query to a simple table scan with a filter.
Ultimately, the code itself was short and simple. In under a week, we successfully deployed the service and showed large performance improvements over the previous postgres query. The whole project shipped to Connect customers 2 weeks later, speeding up our slowest daily export from 4 hours to 15 minutes.
The project shipped quickly because we leveraged our internal service platform built on top of AWS’s Elastic Container Service (ECS). Once we completed the business logic and wired together the service, deploying to production was a breeze. CI/CD built and deployed the container without provisioning dedicated infrastructure.
We shipped the Identity Export Service during Hack Week. Hack Week is a time Heap engineers self-organize to work on projects that aren’t necessarily on the roadmap. In the past, projects focused on proofs of concept for speculative features. This year’s event was oriented towards projects that could ship and deliver value in under a week, and the Identity Export Service clearly fit the bill!
[^1] Technically, union-find has a runtime of O(α(N)N) for a sequence of N union and find operations. Here, α represents the inverse Ackermann function which is extremely slow growing and effectively constant for all practical purposes. For simplicity, we will treat α as a constant factor and consider union find to be an algorithm with an amortized runtime which is linear with respect to the number of union and find operations.
[^2] We create the worst case tree by running N/2 unions to create a single chain of N/2 edges. Then we run N/2 Find’s on the leaf node, each Find traversing N/2 edges. This results in an O(N^2/4) runtime, or, O(N^2)