Build a MapReduce flow in Elixir

Written by jcieslik | Published 2018/06/07
Tech Story Tags: elixir | mapreduce | functional-programming | concurrency

TLDRvia the TL;DR App

Giving the Elephant Some Elixir

MapReduce is a common Big Data pattern for analyzing a data set concurrently. This tutorial will introduce you to Elixir and the principals behind Hadoop. We will be building the equivalent of Hello World in MapReduce which is a word count program. Map and Reduce are also common higher order functions in the world of functional programming. Map is a function that takes a list and an anonymous function or lambda as arguments, applies the function to each element in the list, and returns a new list with the output of the lambda on each element. Reduce is a similar function in that it takes the same arguments with one additional argument in Elixir, an accumulator, but returns an accumulated value instead of a list. Elixir is a great language to learn concurrency and MapReduce is both a useful example and shows off many of Elixir’s features.

MapReduce Flow

MapReduce is a pipeline through which data flows and is processed. It can be broken down into roughly 5 steps which correspond to 5 modules we will write in Elixir. Our first step is the Input Reader. This takes in data, splits it into a form that our Map process can read, and concurrently launches Map processes. Our Map process reads the data given to it, runs a function on each piece of data, and outputs a key value pair to a Partition/Compare process. The Partition process accumulates key value pairs from all Map processes, compares the pairs, and spawns Reduce processes for each unique key. Each Reduce process runs a function on each value that adds up all the values for the given key, and emits these values to the Output Writer. Finally, the Output Writer yields your data in a format of your choice.

Step 0: Setup

First make sure you have Elixir installed for your current system. Instructions can be found here. An excellent introduction to the language is available from the main site here and hexdocs.pm provides Elixir module documentation here. Learnxiny also provides an excellent Elixir syntax primer. This tutorial assumes little to no experience with Elixir but familiarity with basic programming concepts and the command line of your operating system. All code for this project is available on Github.

Once you have Elixir installed create a new project using mix new.

mix new mapreduce --module MapReduce

Next move into your mapreduce directory and edit mix.ex with this line inside def project do [].

escript: [main_module: MapReduce]

Step 1: The Parent Process

Once you have a new project navigate into lib/ in your project directory and open mapreduce.ex which should have been autogenerated by mix . First remove everything from the file that is not defmodule MapReduce do and end. Let’s add a couple of imports for modules we will be writing later to the top of the module.

defmodule MapReduce dorequire InputReaderrequire Partition

Next let’s create a main function and pipe its argument into several functions. Pipe, |>, is an operator that acts very similarly to bash’s pipe.

def main(args) doargs |> parse_args |> pipelineend

After that let’s write a private function (denoted by defp instead of def) to parse our command line arguments. Here we are creating a variable with type tuple which contains the result of OptionParser.parse . Our command line argument is --file=example.txt and so we set our switches in the arguments to the parser function accordingly. We only need the first output from the parser which we will be returning from parse_args the other outputs are represented with an underscore to indicate we won’t need them.

defp parse_args(args) do{options, _, _} = OptionParser.parse(args,switches: [file: :string])optionsendend

The final part of our parent process is the pipeline. The first pipeline function is a pattern match case that checks for an empty file argument. The next pipeline function launches a Partition process but only stores the process id in a variable. We are using elem to give us the second element of the tuple returned by starting our process because we don’t need the atom, :ok, first element. Then the Partition process id and the file name are passed to an Input Reader that we will write in the next step. Finally we use a recursive function, forever, to keep the parent process alive while the rest of our MapReduce flow executes.

defp pipeline([]) doIO.puts "No file given"end

defp pipeline(options) dopartition = elem(Partition.start_link, 1)InputReader.reader("#{options[:file]}", partition)forever()end

defp forever doforever()end

Step 2: The Input Reader

This module is fairly simple, it contains one function that takes a filename and process id. This function will attempt to open a file, sending a message to STDERR if this fails. If our file is successfully opened we will execute a function on every line of the file. We use a regular expression to parse lines from our file and return them as a list. Enum.each is identical to map except that it returns an atom if it completes successfully instead of a list. For every line in our list we will spawn a Map process with the line and the Partition process id as arguments.

defmodule InputReader dorequire Mapper

def reader(file, partition) docase File.read(file) do{:ok, body} -> Enum.each(Regex.split( ~r/\r|\n|\r\n/, String.trim(body)), fn line -> spawn(fn -> Mapper.map(line, partition) end) end){:error, reason} -> IO.puts :stderr, "File Error: #{reason}"endendend

Step 3: The Mapper

The Mapper module is quite short, first we send the process id of the current Map process to the Partition process. This will enable us to check whether our Map processes are still running later on in Partition. After that we will again use Enum.each to send individual words from a list generated by splitting each line based on the space character to the Partition process.

defmodule Mapper dodef map(line, partition) dosend(partition, {:process_put, self()})Enum.each(String.split(line, " "), fn key -> send(partition, {:value_put, key}) end)endend

Step 4: The Partition

The Partition module is the most complex module we will be creating. Here we will be using the Task module instead of just spawn . We are using start_link instead of start because we want the parent process to be killed when this process is killed. Our linked processes are MapReduce, Partition, and OutputWriter in that order. OutputWriter will check if all Reduce processes have finished before exiting its own process. This will exit all linked processes all the way back to the parent. We use a lambda in start_link to start a recursive function that takes 2 lists as arguments. Note that end is necessary to close all lambdas.

defmodule Partition dorequire Reducerrequire OutputWriter

def start_link doTask.start_link(fn -> loop([], []) end)endend

Next we will write our recursive loop function. This will first check the length of the mailbox of messages sent by our Map processes. If it has processed all messages it will launch a check to see if we should launch our Reduce processes yet. We use Keyword.delete to remove all null or whitespace characters that have snuck into our key value pairs. Note the use of a sigil, ~s(\s), to represent a whitespace character. Next we have some pattern matching code that checks all of our received messages for specific tuples. If we receive the atom :processor_put we will append the process id of the caller Map process to our process list inside of a recursive loop call. If we instead receive the atom :value_put we will append a Keyword containing the key sent to us by Map and a value of 1 for the word count. Any other message will produce an error.

defp loop(processes, values) domailbox_length = elem(Process.info(self(), :message_queue_len), 1)if (mailbox_length == 0), do: (mapper_check(processes, Keyword.delete(Keyword.delete(values, String.to_atom(~s(\s))), String.to_atom(""))))receive do{:process_put, caller} ->loop([caller | processes], values){:value_put, key} ->loop(processes, [{String.to_atom(key), 1} | values])error -> IO.puts :stderr, "Partition Error: #{error}"endend

The final piece of Partition is mapper_check . This function checks if all of our Map processes are dead and launches Reduce processes for each unique word if they are. First we use Enum.filter to return a list, check,of any process that is still alive. Then we create a list, unique, of every unique key/word. If we have a non-zero number of keys and no Map process is alive then we start_link OutputWriter and pass its process id to every Reduce process we spawn. After this we use Enum.each on uniques and use Keyword.take to pull out every instance of each unique and then spawn a Reduce process with a list of all of those instances.

defp mapper_check(processes, values) docheck = Enum.filter(processes, fn process -> Process.alive?(process) == true end)uniques = Enum.uniq(Keyword.keys(values))if (length(check) == 0 && length(uniques) != 0), do: (output_writer = elem(OutputWriter.start_link, 1)Enum.each(uniques, fn unique -> spawn(fn -> Reducer.reduce(Keyword.to_list(Keyword.take(values, [unique])), output_writer) end) end))endend

Step 5: The Reducer

The second to last module is our Reduce process. This takes a list of tuples (key value pairs) and the process id of Output Writer. Similar to Map we send Output Writer the process id of Reduce to keep track of its status. Then we check to make sure tuples is not empty with a case pattern match. If tuples is not empty we send a string to Output Writer. This string might look a little strange but that’s because we are using Elixir’s string interpolation syntax, "#{}", to place 2 expressions separated by a space. Let’s break down the two expressions in the string we send to Output Writer. First we use elem, which we’ve seen before, to get the key from the first tuple in the list using hd or head. All of the keys in the list should be the same so it doesn’t matter which one we use. Second we use Enum.reduce to add up all of the values from our key values tuples using the accumulator given as the second argument in both reduce and the lambda in reduce.

defmodule Reducer dodef reduce(tuples, output_writer) dosend(output_writer, {:process_put, self()})case tuples do[] -> IO.puts :stderr, "Empty List"tuples ->send(output_writer, {:value_put, "#{elem(hd(tuples), 0)} #{Enum.reduce(tuples, 0, fn ({_, v}, total) -> v + total end)}"})endendend

Step 6: The Output Writer

This module should look very familiar since it’s a slightly modified version of Partition. The only difference is in reducer_check where we open a file, write each word and its count to STDOUT and the file we just opened, and then close the file and the whole process chain all the way back to our parent process. One important detail is the use of Path.join to give us consistent file paths across various operating systems. Another is the use of <> which is Elixir’s string concatenation operator.

defmodule OutputWriter dodef start_link doTask.start_link(fn -> loop([], []) end)end

defp loop(processes, values) domailbox_length = elem(Process.info(self(), :message_queue_len), 1)if (mailbox_length == 0), do: (reducer_check(processes, values))receive do{:process_put, caller} ->loop([caller | processes], values){:value_put, value} ->loop(processes, [value | values])endend

defp reducer_check(processes, values) docheck = Enum.filter(processes, fn process -> Process.alive?(process) == true end)if (length(check) == 0 && length(processes) != 0), do: ({:ok, file} = File.open(Path.join("test", "output.txt"), [:write])for value <- values doIO.puts valueIO.write(file, value <> ~s(\n))endFile.close(file)Process.exit(self(), :kill))endend

Testing and Wrap-up

Congratulations! You’ve successfully written a small but non-trivial Elixir program that does something useful. Let’s do some testing and then we’re finished. First create a directory called test and a file inside it filled with random text. I used randomtextgenerator.com to for my file. Next compile your project with mix escript.build. Finally run your code using ./mapreduce --file=test/input.txt. You should see several hundred lines of text on your command line like this.

Notice the exit message at the bottom.

Finally open your test directory and there should be a new file name output.txt with output identical to your command line except that the exit message won’t be at the end.

Thanks for reading, please leave a clap or several if this tutorial was helpful to you!

Joe Cieslik is the CEO of Whiteboard Dynamics, a full stack development team specializing in functional programming and Android. You can find out more about us, how we can help you, and our past projects at whiteboarddynamics.co.


Published by HackerNoon on 2018/06/07