Skip to content

Instantly share code, notes, and snippets.

@jnatkins
Created November 9, 2012 17:54
Show Gist options
  • Select an option

  • Save jnatkins/4047144 to your computer and use it in GitHub Desktop.

Select an option

Save jnatkins/4047144 to your computer and use it in GitHub Desktop.

Revisions

  1. Jon Natkins created this gist Nov 9, 2012.
    183 changes: 183 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,183 @@
    package com.cloudera.nile.etl;

    import java.io.IOException;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Iterator;
    import java.util.TimeZone;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;

    import org.apache.avro.Schema;
    import org.apache.avro.Schema.Type;
    import org.apache.avro.mapred.AvroCollector;
    import org.apache.avro.mapred.AvroJob;
    import org.apache.avro.mapred.AvroKey;
    import org.apache.avro.mapred.AvroReducer;
    import org.apache.avro.mapred.AvroValue;
    import org.apache.avro.mapred.Pair;
    import org.apache.avro.reflect.ReflectData;
    import org.apache.avro.util.Utf8;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.FileInputFormat;
    import org.apache.hadoop.mapred.FileOutputFormat;
    import org.apache.hadoop.mapred.JobClient;
    import org.apache.hadoop.mapred.JobConf;
    import org.apache.hadoop.mapred.MapReduceBase;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.OutputCollector;
    import org.apache.hadoop.mapred.Reducer;
    import org.apache.hadoop.mapred.Reporter;
    import org.apache.hadoop.mapred.TextInputFormat;
    import org.apache.hadoop.mapred.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;

    import com.cloudera.nile.etl.SessionRecord.SessionEvent;
    import com.google.common.collect.Lists;

    public class NileWebLogProcessor implements Tool {

    private static final String LOG_PATTERN =
    "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:\\. /]+\\s[+\\-]\\d{4})\\] " +
    "\"([^\\s]+) ([^\\s]+) ([^\"]+)\" (\\d{3}) (\\d+) \"([^\"]+)\" " +
    "\"([^\"]+)\" \"([^\"]+)\"";

    private static final String LOG_DATE_FORMAT_STRING = "yyyy/MM/dd HH:mm:ss.SSS Z";
    private static final SimpleDateFormat LOG_DATE_FORMAT = new SimpleDateFormat(LOG_DATE_FORMAT_STRING);
    private static final SimpleDateFormat OUTPUT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    static class HttpdLogParserMapper extends MapReduceBase
    implements Mapper<LongWritable, Text, AvroKey<Utf8>, AvroValue<Utf8>> {

    @Override
    public void map(LongWritable key, Text value,
    OutputCollector<AvroKey<Utf8>, AvroValue<Utf8>> output, Reporter reporter)
    throws IOException {

    Pattern p = Pattern.compile(LOG_PATTERN);
    Matcher matcher = p.matcher(value.toString());
    if (!matcher.matches()) {
    return;
    }

    OUTPUT_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
    Date timestamp = null;
    try {
    timestamp = LOG_DATE_FORMAT.parse(matcher.group(4));
    } catch (ParseException e) {
    throw new IOException("Count not parse log date: " + matcher.group(4), e);
    }
    String outTimestamp = OUTPUT_DATE_FORMAT.format(timestamp);

    Utf8 cookie = new Utf8(matcher.group(12)); // Cookie
    Utf8 outVal = new Utf8(matcher.group(1) + "\t" + // IP addr
    matcher.group(2) + "\t" + // unused
    matcher.group(3) + "\t" + // unused
    outTimestamp + "\t" + // Timestamp
    matcher.group(5) + "\t" + // Method
    matcher.group(6) + "\t" + // Resource
    matcher.group(7) + "\t" + // Protocol
    matcher.group(8) + "\t" + // Response Code
    matcher.group(9) + "\t" + // Response Size
    matcher.group(10) + "\t" + // Referrer
    matcher.group(11)); // User Agent

    output.collect(new AvroKey<Utf8>(cookie), new AvroValue<Utf8>(outVal));
    }
    }

    static class HttpdLogParserReducer
    extends AvroReducer<Utf8, Utf8, Pair<Utf8, SessionRecord>> {

    @Override
    public void reduce(Utf8 key, Iterable<Utf8> values,
    AvroCollector<Pair<Utf8, SessionRecord>> collector,
    Reporter reporter) throws IOException {

    String sessionKey = key + "_";
    SessionRecord record = new SessionRecord();
    record.startTime = "start";
    record.endTime = "end";
    record.cookie = key.toString();
    record.events = Lists.newArrayList();
    int numEvents = 0;
    for (Utf8 value : values) {
    String fields[] = value.toString().split("\t");

    if (numEvents == 0) {
    try {
    sessionKey += OUTPUT_DATE_FORMAT.parse(fields[4]).getTime();
    } catch (ParseException e) {
    e.printStackTrace();
    }
    }

    SessionEvent event = new SessionEvent();
    event.ipAddr = fields[1];
    event.timestamp = fields[4];
    event.method = fields[5];
    event.resource = fields[6];
    event.protocol = fields[7];
    event.responseCode = Integer.valueOf(fields[8]);
    event.responseSize = Integer.valueOf(fields[9]);
    event.referrer = fields[10];
    event.userAgent = fields[11];
    record.events.add(event);

    numEvents++;
    }

    record.numEvents = numEvents;

    Pair<Utf8, SessionRecord> datum =
    new Pair<Utf8, SessionRecord>(new Utf8(sessionKey), record);
    collector.collect(datum);
    }

    }

    @Override
    public void setConf(Configuration conf) {

    }

    @Override
    public Configuration getConf() {
    return null;
    }

    @Override
    public int run(String[] args) throws Exception {
    if (args.length != 2) {
    return -1;
    }
    JobConf jobConf = new JobConf(NileWebLogProcessor.class);
    FileInputFormat.addInputPath(jobConf, new Path(args[0]));
    FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));

    jobConf.setInputFormat(TextInputFormat.class);
    jobConf.setMapperClass(HttpdLogParserMapper.class);

    AvroJob.setReducerClass(jobConf, HttpdLogParserReducer.class);

    Schema stringSchema = Schema.create(Type.STRING);
    Schema recordSchema = ReflectData.get().getSchema(SessionRecord.class);
    Schema outSchema = Pair.getPairSchema(stringSchema, recordSchema);
    AvroJob.setOutputSchema(jobConf, outSchema);

    JobClient.runJob(jobConf);
    return 0;
    }

    public static void main(String[] args) throws Exception {
    int res = ToolRunner.run(new Configuration(),
    new NileWebLogProcessor(),
    args);
    System.exit(res);
    }
    }