Created
March 16, 2012 12:39
-
-
Save abh1nav/2049902 to your computer and use it in GitHub Desktop.
Revisions
-
abh1nav revised this gist
Mar 16, 2012 . 1 changed file with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -14,7 +14,7 @@ public class S3 implements IRichBolt { /** * To hold the tuples generated by S1 and S2 */ private class InputCollector { public Tuple s1 = null; @@ -63,7 +63,7 @@ public void execute(Tuple input) { this.inputs.remove(messageId); } this.collector.ack(input); } private InputCollector getOrCreateInputCollector(String messageId) { -
abh1nav created this gist
Mar 16, 2012 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,88 @@ package com.twitsprout.sproutscore.bolts; import java.util.HashMap; import java.util.Map; import com.google.common.collect.Maps; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; public class S3 implements IRichBolt { /** * To hold the tuples generated by B1 and B2 */ private class InputCollector { public Tuple s1 = null; public Tuple s2 = null; } private OutputCollector collector; private HashMap<String, InputCollector> inputs; @SuppressWarnings("rawtypes") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.inputs = Maps.newHashMap(); } @Override public void execute(Tuple input) { String messageId = (String) input.getValueByField("messageId"); InputCollector ic = getOrCreateInputCollector(messageId); switch(input.getSourceComponent()) { case "BoltIDofS1": ic.s1 = input; break; case "BoltIDofS2": ic.s2 = input; break; } if(ic.s1 != null && ic.s2 != null) { /** * Run your code for S3 * now that you have both tuples from S1 and S2 * for the given message Id */ // remember to remove this InputCollector object Map this.inputs.remove(messageId); } collector.ack(input); } private InputCollector getOrCreateInputCollector(String messageId) { InputCollector ic; if(this.inputs.containsKey(messageId)) { ic = this.inputs.get(messageId); } else { ic = new InputCollector(); this.inputs.put(messageId, ic); } return ic; } @Override public void cleanup() {} @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }