A Practical guide to optimizing non-equi joins in Spark
Enriching network events with IP geolocation information is a crucial task, especially for organizations like the Canadian Centre for Cyber Security, the national CSIRT of Canada. In this article, we will demonstrate how to optimize Spark SQL joins, specifically focusing on scenarios involving non-equality conditions — a common challenge when working with IP geolocation data.
As cybersecurity practitioners, our reliance on enriching network events with IP geolocation databases necessitates efficient strategies for handling non-equi joins. While numerous articles shed light on various join strategies supported by Spark, the practical application of these strategies remains a prevalent concern for professionals in the field.
David Vrba’s insightful article, “About Joins in Spark 3.0”, published on Towards Data Science, serves as a valuable resource. It explains the conditions guiding Spark’s selection of specific join strategies. In his article, David briefly suggests that optimizing non-equi joins involves transforming them into equi-joins.
This write-up aims to provide a practical guide for optimizing the performance of a non-equi JOIN, with a specific focus on joining with IP ranges in a geolocation table.
To exemplify these optimizations, we will revisit the geolocation table introduced in our previous article.
+----------+--------+---------+-----------+-----------+
| start_ip | end_ip | country | city | owner |
+----------+--------+---------+-----------+-----------+
| 1 | 2 | ca | Toronto | Telus |
| 3 | 4 | ca | Quebec | Rogers |
| 5 | 8 | ca | Vancouver | Bell |
| 10 | 14 | ca | Montreal | Telus |
| 19 | 22 | ca | Ottawa | Rogers |
| 23 | 29 | ca | Calgary | Videotron |
+----------+--------+---------+-----------+-----------+
Equi-Join
To illustrate Spark’s execution of an equi-join, we’ll initiate our exploration by considering a hypothetical scenario. Suppose we have a table of events, each event being associated with a specific ownerdenoted by the event_owner column.
+------------+--------------+
| event_time | event_owner |
+------------+--------------+
| 2024-01-01 | Telus |
| 2024-01-02 | Bell |
| 2024-01-03 | Rogers |
| 2024-01-04 | Videotron |
| 2024-01-05 | Telus |
| 2024-01-06 | Videotron |
| 2024-01-07 | Rogers |
| 2024-01-08 | Bell |
+------------+--------------+
Let’s take a closer look at how Spark handles this equi-join:
SELECT
*
FROM
events
JOIN geolocation
ON (event_owner = owner)
In this example, the equi-join is established between the events table and the geolocation table. The linking criterion is based on the equality of the event_owner column in the events table and the owner column in the geolocation table.
As explained by David Vrba in his blog post:
Spark will plan the join with SMJ if there is an equi-condition and the joining keys are sortable
Spark will execute a Sort Merge Join, distributing the rows of the two tables by hashing the event_owner on the left side and the owner on the right side. Rows from both tables that hash to the same Spark partition will be processed by the same Spark task—a unit of work. For example, Task-1 might receive:
+----------+-------+---------+-----------+-----------+
| start_ip | end_ip| country | city | owner |
+----------+-------+---------+-----------+-----------+
| 1 | 2 | ca | Toronto | Telus |
| 10 | 14 | ca | Montreal | Telus |
+----------+-------+---------+-----------+-----------+
+------------+--------------+
| event_time | event_owner |
+------------+--------------+
| 2024-01-01 | Telus |
| 2024-01-05 | Telus |
+------------+--------------+
Notice how Task-1 handles only a subset of the data. The join problem is divided into multiple smaller tasks, where only a subset of the rows from both the left and right sides is required. Furthermore, the left and right side rows processed by Task-1 have to match. This is true because every occurrence of “Telus” will hash to the same partition, regardless of whether it comes from the events or geolocation tables. We can be certain that no other Task-X will have rows with an owner of “Telus”.
Once the data is divided as shown above, Spark will sort both sides, hence the name of the join strategy, Sort Merge Join. The merge is performed by taking the first row on the left and testing if it matches the right. Once the rows on the right no longer match, Spark will pull rows from the left. It will keep dequeuing each side until no rows are left on either side.
Non-equi Join
Now that we have a better understanding of how equi-joins are performed, let’s contrast it with a non-equi join. Suppose we have events with an event_ip, and we want to add geolocation information to this table.
+------------+----------+
| event_time | event_ip |
+------------+----------+
| 2024-01-01 | 6 |
| 2024-01-02 | 14 |
| 2024-01-03 | 18 |
| 2024-01-04 | 27 |
| 2024-01-05 | 9 |
| 2024-01-06 | 23 |
| 2024-01-07 | 15 |
| 2024-01-08 | 1 |
+------------+----------+
To execute this join, we need to determine the IP range within which the event_ip falls. We accomplish this with the following condition:
SELECT
*
FROM
events
JOIN geolocation
ON (event_ip >= start_ip and event_ip <= end_ip)
Now, let’s consider how Spark will execute this join. On the right side (the geolocation table), there is no key by which Spark can hash and distribute the rows. It is impossible to divide this problem into smaller tasks that can be distributed across the compute cluster and performed in parallel.
In a situation like this, Spark is forced to employ more resource-intensive join strategies. As stated by David Vrba:
If there is no equi-condition, Spark has to use BroadcastNestedLoopJoin (BNLJ) or cartesian product (CPJ).
Both of these strategies involve brute-forcing the problem; for every row on the left side, Spark will test the “between” condition on every single row of the right side. It has no other choice. If the table on the right is small enough, Spark can optimize by copying the right-side table to every task reading the left side, a scenario known as the BNLJ case. However, if the left side is too large, each task will need to read both the right and left sides of the table, referred to as the CPJ case. In either case, both strategies are highly costly.
So, how can we improve this situation? The trick is to introduce an equality in the join condition. For example, we could simply unroll all the IP ranges in the geolocation table, producing a row for every IP found in the IP ranges.
This is easily achievable in Spark; we can execute the following SQL to unroll all the IP ranges:
SELECT
country,
city,
owner,
explode(sequence(start_ip, end_ip)) AS ip
FROM
geolocation
The sequence function creates an array with the IP values from start_ip to end_ip. The explode function unrolls this array into individual rows.
+---------+---------+---------+-----------+
| country | city | owner | ip |
+---------+---------+---------+-----------+
| ca | Toronto | Telus | 1 |
| ca | Toronto | Telus | 2 |
| ca | Quebec | Rogers | 3 |
| ca | Quebec | Rogers | 4 |
| ca | Vancouver | Bell | 5 |
| ca | Vancouver | Bell | 6 |
| ca | Vancouver | Bell | 7 |
| ca | Vancouver | Bell | 8 |
| ca | Montreal | Telus | 10 |
| ca | Montreal | Telus | 11 |
| ca | Montreal | Telus | 12 |
| ca | Montreal | Telus | 13 |
| ca | Montreal | Telus | 14 |
| ca | Ottawa | Rogers | 19 |
| ca | Ottawa | Rogers | 20 |
| ca | Ottawa | Rogers | 21 |
| ca | Ottawa | Rogers | 22 |
| ca | Calgary | Videotron | 23 |
| ca | Calgary | Videotron | 24 |
| ca | Calgary | Videotron | 25 |
| ca | Calgary | Videotron | 26 |
| ca | Calgary | Videotron | 27 |
| ca | Calgary | Videotron | 28 |
| ca | Calgary | Videotron | 29 |
+---------+---------+---------+-----------+
With a key on both sides, we can now execute an equi-join, and Spark can efficiently distribute the problem, resulting in optimal performance. However, in practice, this scenario is not realistic, as a genuine geolocation table often contains billions of rows.
To address this, we can enhance the efficiency by increasing the coarseness of this mapping. Instead of mapping IP ranges to each individual IP, we can map the IP ranges to segments within the IP space. Let’s assume we divide the IP space into segments of 5. The segmented space would look something like this:
+---------------+-------------+-----------+
| segment_start | segment_end | bucket_id |
+---------------+-------------+-----------+
| 1 | 5 | 0 |
| 6 | 10 | 1 |
| 11 | 15 | 2 |
| 16 | 20 | 3 |
| 21 | 25 | 4 |
| 26 | 30 | 5 |
+---------------+-------------+-----------+
Now, our objective is to map the IP ranges to the segments they overlap with. Similar to what we did earlier, we can unroll the IP ranges, but this time, we’ll do it in segments of 5.
SELECT
country,
city,
owner,
explode(sequence(start_ip / 5, end_ip / 5)) AS bucket_id
FROM
geolocations
We observe that certain IP ranges share a bucket_id. Ranges 1–2 and 3–4 both fall within the segment 1–5.
+----------+--------+---------+-----------+-----------+-----------+
| start_ip | end_ip | country | city | owner | bucket_id |
+----------+--------+---------+-----------+-----------+-----------+
| 1 | 2 | ca | Toronto | Telus | 0 |
| 3 | 4 | ca | Quebec | Rogers | 0 |
| 5 | 8 | ca | Vancouver | Bell | 1 |
| 10 | 14 | ca | Montreal | Telus | 2 |
| 19 | 22 | ca | Ottawa | Rogers | 3 |
| 19 | 22 | ca | Ottawa | Rogers | 4 |
| 23 | 29 | ca | Calgary | Videotron | 4 |
| 23 | 29 | ca | Calgary | Videotron | 5 |
+----------+--------+---------+-----------+-----------+-----------+
Additionally, we notice that some IP ranges are duplicated. The last two rows for the IP range 23–29 overlap with segments 20–25 and 26–30. Similar to the scenario where we unrolled individual IPs, we are still duplicating rows, but to a much lesser extent.
Now, we can utilize this bucketed table to perform our join.
SELECT
*
FROM
events
JOIN geolocation
ON (
event_ip / 5 = bucket_id
AND event_ip >= start_ip
AND event_ip <= end_ip
)
The equality in the join enables Spark to perform a Sort Merge Join (SMJ) strategy. The “between” condition eliminates cases where IP ranges share the same bucket_id.
In this illustration, we used segments of 5; however, in reality, we would segment the IP space into segments of 256. This is because the global IP address space is overseen by the Internet Assigned Numbers Authority (IANA), and traditionally, IANA allocates address space in blocks of 256 IPs.
Analyzing the IP ranges in a genuine geolocation table using the Spark approx_percentile function reveals that most records have spans of less than 256, while very few are larger than 256.
SELECT
approx_percentile(
end_ip - start_ip,
array(0.800, 0.900, 0.950, 0.990, 0.999, 0.9999),
10000)
FROM
geolocation
This implies that most IP ranges are assigned a bucket_id, while the few larger ones are unrolled, resulting in the unrolled table containing approximately an extra 10% of rows.
A query executed with a genuine geolocation table might resemble the following:
WITH
b_geo AS (
SELECT
explode(
sequence(
CAST(start_ip / 256 AS INT),
CAST(end_ip / 256 AS INT))) AS bucket_id,
*
FROM
geolocation
),
b_events AS (
SELECT
CAST(event_ip / 256 AS INT) AS bucket_id,
*
FROM
events
)
SELECT
*
FROM
b_events
JOIN b_geo
ON (
b_events.bucket_id = b_geo.bucket_id
AND b_events.event_ip >= b_geo.start_ip
AND b_events.event_ip <= b_geo.end_ip
);
Conclusion
In conclusion, this article has presented a practical demonstration of converting a non-equi join into an equi-join through the implementation of a mapping technique that involves segmenting IP ranges. It’s crucial to note that this approach extends beyond IP addresses and can be applied to any dataset characterized by bands or ranges.
The ability to effectively map and segment data is a valuable tool in the arsenal of data engineers and analysts, providing a pragmatic solution to the challenges posed by non-equality conditions in Spark SQL joins.
Performant IPv4 Range Spark Joins was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.
Originally appeared here:
Performant IPv4 Range Spark Joins
Go Here to Read this Fast! Performant IPv4 Range Spark Joins