List of Topics:
Research Breakthrough Possible @S-Logix pro@slogix.in

Office Address

Social List

How to Write a MapReduce Program in Java

MapReduce Program in Java

Steps for Writing a MapReduce Program in Java

  • Description:
    A MapReduce program in Java is designed to process large datasets in parallel across a distributed cluster. The MapReduce framework divides the task into two major phases:
    • Map Phase: Processes input data and converts it into key-value pairs. Each input record is processed by the Mapper.
    • Reduce Phase: Processes intermediate key-value pairs from the Mapper to generate the final output.
Source Code
  • LogMapper.java:
    package mapreduce;
    
    import java.io.IOException;
    import java.text.ParseException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import com.sreejith.loganalyzer.ParseLog;
    
    public class LogMapper extends Mapper {
    
        private static Logger logger = LoggerFactory.getLogger(LogMapper.class);
        private IntWritable hour = new IntWritable();
        private final static IntWritable one = new IntWritable(1);
        private static Pattern logPattern = Pattern
             .compile("([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\]"
                     + " \"([^\"]*)\""
                     + " ([^ ]*) ([^ ]*).*");
    
        public void map(LongWritable key, Text value, Context context)
             throws InterruptedException, IOException {
            logger.info("Mapper started");
            String line = value.toString();
            Matcher matcher = logPattern.matcher(line);
            if (matcher.matches()) {
                String timestamp = matcher.group(4);
                try {
                    hour.set(ParseLog.getHour(timestamp));
                } catch (ParseException e) {
                    logger.warn("Exception", e);
                }
                context.write(hour, one);
            }
            logger.info("Mapper Completed");
        }
    }
                            
    LogReducer.java:
    package mapreduce;
    
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class LogReducer extends Reducer {
    
        private static Logger logger = LoggerFactory.getLogger(LogReducer.class);
    
        public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException {
            logger.info("Reducer started");
            int sum = 0;
            for (IntWritable value : values) {
                sum = sum + value.get();
            }
            context.write(key, new IntWritable(sum));
            logger.info("Reducer completed");
        }
    }
                            
    LogDriver.java:
    package mapreduce;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class LogDriver {
    
        private static Logger logger = LoggerFactory.getLogger(LogDriver.class);
    
        public static void main(String[] args) throws Exception {        
            logger.info("Code started");
    
            // Set up the job
            Job job = Job.getInstance();  // Use Job.getInstance() instead of Job()
            job.setJarByClass(LogDriver.class);
            job.setJobName("Log Analyzer");
    
            // Set Mapper and Reducer classes
            job.setMapperClass(LogMapper.class);
            job.setReducerClass(LogReducer.class);
    
            // Set output key and value classes
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(IntWritable.class);
    
            // Set input and output paths
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // Wait for job completion
            boolean success = job.waitForCompletion(true);
            
            // Log the result of job completion
            if (success) {
                logger.info("Job completed successfully");
            } else {
                logger.error("Job failed");
            }
        }
    }
                            
Screenshots
  • MapReduce Screenshot 1