Blog from March, 2019

This blog post introduces the three streaming frameworks that are bundled in the Hortonworks Data Platform (HDP) – Apache Storm, Spark Streaming, and Kafka Streams – and focuses on the supervision features offered to the topologies (aka workflows) running with, or within, these particular frameworks. This post does not attempt to fully described each framework nor does it provide examples of their usage via working code. The goal is to develop an understanding of what, if any, services are available to help with lifecyle events, scalability, management, and monitoring.

The Frameworks

Kafka Streams

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in Kafka clusters. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology. –http://kafka.apache.org/documentation/streams/

Kafka Streams are tightly coupled with Kafka’s messaging platform; especially the streaming input data. Kafka Streams is intentionally designed to fit into any Java or Scala application which gives it plenty of flexibility, but offers no inherent lifecycle, scaling, management, or monitoring features.

Spark Streaming

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards.http://spark.apache.org/docs/latest/streaming-programming-guide.html

Apache Spark’s streaming frameworks allow for a variety of input and output data technologies. Spark Streaming apps are themselves Spark applications who, in a Hadoop cluster at least, run under YARN which provides coverage for many of the lifecycle and management features. The Spark framework addresses a number of the scaling and monitoring needs.

Apache Storm

Apache Storm is a … distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data, ... Storm is simple, can be used with any programming language, … Storm is fast: a benchmark clocked it at over a million tuples processed per second per node. It is scalable, fault-tolerant, guarantees your data will be processed, and is easy to set up and operate.http://storm.apache.org/

Storm topologies are assembled with components that can be built with any language, but primarily Java is used. It includes ready to use components for many queueing and database technologies. Storm is a comprehensive framework that solely focuses on streaming applications and has rich solutions addressing lifecyle events, scalability, management, and topology monitoring.

Feature Analysis

Kafka Streams

Spark Streaming

Apache Storm

Lifecycle Events

Start

(blue star) RYO

(tick) Submitter

(tick) Submitter

Stop

(blue star) RYO

(warning) Patterns available

(tick) Available

Pause/Restart

(blue star) RYO

(blue star) Not available

(tick) Available

Scalability

Initial Parallelization

(blue star) RYO

(tick) Parameterized

(tick) Parameterized

Runtime Elasticity

(blue star) RYO

(warning) Auto-scaling based on properties for min/max number of executors

(warning) No auto-scaling but each component can be scaled +/- individually

Management

Resource Availability

(blue star) No inherent resources

(tick) Managed

(tick) Managed

Failure Restart

(blue star) RYO

(tick) Automatic

(tick) Automatic

Monitoring

Topology UI

(blue star) RYO

(warning) Combined with all Spark jobs

(tick) Centralized

Integration

(blue star) RYO

(tick) JMX

(tick) JMX

Summary & Recommendations

Let me start by pointing out that it looks like Kafka Streams is “all bad”, but that’s not the case. It is build around the concept of writing and deploying standard applications and consciously does not want be part of a runtime framework. Due to that and the focus of this blog post, it should be obvious why it scored so low on these features. The RYO (Roll Your Own – aka “custom”) callouts I gave are likely a badge of honor to the folks who are bringing us this framework.

Kafka Streams also has a lot of early interest and I surely would not discount it for a second. The biggest issue for those teams who stand up a decent sized Hadoop/Spark cluster is that you don’t get to take advantage of all those nodes to run your Kafka Streams apps on. You’ll need to size out what is needed for each application and ensure that needed resources are available to run your apps on.

On the other end of the spectrum, one would think that will an almost perfect green checkmark score on the features identified that Storm would be a no-brainer. Storm is the grandpa of the streaming engines and its event-level isolation provide something the other microbatch frameworks can’t do. This maturity shines through in all of these supervision features, but on the other hand it is the least “exciting” of the frameworks for folks starting their streaming journey in 2019. If you need to get something into production asap and you just need to know it works – all day long and every day… then go with Storm!

This brings me to my personal recommendation of Spark Streaming. Note that this comes from a guy who really does love Apache Storm and values the simplicity & flexibility of Kafka Streams. There is simply too much excitement & focus around Spark in general and the ability to transition applications between batch and streaming paradigms with minimal coding close the case. It is still maturing, but its alignment with YARN help it score high on many of these supervision-oriented features.

Today is one of those days when I thought I knew something, stood firm with my assumption, and then found out that I wasn’t as right as I thought. Yes, a more humble person might even say they were wrong. But… I’m not totally wrong, but surely not totally right! (wink)

Much like my discoveries in learning something new every day (seems hdfs is not as immutable as i thought), this time was also about Hadoop’s HDFS. I stood firm on my understanding that clients were prevented from “seeing” an in-flight / partially-written file by the fact that the HDFS NameNode (NN) would not let them be aware of the not-completed file. Much of that assumption came from repetitively teaching the bit visualized below.

It seems I was convinced the “lease” that the NN provides (step 1) indicated to the NN to not provide any details on the file until the client indicates that it is complete (step 12). From some quick Googling, it seems this is a contentious question with multiple folks weighing in on locations such as Stack Overflow Question # 26634057 (visualized below).

As with most things in technology, if you aren’t sure of the answer then go find out with a simple test. I prepared a 7.5 GB file to be loaded into HDFS.

training@cmhost:~$ ls -l /ngrams/unzipped/aTHRUe.txt 
-rw-rw-r-- 1 training training 7498258729 Mar 20 09:16 /ngrams/unzipped/aTHRUe.txt

I then created a little script that would push it to Hadoop and print out the before and after timestamps so we could see how long it took to get loaded.

training@cmhost:~$ cat putFile.sh 
date
hdfs dfs -put /ngrams/unzipped/aTHRUe.txt /tmp/ngramsAthruE/
date

Lastly, I kicked it off in the background and started polling the directory on HDFS.

training@cmhost:~$ ./putFile.sh & 
[1] 10138
Wed Mar 20 09:32:51 PDT 2019

training@cmhost:~$ hdfs dfs -ls /tmp/ngramsAthruE
Found 1 items
-rw-r--r--   3 training supergroup 2818572288 2019-03-20 09:32 /tmp/ngramsAthruE/aTHRUe.txt._COPYING_

training@cmhost:~$ hdfs dfs -ls /tmp/ngramsAthruE
Found 1 items
-rw-r--r--   3 training supergroup 4831838208 2019-03-20 09:32 /tmp/ngramsAthruE/aTHRUe.txt._COPYING_

training@cmhost:~$ hdfs dfs -ls /tmp/ngramsAthruE
Found 1 items
-rw-r--r--   3 training supergroup 7113539584 2019-03-20 09:32 /tmp/ngramsAthruE/aTHRUe.txt._COPYING_
training@cmhost:~$ Wed Mar 20 09:33:59 PDT 2019

[1]+  Done                    ./putFile.sh

training@cmhost:~$ hdfs dfs -ls /tmp/ngramsAthruE
Found 1 items
-rw-r--r--   3 training supergroup 7498258729 2019-03-20 09:33 /tmp/ngramsAthruE/aTHRUe.txt
training@cmhost:~$ 

If you take a good look you’ll see I was right, but again, far less correct that I was hoping to be.

Yes, the file in question, aTHRUe.txt, is NOT accessible, but one with the same name and a ._COPYING_ suffix is available during the write operation that represents all completed blocks until that point.

If a client was looking for a specific file then this would be perfectly fine, but more than likely clients would be reading the contents of all files in directory at once and this half-baked file will surely cause issues.

This is worth considering when you are building your data engineering pipelines and should be addressed in a way as to not cause concern.

For a simple batch ingestion workflow, this could simply be using a working directory name to write to until all data is finalized and then a simple hdfs dfs -mv command could be executed to aid any client that is triggered on the appropriate data availability.

As always is the case, never be too sure of yourself to not listen to others or to give yourself a few minutes to validate your understanding with a simple test. And yes, enjoy the big piece of humble pie when it is served to you. (smile)

These findings only raised more questions as I thought about it. What happens to cat, cp, mv, and rm commands? I tested them out and found less than desirable answers, but ones that did fit in line with the findings above.

I was going to publish a second blog post as a follow-up as I’m a big believer that blog postings (not wiki pages!) are immutable, but since I was wrong about learning something new every day (seems hdfs is not as immutable as i thought) and these additional research findings are additive and do not alter the content above, I decided to just include them below.

If you don’t know why blog posts shouldn’t be edited, but wiki pages should; then I would suggest you should check out my enterprise 2.0 book review (using web 2.0 technologies within organizations). Yes, I am a “wiki gnome” at heart!!

Can the COPYING File be Read? YES.

The file cannot be read by the filename that it is being created as, but the COPYING file can.

training@cmhost:~$ ./putFile.sh & 
[1] 10138
Fri Mar 22 11:22:21 PDT 2019

training@cmhost:~$ hdfs dfs -ls /tmp/ngramsAthruE
Found 1 items
-rw-r--r--   3 training supergroup 1818572288 2019-03-22 11:22 /tmp/ngramsAthruE/aTHRUe.txt._COPYING_

training@cmhost:~$ hdfs dfs -cat /tmp/ngramsAthruE/aTHRUe.txt
cat: `/tmp/ngramsAthruE/aTHRUe.txt': No such file or directory

training@cmhost:~$ hdfs dfs -cat /tmp/ngramsAthruE/aTHRUe.txt._COPYING_

aflually_ADV	2004	1	1
aflually_ADV	2006	2	2
aflually_ADV	2008	1	1
afluente_.	1923	2	2
afluente_.	1924	5	1
afluente_.	1926	1	1
aflcat: Filesystem closed

training@cmhost:~$ hdfs dfs -ls /tmp/ngramsAthruE
Found 1 items
-rw-r--r--   3 training supergroup 2415919104 2019-03-22 11:22 /tmp/ngramsAthruE/aTHRUe.txt._COPYING_
training@cmhost:~$ Fri Mar 22 11:23:34 PDT 2019

[1]+  Done                    ./putFile.sh

training@cmhost:~$ hdfs dfs -ls /tmp/ngramsAthruE
Found 1 items
-rw-r--r--   3 training supergroup 7498258729 2019-03-22 11:23 /tmp/ngramsAthruE/aTHRUe.txt
training@cmhost:~$ 

Can the COPYING File be Copied? YES.

While I was able to create an exception on one test, the results below do validate that the in-flight COPYING file can be copied based on its size at the time of the operation.

training@cmhost:~$ ./putFile.sh &
[1] 18298
Fri Mar 22 11:53:02 PDT 2019

training@cmhost:~$ hdfs dfs -ls /tmp/ngramsAthruE
Found 1 items
-rw-r--r--   3 training supergroup  402653184 2019-03-22 11:53 /tmp/ngramsAthruE/aTHRUe.txt._COPYING_

training@cmhost:~$ hdfs dfs -cp /tmp/ngramsAthruE/aTHRUe.txt._COPYING_ /tmp/ngramsAthruE/inflight-copy.txt

training@cmhost:~$ Fri Mar 22 11:54:37 PDT 2019

[1]+  Done                    ./putFile.sh

training@cmhost:~$ hdfs dfs -ls /tmp/ngramsAthruE
Found 2 items
-rw-r--r--   3 training supergroup 7498258729 2019-03-22 11:54 /tmp/ngramsAthruE/aTHRUe.txt
-rw-r--r--   3 training supergroup 1225386496 2019-03-22 11:53 /tmp/ngramsAthruE/inflight-copy.txt
training@cmhost:~$

Can the COPYING File be Moved/Renamed? YES.

Much to my surprise, this actually caused no problems at all and the completed, full-sized, file retained the name it was renamed to.

training@cmhost:~$ ./putFile.sh &
[1] 24698
Fri Mar 22 12:02:37 PDT 2019

training@cmhost:~$ hdfs dfs -ls /tmp/ngramsAthruE
Found 1 items
-rw-r--r--   3 training supergroup  536870912 2019-03-22 12:02 /tmp/ngramsAthruE/aTHRUe.txt._COPYING_

training@cmhost:~$ hdfs dfs -mv /tmp/ngramsAthruE/aTHRUe.txt._COPYING_ /tmp/ngramsAthruE/inflight-move.txt
training@cmhost:~$ hdfs dfs -ls /tmp/ngramsAthruE
Found 1 items
-rw-r--r--   3 training supergroup 2013265920 2019-03-22 12:02 /tmp/ngramsAthruE/inflight-move.txt

training@cmhost:~$ hdfs dfs -ls /tmp/ngramsAthruE
Found 1 items
-rw-r--r--   3 training supergroup 4026531840 2019-03-22 12:02 /tmp/ngramsAthruE/inflight-move.txt

Fri Mar 22 12:03:54 PDT 2019

[1]+  Done                    ./putFile.sh

training@cmhost:~$ hdfs dfs -ls /tmp/ngramsAthruE
Found 1 items
-rw-r--r--   3 training supergroup 7498258729 2019-03-22 12:03 /tmp/ngramsAthruE/inflight-move.txt
training@cmhost:~$

Can the COPYING File be Deleted? YES.

Sadly, it can. Additionally, it causes havoc for the client writing the file.

training@cmhost:~$ ./putFile.sh &
[1] 11965
Fri Mar 22 12:15:20 PDT 2019

training@cmhost:~$ hdfs dfs -ls /tmp/ngramsAthruE
Found 1 items
-rw-r--r--   3 training supergroup  536870912 2019-03-22 12:15 /tmp/ngramsAthruE/aTHRUe.txt._COPYING_

training@cmhost:~$ hdfs dfs -rm -skipTrash /tmp/ngramsAthruE/aTHRUe.txt._COPYING_
Deleted /tmp/ngramsAthruE/aTHRUe.txt._COPYING_

training@cmhost:~$ 19/03/22 12:15:35 WARN hdfs.DFSClient: DataStreamer Exception
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /tmp/ngramsAthruE/aTHRUe.txt._COPYING_ (inode 56597): File does not exist. Holder DFSClient_NONMAPREDUCE_-1649048187_1 does not have any open files.
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3820)
  ...  ...  ...  STACK TRACE LINES RM'D  ...  ...  ... 
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:790)
put: No lease on /tmp/ngramsAthruE/aTHRUe.txt._COPYING_ (inode 56597): File does not exist. Holder DFSClient_NONMAPREDUCE_-1649048187_1 does not have any open files.

Fri Mar 22 12:15:36 PDT 2019

[1]+  Done                    ./putFile.sh

training@cmhost:~$ hdfs dfs -ls /tmp/ngramsAthruE
training@cmhost:~$ 

How Do I Feel About All of This?

I guess it doesn’t matter that I don’t like it… It is what it is!! Glad I know NOW!

Good luck and happy Hadooping.

As the title suggests, this posting was something I came up with AFTER I published the first three installments of my Open Georgia Analysis way back in 2014. And yes, you might have also noticed I took a long break from blogging about Big Data technologies in 2018 and I’m hoping to change that for 2019. On the other hand, my personal blog had a LOT of fun entries due to a TON of international travel in 2018.

Dataset & Use Case

I decided to clean up the dataset used and just focus on the Simple Open Georgia Use Case, so I ran the cleanup code from use pig to calculate salary statistics for georgia educators (second of a three-part series) to prepare a transformed version of the file which you can find a download link to at the bottom of Preparing Open Georgia Test Data. This will make the Spark code much simpler and more germane to the analysis code I want to present.

On an HDP 3.1.0 cluster I have available, I loaded this file as shown below.

[dev1@ip-172-30-10-1 ~]$ hdfs dfs -ls
Found 1 items
-rw-r--r--   3 dev1 hdfs    7133542 2019-03-09 22:33 cleanSalaryTravelReport.tsv
[dev1@ip-172-30-10-1 ~]$ hdfs dfs -tail cleanSalaryTravelReport.tsv
ZUCKER,STACEY E	PARAPROFESSIONAL/TEACHER AIDE	23387.87	0.0	LBOE	FULTON COUNTY BOARD OF EDUCATION	2012
ZURAS,LINDA D	SPECIAL ED PARAPRO/AIDE	29046.0	0.0	LBOE	FULTON COUNTY BOARD OF EDUCATION	2012
ZVONAR,JESSICA L	GIFTED	41672.9	44.37	LBOE	FULTON COUNTY BOARD OF EDUCATION	2012
ZWEIGEL,RENEE E	SCHOOL SECRETARY/CLERK	42681.23	0.0	LBOE	FULTON COUNTY BOARD OF EDUCATION	2012
[dev1@ip-172-30-10-1 ~]$ 

RDD Implementation

The cluster I am using has Spark 2.3.2 available to me as shown from the pyspark shell I will be using for my Resilient Distributed Dataset (RDD) API example.

[dev1@ip-172-30-10-1 ~]$ pyspark
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.2.3.1.0.0-78
      /_/

Using Python version 2.7.5 (default, Oct 30 2018 23:45:53)
SparkSession available as 'spark'.
>>> 

NOTE: The RDD Programming Guide is a great reference.

Load and Filter

Load up the HDFS file and verify it looks good.

>>> inputRDD = sc.textFile("/user/dev1/cleanSalaryTravelReport.tsv")
>>> inputRDD.take(2)
[u'ABBOTT,DEEDEE W\tGRADES 9-12 TEACHER\t52122.1\t0.0\tLBOE\tATLANTA INDEPENDENT SCHOOL SYSTEM\t2010', u'ABBOTT,RYAN V\tGRADE 4 TEACHER\t56567.24\t0.0\tLBOE\tATLANTA INDEPENDENT SCHOOL SYSTEM\t2010']
>>> inputRDD.count()
76943

Tokenize the tab-separated string to create an array.

>>> arrayRDD = inputRDD.map(lambda val: val.split("\t"))
>>> arrayRDD.take(2)
[[u'ABBOTT,DEEDEE W', u'GRADES 9-12 TEACHER', u'52122.1', u'0.0', u'LBOE', u'ATLANTA INDEPENDENT SCHOOL SYSTEM', u'2010'], [u'ABBOTT,RYAN V', u'GRADE 4 TEACHER', u'56567.24', u'0.0', u'LBOE', u'ATLANTA INDEPENDENT SCHOOL SYSTEM', u'2010']]
>>> arrayRDD.count()
76943

Trim down to just the Local Boards of Education.

>>> justLBOE = arrayRDD.filter(lambda empRec: empRec[4] == 'LBOE')
>>> justLBOE.take(2)
[[u'ABBOTT,DEEDEE W', u'GRADES 9-12 TEACHER', u'52122.1', u'0.0', u'LBOE', u'ATLANTA INDEPENDENT SCHOOL SYSTEM', u'2010'], [u'ABBOTT,RYAN V', u'GRADE 4 TEACHER', u'56567.24', u'0.0', u'LBOE', u'ATLANTA INDEPENDENT SCHOOL SYSTEM', u'2010']]
>>> justLBOE.count()
75029

We just want the 2010 records.

>>> just2010 = justLBOE.filter(lambda empRec: empRec[6] == '2010')
>>> just2010.take(2)
[[u'ABBOTT,DEEDEE W', u'GRADES 9-12 TEACHER', u'52122.1', u'0.0', u'LBOE', u'ATLANTA INDEPENDENT SCHOOL SYSTEM', u'2010'], [u'ABBOTT,RYAN V', u'GRADE 4 TEACHER', u'56567.24', u'0.0', u'LBOE', u'ATLANTA INDEPENDENT SCHOOL SYSTEM', u'2010']]
>>> just2010.count()
44986

Put it All Together

We showed this with a bunch of variables, but normally we would just chain all of these together into one long statement. Additionally, I’m chaining a method to cache these results since we will use them a few times below.

>>> filteredRecs = sc.textFile("/user/dev1/cleanSalaryTravelReport.tsv").map(lambda val: val.split("\t")).filter(lambda empRec: empRec[4] == 'LBOE').filter(lambda empRec: empRec[6] == '2010').cache()
>>> filteredRecs.count()
44986

Number of Employees by Title

To figure out how many people are in each job title we need to treat this a bit like the canonical Word Count problem. We create KVPs of the titles as the keys and hard-code a 1 as the value.

>>> titleKVPs = filteredRecs.map(lambda eR: (eR[1],1))
>>> titleKVPs.take(2)
[(u'GRADES 9-12 TEACHER', 1), (u'GRADE 4 TEACHER', 1)]

Then we can loop through them to present the totals for each of the 181 distinct job titles.

>>> titleTotals = titleKVPs.reduceByKey(lambda a,b: a+b)
>>> titleTotals.take(2)
[(u'SPEECH-LANGUAGE PATHOLOGIST', 1268), (u'RVI TEACHER', 58)]
>>> titleTotals.count()
181

Sort and show all results. As with the other analysis efforts, we see there are 9 Audiologists.

>>> titleTotals.sortByKey().collect()
[(u'ADAPTED PHYS ED TEACHER', 35), (u'ADULT EDUCATION DIRECTOR/COORD', 19), (u'ADULT EDUCATION TEACHER', 63), (u'AFTER-SCHOOL PROGRAM WORKER', 2), (u'ALTERNATIVE SCHOOL DIRECTOR', 2), (u'ASSISTANT PRINCIPAL', 436), (u'ATHLETICS DIRECTOR', 1), (u'ATTENDANCE WORKER', 12), (u'AUDIOLOGIST', 9), (u'AUDITOR', 11), (u'BOOKKEEPER', 118), (u'BUS DRIVER', 1321), (u'BUSINESS SERV SECRETARY/CLERK', 224), (u'CENTRAL SUPPORT CLERK', 4), (u'CONSTRUCTION MANAGER', 8), (u'CROSSING GUARD', 100), (u'CROSSROADS', 17), (u'CROSSROADS ALT SCHOOL TEACHER', 16), (u'CUSTODIAL PERSONNEL', 1393), (u'DATA CLERK', 98), (u'DEPUTY/ASSOC/ASSISTANT SUPT', 36), (u'DIAGNOSTICIAN', 23), (u'DIRECTOR OF CHILD SERVE', 2), (u'DIRECTOR OF CURRICULUM/INSTR', 31), (u'DIRECTOR OF MEDIA SERVICES', 2), (u'DIRECTOR OF PSYCHO-ED PROG', 1), (u'DIRECTOR OF SCHOOL SAFETY', 3), (u'DIRECTOR OF STUDENT SERVICES', 3), (u'EARLY INTERVENTION PRIMARY TEACHER', 605), (u'EARLY INTERVENTION TEACHER ', 147), (u'EIP 4TH AND 5TH GRADE TEACHER', 185), (u'ELEMENTARY COUNSELOR', 265), (u'ENTERPRISE ASP WORKER', 588), (u'ENTERPRISE TECH COORDINATOR', 2), (u'ENTERPRISE TECHNICIAN', 4), (u'ESOL TEACHER', 450), (u'EXTENDED YEAR TEACHER', 59), (u'EXTRA-CURRICULAR ACTIVITIES', 9), (u'FAMILY SERVICES COORDINATOR', 26), (u'FEDERAL PROGRAMS DIRECTOR', 3), (u'FINANCE/BUSINESS PERSONNEL', 101), (u'FINANCE/BUSINESS SERVICE MGR', 50), (u'FOOD SERVICE ADMINISTRATOR', 19), (u'FOOD SERVICE SEC/CLERK/BKKPR', 3), (u'GENERAL ADMIN SECRETARY/CLERK', 175), (u'GIFTED', 295), (u'GIFTED ELEMENTARY TEACHER P-5', 247), (u'GIFTED HIGH ', 102), (u'GRADE 1 TEACHER', 1041), (u'GRADE 10 TEACHER', 59), (u'GRADE 11 TEACHER', 27), (u'GRADE 12 TEACHER', 13), (u'GRADE 2 TEACHER', 1010), (u'GRADE 3 TEACHER', 1036), (u'GRADE 4 TEACHER', 873), (u'GRADE 5 TEACHER', 872), (u'GRADE 6 TEACHER', 239), (u'GRADE 7 TEACHER', 257), (u'GRADE 8 TEACHER', 240), (u'GRADE 9 TEACHER', 35), (u'GRADES 6-8 TEACHER', 1607), (u'GRADES 9-12 TEACHER', 3171), (u'GRADES K-5 TEACHER', 165), (u'GRADUATION SPECIALIST', 63), (u'HEAD START WORKER', 2), (u'HEARING OFFICER', 2), (u'HIGH SCHOOL COUNSELOR', 225), (u'HOSPITAL/HOMEBOUND INSTRUCTOR', 11), (u'HUMAN RESOURCES PERSONNEL', 107), (u'IN-SCHOOL SUSP TEACHER', 69), (u'INFORMATION SERV PERSONNEL', 36), (u'INFORMATION SERVICES CLERK', 3), (u'INSTRUCTIONAL SPECIALIST P-8', 1164), (u'INSTRUCTIONAL SUPERVISOR', 375), (u'INTERPRETER', 26), (u'IS PERSONNEL - FINANCE AND BUSINESS', 23), (u'IS PERSONNEL - FOOD SERVICE', 14), (u'IS PERSONNEL - GENERAL ADMIN', 86), (u'IS PERSONNEL - INSTRUCTION SERV    ', 32), (u'IS PERSONNEL - MAINTENANCE', 3), (u'IS PERSONNEL - OTHER SUPPORT', 16), (u'IS PERSONNEL - SCHOOL ADMIN', 16), (u'IS PERSONNEL - SUPPORT SERV', 75), (u'KINDERGARTEN TEACHER', 1054), (u'LEGAL PERSONNEL', 14), (u'LIBRARIAN/MEDIA SPECIALIST', 342), (u'LIBRARY/MEDIA SECRETARY/CLERK', 1), (u'LIBRARY/MEDIA SUPPORT PARAPRO', 199), (u'LITERACY COACH', 90), (u'LOTTERY PRE-SCHOOL TEACHER', 83), (u'LUNCHROOM MONITOR', 50), (u'MAINTENANCE PERSONNEL', 442), (u'MEMBER, BOARD OF EDUCATION', 26), (u'MIDDLE SCHOOL CAREER, TECHNICAL AND AGRICULTURAL TEACHER', 41), (u'MIDDLE SCHOOL COUNSELOR', 131), (u'MIDDLE SCHOOL EXPLOR TEACHER', 237), (u'MIGRANT EDUCATION RECRUITER', 3), (u'MILITARY SCIENCE TEACHER', 98), (u'MISCELLANEOUS ACTIVITIES', 3013), (u'NURSING ASSISTANT / HEALTH TECH', 104), (u'OCCUPATIONAL THERAPIST ', 84), (u'ORIENT/MOBILITY SPECIALIST', 1), (u'OTHER INSTRUCTIONAL PROVIDER', 82), (u'OTHER TRANSPORTATION', 209), (u'PARAPRO PERSONNEL - PRE-K', 165), (u'PARAPROFESSIONAL/TEACHER AIDE', 1947), (u'PARENT COORDINATOR', 146), (u'PERSONNEL/HUMAN RESOURCES DIR', 17), (u'PHYSICAL THERAPIST ', 24), (u'PLANNING/EVALUATION PERSONNEL', 23), (u'PLANT OPERATIONS DIRECTOR/MGR', 40), (u'PLANT OPERATIONS SEC/CLERK', 2), (u'PRE-K DIRECTOR', 2), (u'PRESCHOOL SPECIAL ED TEACHER', 178), (u'PRINCIPAL', 318), (u'PSYCH-ED PARAPRO/TEACHER AIDE', 51), (u'PSYCHO-ED SCHOOL PSYCHOLOGIST', 2), (u'PSYCHO-ED SOCIAL WORKER', 7), (u'PSYCHO-ED SPEC ED SPECIALIST', 7), (u'PSYCHO-EDUCATIONAL TEACHER', 77), (u'PUBLIC RELATIONS PERSONNEL', 15), (u'REHABILITATION COUNSELOR', 6), (u'RESEARCH PERSONNEL', 24), (u'RVI TEACHER', 58), (u'SCHOOL FOOD SERVICE MANAGER', 143), (u'SCHOOL FOOD SERVICE WORKER', 1849), (u'SCHOOL IMPROVEMENT SPECIALIST', 179), (u'SCHOOL NURSE', 171), (u'SCHOOL NUTRITION MAINTENANCE', 2), (u'SCHOOL PSYCHOLOGIST', 130), (u'SCHOOL PSYCHOMETRIST', 1), (u'SCHOOL SECRETARY/CLERK', 696), (u'SCHOOL SOCIAL WORKER', 124), (u'SECRETARY', 318), (u'SECURITY PERSONNEL/SECURITY OFFICER', 350), (u'SOCIAL SERVICES CASE MANAGER', 2), (u'SOCIAL WORKER ASSISTANT', 13), (u'SPECIAL ED PARAPRO/AIDE', 1674), (u'SPECIAL EDUCATION BUS AIDE', 61), (u'SPECIAL EDUCATION BUS DRIVER', 84), (u'SPECIAL EDUCATION COUNSELOR', 1), (u'SPECIAL EDUCATION DIRECTOR', 6), (u'SPECIAL EDUCATION INTERRELATED', 2261), (u'SPECIAL EDUCATION NURSE', 34), (u'SPECIAL EDUCATION PARAPROFESSIONAL - AGES 3 TO 5', 1), (u'SPECIAL EDUCATION SECRETARY/CLERK', 7), (u'SPECIAL EDUCATION SOCIAL WORKER', 13), (u'SPECIAL EDUCATION SPECIALIST', 95), (u'SPEECH-LANGUAGE PATHOLOGIST', 1268), (u'STAFF DEVELOPMENT SPECIALIST', 17), (u'STUDENT CLERK/AIDE', 20), (u'SUBSTITUTE TEACHER', 3816), (u'SUPERINTENDENT', 3), (u'SUPERINTENDENT SECRETARY', 4), (u'SUPPORT SERV SECRETARY/CLERK', 4), (u'TEACHER FOR DEAF/BLIND STUDENTS', 3), (u'TEACHER OF AUTISTIC STUDENTS', 23), (u'TEACHER OF EMOTIONAL/BEHAVIORAL', 152), (u'TEACHER OF HEARING IMPAIRED STUDENT', 63), (u'TEACHER OF MILD INTELLECTUAL', 146), (u'TEACHER OF MODERATE INTELLECTUAL', 120), (u'TEACHER OF ORTHOPEDIC IMPAIRED', 47), (u'TEACHER OF OTHER HEALTH IMPAIRED', 10), (u'TEACHER OF PROFOUND INTELLECTUAL', 5), (u'TEACHER OF SEVERE INTELLECTUAL', 66), (u'TEACHER OF SPECIFIC LEARNING', 103), (u'TEACHER OF TRAUMATIC BRAIN INJURY', 1), (u'TEACHER OF VISUALLY IMPAIRED', 23), (u'TEACHER SUPPORT SPECIALIST', 204), (u'TECHNICAL INSTITUTE PRESIDENT', 1), (u'TECHNOLOGY DIRECTOR', 18), (u'TECHNOLOGY SPECIALIST', 298), (u'TITLE I DIRECTOR', 4), (u'TRANSPORTATION DIRECTOR/MGR', 19), (u'TRANSPORTATION MECHANIC', 124), (u'TRANSPORTATION SEC/CLERK', 3), (u'VOCATIONAL ', 407), (u'VOCATIONAL DIRECTOR ', 3), (u'VOCATIONAL SUPERVISOR - SCHOOL', 18), (u'WAREHOUSEMAN', 45), (u'YOUTH APPRENTICESHIP DIRECTOR', 2)]

Put It All Together

This can more simply be expressed by chaining these all together.

>>> filteredRecs.map(lambda eR: (eR[1],1)).reduceByKey(lambda a,b: a+b).sortByKey().collect()
[(u'ADAPTED PHYS ED TEACHER', 35), (u'ADULT EDUCATION DIRECTOR/COORD', 19), (u'ADULT EDUCATION TEACHER', 63), (u'AFTER-SCHOOL PROGRAM WORKER', 2), (u'ALTERNATIVE SCHOOL DIRECTOR', 2), (u'ASSISTANT PRINCIPAL', 436), (u'ATHLETICS DIRECTOR', 1), (u'ATTENDANCE WORKER', 12), (u'AUDIOLOGIST', 9), (u'AUDITOR', 11), (u'BOOKKEEPER', 118), (u'BUS DRIVER', 1321), (u'BUSINESS SERV SECRETARY/CLERK', 224), (u'CENTRAL SUPPORT CLERK', 4), (u'CONSTRUCTION MANAGER', 8), (u'CROSSING GUARD', 100), (u'CROSSROADS', 17), (u'CROSSROADS ALT SCHOOL TEACHER', 16), (u'CUSTODIAL PERSONNEL', 1393), (u'DATA CLERK', 98), (u'DEPUTY/ASSOC/ASSISTANT SUPT', 36), (u'DIAGNOSTICIAN', 23), (u'DIRECTOR OF CHILD SERVE', 2), (u'DIRECTOR OF CURRICULUM/INSTR', 31), (u'DIRECTOR OF MEDIA SERVICES', 2), (u'DIRECTOR OF PSYCHO-ED PROG', 1), (u'DIRECTOR OF SCHOOL SAFETY', 3), (u'DIRECTOR OF STUDENT SERVICES', 3), (u'EARLY INTERVENTION PRIMARY TEACHER', 605), (u'EARLY INTERVENTION TEACHER ', 147), (u'EIP 4TH AND 5TH GRADE TEACHER', 185), (u'ELEMENTARY COUNSELOR', 265), (u'ENTERPRISE ASP WORKER', 588), (u'ENTERPRISE TECH COORDINATOR', 2), (u'ENTERPRISE TECHNICIAN', 4), (u'ESOL TEACHER', 450), (u'EXTENDED YEAR TEACHER', 59), (u'EXTRA-CURRICULAR ACTIVITIES', 9), (u'FAMILY SERVICES COORDINATOR', 26), (u'FEDERAL PROGRAMS DIRECTOR', 3), (u'FINANCE/BUSINESS PERSONNEL', 101), (u'FINANCE/BUSINESS SERVICE MGR', 50), (u'FOOD SERVICE ADMINISTRATOR', 19), (u'FOOD SERVICE SEC/CLERK/BKKPR', 3), (u'GENERAL ADMIN SECRETARY/CLERK', 175), (u'GIFTED', 295), (u'GIFTED ELEMENTARY TEACHER P-5', 247), (u'GIFTED HIGH ', 102), (u'GRADE 1 TEACHER', 1041), (u'GRADE 10 TEACHER', 59), (u'GRADE 11 TEACHER', 27), (u'GRADE 12 TEACHER', 13), (u'GRADE 2 TEACHER', 1010), (u'GRADE 3 TEACHER', 1036), (u'GRADE 4 TEACHER', 873), (u'GRADE 5 TEACHER', 872), (u'GRADE 6 TEACHER', 239), (u'GRADE 7 TEACHER', 257), (u'GRADE 8 TEACHER', 240), (u'GRADE 9 TEACHER', 35), (u'GRADES 6-8 TEACHER', 1607), (u'GRADES 9-12 TEACHER', 3171), (u'GRADES K-5 TEACHER', 165), (u'GRADUATION SPECIALIST', 63), (u'HEAD START WORKER', 2), (u'HEARING OFFICER', 2), (u'HIGH SCHOOL COUNSELOR', 225), (u'HOSPITAL/HOMEBOUND INSTRUCTOR', 11), (u'HUMAN RESOURCES PERSONNEL', 107), (u'IN-SCHOOL SUSP TEACHER', 69), (u'INFORMATION SERV PERSONNEL', 36), (u'INFORMATION SERVICES CLERK', 3), (u'INSTRUCTIONAL SPECIALIST P-8', 1164), (u'INSTRUCTIONAL SUPERVISOR', 375), (u'INTERPRETER', 26), (u'IS PERSONNEL - FINANCE AND BUSINESS', 23), (u'IS PERSONNEL - FOOD SERVICE', 14), (u'IS PERSONNEL - GENERAL ADMIN', 86), (u'IS PERSONNEL - INSTRUCTION SERV    ', 32), (u'IS PERSONNEL - MAINTENANCE', 3), (u'IS PERSONNEL - OTHER SUPPORT', 16), (u'IS PERSONNEL - SCHOOL ADMIN', 16), (u'IS PERSONNEL - SUPPORT SERV', 75), (u'KINDERGARTEN TEACHER', 1054), (u'LEGAL PERSONNEL', 14), (u'LIBRARIAN/MEDIA SPECIALIST', 342), (u'LIBRARY/MEDIA SECRETARY/CLERK', 1), (u'LIBRARY/MEDIA SUPPORT PARAPRO', 199), (u'LITERACY COACH', 90), (u'LOTTERY PRE-SCHOOL TEACHER', 83), (u'LUNCHROOM MONITOR', 50), (u'MAINTENANCE PERSONNEL', 442), (u'MEMBER, BOARD OF EDUCATION', 26), (u'MIDDLE SCHOOL CAREER, TECHNICAL AND AGRICULTURAL TEACHER', 41), (u'MIDDLE SCHOOL COUNSELOR', 131), (u'MIDDLE SCHOOL EXPLOR TEACHER', 237), (u'MIGRANT EDUCATION RECRUITER', 3), (u'MILITARY SCIENCE TEACHER', 98), (u'MISCELLANEOUS ACTIVITIES', 3013), (u'NURSING ASSISTANT / HEALTH TECH', 104), (u'OCCUPATIONAL THERAPIST ', 84), (u'ORIENT/MOBILITY SPECIALIST', 1), (u'OTHER INSTRUCTIONAL PROVIDER', 82), (u'OTHER TRANSPORTATION', 209), (u'PARAPRO PERSONNEL - PRE-K', 165), (u'PARAPROFESSIONAL/TEACHER AIDE', 1947), (u'PARENT COORDINATOR', 146), (u'PERSONNEL/HUMAN RESOURCES DIR', 17), (u'PHYSICAL THERAPIST ', 24), (u'PLANNING/EVALUATION PERSONNEL', 23), (u'PLANT OPERATIONS DIRECTOR/MGR', 40), (u'PLANT OPERATIONS SEC/CLERK', 2), (u'PRE-K DIRECTOR', 2), (u'PRESCHOOL SPECIAL ED TEACHER', 178), (u'PRINCIPAL', 318), (u'PSYCH-ED PARAPRO/TEACHER AIDE', 51), (u'PSYCHO-ED SCHOOL PSYCHOLOGIST', 2), (u'PSYCHO-ED SOCIAL WORKER', 7), (u'PSYCHO-ED SPEC ED SPECIALIST', 7), (u'PSYCHO-EDUCATIONAL TEACHER', 77), (u'PUBLIC RELATIONS PERSONNEL', 15), (u'REHABILITATION COUNSELOR', 6), (u'RESEARCH PERSONNEL', 24), (u'RVI TEACHER', 58), (u'SCHOOL FOOD SERVICE MANAGER', 143), (u'SCHOOL FOOD SERVICE WORKER', 1849), (u'SCHOOL IMPROVEMENT SPECIALIST', 179), (u'SCHOOL NURSE', 171), (u'SCHOOL NUTRITION MAINTENANCE', 2), (u'SCHOOL PSYCHOLOGIST', 130), (u'SCHOOL PSYCHOMETRIST', 1), (u'SCHOOL SECRETARY/CLERK', 696), (u'SCHOOL SOCIAL WORKER', 124), (u'SECRETARY', 318), (u'SECURITY PERSONNEL/SECURITY OFFICER', 350), (u'SOCIAL SERVICES CASE MANAGER', 2), (u'SOCIAL WORKER ASSISTANT', 13), (u'SPECIAL ED PARAPRO/AIDE', 1674), (u'SPECIAL EDUCATION BUS AIDE', 61), (u'SPECIAL EDUCATION BUS DRIVER', 84), (u'SPECIAL EDUCATION COUNSELOR', 1), (u'SPECIAL EDUCATION DIRECTOR', 6), (u'SPECIAL EDUCATION INTERRELATED', 2261), (u'SPECIAL EDUCATION NURSE', 34), (u'SPECIAL EDUCATION PARAPROFESSIONAL - AGES 3 TO 5', 1), (u'SPECIAL EDUCATION SECRETARY/CLERK', 7), (u'SPECIAL EDUCATION SOCIAL WORKER', 13), (u'SPECIAL EDUCATION SPECIALIST', 95), (u'SPEECH-LANGUAGE PATHOLOGIST', 1268), (u'STAFF DEVELOPMENT SPECIALIST', 17), (u'STUDENT CLERK/AIDE', 20), (u'SUBSTITUTE TEACHER', 3816), (u'SUPERINTENDENT', 3), (u'SUPERINTENDENT SECRETARY', 4), (u'SUPPORT SERV SECRETARY/CLERK', 4), (u'TEACHER FOR DEAF/BLIND STUDENTS', 3), (u'TEACHER OF AUTISTIC STUDENTS', 23), (u'TEACHER OF EMOTIONAL/BEHAVIORAL', 152), (u'TEACHER OF HEARING IMPAIRED STUDENT', 63), (u'TEACHER OF MILD INTELLECTUAL', 146), (u'TEACHER OF MODERATE INTELLECTUAL', 120), (u'TEACHER OF ORTHOPEDIC IMPAIRED', 47), (u'TEACHER OF OTHER HEALTH IMPAIRED', 10), (u'TEACHER OF PROFOUND INTELLECTUAL', 5), (u'TEACHER OF SEVERE INTELLECTUAL', 66), (u'TEACHER OF SPECIFIC LEARNING', 103), (u'TEACHER OF TRAUMATIC BRAIN INJURY', 1), (u'TEACHER OF VISUALLY IMPAIRED', 23), (u'TEACHER SUPPORT SPECIALIST', 204), (u'TECHNICAL INSTITUTE PRESIDENT', 1), (u'TECHNOLOGY DIRECTOR', 18), (u'TECHNOLOGY SPECIALIST', 298), (u'TITLE I DIRECTOR', 4), (u'TRANSPORTATION DIRECTOR/MGR', 19), (u'TRANSPORTATION MECHANIC', 124), (u'TRANSPORTATION SEC/CLERK', 3), (u'VOCATIONAL ', 407), (u'VOCATIONAL DIRECTOR ', 3), (u'VOCATIONAL SUPERVISOR - SCHOOL', 18), (u'WAREHOUSEMAN', 45), (u'YOUTH APPRENTICESHIP DIRECTOR', 2)]

Minimum Salary by Title

Create a PairRDD of title and salary.

>>> salaryKVPs = filteredRecs.map(lambda eR: (eR[1], float(eR[2])))
>>> salaryKVPs.take(2)
[(u'GRADES 9-12 TEACHER', 52122.1), (u'GRADE 4 TEACHER', 56567.24)]

Incur the shuffle to reduce them by key and figure out the smallest number.

>>> minSalsHandCranked = salaryKVPs.reduceByKey(lambda sal,otherSal: sal if (sal<otherSal) else otherSal)
>>> minSalsHandCranked.take(2)
[(u'SPEECH-LANGUAGE PATHOLOGIST', 2917.23), (u'RVI TEACHER', 27918.14)]

Fortunately, python has us covered already with the max() function that we can leverage instead of that silly anonymous function.

>>> minSals = salaryKVPs.reduceByKey(min)
>>> minSals.take(2)
[(u'SPEECH-LANGUAGE PATHOLOGIST', 2917.23), (u'RVI TEACHER', 27918.14)]

Sort and show all results. As with the other analysis efforts, we see the minimum salary for Audiologists is $36,329.59.

>>> minSals.sortByKey().take(10)
[(u'ADAPTED PHYS ED TEACHER', 19384.24), (u'ADULT EDUCATION DIRECTOR/COORD', 182.4), (u'ADULT EDUCATION TEACHER', 775.2), (u'AFTER-SCHOOL PROGRAM WORKER', 624.04), (u'ALTERNATIVE SCHOOL DIRECTOR', 111199.8), (u'ASSISTANT PRINCIPAL', 3418.53), (u'ATHLETICS DIRECTOR', 122789.04), (u'ATTENDANCE WORKER', 7553.42), (u'AUDIOLOGIST', 36329.59), (u'AUDITOR', 5380.63)]

Put It All Together

>>> filteredRecs.map(lambda eR: (eR[1], float(eR[2]))).reduceByKey(min).sortByKey().take(10)
[(u'ADAPTED PHYS ED TEACHER', 19384.24), (u'ADULT EDUCATION DIRECTOR/COORD', 182.4), (u'ADULT EDUCATION TEACHER', 775.2), (u'AFTER-SCHOOL PROGRAM WORKER', 624.04), (u'ALTERNATIVE SCHOOL DIRECTOR', 111199.8), (u'ASSISTANT PRINCIPAL', 3418.53), (u'ATHLETICS DIRECTOR', 122789.04), (u'ATTENDANCE WORKER', 7553.42), (u'AUDIOLOGIST', 36329.59), (u'AUDITOR', 5380.63)]

Maximum Salary by Title

We already did most of this above, here it is “all together” and switching min for max. We can see that, like before, the max salary for Audiologists is $102,240.46.

>>> filteredRecs.map(lambda eR: (eR[1], float(eR[2]))).reduceByKey(max).sortByKey().take(10)
[(u'ADAPTED PHYS ED TEACHER', 96320.19), (u'ADULT EDUCATION DIRECTOR/COORD', 179041.16), (u'ADULT EDUCATION TEACHER', 60668.84), (u'AFTER-SCHOOL PROGRAM WORKER', 78493.98), (u'ALTERNATIVE SCHOOL DIRECTOR', 127149.12), (u'ASSISTANT PRINCIPAL', 119646.31), (u'ATHLETICS DIRECTOR', 122789.04), (u'ATTENDANCE WORKER', 23392.9), (u'AUDIOLOGIST', 102240.46), (u'AUDITOR', 83811.88)]

Average Salary by Title

This one is a bit more “interesting”… First, we need to roll up the total salary values for each title along with the count for each.

>>> totalSalaryAndCountByTitle = salaryKVPs.aggregateByKey((0,0), lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: (a[0] + b[0], a[1] + b[1]))
>>> totalSalaryAndCountByTitle.take(2)
[(u'SPEECH-LANGUAGE PATHOLOGIST', (40461424.22000003, 1268)), (u'RVI TEACHER', (3884091.3400000003, 58))]

Yes, that does look “interesting”… It took me a bit to figure it all out as the API docs for this function were a bit tough for me to grasp. Thankfully, I found https://stackoverflow.com/questions/29930110/calculating-the-averages-for-each-key-in-a-pairwise-k-v-rdd-in-spark-with-pyth which explains what the a’s and b’s all mean above and I’ll just let you read it from this Q&A as I did and we can discuss it in the comments section of this post if there are still questions.

Then, we need to calculate the average from the total salary values and the count of salaries. This is also described in that last link, but is really just total salaries divided by total number of educators with that title.

>>> avgSals = totalSalaryAndCountByTitle.mapValues(lambda v: v[0]/v[1])
>>> avgSals.take(10)
[(u'SPEECH-LANGUAGE PATHOLOGIST', 31909.640552050496), (u'RVI TEACHER', 66967.09206896552), (u'SPECIAL EDUCATION BUS AIDE', 13871.381803278691), (u'TECHNOLOGY SPECIALIST', 54501.812348993306), (u'SCHOOL IMPROVEMENT SPECIALIST', 79359.73837988825), (u'SPECIAL EDUCATION SECRETARY/CLERK', 32192.98714285714), (u'TEACHER OF EMOTIONAL/BEHAVIORAL', 53380.03302631576), (u'OTHER INSTRUCTIONAL PROVIDER', 18972.18256097561), (u'BUS DRIVER', 21016.957100681313), (u'HOSPITAL/HOMEBOUND INSTRUCTOR', 41357.86272727273)]

The answer at https://stackoverflow.com/questions/46171294/how-can-i-count-the-average-from-spark-rdd shows how this all could look with Scala.

Sort it and we can see an average salary of $73,038.71 for Audiologists like the other frameworks also calculated.

>>> avgSals.sortByKey().take(10)
[(u'ADAPTED PHYS ED TEACHER', 56632.125714285714), (u'ADULT EDUCATION DIRECTOR/COORD', 39572.89157894737), (u'ADULT EDUCATION TEACHER', 19230.814603174604), (u'AFTER-SCHOOL PROGRAM WORKER', 39559.009999999995), (u'ALTERNATIVE SCHOOL DIRECTOR', 119174.45999999999), (u'ASSISTANT PRINCIPAL', 76514.0633944955), (u'ATHLETICS DIRECTOR', 122789.04), (u'ATTENDANCE WORKER', 12826.486666666666), (u'AUDIOLOGIST', 73038.71333333333), (u'AUDITOR', 66145.27090909092)]

Put It All Together

Like before, feel free to pull it all together into one line.

>>> salaryKVPs.aggregateByKey((0,0), lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: (a[0] + b[0], a[1] + b[1])).mapValues(lambda v: v[0]/v[1]).sortByKey().take(10)
[(u'ADAPTED PHYS ED TEACHER', 56632.125714285714), (u'ADULT EDUCATION DIRECTOR/COORD', 39572.89157894737), (u'ADULT EDUCATION TEACHER', 19230.814603174604), (u'AFTER-SCHOOL PROGRAM WORKER', 39559.009999999995), (u'ALTERNATIVE SCHOOL DIRECTOR', 119174.45999999999), (u'ASSISTANT PRINCIPAL', 76514.0633944955), (u'ATHLETICS DIRECTOR', 122789.04), (u'ATTENDANCE WORKER', 12826.486666666666), (u'AUDIOLOGIST', 73038.71333333333), (u'AUDITOR', 66145.27090909092)]

Spark SQL Implementation

When I first was learning Spark I remember the RDD API being described as an “elegant” API. I’m not sure if that’s what I called it back then (or now). (wink) The good news is that for structured datasets like the Format & Sample Data for Open Georgia we have Spark SQL available for our use. More great news is that like working with RDDs, the Spark website has a good Programming Guide for Spark SQL that you can use as a place to start.

Getting a DataFrame

There are multiple ways to create a DataFrame, but I’ll use a common practice with pyspark which infers the schema using reflection to promote an RDD to a DataFrame. To get started, let’s reread the original file again and convert the TSV records into arrays.

>>> allColsAsRDD = sc.textFile("/user/dev1/cleanSalaryTravelReport.tsv").map(lambda val: val.split("\t"))
>>> allColsAsRDD.take(2)
[[u'ABBOTT,DEEDEE W', u'GRADES 9-12 TEACHER', u'52122.1', u'0.0', u'LBOE', u'ATLANTA INDEPENDENT SCHOOL SYSTEM', u'2010'], [u'ABBOTT,RYAN V', u'GRADE 4 TEACHER', u'56567.24', u'0.0', u'LBOE', u'ATLANTA INDEPENDENT SCHOOL SYSTEM', u'2010']]

From here we can create an RDD full of Row objects and then use then have the Spark SQL API convert it up to a DataFrame.

>>> from pyspark.sql import Row
>>> teachersRDD = allColsAsRDD.map(lambda t: Row(name=t[0], title=t[1], salary=float(t[2]), travel=float(t[3]), orgType=t[4], org=t[5], year=int(t[6])))
>>> teachers = spark.createDataFrame(teachersRDD)
>>> teachers.show(10)
+--------------------+--------------------+-------+--------+--------------------+------+----+
|                name|                 org|orgType|  salary|               title|travel|year|
+--------------------+--------------------+-------+--------+--------------------+------+----+
|     ABBOTT,DEEDEE W|ATLANTA INDEPENDE...|   LBOE| 52122.1| GRADES 9-12 TEACHER|   0.0|2010|
|       ABBOTT,RYAN V|ATLANTA INDEPENDE...|   LBOE|56567.24|     GRADE 4 TEACHER|   0.0|2010|
| ABBOUD,CLAUDIA MORA|ATLANTA INDEPENDE...|   LBOE| 63957.5|  GRADES K-5 TEACHER|   0.0|2010|
|ABDUL-JABBAR,KHAD...|ATLANTA INDEPENDE...|   LBOE|16791.73| GRADES 9-12 TEACHER|   0.0|2010|
|ABDUL-RAZACQ,SALA...|ATLANTA INDEPENDE...|   LBOE|45832.92|INSTRUCTIONAL SPE...|   0.0|2010|
|     ABDULLAH,DIANA |ATLANTA INDEPENDE...|   LBOE|10934.94|SPECIAL ED PARAPR...|   0.0|2010|
|  ABDULLAH,NADIYAH W|ATLANTA INDEPENDE...|   LBOE|75109.92|  GRADES 6-8 TEACHER|   0.0|2010|
|ABDULLAH,RHONDALYN Y|ATLANTA INDEPENDE...|   LBOE|28649.34|SPECIAL ED PARAPR...|   0.0|2010|
| ABDULLAH,VALANCIA F|ATLANTA INDEPENDE...|   LBOE|28735.02|SPECIAL ED PARAPR...|   0.0|2010|
|ABDULLAH-MUSA,KWELI |ATLANTA INDEPENDE...|   LBOE|83040.78| SCHOOL PSYCHOLOGIST| 505.2|2010|
+--------------------+--------------------+-------+--------+--------------------+------+----+

Now that looks a bit better than the output of the RDD’s take() method!!

Calculate Statistics via API

Trim down to just educator records for 2010.

>>> filtered = teachers.filter(teachers['orgType'] == 'LBOE').filter(teachers['year'] == 2010)
>>> filtered.show(2)
+---------------+--------------------+-------+--------+-------------------+------+----+
|           name|                 org|orgType|  salary|              title|travel|year|
+---------------+--------------------+-------+--------+-------------------+------+----+
|ABBOTT,DEEDEE W|ATLANTA INDEPENDE...|   LBOE| 52122.1|GRADES 9-12 TEACHER|   0.0|2010|
|  ABBOTT,RYAN V|ATLANTA INDEPENDE...|   LBOE|56567.24|    GRADE 4 TEACHER|   0.0|2010|
+---------------+--------------------+-------+--------+-------------------+------+----+

Now we can just do the grouping and apply some aggregate functions.

>>> from pyspark.sql.functions import min, max, avg, count, col
>>> expr = [count(col("title")),min(col("salary")),max(col("salary")),avg(col("salary"))]
>>> filtered.groupBy("title").agg(*expr).sort("title").show(10)
+--------------------+------------+-----------+-----------+------------------+  
|               title|count(title)|min(salary)|max(salary)|       avg(salary)|
+--------------------+------------+-----------+-----------+------------------+
|ADAPTED PHYS ED T...|          35|   19384.24|   96320.19|56632.125714285714|
|ADULT EDUCATION D...|          19|      182.4|  179041.16| 39572.89157894737|
|ADULT EDUCATION T...|          63|      775.2|   60668.84|19230.814603174604|
|AFTER-SCHOOL PROG...|           2|     624.04|   78493.98|39559.009999999995|
|ALTERNATIVE SCHOO...|           2|   111199.8|  127149.12|119174.45999999999|
| ASSISTANT PRINCIPAL|         436|    3418.53|  119646.31|  76514.0633944955|
|  ATHLETICS DIRECTOR|           1|  122789.04|  122789.04|         122789.04|
|   ATTENDANCE WORKER|          12|    7553.42|    23392.9|12826.486666666666|
|         AUDIOLOGIST|           9|   36329.59|  102240.46| 73038.71333333333|
|             AUDITOR|          11|    5380.63|   83811.88| 66145.27090909092|
+--------------------+------------+-----------+-----------+------------------+

Like before, we can “put it all together” on one line with method chaining, so I won’t bore you with that.

Calculate Statistics via SQL

While the API looks fun for us programmers, we probably want to leave code behind that MANY people can read. You know… so we can get promoted to a better job and not be held back by being the only person who can understand our code! (wink)

So, instead… let’s just write some SQL.

>>> teachers.createOrReplaceTempView("teachers")
>>> spark.sql("SELECT title, count(title), min(salary), max(salary), avg(salary) FROM teachers WHERE orgType = 'LBOE' AND year = 2010 GROUP BY title ORDER BY title").show(10)
+--------------------+------------+-----------+-----------+------------------+  
|               title|count(title)|min(salary)|max(salary)|       avg(salary)|
+--------------------+------------+-----------+-----------+------------------+
|ADAPTED PHYS ED T...|          35|   19384.24|   96320.19|56632.125714285714|
|ADULT EDUCATION D...|          19|      182.4|  179041.16| 39572.89157894737|
|ADULT EDUCATION T...|          63|      775.2|   60668.84|19230.814603174604|
|AFTER-SCHOOL PROG...|           2|     624.04|   78493.98|39559.009999999995|
|ALTERNATIVE SCHOO...|           2|   111199.8|  127149.12|119174.45999999999|
| ASSISTANT PRINCIPAL|         436|    3418.53|  119646.31|  76514.0633944955|
|  ATHLETICS DIRECTOR|           1|  122789.04|  122789.04|         122789.04|
|   ATTENDANCE WORKER|          12|    7553.42|    23392.9|12826.486666666666|
|         AUDIOLOGIST|           9|   36329.59|  102240.46| 73038.71333333333|
|             AUDITOR|          11|    5380.63|   83811.88| 66145.27090909092|
+--------------------+------------+-----------+-----------+------------------+

Now that wasn’t so bad, was it?