
package mapreduce; import java.io.IOException; import java.text.ParseException; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.sreejith.loganalyzer.ParseLog; public class LogMapper extends MapperLogReducer.java:{ private static Logger logger = LoggerFactory.getLogger(LogMapper.class); private IntWritable hour = new IntWritable(); private final static IntWritable one = new IntWritable(1); private static Pattern logPattern = Pattern .compile("([^ ]*) ([^ ]*) ([^ ]*) \\[([^]]*)\\]" + " \"([^\"]*)\"" + " ([^ ]*) ([^ ]*).*"); public void map(LongWritable key, Text value, Context context) throws InterruptedException, IOException { logger.info("Mapper started"); String line = value.toString(); Matcher matcher = logPattern.matcher(line); if (matcher.matches()) { String timestamp = matcher.group(4); try { hour.set(ParseLog.getHour(timestamp)); } catch (ParseException e) { logger.warn("Exception", e); } context.write(hour, one); } logger.info("Mapper Completed"); } }
package mapreduce; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class LogReducer extends ReducerLogDriver.java:{ private static Logger logger = LoggerFactory.getLogger(LogReducer.class); public void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { logger.info("Reducer started"); int sum = 0; for (IntWritable value : values) { sum = sum + value.get(); } context.write(key, new IntWritable(sum)); logger.info("Reducer completed"); } }
package mapreduce;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LogDriver {
private static Logger logger = LoggerFactory.getLogger(LogDriver.class);
public static void main(String[] args) throws Exception {
logger.info("Code started");
// Set up the job
Job job = Job.getInstance(); // Use Job.getInstance() instead of Job()
job.setJarByClass(LogDriver.class);
job.setJobName("Log Analyzer");
// Set Mapper and Reducer classes
job.setMapperClass(LogMapper.class);
job.setReducerClass(LogReducer.class);
// Set output key and value classes
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
// Set input and output paths
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Wait for job completion
boolean success = job.waitForCompletion(true);
// Log the result of job completion
if (success) {
logger.info("Job completed successfully");
} else {
logger.error("Job failed");
}
}
}