hadoop streaming with .net map reduce api (executing on hdp for windows)

This blog post is for anyone who would like some help with creating/executing a simple MapReduce job with C# – specifically for use with HDP for Windows.  For my Hadoop instance, I'm using the virtual machine I had fun during my installing hdp on windows (and then running something on it) effort.  As this non-JVM language will ultimately require the use of Hadoop Streaming, the ultimate goal is to use the Microsoft .NET SDK for Hadoop (specifically the Map/Reduce project) to abstract away from some of the lower-level activities.  So, let's do it!

Development Environment

First up I needed an IDE; remember, I live on my Mac and I prefer to use IntelliJ.  I downloaded Visual Studio Express; specifically I installed Express 2013 for Windows Desktop.  After creating a new project, I needed to pull in the Hadoop SDK and kudos to NuGet as I was impressed how smooth this has become since the last time I was programming in VS.  The following YouTube video shows you how to do that if it is new to you.

That video is pretty helpful and if you watched it you should already be feeling pretty good about what you are about to do.  Interestingly enough, it was a MapReduce tutorial and the presenter decided to only show a Map-only job.  Not wrong, but surely interesting.  Don't worry... we're going to do BOTH Map AND Reduce phases!  (wink)

Actually, we'll need at least three classes since we'll need a class to be the job that kicks off the MapReduce job.  For our use case, we'll go back to Square-1 and implement the old "Word Count" quintessential example.  For anyone not familiar with it, we want to count words in a body of text such as the Shakespeare's SonnetsNow that we have some test data, let's get into the code.

Writing The Code

Mapper

The Hadoop SDK gives us a class called MapperBase that delivers to us a line at a time from the input data along with a MapperContext object that we can use to emit back our 0..n KVPs for the Mapper contract.  The code below show a simple implementation of this activity.

WordMapper.cs
using Microsoft.Hadoop.MapReduce;
using System;
using System.Collections.Generic;

namespace HadoopExploration
{
    public class WordMapper : MapperBase
    {
        public override void Map(string inputLine, MapperContext context)
        {
            char[] delimiterChars = { ' ', ',', '.', ':', '\t' };
            //split up the passed in line
            string[] individualWords = inputLine.Trim().Split(delimiterChars);
            //write output KVP for each one of these
            foreach (string word in individualWords)
            {
                context.EmitKeyValue(word.ToLower(), "1");
            }
        }
    }
}

Reducer

Like with the Mapper, we get a helper class with ReducerCombinerBase (for those who already know what a Combiner is, the name tells you all you need to know).  Just like in a Java Reducer class, we are guaranteed to get the full list of values for a particular key coming from all the Mapper instances.

SumReducer.cs
using Microsoft.Hadoop.MapReduce;
using System;
using System.Collections.Generic;

namespace HadoopExploration
{
    public class SumReducer : ReducerCombinerBase
    {
        public override void Reduce(string key, IEnumerable<string> values, ReducerCombinerContext context)
        {
            int wordCount = 0;
            //iterate through the count values passed to us by the mapper
            foreach (string countAsString in values)
            {
                wordCount += Int32.Parse(countAsString);
            }
            //write output "answer" as a KVP
            context.EmitKeyValue(key, Convert.ToString(wordCount));
        }
    }
}

You might have noticed that the helper Mapper and Reducer helper classes were pegged to just using string and do not offer the richer choices you get with Java.  That makes sense considering these are still leveraging Streaming under the covers which is limited to this since it uses stdin and stdout to execute this code.

Job

As you can see with earlier video, content from SDK project, and other search results such as Bryan Smith's tutorial, there is some information out there, but it is all tailored to using HD Insights.  As I said at the top of this post, I wanted to run this locally with HDP for Windows and despite the fact that HD Insights utilizes HDP I had a lot of trouble with this last step.  Here is a mechanism of stitching together the Mapper and Reducer in the context of a Hadoop Job and is the biggest driver for me to write all of this down.

WordCount.cs
using Microsoft.Hadoop.MapReduce;
using System;
using System.Collections.Generic;

namespace HadoopExploration
{
    class WordCount
    {
        static void Main(string[] args)
        {
            if (args.Length != 2)
            {
                throw new ArgumentException("Usage: WordCount <input path> <output folder>");
            }
            HadoopJobConfiguration jobConfig = new HadoopJobConfiguration()
            {
                InputPath = args[0],
                OutputFolder = args[1],
                DeleteOutputFolder = true
            };
            Hadoop.Connect().MapReduceJob.Execute<WordMapper, SumReducer>(jobConfig);
        }
    }
}

If you forgot how to add command-line parameters here is a little help.

  • Right-click on the project name in the Solution Explorer
  • Under Application menu item, set HadoopExploration.WordCount as the Startup object
  • Under Debug menu item, add appropriate Command line arguments in the text box provided

Test It All Out

Save TypingTest.txt (is anyone old enough to remember this little line?) to your computer as a simple test file.  It is just a 1000 copies of this line.

Now is the time for all good men to come to the aid of their country.

Put it somewhere in your HDFS home directory. 

 PS C:\HDP-Demo\WordCount> ls

     Directory: C:\HDP-Demo\WordCount

Mode                LastWriteTime     Length Name
----                -------------     ------ ----
-a---         7/27/2014  11:39 AM      70000 TypingTest.txt

PS C:\HDP-Demo\WordCount> hadoop fs -put TypingTest.txt wordcount/TypingTest.txt
PS C:\HDP-Demo\WordCount> hadoop fs -ls wordcount
Found 1 items
-rw-r--r--   1 lester supergroup      70000 2014-07-27 21:32 /user/lester/wordcount/TypingTest.txt

Run the WordCount class VSE; don't forget to add some command-line parameters such as "wordcount/TypingTest.txt wordcount/mroutput" to get the following output.

File dependencies to include with job:
[Auto-detected] C:\Users\lester\Documents\Visual Studio 2013\Projects\HadoopExploration\HadoopExploration\bin\Debug\HadoopExploration.vshost.exe
[Auto-detected] C:\Users\lester\Documents\Visual Studio 2013\Projects\HadoopExploration\HadoopExploration\bin\Debug\HadoopExploration.exe
[Auto-detected] C:\Users\lester\Documents\Visual Studio 2013\Projects\HadoopExploration\HadoopExploration\bin\Debug\Microsoft.Hadoop.MapReduce.dll
[Auto-detected] C:\Users\lester\Documents\Visual Studio 2013\Projects\HadoopExploration\HadoopExploration\bin\Debug\Microsoft.Hadoop.WebClient.dll
[Auto-detected] C:\Users\lester\Documents\Visual Studio 2013\Projects\HadoopExploration\HadoopExploration\bin\Debug\Newtonsoft.Json.dll
packageJobJar: [] [/C:/hdp/hadoop/hadoop-1.2.0.1.3.0.0-0380/lib/hadoop-streaming
.jar] C:\Users\lester\AppData\Local\Temp\streamjob5034462588780810117.jar tmpDir=null
14/07/27 22:43:29 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/07/27 22:43:29 WARN snappy.LoadSnappy: Snappy native library not loaded
14/07/27 22:43:29 INFO mapred.FileInputFormat: Total input paths to process : 1
14/07/27 22:43:29 INFO streaming.StreamJob: getLocalDirs(): [c:\hdp\data\hdfs\mapred\local]
14/07/27 22:43:29 INFO streaming.StreamJob: Running job: job_201407271215_0012
14/07/27 22:43:29 INFO streaming.StreamJob: To kill this job, run:
14/07/27 22:43:29 INFO streaming.StreamJob: C:\hdp\hadoop\hadoop-1.2.0.1.3.0.0-0
380/bin/hadoop job  -Dmapred.job.tracker=HDP13-1NODE:50300 -kill job_201407271215_0012
14/07/27 22:43:29 INFO streaming.StreamJob: Tracking URL: http://10.0.2.15:50030/jobdetails.jsp?jobid=job_201407271215_0012
14/07/27 22:43:30 INFO streaming.StreamJob:  map 0%  reduce 0%
14/07/27 22:43:36 INFO streaming.StreamJob:  map 100%  reduce 0%
14/07/27 22:43:43 INFO streaming.StreamJob:  map 100%  reduce 33%
14/07/27 22:43:44 INFO streaming.StreamJob:  map 100%  reduce 100%
14/07/27 22:43:46 INFO streaming.StreamJob: Job complete: job_201407271215_0012
14/07/27 22:43:46 INFO streaming.StreamJob: Output: hdfs:///user/lester/wordcount/mroutput

Then you can validate the MapReduce final answers.

PS C:\HDP-Demo\WordCount> hadoop fs -ls wordcount
Found 2 items
-rw-r--r--   1 lester supergroup      70000 2014-07-27 21:32 /user/lester/wordcount/TypingTest.txt
drwxr-xr-x   - lester supergroup          0 2014-07-27 22:43 /user/lester/wordcount/mroutput
PS C:\HDP-Demo\WordCount> hadoop fs -ls wordcount/mroutput
Found 2 items
drwxr-xr-x   - lester supergroup          0 2014-07-27 22:43 /user/lester/wordcount/mroutput/_logs
-rw-r--r--   1 lester supergroup        132 2014-07-27 22:43 /user/lester/wordcount/mroutput/part-00000
PS C:\HDP-Demo\WordCount> hadoop fs -cat wordcount/mroutput/part-00000
aid     1000
all     1000
come    1000
country 1000
for     1000
good    1000
is      1000
men     1000
now     1000
of      1000
the     2000
their   1000
time    1000
to      2000

That's it!  It all works!!  Now (and just for fun), run it again using shakepoems.txt as the input and add a comment to the page letting me know how many times the word "ceremony" shows up.  Also, don't hesitate to share any successes on your own or any questions that may come up from this post.