Skip to content

Instantly share code, notes, and snippets.

@abh1nav
Created March 16, 2012 12:39
Show Gist options
  • Select an option

  • Save abh1nav/2049902 to your computer and use it in GitHub Desktop.

Select an option

Save abh1nav/2049902 to your computer and use it in GitHub Desktop.

Revisions

  1. abh1nav revised this gist Mar 16, 2012. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions S3.java
    Original file line number Diff line number Diff line change
    @@ -14,7 +14,7 @@
    public class S3 implements IRichBolt {

    /**
    * To hold the tuples generated by B1 and B2
    * To hold the tuples generated by S1 and S2
    */
    private class InputCollector {
    public Tuple s1 = null;
    @@ -63,7 +63,7 @@ public void execute(Tuple input) {
    this.inputs.remove(messageId);
    }

    collector.ack(input);
    this.collector.ack(input);
    }

    private InputCollector getOrCreateInputCollector(String messageId) {
  2. abh1nav created this gist Mar 16, 2012.
    88 changes: 88 additions & 0 deletions S3.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,88 @@
    package com.twitsprout.sproutscore.bolts;

    import java.util.HashMap;
    import java.util.Map;

    import com.google.common.collect.Maps;

    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Tuple;

    public class S3 implements IRichBolt {

    /**
    * To hold the tuples generated by B1 and B2
    */
    private class InputCollector {
    public Tuple s1 = null;
    public Tuple s2 = null;
    }

    private OutputCollector collector;
    private HashMap<String, InputCollector> inputs;

    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(Map stormConf, TopologyContext context,
    OutputCollector collector) {

    this.collector = collector;
    this.inputs = Maps.newHashMap();

    }

    @Override
    public void execute(Tuple input) {

    String messageId = (String) input.getValueByField("messageId");
    InputCollector ic = getOrCreateInputCollector(messageId);

    switch(input.getSourceComponent()) {

    case "BoltIDofS1":
    ic.s1 = input;
    break;

    case "BoltIDofS2":
    ic.s2 = input;
    break;
    }

    if(ic.s1 != null && ic.s2 != null) {
    /**
    * Run your code for S3
    * now that you have both tuples from S1 and S2
    * for the given message Id
    */


    // remember to remove this InputCollector object Map
    this.inputs.remove(messageId);
    }

    collector.ack(input);
    }

    private InputCollector getOrCreateInputCollector(String messageId) {
    InputCollector ic;
    if(this.inputs.containsKey(messageId)) {
    ic = this.inputs.get(messageId);
    }
    else {
    ic = new InputCollector();
    this.inputs.put(messageId, ic);
    }

    return ic;
    }

    @Override
    public void cleanup() {}

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) { }

    }