package main

import (
	"bytes"
	"encoding/gob"
	"errors"
	"log"
	"os"

	"github.com/influxdata/kapacitor/udf"
	udfAgent "github.com/influxdata/kapacitor/udf/agent"
)

type CreateTagFromMeasurementNameHandler struct {
	Agent *udfAgent.Agent
}

func newCreateTagFromMeasurementNameHandler(agent *udfAgent.Agent) *CreateTagFromMeasurementNameHandler {
	return &CreateTagFromMeasurementNameHandler{
		Agent: agent,
	}
}

// Return the InfoResponse. Describing the properties of this UDF agent.
func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Info() (*udf.InfoResponse, error) {
	info := &udf.InfoResponse{
		Wants:    udf.EdgeType_STREAM,
		Provides: udf.EdgeType_STREAM,
		Options:  map[string]*udf.OptionInfo{},
	}
	return info, nil
}

// Initialze the handler based of the provided options.
func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Init(initRequest *udf.InitRequest) (*udf.InitResponse, error) {
	return &udf.InitResponse{
		Success: true,
		Error:   "",
	}, nil
}

// Create a snapshot of the running state of the process. NoOp since its stateless.
func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Snaphost() (*udf.SnapshotResponse, error) {
	var buf bytes.Buffer
	enc := gob.NewEncoder(&buf)
	enc.Encode(0)

	return &udf.SnapshotResponse{
		Snapshot: buf.Bytes(),
	}, nil
}

// Restore a previous snapshot. NoOp since its stateless.
func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Restore(req *udf.RestoreRequest) (*udf.RestoreResponse, error) {
	return &udf.RestoreResponse{
		Success: true,
		Error:   "",
	}, nil
}

// This handler does not do batching
func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) BeginBatch(*udf.BeginBatch) error {
	return errors.New("batching not supported")
}

func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Point(point *udf.Point) error {
	point.Tags["measurementName"] = point.Name

	createTagFromMeasurementNameHandler.Agent.Responses <- &udf.Response{
		Message: &udf.Response_Point{
			Point: point,
		},
	}
	return nil
}

// This handler does not do batching
func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) EndBatch(*udf.EndBatch) error {
	return errors.New("batching not supported")
}

// Stop the handler gracefully.
func (createTagFromMeasurementNameHandler *CreateTagFromMeasurementNameHandler) Stop() {
	close(createTagFromMeasurementNameHandler.Agent.Responses)
}

func main() {
	agent := udfAgent.New(os.Stdin, os.Stdout)
	handler := newCreateTagFromMeasurementNameHandler(agent)
	agent.Handler = handler

	log.Println("Starting agent")
	agent.Start()
	err := agent.Wait()
	if err != nil {
		log.Fatal(err)
	}
}
