Complex Deduplication in BigQuery
This story covers the first step in the deduplication and survivorship process used in master data management. If you’re interested, check out the followup story Complex Survivorship in BigQuery.
Deduplication is challenging with tabular data. You may have customers signing up for promotions with different email addresses, enrichment data that only joins on phone number, multiple emails or addresses for a given customer, missing values, and a multitude of other challenges. On top of all of that, deduplication is inherently a graph problem — specifically connected component subgraph extraction or union find. In this story I’ll show you how to do this all in BigQuery³.
1. Preparation
First we need some data. Use the following query to create a customer table. These customers come from various source systems. business_key
is the primary key of the record in the source system.
We’re going to be deduplicating on any non-key field that could uniquely identify an individual. In this example, those fields are email
, phone_number
¹, and salesforce_id
. Each field will be a node. Edges will be the complete graph connecting them. This just means that all nodes are connected to all other nodes. This will correspond to the number of queries in the Create edge table section. It’s best to keep the number of deduplication fields low since the number of edge queries will be field_count(field_count − 1)/2
.
Here’s the complete graph for our example:
Here’s what it would look like with 2 more fields:
2. Create node tables
The first table we need is a bridge between each data point and the raw_data
row it came from. This will be necessary at the end when we want to query the raw_data
table with group_id
.
We need one query per field that will be connected by an edge. You’ll notice that we have 2 places for salesforce_id
. We want the records that reference a salesforce record to have nodes for both their own data and the data from the salesforce row.
Schema
node_value
andnode_type
are used to build the edge tablebusiness_key
andsource_system
are used to join back toraw_data
node_id
⁴ is a hash ofnode_value
andnote_type
Now we can create our node table from this bridge table:
3 . Create edge table
For each combination of deduplication node types we’ll need an edge query. First I’ll show just the email->phone_number
query. Note that we create edges in only one direction. We’ll add the other directions later.
Schema
edge_id
is a hash of the left node + right nodesource_query
is for debugging to identify the source of the edgegroup_id
is anull
int64
that will be populated by a seed, then updated by the final deduplication query in a loop
Full Edge Query
The salesforce_id
nodes require a more complex join since they’re either the business_key
when source_system='salesforce'
or the salesforce_id
field.
4. Seed group_id
Before we start the deduplication process we need to seed the group_id
field with unique int
². Seeding must happen prior to converting to an undirected graph since both directions must have the same group seed for deduplication to work correctly. You’ll see why once we get to deduplication. There is some complexity to this process I’ve left out for now.
5. Convert to undirected graph
To turn a directed graph into an undirected graph you only need to insert the opposite edges. You’ll see we simply swap node_id_left
and node_id_right
and create a swapped edge_id
and insert this into the original edge
table.
6. Deduplication
This is where the magic happens. Essentially we’re walking all subgraphs in parallel, squashing group_id
to min(group_id)
, alternating our group by
between node_id_left
and node_id_right
. The following query is a single round trip that we’ll be repeating in a while
. If we instead replaced the update
with a join_to_left
CTE, we could duplicate all the CTEs and do multiple rounds of deduplication in a single long query. We could even create a massive query that does n rounds at once, and wouldn’t even have to loop if we know our largest group/subgraph won’t have a distance greater than n.
Note the group_id <> min_group_id
in the update
statement. That’s how we’ll terminate the loop. The first time we run this and get @@rowcount == 0
we’ll know we are done.
Here’s a step by step example of how it works…
Notice that Step 3 didn’t update any records so we know we’re done.
There are 2 groups in this example:
Walk in a loop
Using BigQuery’s scripts we can loop the query above. Note that in this example I nested the queries instead of using CTEs. Seeing it both ways may help to understand the recursive nature of this process. I also keep an iteration log to monitor performance.
You can limit iterations per run by changing the while
predicate to (updates > 0 and i < max_iterations) or i = 0
. It’ll start where you left off if you end up stopping and restarting it.
I’ve run this on ~800 million records⁵ without issue. This method is able to handle some truly massive datasets.
Results
Use the following query to view the raw data in groups.
If you’d like to squash the group_id
values, use the following query. If you are doing multiple runs and need stable group_id
values, you must only squash group_ids
> the max from the previous run.
Next Steps
- Now that we’ve done deduplication, we’re ready to create golden records by squashing these multiple duplicate records into one, in a process called survivorship. BigQuery has some pretty excellent functionality for doing that too.
- We could perform fuzzy matching via the join criteria in our edge query. We could also add edge weights and in our deduplication process we could sum the weights between 2 nodes to determine if it’s a true match.
- The walk query can be optimized. You only have to update the
group_id
for anode_id
that was updated in the previous round. If you add aniteration_id
to theedge
table, you can filter using this, or you can recreate a working table each round limited to the records you must consider. It’s more complex and error prone (you must consider allnode_ids
with changes from the previous iteration), but worth it if you have a huge dataset. - Build this into a DBT macro.
Appendix
Seeding groups between multiple runs
If you want a stable group_id
between runs, you’ll just need to start with group_id
values from previous run and seed new rows with values above the previous max. This works great if you never break a connection, but in real life you will. If you break groups you’ll need a separate process to handle this. In the past I’ve detected edge removal, then reseeded new group_ids
for all nodes/edges in the affected group. If they were all in group 5, they’ll now be in two new groups — maybe 10 and 11, and group 5 will be gone. This is even more challenging if you use weighted edges.
Using a graph database
The deduplication process would be much simpler in a database like Neo4j, but the data pipeline, training, and support would be significantly more complex. That said, if you’re doing a lot of group splitting, using weighted edges, or already have a Neo4j instance, it would make sense to consider this option.
Footnotes
[1]: Phone number is a bit sketchy. Normally you’d have some time threshold or additional piece of info you’d add to phone numbers to avoid spurious correlations.
[2]: We could just use the edge_id
if we want, but I prefer to use row_number()
to avoid massive negative numbers. We can always squash the results with a dense_rank()
after we’re done.
[3]: You could adapt this process to work on any tabular SQL based dbms (like Snowflake). You could also use a graph database.
[4]: farm_fingerprint()
is used here for terse output. Hash collision likelihood is an example of the birthday problem, so you’ll want to use a beefier hash function for large datasets. Check this out for examples of hash size and collision likelihoods.
[5]: For 800 millions records FARM_FINGERPRINT
would be insufficient. See [4] above.
Disclaimer
All code in my blog is meant for instructional purposes only. I make no guarantees of accuracy and if you choose to implement any of this, you do so at your own risk.