🌊 A Realworld Introduction to Elixir Flow

Flow allows developers to express computations on collections, similar to the Enum and Stream modules, although computations will be executed in parallel using multiple GenStages” [Flow HexDocs]. This allows you to express common transformations (like filter/map/reduce) in nearly the same was as you would with the Enum module. However, because Flow is parallelized, it can allow you to process much larger data sets with better resource utilization, while not having to write case-specific, complex OTP code. Flow also can work with bounded and unbounded data sets, allowing you to collect data and pipe it through a Flow pipeline.

🥚 An Anecdotal Experience with Flow

Last summer, about 3 months into learning Elixir, I found myself in the middle of a project where I needed to collect product data from 5 different tables in a MSSQL Database and build a NoSQL database of products. I needed to process over 400,000 products daily. In addition to flattening the data, business rules for pricing, shipping restrictions, swatch options, etc. needed to be applied. We then shipped the data to our search provider, Algolia (amazing product btw). So let me use this story to make a case for Flow.

A co-worker of mine took an initial attempt at this project using NodeJS, but after getting about halfway through, we were already certain that it would not be performant enough. At this point we decided to develop the data processing pipeline in Elixir. Because we were both new to the language, we used Elixir’s Enum.map/2 and Enum.reduce/3 liberally, without any application of OTP. Surprisingly, we found that even with our extremely basic setup we were able to cut the time of the job down from over three hours to about 15 minutes.

Flow to the rescue!

However, we still weren’t completely satisfied with the performance of the tool, and we wanted to apply OTP to make the processing of our products concurrent. After reading Concurrent Data Processing with Elixir we decided to use Elixir’s Flow package combined with a series of GenStages. Our GenStages were fairly simple, consisting of a producer responsible for pulling product ID’s from a service bus, a consumer/producer responsible for collecting the product data from the database tables, another consumer/producer responsible for applying business rules and flattening the product data, and a consumer responsible for saving the data to the NoSQL database.

We do most of the data transformations in the second producer/consumer. This is where we applied our business rules for pricing, shipping, etc. and flattened our data. Underneath this Producer/Consumer we used Flow to add more concurrency to the processing. After massaging some of the parameters for how many concurrent processes Flow will use, we were able to process all of our product data in about 30 seconds, down from over 3 hours from NodeJS (which isn’t really a fair comparison, but aren’t big numbers are fun).

🐣 Your First Flow Pipeline

Let’s take a look at the power that Flow can provide you without much overhead. In this example, we’re going to be looking at a dataset of information on every DC Comic Book release. You can download it for yourself over here to follow along. Let’s create an application to go through the dataset and get the list of comics written by each specific author. I’m going to treat comics with multiple authors as a unique author, for simplicity’s sake. Start by creating a new mix project:

mix new comic_flow --module ComicFlow

Let’s navigate into the root directory and run iex:

cd /comic_flow
iex -S mix

Leave that for now and let’s open up lib/comic_flow.ex. In here, you can remove all of the code within the module and replace it with a new function called get_writers.

defmodule ComicFlow do
  def get_writers do
    # TODO
  end
end

The file we’re going to read is in CSV format, and that’s going to need decoding. Rather than try to figure that out, let’s add the CSV package from hex.pm to our mix.exs file.

 {:csv, "~> 2.4"}

Now, back in get_writers we can read the data a stream:

File.stream!("dc_comics.csv")

And then we can call CSV.decode!/2:

File.stream!("dc_comics.csv")
|> CSV.decode!()

This next step is a little bit nasty, but it’ll work for the sake of example. The 2nd column of our CSV is the name of the comic and the 7th is the writer. We’re going to reduce over the list of comics and create a map of authors with their list of comics. This gives us a final get_writers function that looks like this:

  def get_writers do
    File.stream!("dc_comics.csv")
    |> CSV.decode!()
    |> Enum.reduce(
      %{},
      fn [_cat, name, _link, _pencilers, _artists, _inkers, writer | _tail], acc ->
        Map.update(acc, writer, [name], &[name | &1])
      end
    )
  end

Let’s test

Perfect! Let’s go back to our iex terminal and run this code. I’m going to time it using the Erlang :timer.tc function. To time function calls, I’m using this method. If you want to learn, that’ll take you about 2 minutes to read. I got an average time of ~171.25 seconds, with a minimum of 65 seconds and a max of 244.

Ouch. I’m not happy with that time, especially if the dataset gets larger. And if DC keeps releasing comics every year, it’s just going to get progressively worse. Enum has to load everything into memory in one fell swoop before reducing. When this happens, not many system resources are being used, as we’re just waiting for the entire stream to be read in. But what if there was a way reduce over our collection while the stream was still read in? That’s where Flow comes in. Let’s start by converting our stream into a Flow.

To Flow we Go

def get_writers do
  File.stream!("dc_comics.csv")
  |> CSV.decode!()
  |> Flow.from_enumerable()
  |> Flow.partition(
    key: fn [_cat, _name, _link, _pencilers, _artists, _inkers, writer | _tail] ->
      writer
    end
  )
end

Flow.from_enumerable/1 does the work of converting our data into a Flow, using the collection as the producer for the pipeline. There are options to configure the parallelization of the pipeline, but for now we can leave it without arguments. The next step is the Flow.partition/1 function. Partition is probably the trickiest function to use in Flow. In a simple case, like a tuple, it will default to taking left as the key. In our case, we want to specific the key that the partitioning will happen on, which will create the right hash tables. I like to think of this as “creating a path to the key.” In our example, the data is just a list, so we’ll destructure writer and return it back as the key.

Enum.Reduce -> Flow.reduce

Finally, we’ll convert our Enum.reduce into Flow.reduce. The functions are largely the same, with the primary exception being that Flow’s reduce has a function to return the initial accumulator, as opposed to the enumeration given to Enum.reduce.

def get_writers do
  File.stream!("dc_comics.csv")
  |> CSV.decode!()
  |> Flow.from_enumerable()
  |> Flow.partition(
    key: fn [_cat, _name, _link, _pencilers, _artists, _inkers, writer | _tail] ->
      writer
    end
  )
  |> Flow.reduce(
    fn -> %{} end,
    fn [_cat, name, _link, _pencilers, _artists, _inkers, writer | _tail], acc ->
      Map.update(acc, writer, [name], &[name | &1])
    end
  )
  |> Enum.to_list()
end

And lastly you’ll notice that we called Enum.to_list() Calling this function will execute the Flow in parallel and return back the file result. You can also use Flow.run(), but it will return an atom indicating success/failure, rather than the data from the flow.

After changing to Flow, the time to run through all of DC comics is now averaging 2.902 seconds. That’s about 1.7% of our initial time! Wow! That’s pretty cool.

🐥 Unleash the Flow

On one hand, there is a lot more to learn with flow and I’m sure you’ll run into some odd situations along the way where you’ll have to duke it our with Flow.partitiion, but on the other hand, you’re now mostly equipped to start implementing reliable, concurrent code with Flow. And the best part is, you didn’t have to wrap your head around the systems required for this concurrency, you are simply able to utilize functions that are nearly identical to the functions you already use on collections delay.

I’ve personally had a ton of fun using Flow for work and for sport. Like all things, Flow isn’t a silver bullet, and there are many times I still need to write concurrent code. However, for many applications, using Flow would provide a path to running highly-concurrent code without having to juggle a lot of complexity and without having to pay the price to build those solutions from scratch. Hopefully you can find a place for Flow in your own projects!