Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagebash
themeEmacs
scala> wordsRdd.take(3)
16/05/19 23:11:12 INFO DAGScheduler: Job 1 finished: take at <console>:30, took 0.044692 s
res3: Array[String] = Array(I, am, Daniel)

scala> wordsRdd.take(3)
16/05/19 23:11:14 INFO DAGScheduler: Job 2 finished: take at <console>:30, took 0.020183 s
res4: Array[String] = Array(I, am, Daniel)

scala> wordsRdd.take(3)
16/05/19 23:11:17 INFO DAGScheduler: Job 3 finished: take at <console>:30, took 0.033659 s
res5: Array[String] = Array(I, am, Daniel)

 

Now that we got an order of magnitude speed improvement, and somewhat consistent response times, we are ready to stand up a test harness to prove that mapPartitions() is faster than map() when the function we are calling produces negative results when call once per record instead of once per partition.  The wrapSingleWord() function is just going to add ">>" before each word in the RDD and slap on "<<" on the backside.  And to really make the point, I've snuck in a bogus method that burns 50 millis of clock time for each function call which is my simulation of some of the housekeeping out of the way, what are we going to do to see and what does it all mean? 

c

 

ave stumbled across the following

 arbitrary expensive operation.  Just paste in the following functions separately in the shell.

Code Block
languagescala
themeEmacs
def simulateExpensiveObjectCreation() {
  Thread sleep 10
}

def wrapSingleWord(original: String) : String = {
  simulateExpensiveObjectCreation()
  val sb = new StringBuilder()
  return sb.append(">>").append(original).append("<<").toString()
}

Next up, let's run our RDD through a map() transformation and get a feel of the timing it takes to exercise this model where the wrapSingleWord() function gets call for each word in the RDD.

Code Block
languagebash
themeEmacs
scala> wordsRdd.map(word => wrapSingleWord(word)).take(10)
16/05/20 00:18:46 INFO DAGScheduler: Job 7 finished: take at <console>:34, took 0.151721 s
res9: Array[String] = Array(>>I<<, >>am<<, >>Daniel<<, >><<, >>I<<, >>am<<, >>Sam<<, >>Sam<<, >>I<<, >>am<<)

scala> wordsRdd.map(word => wrapSingleWord(word)).take(10)
16/05/20 00:18:52 INFO DAGScheduler: Job 8 finished: take at <console>:34, took 0.146428 s
res10: Array[String] = Array(>>I<<, >>am<<, >>Daniel<<, >><<, >>I<<, >>am<<, >>Sam<<, >>Sam<<, >>I<<, >>am<<)

scala> wordsRdd.map(word => wrapSingleWord(word)).take(10)
16/05/20 00:18:54 INFO DAGScheduler: Job 9 finished: take at <console>:34, took 0.153416 s
res11: Array[String] = Array(>>I<<, >>am<<, >>Daniel<<, >><<, >>I<<, >>am<<, >>Sam<<, >>Sam<<, >>I<<, >>am<<)

In this tightly constrained Sandbox cluster, we are seeing about 0.15 secs to execute this completely.  Now, to get mapPartitions() to work we need another method that does the same thing, but it has to allow the whole collection of elements to be passed into the function and it needs to return a whole collection on the way back.  The following is just a jazzed up version of the earlier method.

Code Block
languagescala
themeEmacs
import java.util._
import scala.collection.JavaConversions._

def wrapMultiWords(words: Iterator[String]) : Iterator[String] = {
 simulateExpensiveObjectCreation()
  val sb = new StringBuilder()
  val wList = new ArrayList[String]()
  while( words.hasNext ) {
    sb.setLength(0)
    wList.add( sb.append(">>").append(words.next()).append("<<").toString() )
  }
  return wList.iterator()
}

After you get that added into the shell we can exercise it to see how it compares.

Code Block
languagebash
themeEmacs
scala> wordsRdd.mapPartitions(word => wrapMultiWords(word)).take(10)
16/05/20 00:25:51 INFO DAGScheduler: Job 10 finished: take at <console>:40, took 0.069876 s
res12: Array[String] = Array(>>I<<, >>am<<, >>Daniel<<, >><<, >>I<<, >>am<<, >>Sam<<, >>Sam<<, >>I<<, >>am<<)

scala> wordsRdd.mapPartitions(word => wrapMultiWords(word)).take(10)
16/05/20 00:26:04 INFO DAGScheduler: Job 11 finished: take at <console>:40, took 0.050593 s
res13: Array[String] = Array(>>I<<, >>am<<, >>Daniel<<, >><<, >>I<<, >>am<<, >>Sam<<, >>Sam<<, >>I<<, >>am<<)

scala> wordsRdd.mapPartitions(word => wrapMultiWords(word)).take(10)
16/05/20 00:26:07 INFO DAGScheduler: Job 12 finished: take at <console>:40, took 0.055201 s
res14: Array[String] = Array(>>I<<, >>am<<, >>Daniel<<, >><<, >>I<<, >>am<<, >>Sam<<, >>Sam<<, >>I<<, >>am<<)

As you can see, the new RDD is the same, but the performance was dramatically better finishing in about a third of the time in this testing scenario.  Obviously, you'll need to do some testing with your own data and the functions that are being used in the mapping transformations, but if you do have any measurable difference in calling the related function, it will surely surface when you have to call it over and over again.  While Spark is not necessarily Hadoop, the need to POC your particular problem and validate your hypothesis is just as important in this space.  With that, you will likely see improvements by moving from map() to mapPartitions() when your related function can process all the elements at once.