hadoop streaming with .net map reduce api (executing on hdp for windows)
This blog post is for anyone who would like some help with creating/executing a simple MapReduce job with C# – specifically for use with HDP for Windows. For my Hadoop instance, I'm using the virtual machine I had fun during my installing hdp on windows (and then running something on it) effort. As this non-JVM language will ultimately require the use of Hadoop Streaming, the ultimate goal is to use the Microsoft .NET SDK for Hadoop (specifically the Map/Reduce project) to abstract away from some of the lower-level activities. So, let's do it!
Development Environment
First up I needed an IDE; remember, I live on my Mac and I prefer to use IntelliJ. I downloaded Visual Studio Express; specifically I installed Express 2013 for Windows Desktop. After creating a new project, I needed to pull in the Hadoop SDK and kudos to NuGet as I was impressed how smooth this has become since the last time I was programming in VS. The following YouTube video shows you how to do that if it is new to you.
That video is pretty helpful and if you watched it you should already be feeling pretty good about what you are about to do. Interestingly enough, it was a MapReduce tutorial and the presenter decided to only show a Map-only job. Not wrong, but surely interesting. Don't worry... we're going to do BOTH Map AND Reduce phases!
Actually, we'll need at least three classes since we'll need a class to be the job that kicks off the MapReduce job. For our use case, we'll go back to Square-1 and implement the old "Word Count" quintessential example. For anyone not familiar with it, we want to count words in a body of text such as the Shakespeare's Sonnets. Now that we have some test data, let's get into the code.
Writing The Code
These source files can be retrieve from GitHub at https://github.com/lestermartin/hadoop-exploration/tree/master/src/main/csharp/wordcount
Mapper
The Hadoop SDK gives us a class called MapperBase
that delivers to us a line at a time from the input data along with a MapperContext
object that we can use to emit back our 0..n KVPs for the Mapper contract. The code below show a simple implementation of this activity.
using Microsoft.Hadoop.MapReduce; using System; using System.Collections.Generic; namespace HadoopExploration { public class WordMapper : MapperBase { public override void Map(string inputLine, MapperContext context) { char[] delimiterChars = { ' ', ',', '.', ':', '\t' }; //split up the passed in line string[] individualWords = inputLine.Trim().Split(delimiterChars); //write output KVP for each one of these foreach (string word in individualWords) { context.EmitKeyValue(word.ToLower(), "1"); } } } }
Reducer
Like with the Mapper, we get a helper class with ReducerCombinerBase
(for those who already know what a Combiner is, the name tells you all you need to know). Just like in a Java Reducer class, we are guaranteed to get the full list of values for a particular key coming from all the Mapper instances.
using Microsoft.Hadoop.MapReduce; using System; using System.Collections.Generic; namespace HadoopExploration { public class SumReducer : ReducerCombinerBase { public override void Reduce(string key, IEnumerable<string> values, ReducerCombinerContext context) { int wordCount = 0; //iterate through the count values passed to us by the mapper foreach (string countAsString in values) { wordCount += Int32.Parse(countAsString); } //write output "answer" as a KVP context.EmitKeyValue(key, Convert.ToString(wordCount)); } } }
You might have noticed that the helper Mapper and Reducer helper classes were pegged to just using string and do not offer the richer choices you get with Java. That makes sense considering these are still leveraging Streaming under the covers which is limited to this since it uses stdin and stdout to execute this code.
Job
As you can see with earlier video, content from SDK project, and other search results such as Bryan Smith's tutorial, there is some information out there, but it is all tailored to using HD Insights. As I said at the top of this post, I wanted to run this locally with HDP for Windows and despite the fact that HD Insights utilizes HDP I had a lot of trouble with this last step. Here is a mechanism of stitching together the Mapper and Reducer in the context of a Hadoop Job and is the biggest driver for me to write all of this down.
using Microsoft.Hadoop.MapReduce; using System; using System.Collections.Generic; namespace HadoopExploration { class WordCount { static void Main(string[] args) { if (args.Length != 2) { throw new ArgumentException("Usage: WordCount <input path> <output folder>"); } HadoopJobConfiguration jobConfig = new HadoopJobConfiguration() { InputPath = args[0], OutputFolder = args[1], DeleteOutputFolder = true }; Hadoop.Connect().MapReduceJob.Execute<WordMapper, SumReducer>(jobConfig); } } }
If you forgot how to add command-line parameters here is a little help.
- Right-click on the project name in the Solution Explorer
- Under Application menu item, set
HadoopExploration.WordCount
as the Startup object - Under Debug menu item, add appropriate Command line arguments in the text box provided
Test It All Out
Save TypingTest.txt (is anyone old enough to remember this little line?) to your computer as a simple test file. It is just a 1000 copies of this line.
Now is the time for all good men to come to the aid of their country.
Put it somewhere in your HDFS home directory.
PS C:\HDP-Demo\WordCount> ls Directory: C:\HDP-Demo\WordCount Mode LastWriteTime Length Name ---- ------------- ------ ---- -a--- 7/27/2014 11:39 AM 70000 TypingTest.txt PS C:\HDP-Demo\WordCount> hadoop fs -put TypingTest.txt wordcount/TypingTest.txt PS C:\HDP-Demo\WordCount> hadoop fs -ls wordcount Found 1 items -rw-r--r-- 1 lester supergroup 70000 2014-07-27 21:32 /user/lester/wordcount/TypingTest.txt
Run the WordCount
class VSE; don't forget to add some command-line parameters such as "wordcount/TypingTest.txt wordcount/mroutput
" to get the following output.
File dependencies to include with job: [Auto-detected] C:\Users\lester\Documents\Visual Studio 2013\Projects\HadoopExploration\HadoopExploration\bin\Debug\HadoopExploration.vshost.exe [Auto-detected] C:\Users\lester\Documents\Visual Studio 2013\Projects\HadoopExploration\HadoopExploration\bin\Debug\HadoopExploration.exe [Auto-detected] C:\Users\lester\Documents\Visual Studio 2013\Projects\HadoopExploration\HadoopExploration\bin\Debug\Microsoft.Hadoop.MapReduce.dll [Auto-detected] C:\Users\lester\Documents\Visual Studio 2013\Projects\HadoopExploration\HadoopExploration\bin\Debug\Microsoft.Hadoop.WebClient.dll [Auto-detected] C:\Users\lester\Documents\Visual Studio 2013\Projects\HadoopExploration\HadoopExploration\bin\Debug\Newtonsoft.Json.dll packageJobJar: [] [/C:/hdp/hadoop/hadoop-1.2.0.1.3.0.0-0380/lib/hadoop-streaming .jar] C:\Users\lester\AppData\Local\Temp\streamjob5034462588780810117.jar tmpDir=null 14/07/27 22:43:29 INFO util.NativeCodeLoader: Loaded the native-hadoop library 14/07/27 22:43:29 WARN snappy.LoadSnappy: Snappy native library not loaded 14/07/27 22:43:29 INFO mapred.FileInputFormat: Total input paths to process : 1 14/07/27 22:43:29 INFO streaming.StreamJob: getLocalDirs(): [c:\hdp\data\hdfs\mapred\local] 14/07/27 22:43:29 INFO streaming.StreamJob: Running job: job_201407271215_0012 14/07/27 22:43:29 INFO streaming.StreamJob: To kill this job, run: 14/07/27 22:43:29 INFO streaming.StreamJob: C:\hdp\hadoop\hadoop-1.2.0.1.3.0.0-0 380/bin/hadoop job -Dmapred.job.tracker=HDP13-1NODE:50300 -kill job_201407271215_0012 14/07/27 22:43:29 INFO streaming.StreamJob: Tracking URL: http://10.0.2.15:50030/jobdetails.jsp?jobid=job_201407271215_0012 14/07/27 22:43:30 INFO streaming.StreamJob: map 0% reduce 0% 14/07/27 22:43:36 INFO streaming.StreamJob: map 100% reduce 0% 14/07/27 22:43:43 INFO streaming.StreamJob: map 100% reduce 33% 14/07/27 22:43:44 INFO streaming.StreamJob: map 100% reduce 100% 14/07/27 22:43:46 INFO streaming.StreamJob: Job complete: job_201407271215_0012 14/07/27 22:43:46 INFO streaming.StreamJob: Output: hdfs:///user/lester/wordcount/mroutput
Then you can validate the MapReduce final answers.
PS C:\HDP-Demo\WordCount> hadoop fs -ls wordcount Found 2 items -rw-r--r-- 1 lester supergroup 70000 2014-07-27 21:32 /user/lester/wordcount/TypingTest.txt drwxr-xr-x - lester supergroup 0 2014-07-27 22:43 /user/lester/wordcount/mroutput PS C:\HDP-Demo\WordCount> hadoop fs -ls wordcount/mroutput Found 2 items drwxr-xr-x - lester supergroup 0 2014-07-27 22:43 /user/lester/wordcount/mroutput/_logs -rw-r--r-- 1 lester supergroup 132 2014-07-27 22:43 /user/lester/wordcount/mroutput/part-00000 PS C:\HDP-Demo\WordCount> hadoop fs -cat wordcount/mroutput/part-00000 aid 1000 all 1000 come 1000 country 1000 for 1000 good 1000 is 1000 men 1000 now 1000 of 1000 the 2000 their 1000 time 1000 to 2000
That's it! It all works!! Now (and just for fun), run it again using shakepoems.txt as the input and add a comment to the page letting me know how many times the word "ceremony" shows up. Also, don't hesitate to share any successes on your own or any questions that may come up from this post.