Skip to content

Instantly share code, notes, and snippets.

@oza
Created September 28, 2011 08:14
Show Gist options
  • Select an option

  • Save oza/1247325 to your computer and use it in GitHub Desktop.

Select an option

Save oza/1247325 to your computer and use it in GitHub Desktop.

Revisions

  1. oza renamed this gist Sep 28, 2011. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. oza revised this gist Sep 28, 2011. 1 changed file with 37 additions and 25 deletions.
    62 changes: 37 additions & 25 deletions out_hbase.rb
    Original file line number Diff line number Diff line change
    @@ -1,57 +1,69 @@
    module Fluent

    class HbaseOutput < Fluent::TimeSlicedOutput

    class HbaseOutput < Fluent::BufferedOutput
    Fluent::Plugin.register_output('hbase', self)

    def initialize
    super
    require 'rubygems'
    require 'stargate'
    end
    end

    def configure(conf)
    super

    if hbase_host = conf['hbase_host']
    @hbase_host = hbase_host
    unless @hbase_host
    raise ConfigError, "'hbase_host' parameter is required on file output"
    if host = conf['host']
    @host = host
    unless @host
    raise ConfigError, "'host' parameter is required on file output"
    end

    if hbase_table = conf['hbase_table']
    @hbase_table = hbase_table

    if table = conf['table']
    @table = table
    end
    unless @base_table
    unless @table
    raise ConfigError, "'hbase_table' parameter is required on file output"
    end


    if hbase_cfamily = conf['hbase_cfamily']
    @hbase_cfamily = hbase_cfamily
    if @localtime
    @formatter = Proc.new {|tag,event|
    "#{Time.at(event.time).iso8601}\t#{tag}\t#{event.record.to_json}\n"
    }
    else
    @formatter = Proc.new {|tag,event|
    "#{Time.at(event.time).utc.iso8601}\t#{tag}\t#{event.record.to_json}\n"
    }
    end
    unless @base_cfamily
    raise ConfigError, "'hbase_cfamily' parameter is required on file output"
    end

    end

    def start
    super
    # FIXME : authentication may be required.
    @hbase = Stargate::Client.new(@hbase_host)
    @client = Stargate::Client.new(@host)
    end

    def shutdown
    @hbase.close
    @client.close
    end

    def format(tag, event)
    "#{event.record.to_json}\n"
    @formatter.call(tag, event)
    end

    def write(chunk)
    chunk.read.split("\n").map { |data|
    key, value = JSON.parse(data)
    @hbase.client.create_row(@hbase_table, key, Time.now.to_i, value)
    }
    end
    end
    # TODO : Error handling
    records = []
    chunk.split('\n') { |element|
    key, cf, json_record = element.split('\t')
    p json_record
    record = JSON.parse(json_val).each { |k, v|
    k = cf + ":" + k
    }
    p record
    @client.create_row(@table, key, Time.now.to_i, record)
    }
    end

    end
  3. oza revised this gist Sep 28, 2011. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion out_hbase.rb
    Original file line number Diff line number Diff line change
    @@ -51,7 +51,7 @@ def format(tag, event)
    def write(chunk)
    chunk.read.split("\n").map { |data|
    key, value = JSON.parse(data)
    @hbase.client.create_row(@hbase_table, key, Time.now.to_i, data)
    @hbase.client.create_row(@hbase_table, key, Time.now.to_i, value)
    }
    end
    end
  4. oza revised this gist Sep 28, 2011. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion out_hbase.rb
    Original file line number Diff line number Diff line change
    @@ -52,6 +52,6 @@ def write(chunk)
    chunk.read.split("\n").map { |data|
    key, value = JSON.parse(data)
    @hbase.client.create_row(@hbase_table, key, Time.now.to_i, data)
    }
    }
    end
    end
  5. oza revised this gist Sep 28, 2011. 1 changed file with 0 additions and 2 deletions.
    2 changes: 0 additions & 2 deletions out_hbase.rb
    Original file line number Diff line number Diff line change
    @@ -53,7 +53,5 @@ def write(chunk)
    key, value = JSON.parse(data)
    @hbase.client.create_row(@hbase_table, key, Time.now.to_i, data)
    }
    end
    }
    end
    end
  6. oza revised this gist Sep 28, 2011. 1 changed file with 5 additions and 2 deletions.
    7 changes: 5 additions & 2 deletions out_hbase.rb
    Original file line number Diff line number Diff line change
    @@ -49,8 +49,11 @@ def format(tag, event)
    end

    def write(chunk)
    chunk.read.split("\n").map { |data|
    @hbase.client.create_row(@hbase_table, @hbase_cfamily, Time.now.to_i, JSON.parse(data))
    chunk.read.split("\n").map { |data|
    key, value = JSON.parse(data)
    @hbase.client.create_row(@hbase_table, key, Time.now.to_i, data)
    }
    end
    }
    end
    end
  7. oza created this gist Sep 28, 2011.
    56 changes: 56 additions & 0 deletions out_hbase.rb
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,56 @@
    module Fluent

    class HbaseOutput < Fluent::TimeSlicedOutput
    Fluent::Plugin.register_output('hbase', self)

    def initialize
    super
    require 'stargate'
    end

    def configure(conf)
    super

    if hbase_host = conf['hbase_host']
    @hbase_host = hbase_host
    unless @hbase_host
    raise ConfigError, "'hbase_host' parameter is required on file output"
    end

    if hbase_table = conf['hbase_table']
    @hbase_table = hbase_table
    end
    unless @base_table
    raise ConfigError, "'hbase_table' parameter is required on file output"
    end


    if hbase_cfamily = conf['hbase_cfamily']
    @hbase_cfamily = hbase_cfamily
    end
    unless @base_cfamily
    raise ConfigError, "'hbase_cfamily' parameter is required on file output"
    end

    end

    def start
    super
    # FIXME : authentication may be required.
    @hbase = Stargate::Client.new(@hbase_host)
    end

    def shutdown
    @hbase.close
    end

    def format(tag, event)
    "#{event.record.to_json}\n"
    end

    def write(chunk)
    chunk.read.split("\n").map { |data|
    @hbase.client.create_row(@hbase_table, @hbase_cfamily, Time.now.to_i, JSON.parse(data))
    }
    end
    end