Skip to content

Instantly share code, notes, and snippets.

@oza
Created September 28, 2011 08:14
Show Gist options
  • Select an option

  • Save oza/1247325 to your computer and use it in GitHub Desktop.

Select an option

Save oza/1247325 to your computer and use it in GitHub Desktop.
The fluent plugin for hbase.
module Fluent
class HbaseOutput < Fluent::TimeSlicedOutput
Fluent::Plugin.register_output('hbase', self)
def initialize
super
require 'stargate'
end
def configure(conf)
super
if hbase_host = conf['hbase_host']
@hbase_host = hbase_host
unless @hbase_host
raise ConfigError, "'hbase_host' parameter is required on file output"
end
if hbase_table = conf['hbase_table']
@hbase_table = hbase_table
end
unless @base_table
raise ConfigError, "'hbase_table' parameter is required on file output"
end
if hbase_cfamily = conf['hbase_cfamily']
@hbase_cfamily = hbase_cfamily
end
unless @base_cfamily
raise ConfigError, "'hbase_cfamily' parameter is required on file output"
end
end
def start
super
# FIXME : authentication may be required.
@hbase = Stargate::Client.new(@hbase_host)
end
def shutdown
@hbase.close
end
def format(tag, event)
"#{event.record.to_json}\n"
end
def write(chunk)
chunk.read.split("\n").map { |data|
key, value = JSON.parse(data)
@hbase.client.create_row(@hbase_table, key, Time.now.to_i, value)
}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment