Skip to content

Instantly share code, notes, and snippets.

@fallenarchon
Last active July 18, 2016 17:34
Show Gist options
  • Select an option

  • Save fallenarchon/3a14da6038fbf5b86de341bc7c69e2f8 to your computer and use it in GitHub Desktop.

Select an option

Save fallenarchon/3a14da6038fbf5b86de341bc7c69e2f8 to your computer and use it in GitHub Desktop.
Sample go client to listen for LCM packets broadcast over UDP multicast. https://lcm-proj.github.io/
package main
import (
"bytes"
"fmt"
)
func checkErr(err error) {
if err != nil {
panic(err)
}
}
func main() {
fmt.Println("Starting")
output := make(chan *bytes.Reader)
provider := lcm.NewUDPMulticastProvider(nil)
provider.Subscribe("sample", output)
defer provider.Close()
for reader := range output {
fmt.Printf("Received %v bytes from provider\n", reader.Len())
}
}
package main
import (
"bytes"
"encoding/binary"
"log"
"net"
"sync"
)
const (
//LCM identifer for 'short packet'; ascii of "LC02"
magicShort = 0x4c433032
//LCM identifer for 'fragment packet'; ascii of "LC03"
magicFragment = 0x4c433033
)
// UDPMulticastProvider listens for LCM packets on a given UDPAddr and delivers
// any valid packets to subscribed channels that match LCM header information. Currently
// only supports LCM short packet format
//
//
// LCM short packet format
//
// ------------------------------------------------------------------------------------------
// | 4 byte ID | 4 byte sequence number | null terminated LCM channel name string | payload |
// ------------------------------------------------------------------------------------------
//
type UDPMulticastProvider struct {
addr *net.UDPAddr
sock *net.UDPConn
subscriptions map[string]chan *bytes.Reader
subLock *sync.Mutex
isActive bool
}
// NewUDPMulticastProvider is the default constructor for UDPMulticastProvider.
// If addr is nil, the socket listen address defaults to 239.255.76.67:7667. The provider
// doesn't start listening to the network until the first Subscribe() is called
func NewUDPMulticastProvider(addr *net.UDPAddr) *UDPMulticastProvider {
if addr == nil {
addr = &net.UDPAddr{IP: []byte{239, 255, 76, 67}, Port: 7667}
}
return &UDPMulticastProvider{
addr: addr,
subLock: &sync.Mutex{},
subscriptions: make(map[string]chan *bytes.Reader),
}
}
// Subscribe takes a go channel to deliver any LCM packets that match the supplied LCM channel name.
// Starts listening after the first call
func (u *UDPMulticastProvider) Subscribe(channel string, output chan *bytes.Reader) {
u.subLock.Lock()
//Verify that we aren't already subscribed to channel
if _, ok := u.subscriptions[channel]; ok {
return
}
u.subscriptions[channel] = output
u.subLock.Unlock()
//Start listening for LCM packets if we aren't already listening
if u.sock == nil {
u.isActive = true
go u.listen()
}
}
// Unsubscribe removes the supplied LCM channel name from the list of subscriptions. If no subscriptions remain,
// the socket is closed
func (u *UDPMulticastProvider) Unsubscribe(channel string) {
u.subLock.Lock()
defer u.subLock.Unlock()
delete(u.subscriptions, channel)
//Close connection if there are no more subscriptions
if len(u.subscriptions) == 0 {
u.Close()
}
}
// Close closes the socket connection
func (u *UDPMulticastProvider) Close() {
u.isActive = false
if u.sock != nil {
u.sock.Close()
}
u.sock = nil
}
func (u *UDPMulticastProvider) listen() {
//Guard against opening another socket if already open
if u.sock != nil {
return
}
var err error
u.sock, err = net.ListenMulticastUDP("udp", nil, u.addr)
if err != nil {
log.Printf("Error establishing LCM socket %v", err.Error())
return
}
var payload = make([]byte, 65536)
u.isActive = true
for {
if !u.isActive {
return
}
len, _, err := u.sock.ReadFromUDP(payload)
if err != nil {
log.Printf("Error reading UDP packet: %v\n", err.Error())
}
u.processPacket(payload[:len])
}
}
func (u *UDPMulticastProvider) processPacket(packet []byte) {
//Check for LCM magic identifier
r := bytes.NewReader(packet)
var lcmID int32
err := binary.Read(r, binary.BigEndian, &lcmID)
if err != nil {
log.Printf("Error reading LCM packet identifier: %v\n", err.Error())
return
}
if lcmID == magicShort {
u.processShortMessage(r)
}
//TODO implement fragment message handling
}
func (u *UDPMulticastProvider) processShortMessage(packetReader *bytes.Reader) {
//Skip past 4 bytes of sequence ID since it is not used.
//Magic number 1 in Seek() whence argument means to move relative to the current offset.
//See io.Seeker go doc for more details
packetReader.Seek(4, 1)
//Fetch channel name from null terminated series of ASCII bytes
var channelBytes []byte
for {
b, err := packetReader.ReadByte()
if err != nil {
break
}
//We reached end of null terminated string
if b == 0 {
break
}
channelBytes = append(channelBytes, b)
}
channelName := string(channelBytes)
//Deliver packet if channel name matches a subscription
u.subLock.Lock()
defer u.subLock.Unlock()
if output, ok := u.subscriptions[channelName]; ok {
output <- packetReader
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment