How to sort the data using MapReduce

Description

The sorting of large amount of data is done in shuffle-sort phase of a MapReduce task. The below source code describes the sorting of second column value in the csv file. In Mapper class, the Map function that reads the input rows and produces an output pair with key and value. Key is represented in second column and value is represented in first column. After the Map phase is completed, the key-value pairs are sorted by keys using shuffle-sort phase. Sorting of data automatically occurs between the map and reduce phase during which the comparator is used to determine the order of keys and their corresponding lists of values and the sorted key value pairs are fed into the reduce function. The reduce function aggregates the input data and writes into output directory.

import java.nio.ByteBuffer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.WritableComparator;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SortValues {

public static void main(String[] args) throws Exception {

Path inputPath = new Path(“hdfs://localhost:54310/home/sortinput”);

Path outputDir = new Path(“hdfs://localhost:54310/home/sortoutput”);

// Create configuration

Configuration conf = new Configuration();

// Create job

Job job = new Job(conf, “Sort the Numbers”);

job.setJarByClass(SortValues .class);

// Setup MapReduce

job.setMapperClass(MapTask.class);

job.setReducerClass(ReduceTask.class);

job.setNumReduceTasks(1);

// Specify key / value

job.setMapOutputKeyClass(IntWritable.class);

job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(IntWritable.class);

job.setOutputValueClass(IntWritable.class);

//job.setSortComparatorClass(IntComparator.class);

// Input

FileInputFormat.addInputPath(job, inputPath);

job.setInputFormatClass(TextInputFormat.class);

// Output

FileOutputFormat.setOutputPath(job, outputDir);

job.setOutputFormatClass(TextOutputFormat.class);

// Execute job

int code = job.waitForCompletion(true) ? 0 : 1;

System.exit(code);

}

public static class MapTask extends

Mapper<LongWritable, Text, IntWritable, IntWritable> {

public void map(LongWritable key, Text value, Context context)

throws java.io.IOException, InterruptedException {

String line = value.toString();

String[] tokens = line.split(“,”); // This is the delimiter between

int keypart = Integer.parseInt(tokens[0]);

int valuePart = Integer.parseInt(tokens[1]);

context.write(new IntWritable(valuePart), new IntWritable(keypart));

}

}

public static class ReduceTask extends

Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

public void reduce(IntWritable key, Iterable<IntWritable> list, Context context)

throws java.io.IOException, InterruptedException {

for (IntWritable value : list) {

context.write(value, key);

}}}}

Input

Output

[/vc_row]

I am text block. Click edit button to change this text. Lorem ipsum dolor sit amet, consectetur adipiscing elit. Ut elit tellus, luctus nec ullamcorper mattis, pulvinar dapibus leo.

Leave Comment

Your email address will not be published. Required fields are marked *

clear formSubmit