package main import ( "errors" "log" "os" "github.com/influxdata/kapacitor/udf/agent" ) type createTagFromMeasurementNameHandler struct { agent *agent.Agent } func newcreateTagFromMeasurementNameHandler(agent *agent.Agent) *createTagFromMeasurementNameHandler { return &createTagFromMeasurementNameHandler{agent: agent} } // Return the InfoResponse. Describing the properties of this UDF agent. func (*createTagFromMeasurementNameHandler) Info() (*agent.InfoResponse, error) { return &agent.InfoResponse{ Wants: agent.EdgeType_STREAM, Provides: agent.EdgeType_STREAM, Options: map[string]*agent.OptionInfo{}, }, nil } // Initialze the handler based of the provided options. func (*createTagFromMeasurementNameHandler) Init(r *agent.InitRequest) (*agent.InitResponse, error) { init := &agent.InitResponse{ Success: true, Error: "", } return init, nil } // Create a snapshot of the running state of the process. func (*createTagFromMeasurementNameHandler) Snapshot() (*agent.SnapshotResponse, error) { return &agent.SnapshotResponse{}, nil } // Restore a previous snapshot. func (*createTagFromMeasurementNameHandler) Restore(req *agent.RestoreRequest) (*agent.RestoreResponse, error) { return &agent.RestoreResponse{ Success: true, }, nil } func (*createTagFromMeasurementNameHandler) BeginBatch(begin *agent.BeginBatch) error { return errors.New("batching not supported") } func (h *createTagFromMeasurementNameHandler) Point(point *agent.Point) error { point.Tags["measurementName"] = point.Name h.agent.Responses <- &agent.Response{ Message: &agent.Response_Point{ Point: point, }, } return nil } func (*createTagFromMeasurementNameHandler) EndBatch(end *agent.EndBatch) error { return nil } // Stop the handler gracefully. func (h *createTagFromMeasurementNameHandler) Stop() { close(h.agent.Responses) } func main() { a := agent.New(os.Stdin, os.Stdout) h := newcreateTagFromMeasurementNameHandler(a) a.Handler = h log.Println("Starting agent") a.Start() err := a.Wait() if err != nil { log.Fatal(err) } }