Well… it looks like I have not published a single professional blog posting all year, so hopefully this will jumpstart my efforts. And, of course, I’m behind the eight ball on this one! It is the Saturday before the Monday night user group presentation I committed a long time ago to present at, https://www.meetup.com/Atlanta-Net-User-Group/events/244527107/, and I’m FINALLY getting around to fully testing this all out. Guess I need to read Getting Things Done again!

Yes, you read that right! I’m going to talk about and demonstrate developing, building, and deploying Storm topologies written in C#, not in Java.

If interested in that more mainstream approach take a peek at my YouTube video on that topic, https://www.youtube.com/watch?v=qePLLergwVM, as well as the Microsoft post at https://docs.microsoft.com/en-us/azure/hdinsight/storm/apache-storm-develop-java-topology for how to do this with Visual Studio (VS).

That aside, let’s get some C# topologies running on Storm!!

Provision Storm Cluster

Since I’m talking Microsoft, I decided it was best I run a HDInsight’s based Storm cluster. I used the instructions documented at https://docs.microsoft.com/en-us/azure/hdinsight/storm/apache-storm-tutorial-get-started-linux to spin up the cluster.  Thankfully, the estimate came in a bit cheaper than on that web page, but it is still expensive. 

Regarding this cost, and as clearly documented in the setup instructions, the problem with the HDInsight cluster for testing like this is that you cannot stop the meter.  Yes, you can stop the machines that the cluster is running on, but that doesn't stop the charge clock, so you have to tear it all down when you want to stop being charged.

Fortunately, it is easy to spin up a new one.  Creating my second cluster took 10 mouse clicks, filling out two textboxes (cluster name and ssh user's password), and less than four minutes of time.  The actual provisioning of the cluster took a bit less than 20 minutes and for those familiar with Hortonworks, you’ll feel right at home once it is all created.

Ambari UI

Storm UI

The HDInsights setup instructions show you how to deploy a pre-built WordCount topology and to stop it once you have finished exploring a bit as highlighted below.

HW13005:~ lmartin$ ssh sshuser@myhdinsight-ssh.azurehdinsight.net
sshuser@hn0-myhdin:~$ storm jar /usr/hdp/current/storm-client/contrib/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.WordCountTopology wordcount
32492 [main] INFO  o.a.s.StormSubmitter - Finished submitting topology: wordcount
sshuser@hn0-myhdin:~$ storm list
Topology_name        Status     Num_tasks  Num_workers  Uptime_secs
-------------------------------------------------------------------
wordcount            ACTIVE     29         3            634       
sshuser@hn0-myhdin:~$ storm kill wordcount
6116 [main] INFO  o.a.s.c.kill-topology - Killed topology: wordcount
sshuser@hn0-myhdin:~$ 

Create Development Environment

For a dev environment, I used https://buildazure.com/2017/03/07/visual-studio-2017-development-using-a-vm-in-azure/ as my setup guide and Microsoft Remote Desktop to access the VM from my Mac. I was reminded to fully stop (from the Azure portal) the VM as described in https://buildazure.com/2017/03/16/properly-shutdown-azure-vm-to-save-money/.  Fortunately, and unlike the cluster, I do not need to tear it all down while not running to limit my cloud expenses.

Deploy Topology from VS

For this initial deploy effort, I used the instructions at https://docs.microsoft.com/en-us/azure/hdinsight/storm/apache-storm-deploy-monitor-topology#hdinsight-tools-for-visual-studio to setup the bundled StormSample (a WordCount) topology.

If not familiar with WordCount, please take a look at hadoop streaming with .net map reduce api (executing on hdp for windows) which coincidentally was one of the demos when I delivered my hadoop demystified presentation (with atlanta's .net user group) four years ago.

The deployment straight from VS was incredibly simple; just a simple right-click on the project and I was off to the races.

That prompted me with the Storm cluster I already deployed on Azure.

Within a minute or so, I noticed down in the Output window that it was deployed.

I also noticed a new Storm Topologies View tab surfaced in VS where I could see the running topology along with the details I usually find in the native Storm UI including component statistics as well as topology level lifecycle events.

This was very nice!

While the direct-from-IDE option is nice, it is probably not what would happen in a real-world model. So, I did dabble around a while to see if I could use the zip generated as part of this visual deploy process to be used with the CLI storm jar command, but no-joy! This is an area that will need additional research on my part and, of course, if someone already knows how to do this please let me know in the comments section.

Code Walkthrough

While that bundled example was presenting WordCount, it also brought in a number of additional thoughts that did not make it the cleanest example that it could be. To help with that I added posted this example below to https://github.com/lestermartin/streaming-exploration/tree/master/dotnet-projects/SimplestWordCount. I found https://docs.microsoft.com/en-us/azure/hdinsight/storm/apache-storm-develop-csharp-visual-studio-topology to be a good resource in this exercise as well.

Sentence Spout

This spout randomly generates a sentence when its nextTuple() method is called.

    public class SentenceSpout : ISCPSpout
    {
        private Context ctx;
        private Random rand = new Random();
        string[] sentences = new string[] {
                                          "the cow jumped over the moon",
                                          "an apple a day keeps the doctor away",
                                          "four score and seven years ago",
                                          "snow white and the seven dwarfs",
                                          "i am at two with nature"};

        public SentenceSpout(Context ctx)
        {
            this.ctx = ctx;
            
            // identify that a 1-tuple of type string (a sentence) will be emitted
            Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();
            outputSchema.Add(Constants.DEFAULT_STREAM_ID, new List<Type>() { typeof(string) });
            this.ctx.DeclareComponentSchema(new ComponentStreamSchema(null, outputSchema));
        }

        public void NextTuple(Dictionary<string, Object> parms)
        {
            Context.Logger.Info("NextTuple enter");

            // select one of the random sentences
            string sentence = sentences[rand.Next(0, sentences.Length - 1)];
            // log info message of selected sentence
            Context.Logger.Info("Emit: {0}", sentence);

            // raise this 1-tuple into the stream
            ctx.Emit(Constants.DEFAULT_STREAM_ID, new Values(sentence));

            Context.Logger.Info("NextTuple exit");
        }
    }

Here is the output from the running topology.

SimplestWordCount_SentenceSpout - NextTuple enter
SimplestWordCount_SentenceSpout - Emit: snow white and the seven dwarfs
SimplestWordCount_SentenceSpout - NextTuple exit

Splitter Bolt

This bolt breaks the sentences apart into words and emits those via its output stream.

    public class SplitterBolt : ISCPBolt
    {
        private Context ctx;

        public SplitterBolt(Context ctx)
        {
            this.ctx = ctx;

            // declare that a 1-tuple (a sentence) will be processed
            Dictionary<string, List<Type>> inputSchema = new Dictionary<string, List<Type>>();
            inputSchema.Add(Constants.DEFAULT_STREAM_ID, new List<Type>() { typeof(string) });

            // identify that a 1-tuple (a word) will be emitted
            Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();
            outputSchema.Add(Constants.DEFAULT_STREAM_ID, new List<Type>() { typeof(string) });

            this.ctx.DeclareComponentSchema(new ComponentStreamSchema(inputSchema, outputSchema));
        }

        public void Execute(SCPTuple tuple)
        {
            Context.Logger.Info("Execute enter");

            string sentence = tuple.GetString(0); // grab the first (and only field) from the tuple
            // tokenize it into words
            foreach (string word in sentence.Split(' '))
            {
                // log each word and emit it into the output stream
                Context.Logger.Info("Emit: {0}", word);
                ctx.Emit(new Values(word));
            }

            Context.Logger.Info("Execute exit");
        }
    }

Here is the output from the running topology. As you can see, each sentence being consumed results in many words being emitted.

SimplestWordCount_SplitterBolt - Execute enter
SimplestWordCount_SplitterBolt - Emit: snow
SimplestWordCount_SplitterBolt - Emit: white
SimplestWordCount_SplitterBolt - Emit: and
SimplestWordCount_SplitterBolt - Emit: the
SimplestWordCount_SplitterBolt - Emit: seven
SimplestWordCount_SplitterBolt - Emit: dwarfs
SimplestWordCount_SplitterBolt - Execute exit

Counter Bolt

The final bolt maintains state of running total counts for each word.

    public class CounterBolt : ISCPBolt
    {
        private Context ctx;

        // a map to hold the words and their running totals
        private Dictionary<string, int> counts = new Dictionary<string, int>();

        public CounterBolt(Context ctx)
        {
            this.ctx = ctx;

            // declare that a 1-tuple (a word) will be processed
            Dictionary<string, List<Type>> inputSchema = new Dictionary<string, List<Type>>();
            inputSchema.Add(Constants.DEFAULT_STREAM_ID, new List<Type>() { typeof(string) });

            // identify that a 2-tuple (a specific word and its running total) will be emitted
            Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();
            outputSchema.Add(Constants.DEFAULT_STREAM_ID, new List<Type>() { typeof(string), typeof(int) });

            this.ctx.DeclareComponentSchema(new ComponentStreamSchema(inputSchema, outputSchema));
        }

        public void Execute(SCPTuple tuple)
        {
            Context.Logger.Info("Execute enter");

            string word = tuple.GetString(0); // grab the first (and only field) from the tuple

            // figure out if the word has already been encountered or not
            int count = counts.ContainsKey(word) ? counts[word] : 0;
            count++;                // update counter
            counts[word] = count;   // update the map

            // log the running count for the word and emit those two fields into the output stream
            Context.Logger.Info("Emit: {0}, count: {1}", word, count);
            this.ctx.Emit(Constants.DEFAULT_STREAM_ID, new List<SCPTuple> { tuple }, new Values(word, count));

            Context.Logger.Info("Execute exit");
        }
    }

Here is the output from the running topology. I am just focusing on the output from a single method invocation.

SimplestWordCount_CounterBolt - Execute enter
SimplestWordCount_CounterBolt - Emit: snow, count: 69
SimplestWordCount_CounterBolt - Execute exit

The Driver

This class assembles the topology by declaring the components and how they are wired together.

    class Program : TopologyDescriptor
    {
        public ITopologyBuilder GetTopologyBuilder()
        {
            // use TopologyBuilder to define topology and define each spout/bolt one by one
            TopologyBuilder topologyBuilder = new TopologyBuilder("SimplestWordCount" + DateTime.Now.ToString("yyyyMMddHHmmss"));

            topologyBuilder.SetSpout(
                "sentenceSpout",
                SentenceSpout.Get,
                new Dictionary<string, List<string>>()
                {
                    // name the 1-tuple's single field as 'sentence'
                    {Constants.DEFAULT_STREAM_ID, new List<string>(){"sentence"}}
                },
                1);  // initial nbr of instances of this spout (same for bolts below)

            topologyBuilder.SetBolt(
                "splitterBolt",
                SplitterBolt.Get,
                new Dictionary<string, List<string>>()
                {
                    // name the 1-tuple's single field as 'word'
                    {Constants.DEFAULT_STREAM_ID, new List<string>(){"word"}}
                },
                // wire it up to the stream from the sentence generator
                2).shuffleGrouping("sentenceSpout");

            topologyBuilder.SetBolt(
                "counterBolt",
                CounterBolt.Get,
                new Dictionary<string, List<string>>()
                {
                    // name the 2-tuple's fields as 'word' and 'count'
                    {Constants.DEFAULT_STREAM_ID, new List<string>(){"word", "count"}}
                },
                // wire it up to the stream from the sentence splitting bolt
                3).fieldsGrouping("splitterBolt", new List<int>() { 0 }); 
                   // ^^^^^ make sure every instance of specific word goes to same bolt instance

            return topologyBuilder;
        }
    }

As a reminder, this simple WordCount example can be retrieved from https://github.com/lestermartin/streaming-exploration/tree/master/dotnet-projects/SimplestWordCount.

Meetup Artifacts

In addition to the GitHub resources mentioned above, the presentation I will be using at Monday’s https://www.meetup.com/Atlanta-Net-User-Group/events/244527107/ Meetup is as follows.

Next Steps

If interested more about Apache Storm, then I highly recommend the project’s web site at http://storm.apache.org as a great place to visit. For Microsoft-specific development, these sites seems to be a great next step for learning more and to go beyond the simple example presented above.