Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save forestjohnsonpeoplenet/f0aa7a807f0f1735ea02279338cdeaf1 to your computer and use it in GitHub Desktop.

Select an option

Save forestjohnsonpeoplenet/f0aa7a807f0f1735ea02279338cdeaf1 to your computer and use it in GitHub Desktop.

Revisions

  1. forestjohnsonpeoplenet created this gist Apr 18, 2017.
    98 changes: 98 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,98 @@
    package main

    import (
    "bytes"
    "encoding/gob"
    "errors"
    "log"
    "os"

    "github.com/influxdata/kapacitor/udf"
    udfAgent "github.com/influxdata/kapacitor/udf/agent"
    )

    type CreateTagFromMeasurementNameHandler struct {
    Agent *udfAgent.Agent
    }

    func newCreateTagFromMeasurementNameHandler(agent *udfAgent.Agent) *CreateTagFromMeasurementNameHandler {
    return &CreateTagFromMeasurementNameHandler{
    Agent: agent,
    }
    }

    // Return the InfoResponse. Describing the properties of this UDF agent.
    func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Info() (*udf.InfoResponse, error) {
    info := &udf.InfoResponse{
    Wants: udf.EdgeType_STREAM,
    Provides: udf.EdgeType_STREAM,
    Options: map[string]*udf.OptionInfo{},
    }
    return info, nil
    }

    // Initialze the handler based of the provided options.
    func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Init(initRequest *udf.InitRequest) (*udf.InitResponse, error) {
    return &udf.InitResponse{
    Success: true,
    Error: "",
    }, nil
    }

    // Create a snapshot of the running state of the process. NoOp since its stateless.
    func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Snaphost() (*udf.SnapshotResponse, error) {
    var buf bytes.Buffer
    enc := gob.NewEncoder(&buf)
    enc.Encode(0)

    return &udf.SnapshotResponse{
    Snapshot: buf.Bytes(),
    }, nil
    }

    // Restore a previous snapshot. NoOp since its stateless.
    func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Restore(req *udf.RestoreRequest) (*udf.RestoreResponse, error) {
    return &udf.RestoreResponse{
    Success: true,
    Error: "",
    }, nil
    }

    // This handler does not do batching
    func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) BeginBatch(*udf.BeginBatch) error {
    return errors.New("batching not supported")
    }

    func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Point(point *udf.Point) error {
    point.Tags["measurementName"] = point.Name

    createTagFromMeasurementNameHandler.Agent.Responses <- &udf.Response{
    Message: &udf.Response_Point{
    Point: point,
    },
    }
    return nil
    }

    // This handler does not do batching
    func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) EndBatch(*udf.EndBatch) error {
    return errors.New("batching not supported")
    }

    // Stop the handler gracefully.
    func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Stop() {
    close(createTagFromMeasurementNameHandler.Agent.Responses)
    }

    func main() {
    agent := udfAgent.New(os.Stdin, os.Stdout)
    handler := newCreateTagFromMeasurementNameHandler(agent)
    agent.Handler = handler

    log.Println("Starting agent")
    agent.Start()
    err := agent.Wait()
    if err != nil {
    log.Fatal(err)
    }
    }