package com.cloudera.nile.etl; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; import java.util.TimeZone; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; import org.apache.avro.mapred.AvroCollector; import org.apache.avro.mapred.AvroJob; import org.apache.avro.mapred.AvroKey; import org.apache.avro.mapred.AvroReducer; import org.apache.avro.mapred.AvroValue; import org.apache.avro.mapred.Pair; import org.apache.avro.reflect.ReflectData; import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.cloudera.nile.etl.SessionRecord.SessionEvent; import com.google.common.collect.Lists; public class NileWebLogProcessor implements Tool { private static final String LOG_PATTERN = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:\\. /]+\\s[+\\-]\\d{4})\\] " + "\"([^\\s]+) ([^\\s]+) ([^\"]+)\" (\\d{3}) (\\d+) \"([^\"]+)\" " + "\"([^\"]+)\" \"([^\"]+)\""; private static final String LOG_DATE_FORMAT_STRING = "yyyy/MM/dd HH:mm:ss.SSS Z"; private static final SimpleDateFormat LOG_DATE_FORMAT = new SimpleDateFormat(LOG_DATE_FORMAT_STRING); private static final SimpleDateFormat OUTPUT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); static class HttpdLogParserMapper extends MapReduceBase implements Mapper, AvroValue> { @Override public void map(LongWritable key, Text value, OutputCollector, AvroValue> output, Reporter reporter) throws IOException { Pattern p = Pattern.compile(LOG_PATTERN); Matcher matcher = p.matcher(value.toString()); if (!matcher.matches()) { return; } OUTPUT_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC")); Date timestamp = null; try { timestamp = LOG_DATE_FORMAT.parse(matcher.group(4)); } catch (ParseException e) { throw new IOException("Count not parse log date: " + matcher.group(4), e); } String outTimestamp = OUTPUT_DATE_FORMAT.format(timestamp); Utf8 cookie = new Utf8(matcher.group(12)); // Cookie Utf8 outVal = new Utf8(matcher.group(1) + "\t" + // IP addr matcher.group(2) + "\t" + // unused matcher.group(3) + "\t" + // unused outTimestamp + "\t" + // Timestamp matcher.group(5) + "\t" + // Method matcher.group(6) + "\t" + // Resource matcher.group(7) + "\t" + // Protocol matcher.group(8) + "\t" + // Response Code matcher.group(9) + "\t" + // Response Size matcher.group(10) + "\t" + // Referrer matcher.group(11)); // User Agent output.collect(new AvroKey(cookie), new AvroValue(outVal)); } } static class HttpdLogParserReducer extends AvroReducer> { @Override public void reduce(Utf8 key, Iterable values, AvroCollector> collector, Reporter reporter) throws IOException { String sessionKey = key + "_"; SessionRecord record = new SessionRecord(); record.startTime = "start"; record.endTime = "end"; record.cookie = key.toString(); record.events = Lists.newArrayList(); int numEvents = 0; for (Utf8 value : values) { String fields[] = value.toString().split("\t"); if (numEvents == 0) { try { sessionKey += OUTPUT_DATE_FORMAT.parse(fields[4]).getTime(); } catch (ParseException e) { e.printStackTrace(); } } SessionEvent event = new SessionEvent(); event.ipAddr = fields[1]; event.timestamp = fields[4]; event.method = fields[5]; event.resource = fields[6]; event.protocol = fields[7]; event.responseCode = Integer.valueOf(fields[8]); event.responseSize = Integer.valueOf(fields[9]); event.referrer = fields[10]; event.userAgent = fields[11]; record.events.add(event); numEvents++; } record.numEvents = numEvents; Pair datum = new Pair(new Utf8(sessionKey), record); collector.collect(datum); } } @Override public void setConf(Configuration conf) { } @Override public Configuration getConf() { return null; } @Override public int run(String[] args) throws Exception { if (args.length != 2) { return -1; } JobConf jobConf = new JobConf(NileWebLogProcessor.class); FileInputFormat.addInputPath(jobConf, new Path(args[0])); FileOutputFormat.setOutputPath(jobConf, new Path(args[1])); jobConf.setInputFormat(TextInputFormat.class); jobConf.setMapperClass(HttpdLogParserMapper.class); AvroJob.setReducerClass(jobConf, HttpdLogParserReducer.class); Schema stringSchema = Schema.create(Type.STRING); Schema recordSchema = ReflectData.get().getSchema(SessionRecord.class); Schema outSchema = Pair.getPairSchema(stringSchema, recordSchema); AvroJob.setOutputSchema(jobConf, outSchema); JobClient.runJob(jobConf); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new NileWebLogProcessor(), args); System.exit(res); } }