package mapreduce; import java.io.IOException; import java.text.ParseException; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.sreejith.loganalyzer.ParseLog; public class LogMapper extends MapperLogReducer.java:{ private static Logger logger = LoggerFactory.getLogger(LogMapper.class); private IntWritable hour = new IntWritable(); private final static IntWritable one = new IntWritable(1); private static Pattern logPattern = Pattern .compile("([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\]" + " \"([^\"]*)\"" + " ([^ ]*) ([^ ]*).*"); public void map(LongWritable key, Text value, Context context) throws InterruptedException, IOException { logger.info("Mapper started"); String line = value.toString(); Matcher matcher = logPattern.matcher(line); if (matcher.matches()) { String timestamp = matcher.group(4); try { hour.set(ParseLog.getHour(timestamp)); } catch (ParseException e) { logger.warn("Exception", e); } context.write(hour, one); } logger.info("Mapper Completed"); } }
package mapreduce; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class LogReducer extends ReducerLogDriver.java:{ private static Logger logger = LoggerFactory.getLogger(LogReducer.class); public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { logger.info("Reducer started"); int sum = 0; for (IntWritable value : values) { sum = sum + value.get(); } context.write(key, new IntWritable(sum)); logger.info("Reducer completed"); } }
package mapreduce; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class LogDriver { private static Logger logger = LoggerFactory.getLogger(LogDriver.class); public static void main(String[] args) throws Exception { logger.info("Code started"); // Set up the job Job job = Job.getInstance(); // Use Job.getInstance() instead of Job() job.setJarByClass(LogDriver.class); job.setJobName("Log Analyzer"); // Set Mapper and Reducer classes job.setMapperClass(LogMapper.class); job.setReducerClass(LogReducer.class); // Set output key and value classes job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); // Set input and output paths FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // Wait for job completion boolean success = job.waitForCompletion(true); // Log the result of job completion if (success) { logger.info("Job completed successfully"); } else { logger.error("Job failed"); } } }