Skip to content

Instantly share code, notes, and snippets.

@forestjohnsonpeoplenet
Created July 19, 2017 15:40
Show Gist options
  • Select an option

  • Save forestjohnsonpeoplenet/a5fb6fd2916b696e167e753c37fd9f10 to your computer and use it in GitHub Desktop.

Select an option

Save forestjohnsonpeoplenet/a5fb6fd2916b696e167e753c37fd9f10 to your computer and use it in GitHub Desktop.

Revisions

  1. forestjohnsonpeoplenet created this gist Jul 19, 2017.
    83 changes: 83 additions & 0 deletions main.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,83 @@
    package main

    import (
    "errors"
    "log"
    "os"
    "github.com/influxdata/kapacitor/udf/agent"
    )

    type createTagFromMeasurementNameHandler struct {
    agent *agent.Agent
    }

    func newcreateTagFromMeasurementNameHandler(agent *agent.Agent) *createTagFromMeasurementNameHandler {
    return &createTagFromMeasurementNameHandler{agent: agent}
    }

    // Return the InfoResponse. Describing the properties of this UDF agent.
    func (*createTagFromMeasurementNameHandler) Info() (*agent.InfoResponse, error) {
    return &agent.InfoResponse{
    Wants: agent.EdgeType_STREAM,
    Provides: agent.EdgeType_STREAM,
    Options: map[string]*agent.OptionInfo{},
    }, nil
    }

    // Initialze the handler based of the provided options.
    func (*createTagFromMeasurementNameHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) {
    init := &agent.InitResponse{
    Success: true,
    Error: "",
    }
    return init, nil
    }

    // Create a snapshot of the running state of the process.
    func (*createTagFromMeasurementNameHandler) Snapshot() (*agent.SnapshotResponse, error) {
    return &agent.SnapshotResponse{}, nil
    }

    // Restore a previous snapshot.
    func (*createTagFromMeasurementNameHandler) Restore(req *agent.RestoreRequest) (*agent.RestoreResponse, error) {
    return &agent.RestoreResponse{
    Success: true,
    }, nil
    }

    func (*createTagFromMeasurementNameHandler) BeginBatch(begin *agent.BeginBatch) error {
    return errors.New("batching not supported")
    }

    func (h *createTagFromMeasurementNameHandler) Point(point *agent.Point) error {
    point.Tags["measurementName"] = point.Name

    h.agent.Responses <- &agent.Response{
    Message: &agent.Response_Point{
    Point: point,
    },
    }
    return nil
    }

    func (*createTagFromMeasurementNameHandler) EndBatch(end *agent.EndBatch) error {
    return nil
    }

    // Stop the handler gracefully.
    func (h *createTagFromMeasurementNameHandler) Stop() {
    close(h.agent.Responses)
    }

    func main() {
    a := agent.New(os.Stdin, os.Stdout)
    h := newcreateTagFromMeasurementNameHandler(a)
    a.Handler = h

    log.Println("Starting agent")
    a.Start()
    err := a.Wait()
    if err != nil {
    log.Fatal(err)
    }
    }