package main import ( "bufio" "fmt" "os" "os/exec" "path" "runtime" "sync" "syscall" "time" "github.com/pivotal-golang/lager" "github.com/vishvananda/netlink" "github.com/vishvananda/netlink/nl" "github.com/cloudfoundry-incubator/ducati-daemon/lib/links" "github.com/cloudfoundry-incubator/ducati-daemon/lib/namespace" ducati_nl "github.com/cloudfoundry-incubator/ducati-daemon/lib/nl" ) func main() { logger := lager.NewLogger("ducati-d") logger = logger.Session("", lager.Data{"pid": os.Getpid()}) logger.RegisterSink(lager.NewWriterSink(os.Stdout, lager.DEBUG)) loops := 1000 done := make(chan struct{}) wg := sync.WaitGroup{} wg.Add(loops) for i := 0; i < loops; i++ { iter := i commands := []Command{ CreateNamespace{ Namespace: fmt.Sprintf("container-%d", iter), }, CreateVethPair{ Master: fmt.Sprintf("host-%d", iter), Slave: fmt.Sprintf("eth%d", iter), Namespace: fmt.Sprintf("container-%d", iter), }, MoveInterface{ SourceNamespace: fmt.Sprintf("container-%d", iter), TargetNamespace: "", InterfaceName: fmt.Sprintf("host-%d", iter), }, } go func() { runtime.LockOSThread() logger.Debug("starting", lager.Data{"iter": iter}) err := execute(commands) if err != nil { logger.Fatal("execute-failed", err) } wg.Done() <-done }() } wg.Wait() fmt.Printf("\n\n\n---- Press Enter ---\n\n\n") reader := bufio.NewReader(os.Stdin) reader.ReadLine() close(done) fmt.Printf("\n\n\n---- Press Enter ---\n\n\n") reader.ReadLine() // namespacePath := os.Args[1] // ns := namespace.NewNamespace(namespacePath) // misses := make(chan *netlink.Neigh, 100) // logger.Debug("Creating netlink socket") // var sock *nl.NetlinkSocket // ns.Execute(func(_ *os.File) error { // logger.Debug("Subscribing to socket") // sock = subscribe(logger) // return nil // }) // ch := make(chan struct{}, 1) // logger.Debug("Monitoring misses", lager.Data{"socket": fmt.Sprintf("%+v", sock)}) // go monitorMisses(logger, misses, sock) // <-ch } type Command interface { Do() error } type CreateNamespace struct { Namespace string } func (c CreateNamespace) Do() error { return exec.Command("/sbin/ip", "netns", "add", c.Namespace).Run() } type CreateVethPair struct { Master string Slave string Namespace string } func (c CreateVethPair) Do() error { ns := namespace.NewNamespace(path.Join("/var/run/netns", c.Namespace)) return ns.Execute(func(_ *os.File) error { linkFactory := &links.Factory{Netlinker: ducati_nl.Netlink} _, _, err := linkFactory.CreateVethPair(c.Master, c.Slave, links.VxlanVethMTU) return err }) } type MoveInterface struct { SourceNamespace string TargetNamespace string InterfaceName string } func (c MoveInterface) Do() error { ns := namespace.NewNamespace(path.Join("/var/run/netns", c.SourceNamespace)) file, err := os.Open("/proc/1/ns/net") if err != nil { return err } defer file.Close() return ns.Execute(func(_ *os.File) error { linkFactory := &links.Factory{Netlinker: ducati_nl.Netlink} link, err := linkFactory.FindLink(c.InterfaceName) if err != nil { return err } return ducati_nl.Netlink.LinkSetNsFd(link, int(file.Fd())) }) } func execute(commands []Command) error { for _, c := range commands { if err := c.Do(); err != nil { return fmt.Errorf("command %+v failed: %s", c, err) } } return nil } func subscribe(logger lager.Logger) *nl.NetlinkSocket { nlsock, err := nl.Subscribe(syscall.NETLINK_ROUTE, syscall.RTNLGRP_NEIGH) if err != nil { logger.Error("Failed to subscribe to netlink RTNLGRP_NEIGH messages", err) return nil } return nlsock } func monitorMisses(logger lager.Logger, misses chan *netlink.Neigh, nlsock *nl.NetlinkSocket) { for { msgs, err := nlsock.Receive() if err != nil { logger.Error("Failed to receive from netlink", err) time.Sleep(1 * time.Second) continue } for _, msg := range msgs { processNeighMsg(logger, msg, misses) } } } func isNeighResolving(state int) bool { return (state & (netlink.NUD_INCOMPLETE | netlink.NUD_STALE | netlink.NUD_DELAY | netlink.NUD_PROBE)) != 0 } type neigh netlink.Neigh func (n *neigh) String() string { var readableState string if n.State&netlink.NUD_INCOMPLETE != 0 { readableState = " | " + "INCOMPLETE" } if n.State&netlink.NUD_REACHABLE != 0 { readableState = " | " + "REACHABLE" } if n.State&netlink.NUD_STALE != 0 { readableState = " | " + "STALE" } if n.State&netlink.NUD_DELAY != 0 { readableState = " | " + "DELAY" } if n.State&netlink.NUD_PROBE != 0 { readableState = " | " + "PROBE" } if n.State&netlink.NUD_FAILED != 0 { readableState = " | " + "FAILED" } if n.State&netlink.NUD_NOARP != 0 { readableState = " | " + "NOARP" } if n.State&netlink.NUD_PERMANENT != 0 { readableState = " | " + "PERMANENT" } return fmt.Sprintf( "LinkIndex: %d, Family: %d, State: %s, Type: %d, Flags: %d, IP: %s, HardwareAddr: %s", int(n.LinkIndex), int(n.Family), readableState, int(n.Type), int(n.Flags), n.IP.String(), n.HardwareAddr.String(), ) } func processNeighMsg(logger lager.Logger, msg syscall.NetlinkMessage, misses chan *netlink.Neigh) { n, err := netlink.NeighDeserialize(msg.Data) if err != nil { logger.Error("Failed to deserialize netlink ndmsg", err) return } myNeigh := neigh(*n) logger.Debug("netlink-neighbor-message", lager.Data{"msg": fmt.Sprintf("%s", myNeigh.String())}) if msg.Header.Type != syscall.RTM_GETNEIGH && msg.Header.Type != syscall.RTM_NEWNEIGH { return } if !isNeighResolving(n.State) { // misses come with NUD_STALE bit set return } misses <- n }