When working with data, you’re almost always going to hit a situation where you need to combine and transform multiple sets of data into a singular entity.
Doing this sort of transformation is simpler in analytics frameworks where whole columnar transformations are made easy. An example of this can be seen via SQL and Spark.
SQL Join Example
select order_date, order_amount
from customers
join orders
on customers.customer_id = orders.customer_id
PySpark Join Example
joined_df = orders_df.join(
customers_df,
orders_df.customer_id == customers_df.customer_id,
how='left',
)
What happens if we want to do this in a framework which emphasizes row-level transformations? Enter Apache Beam.
Apache Beam is a relatively new framework based on Map-Reduce and Java Streams paradigms. The goal is to take a wide set of data and apply subsequent transformations to filter or extract meaningful computations, and then finally to aggregate or reduce the result into something you’ll be able to use further down the pipeline.
The problem with this design is that it becomes hard to apply transforms between multiple streams on a columnar level, AKA joining.
Apache Beam as of today, has no stable way of joining by keys, so, how do we do this?
First, note that joins are very similar to grouping. For example, if I wanted to join by an id column, I would need to check that my ids were grouped together. If a particular data had an id of 1, I’d expect all other rows with an id of 1 to be clumped together. This way, it’d be easy to extract all fields that have an id of 1.
Now, the problem is, grouping doesn’t necessarily imply that all our rows are combined into a singular row with multiple fields. Most of the time, it just has the following format:
[
{'id': 1, 'data': ['dat_1', 'dat_2', 'dat_3']},
{'id': 2, 'data': ['dat_4', 'dat_5']},
]
This is the same way with beam’s grouping. We must unpack these values and combine them into a singular set.
Let’s go over an example with two sub-collections.
First, we create two different collections. Both collections have an id_col which we will be using as a join key.
Sub-Collection 1
{'id_col': 1, 'val_1': 4.5}
{'id_col': 2, 'val_1': 23.7}
{'id_col': 3, 'val_1': 684.34}
{'id_col': 4, 'val_1': 896.0}
Sub-Collection 2
{'id_col': 1, 'val_2': 351.0}
{'id_col': 2, 'val_2': 64.23}
{'id_col': 3, 'val_2': -5489.0}
{'id_col': 4, 'val_2': 894.2}
What we want is a combined collection structure that looks like this:
{'id_col': 1, 'val_1': 4.5, 'val_2': 351.0}
{'id_col': 2, 'val_1': 23.7, 'val_2': 64.23}
{'id_col': 3, 'val_1': 684.34, 'val_2': -5489.0}
{'id_col': 4, 'val_1': 896.0, 'val_2': 894.2}
Now that we know our goal, we need to complete these steps:
- Create and set the joinable key
- Group the data based on the key
- Unpack the grouped data into a singular set
Creating and Setting Keys
Creating a key that both collections can join is as simple as separating the key value from the rest of the dataset. For example, instead of representing a row as itself, we now have:
row = (row['key'], row)
Differentiating the row key like this will make it easier to be grouped via Beam’s GroupByKey.
Grouping By Key
Now that we have our key set, we can directly call Beam’s GroupByKey.
merge = [sub_pipe_1, sub_pipe_2] | beam.CoGroupByKey()
CoGroupByKey
instead of GroupByKey
in this example because we are joining multiple input collections instead
of a singular input collection.
Unpacking Grouped Values
Now that we have finished grouping, our dataset looks somewhat like the following:
(1, ({'id_col': 1, 'val_1': 4.5}, {'id_col': 1, 'val_2': 351.0}))
(2, ({'id_col': 2, 'val_1': 23.7}, {'id_col': 2, 'val_2': 64.23}))
We can break this custom tuple using a standard python assignment.
id_col, grouped = row
id_col | grouped |
---|---|
1 | ({‘id_col’: 1, ‘val_1’: 4.5}, {‘id_col’: 1, ‘val_2’: 351.0}) |
2 | ({‘id_col’: 2, ‘val_1’: 23.7}, {‘id_col’: 2, ‘val_2’: 64.23}) |
This structure brings us the benefits of being able to split and combine grouped sub-dictionaries (collections) into a unified one using standard python dictionary computations.
def unpack(row):
id_col, group = row
res = {'id_col': id_col}
for gp in group:
# update our resulting dict with all other vals
res.update({k: v for k, v in gp[0].items() if k != 'id_col'})
return res
row[0]
is the id_col since we have set it that way in the beginning as a joinable key.
Keys can also be multiple values assuming that you provide verification functions which confirms valid keys that can
be grouped.
The unpack
function (or something similar like it) can be used to unpack the nested dictionary structures into a
singular structure which contains unique values from different beam collections.
This step will finally give us the format that we wanted.
{'id_col': 1, 'val_1': 4.5, 'val_2': 351.0}
{'id_col': 2, 'val_1': 23.7, 'val_2': 64.23}
{'id_col': 3, 'val_1': 684.34, 'val_2': -5489.0}
{'id_col': 4, 'val_1': 896.0, 'val_2': 894.2}
Ready To Try It?
Try running and analyzing the code structure below.
Full Code
with beam.Pipeline() as p:
sub_col_1 = (
p
| 'Create collection 1' >> beam.Create([
{'id_col': 1, 'val_1': 4.5},
{'id_col': 2, 'val_1': 23.7},
{'id_col': 3, 'val_1': 684.34},
{'id_col': 4, 'val_1': 896.0},
])
| 'Create sub collection 1 joinable key' >> beam.Map(lambda row: (row['id_col'], row))
)
sub_col_2 = (
p
| 'Create collection 2' >> beam.Create([
{'id_col': 1, 'val_2': 351.0},
{'id_col': 2, 'val_2': 64.23},
{'id_col': 3, 'val_2': -5489.0},
{'id_col': 4, 'val_2': 894.2},
])
| 'Create sub collection 2 joinable key' >> beam.Map(lambda row: (row['id_col'], row))
)
# group these collections by id
merge = [sub_col_1, sub_col_2] | beam.CoGroupByKey()
# unpack merge contents to flattened dict
def unpack(row):
id_col, group = row
res = {'id_col': id_col}
for gp in group:
# update our resulting dict with all other vals
res.update({k: v for k, v in gp[0].items() if k != 'id_col'})
return res
(
merge
| 'unpack values' >> beam.Map(unpack)
| 'print final result' >> beam.Map(print)
)