Created
September 28, 2011 08:14
-
-
Save oza/1247325 to your computer and use it in GitHub Desktop.
Revisions
-
oza renamed this gist
Sep 28, 2011 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
oza revised this gist
Sep 28, 2011 . 1 changed file with 37 additions and 25 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,57 +1,69 @@ module Fluent class HbaseOutput < Fluent::BufferedOutput Fluent::Plugin.register_output('hbase', self) def initialize super require 'rubygems' require 'stargate' end def configure(conf) super if host = conf['host'] @host = host unless @host raise ConfigError, "'host' parameter is required on file output" end if table = conf['table'] @table = table end unless @table raise ConfigError, "'hbase_table' parameter is required on file output" end if @localtime @formatter = Proc.new {|tag,event| "#{Time.at(event.time).iso8601}\t#{tag}\t#{event.record.to_json}\n" } else @formatter = Proc.new {|tag,event| "#{Time.at(event.time).utc.iso8601}\t#{tag}\t#{event.record.to_json}\n" } end end def start super # FIXME : authentication may be required. @client = Stargate::Client.new(@host) end def shutdown @client.close end def format(tag, event) @formatter.call(tag, event) end def write(chunk) # TODO : Error handling records = [] chunk.split('\n') { |element| key, cf, json_record = element.split('\t') p json_record record = JSON.parse(json_val).each { |k, v| k = cf + ":" + k } p record @client.create_row(@table, key, Time.now.to_i, record) } end end -
oza revised this gist
Sep 28, 2011 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -51,7 +51,7 @@ def format(tag, event) def write(chunk) chunk.read.split("\n").map { |data| key, value = JSON.parse(data) @hbase.client.create_row(@hbase_table, key, Time.now.to_i, value) } end end -
oza revised this gist
Sep 28, 2011 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -52,6 +52,6 @@ def write(chunk) chunk.read.split("\n").map { |data| key, value = JSON.parse(data) @hbase.client.create_row(@hbase_table, key, Time.now.to_i, data) } end end -
oza revised this gist
Sep 28, 2011 . 1 changed file with 0 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -53,7 +53,5 @@ def write(chunk) key, value = JSON.parse(data) @hbase.client.create_row(@hbase_table, key, Time.now.to_i, data) } end end -
oza revised this gist
Sep 28, 2011 . 1 changed file with 5 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -49,8 +49,11 @@ def format(tag, event) end def write(chunk) chunk.read.split("\n").map { |data| key, value = JSON.parse(data) @hbase.client.create_row(@hbase_table, key, Time.now.to_i, data) } end } end end -
oza created this gist
Sep 28, 2011 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,56 @@ module Fluent class HbaseOutput < Fluent::TimeSlicedOutput Fluent::Plugin.register_output('hbase', self) def initialize super require 'stargate' end def configure(conf) super if hbase_host = conf['hbase_host'] @hbase_host = hbase_host unless @hbase_host raise ConfigError, "'hbase_host' parameter is required on file output" end if hbase_table = conf['hbase_table'] @hbase_table = hbase_table end unless @base_table raise ConfigError, "'hbase_table' parameter is required on file output" end if hbase_cfamily = conf['hbase_cfamily'] @hbase_cfamily = hbase_cfamily end unless @base_cfamily raise ConfigError, "'hbase_cfamily' parameter is required on file output" end end def start super # FIXME : authentication may be required. @hbase = Stargate::Client.new(@hbase_host) end def shutdown @hbase.close end def format(tag, event) "#{event.record.to_json}\n" end def write(chunk) chunk.read.split("\n").map { |data| @hbase.client.create_row(@hbase_table, @hbase_cfamily, Time.now.to_i, JSON.parse(data)) } end end