joining multiple datasets with pig (i/o courtesy of hcatloader & hcatstorer)

My blogging has been drying up lately as I've mostly been focused on trying to add value within the Hortonworks Community Connection (HCC) forums where I ran into this question; https://community.hortonworks.com/questions/50243/pig-inner-join-with-different-keys.html.  This person was having trouble performing an inner join with Pig across four datasets.  This blog post is here to show that this is relatively easy with Pig.

First, let's create some datasets and load them into HDFS.  On these four files, I'll be joining on the first two columns of each so the output will only have three rows.

[root@sandbox hcc]# ls
T1.csv  T2.csv  T3.csv  T4.csv
[root@sandbox hcc]# cat T1.csv 
1,1,t1_A,t1_B
1,2,t1_A,t1_B
1,3,t1_A,t1_B
[root@sandbox hcc]# cat T2.csv 
1,1,t2_C,t2_D
1,2,t2_C,t2_D
1,3,t2_C,t2_D
[root@sandbox hcc]# cat T3.csv 
1,3,t3_E,t3_F
1,2,t3_E,t3_F
1,1,t3_E,t3_F
[root@sandbox hcc]# cat T4.csv 
1,3,t4_G,t4_H
1,2,t4_G,t4_H
1,1,t4_G,t4_H
[root@sandbox hcc]# 
[root@sandbox hcc]# hdfs dfs -mkdir /tmp/hcc
[root@sandbox hcc]# hdfs dfs -put T1.csv T2.csv T3.csv T4.csv /tmp/hcc
[root@sandbox hcc]# hdfs dfs -ls /tmp/hcc
Found 4 items
-rw-r--r--   1 root hdfs         42 2016-08-08 09:47 /tmp/hcc/T1.csv
-rw-r--r--   1 root hdfs         42 2016-08-08 09:47 /tmp/hcc/T2.csv
-rw-r--r--   1 root hdfs         42 2016-08-08 09:47 /tmp/hcc/T3.csv
-rw-r--r--   1 root hdfs         42 2016-08-08 09:47 /tmp/hcc/T4.csv

Since we'll want to leverage HCatLoader (to pick up the schema definitions), let's create & load some Hive tables in a quick way such as described in hadoop mini smoke test (VERY mini) and is shown below.

createLoadTables.hql
CREATE TABLE T1 (k1 int, k2 int, c1 string, c2 string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE;
LOAD DATA INPATH '/tmp/hcc/T1.csv' INTO TABLE T1;
SELECT * FROM T1;

CREATE TABLE T2 (k1 int, k2 int, c1 string, c2 string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE;
LOAD DATA INPATH '/tmp/hcc/T2.csv' INTO TABLE T2;
SELECT * FROM T2;

CREATE TABLE T3 (k1 int, k2 int, c1 string, c2 string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE;
LOAD DATA INPATH '/tmp/hcc/T3.csv' INTO TABLE T3;
SELECT * FROM T3;

CREATE TABLE T4 (k1 int, k2 int, c1 string, c2 string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE;
LOAD DATA INPATH '/tmp/hcc/T4.csv' INTO TABLE T4;
SELECT * FROM T4;

Let's run those commands.

[root@sandbox hcc]# hive -f createLoadTables.hql 

Loading data to table default.t1
Table default.t1 stats: [numFiles=1, totalSize=42]
OK
1    1    t1_A    t1_B
1    2    t1_A    t1_B
1    3    t1_A    t1_B
Time taken: 0.748 seconds, Fetched: 3 row(s)

Loading data to table default.t2
Table default.t2 stats: [numFiles=1, totalSize=42]
OK
1    1    t2_C    t2_D
1    2    t2_C    t2_D
1    3    t2_C    t2_D
Time taken: 0.583 seconds, Fetched: 3 row(s)

Loading data to table default.t3
Table default.t3 stats: [numFiles=1, totalSize=42]
OK
1    3    t3_E    t3_F
1    2    t3_E    t3_F
1    1    t3_E    t3_F
Time taken: 0.607 seconds, Fetched: 3 row(s)

Loading data to table default.t4
Table default.t4 stats: [numFiles=1, totalSize=42]
OK
1    3    t4_G    t4_H
1    2    t4_G    t4_H
1    1    t4_G    t4_H
Time taken: 0.159 seconds, Fetched: 3 row(s)

Now, let's write some Pig Latin to join these things up!!

join.pig
t1 = LOAD 'T1' USING org.apache.hive.hcatalog.pig.HCatLoader();
t2 = LOAD 'T2' USING org.apache.hive.hcatalog.pig.HCatLoader();
t3 = LOAD 'T3' USING org.apache.hive.hcatalog.pig.HCatLoader();
t4 = LOAD 'T4' USING org.apache.hive.hcatalog.pig.HCatLoader();

joined = JOIN t1 BY (k1,k2), t2 BY (k1,k2),
              t3 BY (k1,k2), t4 BY (k1,k2);

DUMP joined;
DESCRIBE joined;

Here's the output of this simple script.

[root@sandbox hcc]# pig -x tez -useHCatalog join.pig

Input(s):
Successfully read 3 records (42 bytes) from: "T3"
Successfully read 3 records (42 bytes) from: "T4"
Successfully read 3 records (42 bytes) from: "T1"
Successfully read 3 records (42 bytes) from: "T2"

Output(s):
Successfully stored 3 records (279 bytes) in: "hdfs://sandbox:8020/tmp/temp-2066780368/tmp1329817571"

(1,1,t1_A,t1_B,1,1,t2_C,t2_D,1,1,t3_E,t3_F,1,1,t4_G,t4_H)
(1,2,t1_A,t1_B,1,2,t2_C,t2_D,1,2,t3_E,t3_F,1,2,t4_G,t4_H)
(1,3,t1_A,t1_B,1,3,t2_C,t2_D,1,3,t3_E,t3_F,1,3,t4_G,t4_H)

joined: {t1::k1: chararray,t1::k2: chararray,t1::c1: chararray,t1::c2: chararray,t2::k1: chararray,t2::k2: chararray,t2::c1: chararray,t2::c2: chararray,t3::k1: chararray,t3::k2: chararray,t3::c1: chararray,t3::c2: chararray,t4::k1: chararray,t4::k2: chararray,t4::c1: chararray,t4::c2: chararray}

As you'll notice the schema's column names gets pretty detailed and basically show the lineage of where they came from with format of origin_alias_name::original_column_name.  If you'd like to store these results in a Hive table, then you will most likely need to project the join results into a relation that has the same schema of the Hive table you want to load into.  Here's a Hive table we can store this into.

createResultsTable.hql
CREATE TABLE JOINED_RESULTS ( k1 int,
    t1c1 string, t1c2 string, t2c1 string, t2c2 string,
    t3c1 string, t3c2 string, t4c1 string, t4c2 string )
PARTITIONED BY ( k2 int )
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
STORED AS TEXTFILE;

You'll notice I created a partition on this table.  I wasn't trying to be nasty; I just wanted to also address the concern of the HCC question I was looking at where this person wanted to store the results into a partitioned table.  In your Pig relation to be loaded into Hive, you'll need to put the partition column(s) at the end of the "regular" columns which creates a somewhat weird looking order of the columns since I used the second join key (i.e. k2) as the partition column.

We can now add the following lines to the earlier join.pig file to project the joined results into something that can be loaded into the new Hive table.

(end of) joinAndSave.pig
prepIt = FOREACH joined GENERATE t1::k1 AS k1,
    t1::c1 AS t1c1, t1::c2 AS t1c2, t2::c1 AS t2c1, t2::c2 AS t2c2,
    t3::c1 AS t3c1, t3::c2 AS t3c2, t4::c1 As t4c1, t4::c2 AS t4c2,
    t1::k2 AS k2;

STORE prepIt INTO 'JOINED_RESULTS' 
    USING org.apache.hive.hcatalog.pig.HCatStorer();

Then we can run this updated Pig script to do the join again and save it into the new Hive (partitioned) table.

[root@sandbox hcc]# pig -x tez -useHCatalog joinAndSave.pig

Input(s):
Successfully read 3 records (42 bytes) from: "T2"
Successfully read 3 records (42 bytes) from: "T1"
Successfully read 3 records (42 bytes) from: "T3"
Successfully read 3 records (42 bytes) from: "T4"

Output(s):
Successfully stored 3 records (180 bytes) in: "JOINED_RESULTS"

We can also verify the results are correct in Hive.

[root@sandbox hcc]# hive -e "SELECT k1, k2, t1c1, t4c2 FROM JOINED_RESULTS"
OK
1    1    t1_A    t4_H
1    2    t1_A    t4_H
1    3    t1_A    t4_H
Time taken: 2.343 seconds, Fetched: 3 row(s)

Since we also made this problem more complicated by introducing a partitioned table, we can also see the underlying partitioned data being spread across three folders as well as one of the partitioned files' contents just to be sure.

[root@sandbox hcc]# hdfs dfs -ls -R /apps/hive/warehouse/joined_results
drwxrwxrwx   - root hdfs          0 2016-08-08 10:46 /apps/hive/warehouse/joined_results/k2=1
-rwxrwxrwx   1 root hdfs          0 2016-08-08 10:46 /apps/hive/warehouse/joined_results/k2=1/_SUCCESS
-rwxrwxrwx   1 root hdfs         60 2016-08-08 10:46 /apps/hive/warehouse/joined_results/k2=1/part-v004-o000-r-00000
drwxrwxrwx   - root hdfs          0 2016-08-08 10:46 /apps/hive/warehouse/joined_results/k2=2
-rwxrwxrwx   1 root hdfs          0 2016-08-08 10:46 /apps/hive/warehouse/joined_results/k2=2/_SUCCESS
-rwxrwxrwx   1 root hdfs         60 2016-08-08 10:46 /apps/hive/warehouse/joined_results/k2=2/part-v004-o000-r-00000
drwxrwxrwx   - root hdfs          0 2016-08-08 10:46 /apps/hive/warehouse/joined_results/k2=3
-rwxrwxrwx   1 root hdfs          0 2016-08-08 10:46 /apps/hive/warehouse/joined_results/k2=3/_SUCCESS
-rwxrwxrwx   1 root hdfs         60 2016-08-08 10:46 /apps/hive/warehouse/joined_results/k2=3/part-v004-o000-r-00000
[root@sandbox hcc]# 
[root@sandbox hcc]# hdfs dfs -cat /apps/hive/warehouse/joined_results/k2=3/part-v004-o000-r-00000
"1","t1_A","t1_B","t2_C","t2_D","t3_E","t3_F","t4_G","t4_H"

https://community.hortonworks.com/questions/2562/appending-to-existing-partition-with-pig.html points out that Pig cannot write to an existing partition in this way and also suggests some approaches to work around that

While it got a tiny bit more complicated at the end when we wanted to save results back into a table (a partitioned one at that!!), the first bit of this post should make it clear that Pig's joining functions are relatively straightforward.  As always, a great place to check out is the docs themselves available at http://pig.apache.org/docs/r0.15.0/basic.html#join-inner and http://pig.apache.org/docs/r0.15.0/basic.html#join-outer.