As the title suggests, this posting was something I came up with AFTER I published the first three installments of my Open Georgia Analysis way back in 2014. And yes, you might have also noticed I took a long break from blogging about Big Data technologies in 2018 and I’m hoping to change that for 2019. On the other hand, my personal blog had a LOT of fun entries due to a TON of international travel in 2018.

Dataset & Use Case

I decided to clean up the dataset used and just focus on the Simple Open Georgia Use Case, so I ran the cleanup code from use pig to calculate salary statistics for georgia educators (second of a three-part series) to prepare a transformed version of the file which you can find a download link to at the bottom of Preparing Open Georgia Test Data. This will make the Spark code much simpler and more germane to the analysis code I want to present.

On an HDP 3.1.0 cluster I have available, I loaded this file as shown below.

[dev1@ip-172-30-10-1 ~]$ hdfs dfs -ls
Found 1 items
-rw-r--r--   3 dev1 hdfs    7133542 2019-03-09 22:33 cleanSalaryTravelReport.tsv
[dev1@ip-172-30-10-1 ~]$ hdfs dfs -tail cleanSalaryTravelReport.tsv
ZUCKER,STACEY E	PARAPROFESSIONAL/TEACHER AIDE	23387.87	0.0	LBOE	FULTON COUNTY BOARD OF EDUCATION	2012
ZURAS,LINDA D	SPECIAL ED PARAPRO/AIDE	29046.0	0.0	LBOE	FULTON COUNTY BOARD OF EDUCATION	2012
ZVONAR,JESSICA L	GIFTED	41672.9	44.37	LBOE	FULTON COUNTY BOARD OF EDUCATION	2012
ZWEIGEL,RENEE E	SCHOOL SECRETARY/CLERK	42681.23	0.0	LBOE	FULTON COUNTY BOARD OF EDUCATION	2012
[dev1@ip-172-30-10-1 ~]$ 

RDD Implementation

The cluster I am using has Spark 2.3.2 available to me as shown from the pyspark shell I will be using for my Resilient Distributed Dataset (RDD) API example.

[dev1@ip-172-30-10-1 ~]$ pyspark
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.2.3.1.0.0-78
      /_/

Using Python version 2.7.5 (default, Oct 30 2018 23:45:53)
SparkSession available as 'spark'.
>>> 

NOTE: The RDD Programming Guide is a great reference.

Load and Filter

Load up the HDFS file and verify it looks good.

>>> inputRDD = sc.textFile("/user/dev1/cleanSalaryTravelReport.tsv")
>>> inputRDD.take(2)
[u'ABBOTT,DEEDEE W\tGRADES 9-12 TEACHER\t52122.1\t0.0\tLBOE\tATLANTA INDEPENDENT SCHOOL SYSTEM\t2010', u'ABBOTT,RYAN V\tGRADE 4 TEACHER\t56567.24\t0.0\tLBOE\tATLANTA INDEPENDENT SCHOOL SYSTEM\t2010']
>>> inputRDD.count()
76943

Tokenize the tab-separated string to create an array.

>>> arrayRDD = inputRDD.map(lambda val: val.split("\t"))
>>> arrayRDD.take(2)
[[u'ABBOTT,DEEDEE W', u'GRADES 9-12 TEACHER', u'52122.1', u'0.0', u'LBOE', u'ATLANTA INDEPENDENT SCHOOL SYSTEM', u'2010'], [u'ABBOTT,RYAN V', u'GRADE 4 TEACHER', u'56567.24', u'0.0', u'LBOE', u'ATLANTA INDEPENDENT SCHOOL SYSTEM', u'2010']]
>>> arrayRDD.count()
76943

Trim down to just the Local Boards of Education.

>>> justLBOE = arrayRDD.filter(lambda empRec: empRec[4] == 'LBOE')
>>> justLBOE.take(2)
[[u'ABBOTT,DEEDEE W', u'GRADES 9-12 TEACHER', u'52122.1', u'0.0', u'LBOE', u'ATLANTA INDEPENDENT SCHOOL SYSTEM', u'2010'], [u'ABBOTT,RYAN V', u'GRADE 4 TEACHER', u'56567.24', u'0.0', u'LBOE', u'ATLANTA INDEPENDENT SCHOOL SYSTEM', u'2010']]
>>> justLBOE.count()
75029

We just want the 2010 records.

>>> just2010 = justLBOE.filter(lambda empRec: empRec[6] == '2010')
>>> just2010.take(2)
[[u'ABBOTT,DEEDEE W', u'GRADES 9-12 TEACHER', u'52122.1', u'0.0', u'LBOE', u'ATLANTA INDEPENDENT SCHOOL SYSTEM', u'2010'], [u'ABBOTT,RYAN V', u'GRADE 4 TEACHER', u'56567.24', u'0.0', u'LBOE', u'ATLANTA INDEPENDENT SCHOOL SYSTEM', u'2010']]
>>> just2010.count()
44986

Put it All Together

We showed this with a bunch of variables, but normally we would just chain all of these together into one long statement. Additionally, I’m chaining a method to cache these results since we will use them a few times below.

>>> filteredRecs = sc.textFile("/user/dev1/cleanSalaryTravelReport.tsv").map(lambda val: val.split("\t")).filter(lambda empRec: empRec[4] == 'LBOE').filter(lambda empRec: empRec[6] == '2010').cache()
>>> filteredRecs.count()
44986

Number of Employees by Title

To figure out how many people are in each job title we need to treat this a bit like the canonical Word Count problem. We create KVPs of the titles as the keys and hard-code a 1 as the value.

>>> titleKVPs = filteredRecs.map(lambda eR: (eR[1],1))
>>> titleKVPs.take(2)
[(u'GRADES 9-12 TEACHER', 1), (u'GRADE 4 TEACHER', 1)]

Then we can loop through them to present the totals for each of the 181 distinct job titles.

>>> titleTotals = titleKVPs.reduceByKey(lambda a,b: a+b)
>>> titleTotals.take(2)
[(u'SPEECH-LANGUAGE PATHOLOGIST', 1268), (u'RVI TEACHER', 58)]
>>> titleTotals.count()
181

Sort and show all results. As with the other analysis efforts, we see there are 9 Audiologists.

>>> titleTotals.sortByKey().collect()
[(u'ADAPTED PHYS ED TEACHER', 35), (u'ADULT EDUCATION DIRECTOR/COORD', 19), (u'ADULT EDUCATION TEACHER', 63), (u'AFTER-SCHOOL PROGRAM WORKER', 2), (u'ALTERNATIVE SCHOOL DIRECTOR', 2), (u'ASSISTANT PRINCIPAL', 436), (u'ATHLETICS DIRECTOR', 1), (u'ATTENDANCE WORKER', 12), (u'AUDIOLOGIST', 9), (u'AUDITOR', 11), (u'BOOKKEEPER', 118), (u'BUS DRIVER', 1321), (u'BUSINESS SERV SECRETARY/CLERK', 224), (u'CENTRAL SUPPORT CLERK', 4), (u'CONSTRUCTION MANAGER', 8), (u'CROSSING GUARD', 100), (u'CROSSROADS', 17), (u'CROSSROADS ALT SCHOOL TEACHER', 16), (u'CUSTODIAL PERSONNEL', 1393), (u'DATA CLERK', 98), (u'DEPUTY/ASSOC/ASSISTANT SUPT', 36), (u'DIAGNOSTICIAN', 23), (u'DIRECTOR OF CHILD SERVE', 2), (u'DIRECTOR OF CURRICULUM/INSTR', 31), (u'DIRECTOR OF MEDIA SERVICES', 2), (u'DIRECTOR OF PSYCHO-ED PROG', 1), (u'DIRECTOR OF SCHOOL SAFETY', 3), (u'DIRECTOR OF STUDENT SERVICES', 3), (u'EARLY INTERVENTION PRIMARY TEACHER', 605), (u'EARLY INTERVENTION TEACHER ', 147), (u'EIP 4TH AND 5TH GRADE TEACHER', 185), (u'ELEMENTARY COUNSELOR', 265), (u'ENTERPRISE ASP WORKER', 588), (u'ENTERPRISE TECH COORDINATOR', 2), (u'ENTERPRISE TECHNICIAN', 4), (u'ESOL TEACHER', 450), (u'EXTENDED YEAR TEACHER', 59), (u'EXTRA-CURRICULAR ACTIVITIES', 9), (u'FAMILY SERVICES COORDINATOR', 26), (u'FEDERAL PROGRAMS DIRECTOR', 3), (u'FINANCE/BUSINESS PERSONNEL', 101), (u'FINANCE/BUSINESS SERVICE MGR', 50), (u'FOOD SERVICE ADMINISTRATOR', 19), (u'FOOD SERVICE SEC/CLERK/BKKPR', 3), (u'GENERAL ADMIN SECRETARY/CLERK', 175), (u'GIFTED', 295), (u'GIFTED ELEMENTARY TEACHER P-5', 247), (u'GIFTED HIGH ', 102), (u'GRADE 1 TEACHER', 1041), (u'GRADE 10 TEACHER', 59), (u'GRADE 11 TEACHER', 27), (u'GRADE 12 TEACHER', 13), (u'GRADE 2 TEACHER', 1010), (u'GRADE 3 TEACHER', 1036), (u'GRADE 4 TEACHER', 873), (u'GRADE 5 TEACHER', 872), (u'GRADE 6 TEACHER', 239), (u'GRADE 7 TEACHER', 257), (u'GRADE 8 TEACHER', 240), (u'GRADE 9 TEACHER', 35), (u'GRADES 6-8 TEACHER', 1607), (u'GRADES 9-12 TEACHER', 3171), (u'GRADES K-5 TEACHER', 165), (u'GRADUATION SPECIALIST', 63), (u'HEAD START WORKER', 2), (u'HEARING OFFICER', 2), (u'HIGH SCHOOL COUNSELOR', 225), (u'HOSPITAL/HOMEBOUND INSTRUCTOR', 11), (u'HUMAN RESOURCES PERSONNEL', 107), (u'IN-SCHOOL SUSP TEACHER', 69), (u'INFORMATION SERV PERSONNEL', 36), (u'INFORMATION SERVICES CLERK', 3), (u'INSTRUCTIONAL SPECIALIST P-8', 1164), (u'INSTRUCTIONAL SUPERVISOR', 375), (u'INTERPRETER', 26), (u'IS PERSONNEL - FINANCE AND BUSINESS', 23), (u'IS PERSONNEL - FOOD SERVICE', 14), (u'IS PERSONNEL - GENERAL ADMIN', 86), (u'IS PERSONNEL - INSTRUCTION SERV    ', 32), (u'IS PERSONNEL - MAINTENANCE', 3), (u'IS PERSONNEL - OTHER SUPPORT', 16), (u'IS PERSONNEL - SCHOOL ADMIN', 16), (u'IS PERSONNEL - SUPPORT SERV', 75), (u'KINDERGARTEN TEACHER', 1054), (u'LEGAL PERSONNEL', 14), (u'LIBRARIAN/MEDIA SPECIALIST', 342), (u'LIBRARY/MEDIA SECRETARY/CLERK', 1), (u'LIBRARY/MEDIA SUPPORT PARAPRO', 199), (u'LITERACY COACH', 90), (u'LOTTERY PRE-SCHOOL TEACHER', 83), (u'LUNCHROOM MONITOR', 50), (u'MAINTENANCE PERSONNEL', 442), (u'MEMBER, BOARD OF EDUCATION', 26), (u'MIDDLE SCHOOL CAREER, TECHNICAL AND AGRICULTURAL TEACHER', 41), (u'MIDDLE SCHOOL COUNSELOR', 131), (u'MIDDLE SCHOOL EXPLOR TEACHER', 237), (u'MIGRANT EDUCATION RECRUITER', 3), (u'MILITARY SCIENCE TEACHER', 98), (u'MISCELLANEOUS ACTIVITIES', 3013), (u'NURSING ASSISTANT / HEALTH TECH', 104), (u'OCCUPATIONAL THERAPIST ', 84), (u'ORIENT/MOBILITY SPECIALIST', 1), (u'OTHER INSTRUCTIONAL PROVIDER', 82), (u'OTHER TRANSPORTATION', 209), (u'PARAPRO PERSONNEL - PRE-K', 165), (u'PARAPROFESSIONAL/TEACHER AIDE', 1947), (u'PARENT COORDINATOR', 146), (u'PERSONNEL/HUMAN RESOURCES DIR', 17), (u'PHYSICAL THERAPIST ', 24), (u'PLANNING/EVALUATION PERSONNEL', 23), (u'PLANT OPERATIONS DIRECTOR/MGR', 40), (u'PLANT OPERATIONS SEC/CLERK', 2), (u'PRE-K DIRECTOR', 2), (u'PRESCHOOL SPECIAL ED TEACHER', 178), (u'PRINCIPAL', 318), (u'PSYCH-ED PARAPRO/TEACHER AIDE', 51), (u'PSYCHO-ED SCHOOL PSYCHOLOGIST', 2), (u'PSYCHO-ED SOCIAL WORKER', 7), (u'PSYCHO-ED SPEC ED SPECIALIST', 7), (u'PSYCHO-EDUCATIONAL TEACHER', 77), (u'PUBLIC RELATIONS PERSONNEL', 15), (u'REHABILITATION COUNSELOR', 6), (u'RESEARCH PERSONNEL', 24), (u'RVI TEACHER', 58), (u'SCHOOL FOOD SERVICE MANAGER', 143), (u'SCHOOL FOOD SERVICE WORKER', 1849), (u'SCHOOL IMPROVEMENT SPECIALIST', 179), (u'SCHOOL NURSE', 171), (u'SCHOOL NUTRITION MAINTENANCE', 2), (u'SCHOOL PSYCHOLOGIST', 130), (u'SCHOOL PSYCHOMETRIST', 1), (u'SCHOOL SECRETARY/CLERK', 696), (u'SCHOOL SOCIAL WORKER', 124), (u'SECRETARY', 318), (u'SECURITY PERSONNEL/SECURITY OFFICER', 350), (u'SOCIAL SERVICES CASE MANAGER', 2), (u'SOCIAL WORKER ASSISTANT', 13), (u'SPECIAL ED PARAPRO/AIDE', 1674), (u'SPECIAL EDUCATION BUS AIDE', 61), (u'SPECIAL EDUCATION BUS DRIVER', 84), (u'SPECIAL EDUCATION COUNSELOR', 1), (u'SPECIAL EDUCATION DIRECTOR', 6), (u'SPECIAL EDUCATION INTERRELATED', 2261), (u'SPECIAL EDUCATION NURSE', 34), (u'SPECIAL EDUCATION PARAPROFESSIONAL - AGES 3 TO 5', 1), (u'SPECIAL EDUCATION SECRETARY/CLERK', 7), (u'SPECIAL EDUCATION SOCIAL WORKER', 13), (u'SPECIAL EDUCATION SPECIALIST', 95), (u'SPEECH-LANGUAGE PATHOLOGIST', 1268), (u'STAFF DEVELOPMENT SPECIALIST', 17), (u'STUDENT CLERK/AIDE', 20), (u'SUBSTITUTE TEACHER', 3816), (u'SUPERINTENDENT', 3), (u'SUPERINTENDENT SECRETARY', 4), (u'SUPPORT SERV SECRETARY/CLERK', 4), (u'TEACHER FOR DEAF/BLIND STUDENTS', 3), (u'TEACHER OF AUTISTIC STUDENTS', 23), (u'TEACHER OF EMOTIONAL/BEHAVIORAL', 152), (u'TEACHER OF HEARING IMPAIRED STUDENT', 63), (u'TEACHER OF MILD INTELLECTUAL', 146), (u'TEACHER OF MODERATE INTELLECTUAL', 120), (u'TEACHER OF ORTHOPEDIC IMPAIRED', 47), (u'TEACHER OF OTHER HEALTH IMPAIRED', 10), (u'TEACHER OF PROFOUND INTELLECTUAL', 5), (u'TEACHER OF SEVERE INTELLECTUAL', 66), (u'TEACHER OF SPECIFIC LEARNING', 103), (u'TEACHER OF TRAUMATIC BRAIN INJURY', 1), (u'TEACHER OF VISUALLY IMPAIRED', 23), (u'TEACHER SUPPORT SPECIALIST', 204), (u'TECHNICAL INSTITUTE PRESIDENT', 1), (u'TECHNOLOGY DIRECTOR', 18), (u'TECHNOLOGY SPECIALIST', 298), (u'TITLE I DIRECTOR', 4), (u'TRANSPORTATION DIRECTOR/MGR', 19), (u'TRANSPORTATION MECHANIC', 124), (u'TRANSPORTATION SEC/CLERK', 3), (u'VOCATIONAL ', 407), (u'VOCATIONAL DIRECTOR ', 3), (u'VOCATIONAL SUPERVISOR - SCHOOL', 18), (u'WAREHOUSEMAN', 45), (u'YOUTH APPRENTICESHIP DIRECTOR', 2)]

Put It All Together

This can more simply be expressed by chaining these all together.

>>> filteredRecs.map(lambda eR: (eR[1],1)).reduceByKey(lambda a,b: a+b).sortByKey().collect()
[(u'ADAPTED PHYS ED TEACHER', 35), (u'ADULT EDUCATION DIRECTOR/COORD', 19), (u'ADULT EDUCATION TEACHER', 63), (u'AFTER-SCHOOL PROGRAM WORKER', 2), (u'ALTERNATIVE SCHOOL DIRECTOR', 2), (u'ASSISTANT PRINCIPAL', 436), (u'ATHLETICS DIRECTOR', 1), (u'ATTENDANCE WORKER', 12), (u'AUDIOLOGIST', 9), (u'AUDITOR', 11), (u'BOOKKEEPER', 118), (u'BUS DRIVER', 1321), (u'BUSINESS SERV SECRETARY/CLERK', 224), (u'CENTRAL SUPPORT CLERK', 4), (u'CONSTRUCTION MANAGER', 8), (u'CROSSING GUARD', 100), (u'CROSSROADS', 17), (u'CROSSROADS ALT SCHOOL TEACHER', 16), (u'CUSTODIAL PERSONNEL', 1393), (u'DATA CLERK', 98), (u'DEPUTY/ASSOC/ASSISTANT SUPT', 36), (u'DIAGNOSTICIAN', 23), (u'DIRECTOR OF CHILD SERVE', 2), (u'DIRECTOR OF CURRICULUM/INSTR', 31), (u'DIRECTOR OF MEDIA SERVICES', 2), (u'DIRECTOR OF PSYCHO-ED PROG', 1), (u'DIRECTOR OF SCHOOL SAFETY', 3), (u'DIRECTOR OF STUDENT SERVICES', 3), (u'EARLY INTERVENTION PRIMARY TEACHER', 605), (u'EARLY INTERVENTION TEACHER ', 147), (u'EIP 4TH AND 5TH GRADE TEACHER', 185), (u'ELEMENTARY COUNSELOR', 265), (u'ENTERPRISE ASP WORKER', 588), (u'ENTERPRISE TECH COORDINATOR', 2), (u'ENTERPRISE TECHNICIAN', 4), (u'ESOL TEACHER', 450), (u'EXTENDED YEAR TEACHER', 59), (u'EXTRA-CURRICULAR ACTIVITIES', 9), (u'FAMILY SERVICES COORDINATOR', 26), (u'FEDERAL PROGRAMS DIRECTOR', 3), (u'FINANCE/BUSINESS PERSONNEL', 101), (u'FINANCE/BUSINESS SERVICE MGR', 50), (u'FOOD SERVICE ADMINISTRATOR', 19), (u'FOOD SERVICE SEC/CLERK/BKKPR', 3), (u'GENERAL ADMIN SECRETARY/CLERK', 175), (u'GIFTED', 295), (u'GIFTED ELEMENTARY TEACHER P-5', 247), (u'GIFTED HIGH ', 102), (u'GRADE 1 TEACHER', 1041), (u'GRADE 10 TEACHER', 59), (u'GRADE 11 TEACHER', 27), (u'GRADE 12 TEACHER', 13), (u'GRADE 2 TEACHER', 1010), (u'GRADE 3 TEACHER', 1036), (u'GRADE 4 TEACHER', 873), (u'GRADE 5 TEACHER', 872), (u'GRADE 6 TEACHER', 239), (u'GRADE 7 TEACHER', 257), (u'GRADE 8 TEACHER', 240), (u'GRADE 9 TEACHER', 35), (u'GRADES 6-8 TEACHER', 1607), (u'GRADES 9-12 TEACHER', 3171), (u'GRADES K-5 TEACHER', 165), (u'GRADUATION SPECIALIST', 63), (u'HEAD START WORKER', 2), (u'HEARING OFFICER', 2), (u'HIGH SCHOOL COUNSELOR', 225), (u'HOSPITAL/HOMEBOUND INSTRUCTOR', 11), (u'HUMAN RESOURCES PERSONNEL', 107), (u'IN-SCHOOL SUSP TEACHER', 69), (u'INFORMATION SERV PERSONNEL', 36), (u'INFORMATION SERVICES CLERK', 3), (u'INSTRUCTIONAL SPECIALIST P-8', 1164), (u'INSTRUCTIONAL SUPERVISOR', 375), (u'INTERPRETER', 26), (u'IS PERSONNEL - FINANCE AND BUSINESS', 23), (u'IS PERSONNEL - FOOD SERVICE', 14), (u'IS PERSONNEL - GENERAL ADMIN', 86), (u'IS PERSONNEL - INSTRUCTION SERV    ', 32), (u'IS PERSONNEL - MAINTENANCE', 3), (u'IS PERSONNEL - OTHER SUPPORT', 16), (u'IS PERSONNEL - SCHOOL ADMIN', 16), (u'IS PERSONNEL - SUPPORT SERV', 75), (u'KINDERGARTEN TEACHER', 1054), (u'LEGAL PERSONNEL', 14), (u'LIBRARIAN/MEDIA SPECIALIST', 342), (u'LIBRARY/MEDIA SECRETARY/CLERK', 1), (u'LIBRARY/MEDIA SUPPORT PARAPRO', 199), (u'LITERACY COACH', 90), (u'LOTTERY PRE-SCHOOL TEACHER', 83), (u'LUNCHROOM MONITOR', 50), (u'MAINTENANCE PERSONNEL', 442), (u'MEMBER, BOARD OF EDUCATION', 26), (u'MIDDLE SCHOOL CAREER, TECHNICAL AND AGRICULTURAL TEACHER', 41), (u'MIDDLE SCHOOL COUNSELOR', 131), (u'MIDDLE SCHOOL EXPLOR TEACHER', 237), (u'MIGRANT EDUCATION RECRUITER', 3), (u'MILITARY SCIENCE TEACHER', 98), (u'MISCELLANEOUS ACTIVITIES', 3013), (u'NURSING ASSISTANT / HEALTH TECH', 104), (u'OCCUPATIONAL THERAPIST ', 84), (u'ORIENT/MOBILITY SPECIALIST', 1), (u'OTHER INSTRUCTIONAL PROVIDER', 82), (u'OTHER TRANSPORTATION', 209), (u'PARAPRO PERSONNEL - PRE-K', 165), (u'PARAPROFESSIONAL/TEACHER AIDE', 1947), (u'PARENT COORDINATOR', 146), (u'PERSONNEL/HUMAN RESOURCES DIR', 17), (u'PHYSICAL THERAPIST ', 24), (u'PLANNING/EVALUATION PERSONNEL', 23), (u'PLANT OPERATIONS DIRECTOR/MGR', 40), (u'PLANT OPERATIONS SEC/CLERK', 2), (u'PRE-K DIRECTOR', 2), (u'PRESCHOOL SPECIAL ED TEACHER', 178), (u'PRINCIPAL', 318), (u'PSYCH-ED PARAPRO/TEACHER AIDE', 51), (u'PSYCHO-ED SCHOOL PSYCHOLOGIST', 2), (u'PSYCHO-ED SOCIAL WORKER', 7), (u'PSYCHO-ED SPEC ED SPECIALIST', 7), (u'PSYCHO-EDUCATIONAL TEACHER', 77), (u'PUBLIC RELATIONS PERSONNEL', 15), (u'REHABILITATION COUNSELOR', 6), (u'RESEARCH PERSONNEL', 24), (u'RVI TEACHER', 58), (u'SCHOOL FOOD SERVICE MANAGER', 143), (u'SCHOOL FOOD SERVICE WORKER', 1849), (u'SCHOOL IMPROVEMENT SPECIALIST', 179), (u'SCHOOL NURSE', 171), (u'SCHOOL NUTRITION MAINTENANCE', 2), (u'SCHOOL PSYCHOLOGIST', 130), (u'SCHOOL PSYCHOMETRIST', 1), (u'SCHOOL SECRETARY/CLERK', 696), (u'SCHOOL SOCIAL WORKER', 124), (u'SECRETARY', 318), (u'SECURITY PERSONNEL/SECURITY OFFICER', 350), (u'SOCIAL SERVICES CASE MANAGER', 2), (u'SOCIAL WORKER ASSISTANT', 13), (u'SPECIAL ED PARAPRO/AIDE', 1674), (u'SPECIAL EDUCATION BUS AIDE', 61), (u'SPECIAL EDUCATION BUS DRIVER', 84), (u'SPECIAL EDUCATION COUNSELOR', 1), (u'SPECIAL EDUCATION DIRECTOR', 6), (u'SPECIAL EDUCATION INTERRELATED', 2261), (u'SPECIAL EDUCATION NURSE', 34), (u'SPECIAL EDUCATION PARAPROFESSIONAL - AGES 3 TO 5', 1), (u'SPECIAL EDUCATION SECRETARY/CLERK', 7), (u'SPECIAL EDUCATION SOCIAL WORKER', 13), (u'SPECIAL EDUCATION SPECIALIST', 95), (u'SPEECH-LANGUAGE PATHOLOGIST', 1268), (u'STAFF DEVELOPMENT SPECIALIST', 17), (u'STUDENT CLERK/AIDE', 20), (u'SUBSTITUTE TEACHER', 3816), (u'SUPERINTENDENT', 3), (u'SUPERINTENDENT SECRETARY', 4), (u'SUPPORT SERV SECRETARY/CLERK', 4), (u'TEACHER FOR DEAF/BLIND STUDENTS', 3), (u'TEACHER OF AUTISTIC STUDENTS', 23), (u'TEACHER OF EMOTIONAL/BEHAVIORAL', 152), (u'TEACHER OF HEARING IMPAIRED STUDENT', 63), (u'TEACHER OF MILD INTELLECTUAL', 146), (u'TEACHER OF MODERATE INTELLECTUAL', 120), (u'TEACHER OF ORTHOPEDIC IMPAIRED', 47), (u'TEACHER OF OTHER HEALTH IMPAIRED', 10), (u'TEACHER OF PROFOUND INTELLECTUAL', 5), (u'TEACHER OF SEVERE INTELLECTUAL', 66), (u'TEACHER OF SPECIFIC LEARNING', 103), (u'TEACHER OF TRAUMATIC BRAIN INJURY', 1), (u'TEACHER OF VISUALLY IMPAIRED', 23), (u'TEACHER SUPPORT SPECIALIST', 204), (u'TECHNICAL INSTITUTE PRESIDENT', 1), (u'TECHNOLOGY DIRECTOR', 18), (u'TECHNOLOGY SPECIALIST', 298), (u'TITLE I DIRECTOR', 4), (u'TRANSPORTATION DIRECTOR/MGR', 19), (u'TRANSPORTATION MECHANIC', 124), (u'TRANSPORTATION SEC/CLERK', 3), (u'VOCATIONAL ', 407), (u'VOCATIONAL DIRECTOR ', 3), (u'VOCATIONAL SUPERVISOR - SCHOOL', 18), (u'WAREHOUSEMAN', 45), (u'YOUTH APPRENTICESHIP DIRECTOR', 2)]

Minimum Salary by Title

Create a PairRDD of title and salary.

>>> salaryKVPs = filteredRecs.map(lambda eR: (eR[1], float(eR[2])))
>>> salaryKVPs.take(2)
[(u'GRADES 9-12 TEACHER', 52122.1), (u'GRADE 4 TEACHER', 56567.24)]

Incur the shuffle to reduce them by key and figure out the smallest number.

>>> minSalsHandCranked = salaryKVPs.reduceByKey(lambda sal,otherSal: sal if (sal<otherSal) else otherSal)
>>> minSalsHandCranked.take(2)
[(u'SPEECH-LANGUAGE PATHOLOGIST', 2917.23), (u'RVI TEACHER', 27918.14)]

Fortunately, python has us covered already with the max() function that we can leverage instead of that silly anonymous function.

>>> minSals = salaryKVPs.reduceByKey(min)
>>> minSals.take(2)
[(u'SPEECH-LANGUAGE PATHOLOGIST', 2917.23), (u'RVI TEACHER', 27918.14)]

Sort and show all results. As with the other analysis efforts, we see the minimum salary for Audiologists is $36,329.59.

>>> minSals.sortByKey().take(10)
[(u'ADAPTED PHYS ED TEACHER', 19384.24), (u'ADULT EDUCATION DIRECTOR/COORD', 182.4), (u'ADULT EDUCATION TEACHER', 775.2), (u'AFTER-SCHOOL PROGRAM WORKER', 624.04), (u'ALTERNATIVE SCHOOL DIRECTOR', 111199.8), (u'ASSISTANT PRINCIPAL', 3418.53), (u'ATHLETICS DIRECTOR', 122789.04), (u'ATTENDANCE WORKER', 7553.42), (u'AUDIOLOGIST', 36329.59), (u'AUDITOR', 5380.63)]

Put It All Together

>>> filteredRecs.map(lambda eR: (eR[1], float(eR[2]))).reduceByKey(min).sortByKey().take(10)
[(u'ADAPTED PHYS ED TEACHER', 19384.24), (u'ADULT EDUCATION DIRECTOR/COORD', 182.4), (u'ADULT EDUCATION TEACHER', 775.2), (u'AFTER-SCHOOL PROGRAM WORKER', 624.04), (u'ALTERNATIVE SCHOOL DIRECTOR', 111199.8), (u'ASSISTANT PRINCIPAL', 3418.53), (u'ATHLETICS DIRECTOR', 122789.04), (u'ATTENDANCE WORKER', 7553.42), (u'AUDIOLOGIST', 36329.59), (u'AUDITOR', 5380.63)]

Maximum Salary by Title

We already did most of this above, here it is “all together” and switching min for max. We can see that, like before, the max salary for Audiologists is $102,240.46.

>>> filteredRecs.map(lambda eR: (eR[1], float(eR[2]))).reduceByKey(max).sortByKey().take(10)
[(u'ADAPTED PHYS ED TEACHER', 96320.19), (u'ADULT EDUCATION DIRECTOR/COORD', 179041.16), (u'ADULT EDUCATION TEACHER', 60668.84), (u'AFTER-SCHOOL PROGRAM WORKER', 78493.98), (u'ALTERNATIVE SCHOOL DIRECTOR', 127149.12), (u'ASSISTANT PRINCIPAL', 119646.31), (u'ATHLETICS DIRECTOR', 122789.04), (u'ATTENDANCE WORKER', 23392.9), (u'AUDIOLOGIST', 102240.46), (u'AUDITOR', 83811.88)]

Average Salary by Title

This one is a bit more “interesting”… First, we need to roll up the total salary values for each title along with the count for each.

>>> totalSalaryAndCountByTitle = salaryKVPs.aggregateByKey((0,0), lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: (a[0] + b[0], a[1] + b[1]))
>>> totalSalaryAndCountByTitle.take(2)
[(u'SPEECH-LANGUAGE PATHOLOGIST', (40461424.22000003, 1268)), (u'RVI TEACHER', (3884091.3400000003, 58))]

Yes, that does look “interesting”… It took me a bit to figure it all out as the API docs for this function were a bit tough for me to grasp. Thankfully, I found https://stackoverflow.com/questions/29930110/calculating-the-averages-for-each-key-in-a-pairwise-k-v-rdd-in-spark-with-pyth which explains what the a’s and b’s all mean above and I’ll just let you read it from this Q&A as I did and we can discuss it in the comments section of this post if there are still questions.

Then, we need to calculate the average from the total salary values and the count of salaries. This is also described in that last link, but is really just total salaries divided by total number of educators with that title.

>>> avgSals = totalSalaryAndCountByTitle.mapValues(lambda v: v[0]/v[1])
>>> avgSals.take(10)
[(u'SPEECH-LANGUAGE PATHOLOGIST', 31909.640552050496), (u'RVI TEACHER', 66967.09206896552), (u'SPECIAL EDUCATION BUS AIDE', 13871.381803278691), (u'TECHNOLOGY SPECIALIST', 54501.812348993306), (u'SCHOOL IMPROVEMENT SPECIALIST', 79359.73837988825), (u'SPECIAL EDUCATION SECRETARY/CLERK', 32192.98714285714), (u'TEACHER OF EMOTIONAL/BEHAVIORAL', 53380.03302631576), (u'OTHER INSTRUCTIONAL PROVIDER', 18972.18256097561), (u'BUS DRIVER', 21016.957100681313), (u'HOSPITAL/HOMEBOUND INSTRUCTOR', 41357.86272727273)]

The answer at https://stackoverflow.com/questions/46171294/how-can-i-count-the-average-from-spark-rdd shows how this all could look with Scala.

Sort it and we can see an average salary of $73,038.71 for Audiologists like the other frameworks also calculated.

>>> avgSals.sortByKey().take(10)
[(u'ADAPTED PHYS ED TEACHER', 56632.125714285714), (u'ADULT EDUCATION DIRECTOR/COORD', 39572.89157894737), (u'ADULT EDUCATION TEACHER', 19230.814603174604), (u'AFTER-SCHOOL PROGRAM WORKER', 39559.009999999995), (u'ALTERNATIVE SCHOOL DIRECTOR', 119174.45999999999), (u'ASSISTANT PRINCIPAL', 76514.0633944955), (u'ATHLETICS DIRECTOR', 122789.04), (u'ATTENDANCE WORKER', 12826.486666666666), (u'AUDIOLOGIST', 73038.71333333333), (u'AUDITOR', 66145.27090909092)]

Put It All Together

Like before, feel free to pull it all together into one line.

>>> salaryKVPs.aggregateByKey((0,0), lambda a,b: (a[0] + b, a[1] + 1), lambda a,b: (a[0] + b[0], a[1] + b[1])).mapValues(lambda v: v[0]/v[1]).sortByKey().take(10)
[(u'ADAPTED PHYS ED TEACHER', 56632.125714285714), (u'ADULT EDUCATION DIRECTOR/COORD', 39572.89157894737), (u'ADULT EDUCATION TEACHER', 19230.814603174604), (u'AFTER-SCHOOL PROGRAM WORKER', 39559.009999999995), (u'ALTERNATIVE SCHOOL DIRECTOR', 119174.45999999999), (u'ASSISTANT PRINCIPAL', 76514.0633944955), (u'ATHLETICS DIRECTOR', 122789.04), (u'ATTENDANCE WORKER', 12826.486666666666), (u'AUDIOLOGIST', 73038.71333333333), (u'AUDITOR', 66145.27090909092)]

Spark SQL Implementation

When I first was learning Spark I remember the RDD API being described as an “elegant” API. I’m not sure if that’s what I called it back then (or now). (wink) The good news is that for structured datasets like the Format & Sample Data for Open Georgia we have Spark SQL available for our use. More great news is that like working with RDDs, the Spark website has a good Programming Guide for Spark SQL that you can use as a place to start.

Getting a DataFrame

There are multiple ways to create a DataFrame, but I’ll use a common practice with pyspark which infers the schema using reflection to promote an RDD to a DataFrame. To get started, let’s reread the original file again and convert the TSV records into arrays.

>>> allColsAsRDD = sc.textFile("/user/dev1/cleanSalaryTravelReport.tsv").map(lambda val: val.split("\t"))
>>> allColsAsRDD.take(2)
[[u'ABBOTT,DEEDEE W', u'GRADES 9-12 TEACHER', u'52122.1', u'0.0', u'LBOE', u'ATLANTA INDEPENDENT SCHOOL SYSTEM', u'2010'], [u'ABBOTT,RYAN V', u'GRADE 4 TEACHER', u'56567.24', u'0.0', u'LBOE', u'ATLANTA INDEPENDENT SCHOOL SYSTEM', u'2010']]

From here we can create an RDD full of Row objects and then use then have the Spark SQL API convert it up to a DataFrame.

>>> from pyspark.sql import Row
>>> teachersRDD = allColsAsRDD.map(lambda t: Row(name=t[0], title=t[1], salary=float(t[2]), travel=float(t[3]), orgType=t[4], org=t[5], year=int(t[6])))
>>> teachers = spark.createDataFrame(teachersRDD)
>>> teachers.show(10)
+--------------------+--------------------+-------+--------+--------------------+------+----+
|                name|                 org|orgType|  salary|               title|travel|year|
+--------------------+--------------------+-------+--------+--------------------+------+----+
|     ABBOTT,DEEDEE W|ATLANTA INDEPENDE...|   LBOE| 52122.1| GRADES 9-12 TEACHER|   0.0|2010|
|       ABBOTT,RYAN V|ATLANTA INDEPENDE...|   LBOE|56567.24|     GRADE 4 TEACHER|   0.0|2010|
| ABBOUD,CLAUDIA MORA|ATLANTA INDEPENDE...|   LBOE| 63957.5|  GRADES K-5 TEACHER|   0.0|2010|
|ABDUL-JABBAR,KHAD...|ATLANTA INDEPENDE...|   LBOE|16791.73| GRADES 9-12 TEACHER|   0.0|2010|
|ABDUL-RAZACQ,SALA...|ATLANTA INDEPENDE...|   LBOE|45832.92|INSTRUCTIONAL SPE...|   0.0|2010|
|     ABDULLAH,DIANA |ATLANTA INDEPENDE...|   LBOE|10934.94|SPECIAL ED PARAPR...|   0.0|2010|
|  ABDULLAH,NADIYAH W|ATLANTA INDEPENDE...|   LBOE|75109.92|  GRADES 6-8 TEACHER|   0.0|2010|
|ABDULLAH,RHONDALYN Y|ATLANTA INDEPENDE...|   LBOE|28649.34|SPECIAL ED PARAPR...|   0.0|2010|
| ABDULLAH,VALANCIA F|ATLANTA INDEPENDE...|   LBOE|28735.02|SPECIAL ED PARAPR...|   0.0|2010|
|ABDULLAH-MUSA,KWELI |ATLANTA INDEPENDE...|   LBOE|83040.78| SCHOOL PSYCHOLOGIST| 505.2|2010|
+--------------------+--------------------+-------+--------+--------------------+------+----+

Now that looks a bit better than the output of the RDD’s take() method!!

Calculate Statistics via API

Trim down to just educator records for 2010.

>>> filtered = teachers.filter(teachers['orgType'] == 'LBOE').filter(teachers['year'] == 2010)
>>> filtered.show(2)
+---------------+--------------------+-------+--------+-------------------+------+----+
|           name|                 org|orgType|  salary|              title|travel|year|
+---------------+--------------------+-------+--------+-------------------+------+----+
|ABBOTT,DEEDEE W|ATLANTA INDEPENDE...|   LBOE| 52122.1|GRADES 9-12 TEACHER|   0.0|2010|
|  ABBOTT,RYAN V|ATLANTA INDEPENDE...|   LBOE|56567.24|    GRADE 4 TEACHER|   0.0|2010|
+---------------+--------------------+-------+--------+-------------------+------+----+

Now we can just do the grouping and apply some aggregate functions.

>>> from pyspark.sql.functions import min, max, avg, count, col
>>> expr = [count(col("title")),min(col("salary")),max(col("salary")),avg(col("salary"))]
>>> filtered.groupBy("title").agg(*expr).sort("title").show(10)
+--------------------+------------+-----------+-----------+------------------+  
|               title|count(title)|min(salary)|max(salary)|       avg(salary)|
+--------------------+------------+-----------+-----------+------------------+
|ADAPTED PHYS ED T...|          35|   19384.24|   96320.19|56632.125714285714|
|ADULT EDUCATION D...|          19|      182.4|  179041.16| 39572.89157894737|
|ADULT EDUCATION T...|          63|      775.2|   60668.84|19230.814603174604|
|AFTER-SCHOOL PROG...|           2|     624.04|   78493.98|39559.009999999995|
|ALTERNATIVE SCHOO...|           2|   111199.8|  127149.12|119174.45999999999|
| ASSISTANT PRINCIPAL|         436|    3418.53|  119646.31|  76514.0633944955|
|  ATHLETICS DIRECTOR|           1|  122789.04|  122789.04|         122789.04|
|   ATTENDANCE WORKER|          12|    7553.42|    23392.9|12826.486666666666|
|         AUDIOLOGIST|           9|   36329.59|  102240.46| 73038.71333333333|
|             AUDITOR|          11|    5380.63|   83811.88| 66145.27090909092|
+--------------------+------------+-----------+-----------+------------------+

Like before, we can “put it all together” on one line with method chaining, so I won’t bore you with that.

Calculate Statistics via SQL

While the API looks fun for us programmers, we probably want to leave code behind that MANY people can read. You know… so we can get promoted to a better job and not be held back by being the only person who can understand our code! (wink)

So, instead… let’s just write some SQL.

>>> teachers.createOrReplaceTempView("teachers")
>>> spark.sql("SELECT title, count(title), min(salary), max(salary), avg(salary) FROM teachers WHERE orgType = 'LBOE' AND year = 2010 GROUP BY title ORDER BY title").show(10)
+--------------------+------------+-----------+-----------+------------------+  
|               title|count(title)|min(salary)|max(salary)|       avg(salary)|
+--------------------+------------+-----------+-----------+------------------+
|ADAPTED PHYS ED T...|          35|   19384.24|   96320.19|56632.125714285714|
|ADULT EDUCATION D...|          19|      182.4|  179041.16| 39572.89157894737|
|ADULT EDUCATION T...|          63|      775.2|   60668.84|19230.814603174604|
|AFTER-SCHOOL PROG...|           2|     624.04|   78493.98|39559.009999999995|
|ALTERNATIVE SCHOO...|           2|   111199.8|  127149.12|119174.45999999999|
| ASSISTANT PRINCIPAL|         436|    3418.53|  119646.31|  76514.0633944955|
|  ATHLETICS DIRECTOR|           1|  122789.04|  122789.04|         122789.04|
|   ATTENDANCE WORKER|          12|    7553.42|    23392.9|12826.486666666666|
|         AUDIOLOGIST|           9|   36329.59|  102240.46| 73038.71333333333|
|             AUDITOR|          11|    5380.63|   83811.88| 66145.27090909092|
+--------------------+------------+-----------+-----------+------------------+

Now that wasn’t so bad, was it?