package main import ( "bytes" "encoding/gob" "errors" "log" "os" "github.com/influxdata/kapacitor/udf" udfAgent "github.com/influxdata/kapacitor/udf/agent" ) type CreateTagFromMeasurementNameHandler struct { Agent *udfAgent.Agent } func newCreateTagFromMeasurementNameHandler(agent *udfAgent.Agent) *CreateTagFromMeasurementNameHandler { return &CreateTagFromMeasurementNameHandler{ Agent: agent, } } // Return the InfoResponse. Describing the properties of this UDF agent. func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Info() (*udf.InfoResponse, error) { info := &udf.InfoResponse{ Wants: udf.EdgeType_STREAM, Provides: udf.EdgeType_STREAM, Options: map[string]*udf.OptionInfo{}, } return info, nil } // Initialze the handler based of the provided options. func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Init(initRequest *udf.InitRequest) (*udf.InitResponse, error) { return &udf.InitResponse{ Success: true, Error: "", }, nil } // Create a snapshot of the running state of the process. NoOp since its stateless. func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Snaphost() (*udf.SnapshotResponse, error) { var buf bytes.Buffer enc := gob.NewEncoder(&buf) enc.Encode(0) return &udf.SnapshotResponse{ Snapshot: buf.Bytes(), }, nil } // Restore a previous snapshot. NoOp since its stateless. func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Restore(req *udf.RestoreRequest) (*udf.RestoreResponse, error) { return &udf.RestoreResponse{ Success: true, Error: "", }, nil } // This handler does not do batching func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) BeginBatch(*udf.BeginBatch) error { return errors.New("batching not supported") } func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Point(point *udf.Point) error { point.Tags["measurementName"] = point.Name createTagFromMeasurementNameHandler.Agent.Responses <- &udf.Response{ Message: &udf.Response_Point{ Point: point, }, } return nil } // This handler does not do batching func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) EndBatch(*udf.EndBatch) error { return errors.New("batching not supported") } // Stop the handler gracefully. func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Stop() { close(createTagFromMeasurementNameHandler.Agent.Responses) } func main() { agent := udfAgent.New(os.Stdin, os.Stdout) handler := newCreateTagFromMeasurementNameHandler(agent) agent.Handler = handler log.Println("Starting agent") agent.Start() err := agent.Wait() if err != nil { log.Fatal(err) } }