Go! Go! Go! - Moby

This commit is contained in:
Adam Ierymenko 2019-09-20 20:34:31 -07:00
parent 02a6b15e6b
commit b540181990
No known key found for this signature in database
GPG Key ID: C8877CF2D7A5D7F3
5 changed files with 89 additions and 14 deletions

View File

@ -110,6 +110,8 @@ extern "C" int goStateObjectGetFunc(ZT_GoNode *,int,const uint64_t [2],void *,un
extern "C" void goDNSResolverFunc(ZT_GoNode *,const uint8_t *,int,const char *,uintptr_t);
extern "C" int goVirtualNetworkConfigFunc(ZT_GoNode *,ZT_GoTap *,uint64_t,int,const ZT_VirtualNetworkConfig *);
extern "C" void goZtEvent(ZT_GoNode *,int,const void *);
extern "C" void goHandleTapAddedMulticastGroup(ZT_GoNode *,ZT_GoTap *,uint64_t,uint64_t,uint32_t);
extern "C" void goHandleTapRemovedMulticastGroup(ZT_GoNode *,ZT_GoTap *,uint64_t,uint64_t,uint32_t);
static int ZT_GoNode_VirtualNetworkConfigFunction(
ZT_Node *node,
@ -334,11 +336,26 @@ extern "C" ZT_GoNode *ZT_GoNode_new(const char *workingPath)
gn->run = true;
gn->backgroundTaskThread = std::thread([gn] {
int64_t lastScannedMulticastGroups = 0;
while (gn->run) {
std::this_thread::sleep_for(std::chrono::milliseconds(250));
std::this_thread::sleep_for(std::chrono::milliseconds(500));
const int64_t now = OSUtils::now();
if (now >= gn->nextBackgroundTaskDeadline)
gn->node->processBackgroundTasks(nullptr,now,&(gn->nextBackgroundTaskDeadline));
if ((now - lastScannedMulticastGroups) > 5000) {
lastScannedMulticastGroups = now;
std::vector<MulticastGroup> added,removed;
std::lock_guard<std::mutex> tl(gn->taps_l);
for(auto t=gn->taps.begin();t!=gn->taps.end();++t) {
added.clear();
removed.clear();
t->second->scanMulticastGroups(added,removed);
for(auto g=added.begin();g!=added.end();++g)
goHandleTapAddedMulticastGroup(gn,(ZT_GoTap *)t->second.get(),t->first,g->mac().toInt(),g->adi());
for(auto g=removed.begin();g!=removed.end();++g)
goHandleTapRemovedMulticastGroup(gn,(ZT_GoTap *)t->second.get(),t->first,g->mac().toInt(),g->adi());
}
}
}
});

View File

@ -16,6 +16,7 @@ package zerotier
import (
"fmt"
"net"
"sync"
"sync/atomic"
"unsafe"
)
@ -30,9 +31,11 @@ import "C"
// nativeTap is a Tap implementation that wraps a native C++ interface to a system tun/tap device
type nativeTap struct {
tap unsafe.Pointer
networkStatus uint32
enabled uint32
tap unsafe.Pointer
networkStatus uint32
enabled uint32
multicastGroupHandlers []func(bool, *MulticastGroup)
multicastGroupHandlersLock sync.Mutex
}
// SetEnabled sets this tap's enabled state
@ -56,12 +59,12 @@ func (t *nativeTap) AddIP(ip net.IPNet) error {
if bits > 128 || bits < 0 {
return ErrInvalidParameter
}
C.ZT_GoTap_addIp(t.tap, afInet6, unsafe.Pointer(&ip.IP[0]), C.int(bits))
C.ZT_GoTap_addIp(t.tap, C.int(afInet6), unsafe.Pointer(&ip.IP[0]), C.int(bits))
} else if len(ip.IP) == 4 {
if bits > 32 || bits < 0 {
return ErrInvalidParameter
}
C.ZT_GoTap_addIp(t.tap, afInet, unsafe.Pointer(&ip.IP[0]), C.int(bits))
C.ZT_GoTap_addIp(t.tap, C.int(afInet), unsafe.Pointer(&ip.IP[0]), C.int(bits))
}
return ErrInvalidParameter
}
@ -73,14 +76,14 @@ func (t *nativeTap) RemoveIP(ip net.IPNet) error {
if bits > 128 || bits < 0 {
return ErrInvalidParameter
}
C.ZT_GoTap_removeIp(t.tap, afInet6, unsafe.Pointer(&ip.IP[0]), C.int(bits))
C.ZT_GoTap_removeIp(t.tap, C.int(afInet6), unsafe.Pointer(&ip.IP[0]), C.int(bits))
return nil
}
if len(ip.IP) == 4 {
if bits > 32 || bits < 0 {
return ErrInvalidParameter
}
C.ZT_GoTap_removeIp(t.tap, afInet, unsafe.Pointer(&ip.IP[0]), C.int(bits))
C.ZT_GoTap_removeIp(t.tap, C.int(afInet), unsafe.Pointer(&ip.IP[0]), C.int(bits))
return nil
}
return ErrInvalidParameter
@ -98,7 +101,7 @@ func (t *nativeTap) IPs() (ips []net.IPNet, err error) {
count := int(C.ZT_GoTap_ips(t.tap, unsafe.Pointer(&ipbuf[0]), 16384))
ipptr := 0
for i := 0; i < count; i++ {
af := ipbuf[ipptr]
af := int(ipbuf[ipptr])
ipptr++
switch af {
case afInet:
@ -135,3 +138,10 @@ func (t *nativeTap) DeviceName() string {
}
return ""
}
// AddMulticastGroupChangeHandler adds a function to be called when the tap subscribes or unsubscribes to a multicast group.
func (t *nativeTap) AddMulticastGroupChangeHandler(handler func(bool, *MulticastGroup)) {
t.multicastGroupHandlersLock.Lock()
t.multicastGroupHandlers = append(t.multicastGroupHandlers, handler)
t.multicastGroupHandlersLock.Unlock()
}

View File

@ -106,15 +106,26 @@ type NetworkConfig struct {
// Network is a currently joined network
type Network struct {
id NetworkID
config NetworkConfig
configLock sync.RWMutex
tap *Tap
tap Tap
tapLock sync.Mutex
}
// ID gets this network's unique ID
func (n *Network) ID() NetworkID { return n.id }
// Config returns a copy of this network's current configuration
func (n *Network) Config() NetworkConfig {
n.configLock.RLock()
defer n.configLock.RUnlock()
return n.config
}
// Tap gets this network's tap device
func (n *Network) Tap() Tap {
n.tapLock.Lock()
defer n.tapLock.Unlock()
return n.tap
}

View File

@ -29,8 +29,8 @@ import (
)
const (
afInet = C.AF_INET
afInet6 = C.AF_INET6
afInet int = C.AF_INET
afInet6 int = C.AF_INET6
networkStatusRequestingConfiguration = C.ZT_NETWORK_STATUS_REQUESTING_CONFIGURATION
networkStatusOK = C.ZT_NETWORK_STATUS_OK
@ -68,11 +68,11 @@ func goPathLookupFunc(gn unsafe.Pointer, ztAddress C.uint64_t, desiredAddressFam
ip, port := node.pathLookup(uint64(ztAddress))
ip4 := ip.To4()
if len(ip4) == 4 {
*((*C.int)(familyP)) = afInet
*((*C.int)(familyP)) = C.int(afInet)
copy((*[4]byte)(ipP)[:], ip4)
*((*C.int)(portP)) = C.int(port)
} else if len(ip) == 16 {
*((*C.int)(familyP)) = afInet6
*((*C.int)(familyP)) = C.int(afInet6)
copy((*[16]byte)(ipP)[:], ip)
*((*C.int)(portP)) = C.int(port)
}
@ -178,3 +178,39 @@ func goZtEvent(gn unsafe.Pointer, eventType C.int, data unsafe.Pointer) {
node.handleRemoteTrace(uint64(rt.origin), C.GoBytes(unsafe.Pointer(rt.data), C.int(rt.len)))
}
}
func handleTapMulticastGroupChange(gn unsafe.Pointer, nwid, mac C.uint64_t, adi C.uint32_t, added bool) {
nodesByUserPtrLock.RLock()
node := nodesByUserPtr[uintptr(gn)]
nodesByUserPtrLock.RUnlock()
if node == nil {
return
}
node.networksLock.RLock()
network := node.networks[uint64(nwid)]
node.networksLock.RUnlock()
network.tapLock.Lock()
tap, _ := network.tap.(*nativeTap)
network.tapLock.Unlock()
if tap != nil {
mg := &MulticastGroup{MAC: MAC(mac), ADI: uint32(adi)}
tap.multicastGroupHandlersLock.Lock()
defer tap.multicastGroupHandlersLock.Unlock()
for _, h := range tap.multicastGroupHandlers {
h(added, mg)
}
}
}
//export goHandleTapAddedMulticastGroup
func goHandleTapAddedMulticastGroup(gn, tapP unsafe.Pointer, nwid, mac C.uint64_t, adi C.uint32_t) {
handleTapMulticastGroupChange(gn, nwid, mac, adi, true)
}
//export goHandleTapRemovedMulticastGroup
func goHandleTapRemovedMulticastGroup(gn, tapP unsafe.Pointer, nwid, mac C.uint64_t, adi C.uint32_t) {
handleTapMulticastGroupChange(gn, nwid, mac, adi, false)
}

View File

@ -23,4 +23,5 @@ type Tap interface {
RemoveIP(ip net.IPNet) error
IPs() ([]net.IPNet, error)
DeviceName() string
AddMulticastGroupChangeHandler(func(bool, *MulticastGroup))
}