For the purpose of this article, consider that the HDB table "employee_expenditure" holds information about an employee's expenses for every month. In this case, it is necessary to develop a Map Reduce job which reads data from the table and provides the total amount of expenditures for each employee.
Note: An integrated development environment (IDE) environment such as Eclipse is recommended. It will make your life easy because not everyone is an expert in JAVA.
Follow the steps below to to design a MapReduce job to read data from a HDB table:
1. Create a HDB table "employee_expenditure" and populate some data.
CREATE TABLE employee_expenditure (employee_id int, expenditure_amt int, expenditure_month text) DISTRIBUTED RANDOMLY; insert into employee_expenditure values (1, 20000, 'Jan'); insert into employee_expenditure values (2, 10000, 'Jan'); insert into employee_expenditure values (1, 15000, 'Feb'); insert into employee_expenditure values (2, 10000, 'Feb'); insert into employee_expenditure values (3, 500, 'Mar'); insert into employee_expenditure values (1, 20000, 'Mar'); select * from employee_expenditure; employee_id | expenditure_amt | expenditure_month -------------+-----------------+------------------- 1 | 20000 | Jan 2 | 10000 | Jan 1 | 15000 | Feb 2 | 10000 | Feb 3 | 500 | Mar 1 | 20000 | Mar
2. Create a Java Project, let's name it "HAWQMapReduceProgramming". On Eclipse, go to : File > New > Java Project.
3. Create a folder named "lib", this folder will hold all the required Hadoop jar's for the Map Reduce Program. On Eclipse, go to: File > New > Folder.
4. Import all the required libraries to the "lib" folder from your local filesystem. We need the below jars.
/usr/lib/gphd/hadoop/ /usr/lib/gphd/hadoop/lib
5. Create a package, let's say com.HAWQMapReduce.apps, under which we will create the required class. It is good practice to put all the required classes into a package. On Eclipse, go to: File > New > Package.
6. Create a Java Class, let's say AggregatedExpenditure, under the package com.HAWQMapReduce.apps.
7. Write Map Reduce code to implement aggregation of the expenditure made by each employee respectively.
//Below is the package name package com.HAWQMapReduce.apps; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //The below are the classes added for HAWQ specific implementation import com.pivotal.hawq.mapreduce.HAWQException; import com.pivotal.hawq.mapreduce.HAWQInputFormat; import com.pivotal.hawq.mapreduce.HAWQRecord; //From below the class definition starts public class AggregateExpenditure extends Configured implements Tool { /*Name of the Map class is HAWQMapper which extends Mapper. As part of the mapper, we will just read the employee_id and expenditure_amt column from HAWQ table employee_expenditure and pass them to the Reducer class for aggregation */ public static class HAWQMapper extends Mapper<Object,HAWQRecord,IntWritable,IntWritable> { protected void setup(Mapper<Object,HAWQRecord, IntWritable,IntWritable>.Context context) throws IOException, InterruptedException { } public void map (Object Key, HAWQRecord value, Context context) throws IOException, InterruptedException { int employeeidToGroup=0; int expenditureToAggregate=0; //Here we are reading data from the HAWQ table, where indices like 1,2 refer to column 1 and 2 in the HAWQ table respectively try { employeeidToGroup=value.getInt(1); expenditureToAggregate=value.getInt(2); } catch (HAWQException hawqE){ throw new IOException(hawqE.getMessage()); } context.write(new IntWritable(employeeidToGroup), new IntWritable(expenditureToAggregate)); } } //Below is the reducer class, which receives key and value from mapper above, it iterates over the values for a same key and keeps adding them. public static class HAWQReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(IntWritable key,Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum=0; for (IntWritable val: values) { sum +=val.get(); } result.set(sum); context.write(key,result); } } //Here it the main class from where the code execution starts. public static void main (String[] args) throws Exception { int res=ToolRunner.run(new Configuration(), new AggregateExpenditure(),args); System.exit(res); } public int run(String[] args) throws Exception { if(args.length < 3) { System.out.println("The argument's for the jar are: <hostname:port/database> <table_name> <output_path> [username][password]"); return 2; } //The below properties defines the configuration of the Map Reduce program, it the controlling body for the Map Reduce program. Job job=Job.getInstance(getConf()); job.setJobName("hawq-mapreduce"); job.setJarByClass(AggregateExpenditure.class); job.setMapperClass(HAWQMapper.class); job.setReducerClass(HAWQReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); String database_url=args[0]; String table_name=args[1]; String output_path=args[2]; String username=null; if(args.length > 3) { username=args[3]; } String password=null; if(args.length > 4) { password=args[4]; } job.setInputFormatClass(HAWQInputFormat.class); HAWQInputFormat.setInput(job.getConfiguration(), database_url,username, password, table_name); FileOutputFormat.setOutputPath(job, new Path(output_path)); int res = job.waitForCompletion(true) ? 0 : 1; return res; } }
8. Export the Map Reduce program created above to a jar file, let's say HAWQAggregateExpenditure.jar. In Eclipse, go to: File > Export, and chose Java > JAR file.
9. Set the value for LIBJARS, HADOOP_CLASSPATH and then execute the jar.
a. Export the required variables:
echo export LIBJARS=/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-hadoop-javadoc.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-ao.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-common.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-tool.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/lib/postgresql-9.2-1003-jdbc4.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/lib/snakeyaml-1.12.jar export HADOOP_CLASSPATH=/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-hadoop-javadoc.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-ao.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-common.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/hawq-mapreduce-tool.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/lib/postgresql-9.2-1003-jdbc4.jar,/usr/local/hawq/lib/postgresql/hawq-mr-io/lib/snakeyaml-1.12.jar
b. Execute the jar file:
hadoop jar /binary/HAWQAggregateExpenditure.jar com.HAWQMapReduce.apps.AggregateExpenditure -libjars $LIBJARS localhost:5432/gpadmin employee_expenditure /tmp/expenditure where: Name of the jar file created :-------------> /binary/HAWQAggregateExpenditure.jar Name of the class file :-------------------> com.HAWQMapReduce.apps.AggregateExpenditure" Flag to support additional jar files: -----> -libjars Database Login URL :-----------------------> localhost:5432/gpadmin Table Name :-------------------------------> employee_expenditure HDFS Output dir:---------------------------> /tmp/expenditure
10. Verify the data in the /tmp/expenditure directory.
11. Note: This step is Optional. Create a table using PXF to view the output using SQL.
CREATE EXTERNAL TABLE aggregate_expenditure (employee_id int, expenditure_aggregate int) LOCATION ('pxf://phd11-nn.saturn.local:50070/tmp/expenditure/?Fragmenter=HdfsDataFragmenter&Accessor=LineReaderAccessor&Resolver=StringPassResolver') FORMAT 'TEXT' (DELIMITER E'\t'); [gpadmin@phd11-client ~]$ psql -c "SELECT * FROM aggregate_expenditure;" employee_id | expenditure_aggregate