package rudp
import (
- "encoding/binary"
"errors"
"fmt"
"net"
mu sync.Mutex
addr2peer map[string]cltPeer
id2peer map[PeerID]cltPeer
- peerid PeerID
+ peerID PeerID
}
// Listen listens for packets on conn until it is closed.
clt, ok := l.addr2peer[addrstr]
if !ok {
- prev := l.peerid
- for l.id2peer[l.peerid].Peer != nil || l.peerid < PeerIDCltMin {
- if l.peerid == prev-1 {
+ prev := l.peerID
+ for l.id2peer[l.peerID].Peer != nil || l.peerID < PeerIDCltMin {
+ if l.peerID == prev-1 {
return ErrOutOfPeerIDs
}
- l.peerid++
+ l.peerID++
}
pkts := make(chan netPkt, 256)
clt = cltPeer{
- Peer: newPeer(l.conn, pkt.SrcAddr, l.peerid, PeerIDSrv),
+ Peer: newPeer(l.conn, pkt.SrcAddr, l.peerID, PeerIDSrv),
pkts: pkts,
accepted: make(chan struct{}),
}
data := make([]byte, 1+1+2)
data[0] = uint8(rawTypeCtl)
data[1] = uint8(ctlSetPeerID)
- binary.BigEndian.PutUint16(data[2:4], uint16(clt.ID()))
+ be.PutUint16(data[2:4], uint16(clt.ID()))
if _, err := clt.sendRaw(rawPkt{Data: data}); err != nil {
if errors.Is(err, net.ErrClosed) {
return nil
// A Peer is a connection to a client or server.
type Peer struct {
- conn net.PacketConn
+ pc net.PacketConn
addr net.Addr
+ conn net.Conn
disco chan struct{} // close-only
pkts chan Pkt
errs chan error // don't close
- timedout chan struct{} // close-only
+ timedOut chan struct{} // close-only
chans [ChannelCount]pktchan // read/write
ping *time.Ticker
}
-type pktchan struct {
- // Only accessed by Peer.processRawPkt.
- insplit map[seqnum][][]byte
- inrel map[seqnum][]byte
- inrelsn seqnum
-
- ackchans sync.Map // map[seqnum]chan struct{}
-
- outsplitmu sync.Mutex
- outsplitsn seqnum
-
- outrelmu sync.Mutex
- outrelsn seqnum
- outrelwin seqnum
-}
-
// Conn returns the net.PacketConn used to communicate with the Peer.
-func (p *Peer) Conn() net.PacketConn { return p.conn }
+func (p *Peer) Conn() net.PacketConn { return p.pc }
// Addr returns the address of the Peer.
func (p *Peer) Addr() net.Addr { return p.addr }
// TimedOut reports whether the Peer has timed out.
func (p *Peer) TimedOut() bool {
select {
- case <-p.timedout:
+ case <-p.timedOut:
return true
default:
return false
}
}
+type inSplit struct {
+ chunks [][]byte
+ size, got int
+}
+
+type pktchan struct {
+ // Only accessed by Peer.processRawPkt.
+ inSplit *[65536]*inSplit
+ inRel *[65536][]byte
+ inRelSN seqnum
+
+ ackChans sync.Map // map[seqnum]chan struct{}
+
+ outSplitMu sync.Mutex
+ outSplitSN seqnum
+
+ outRelMu sync.Mutex
+ outRelSN seqnum
+ outRelWin seqnum
+}
+
// Recv recieves a packet from the Peer.
// You should keep calling this until it returns net.ErrClosed
// so it doesn't leak a goroutine.
return nil
}
-func newPeer(conn net.PacketConn, addr net.Addr, id, idOfPeer PeerID) *Peer {
+func newPeer(pc net.PacketConn, addr net.Addr, id, idOfPeer PeerID) *Peer {
p := &Peer{
- conn: conn,
+ pc: pc,
addr: addr,
id: id,
idOfPeer: idOfPeer,
errs: make(chan error),
}
+ if conn, ok := pc.(net.Conn); ok && conn.RemoteAddr() != nil {
+ p.conn = conn
+ }
+
for i := range p.chans {
p.chans[i] = pktchan{
- insplit: make(map[seqnum][][]byte),
- inrel: make(map[seqnum][]byte),
- inrelsn: seqnumInit,
+ inSplit: new([65536]*inSplit),
+ inRel: new([65536][]byte),
+ inRelSN: seqnumInit,
- outsplitsn: seqnumInit,
- outrelsn: seqnumInit,
- outrelwin: seqnumInit,
+ outSplitSN: seqnumInit,
+ outRelSN: seqnumInit,
+ outRelWin: seqnumInit,
}
}
- p.timedout = make(chan struct{})
+ p.timedOut = make(chan struct{})
p.timeout = time.AfterFunc(ConnTimeout, func() {
- close(p.timedout)
+ close(p.timedOut)
p.SendDisco(0, true)
p.Close()
}
}
-// Connect connects to the server on conn
-// and closes conn when the returned *Peer disconnects.
-func Connect(conn net.PacketConn, addr net.Addr) *Peer {
- srv := newPeer(conn, addr, PeerIDSrv, PeerIDNil)
+// Connect connects to addr using pc
+// and closes pc when the returned *Peer disconnects.
+func Connect(pc net.PacketConn, addr net.Addr) *Peer {
+ srv := newPeer(pc, addr, PeerIDSrv, PeerIDNil)
pkts := make(chan netPkt)
- go readNetPkts(conn, pkts, srv.errs)
+ go readNetPkts(pc, pkts, srv.errs)
go srv.processNetPkts(pkts)
go func() {
<-srv.Disco()
- conn.Close()
+ pc.Close()
}()
return srv
package rudp
import (
- "encoding/binary"
"errors"
"fmt"
"io"
return io.ErrUnexpectedEOF
}
- if id := binary.BigEndian.Uint32(pkt.Data[0:4]); id != protoID {
+ if id := be.Uint32(pkt.Data[0:4]); id != protoID {
return fmt.Errorf("unsupported protocol id: 0x%08x", id)
}
return io.ErrUnexpectedEOF
}
- sn := seqnum(binary.BigEndian.Uint16(pkt.Data[2:4]))
+ sn := seqnum(be.Uint16(pkt.Data[2:4]))
- if ack, ok := c.ackchans.LoadAndDelete(sn); ok {
+ if ack, ok := c.ackChans.LoadAndDelete(sn); ok {
close(ack.(chan struct{}))
}
return errors.New("peer id already set")
}
- p.idOfPeer = PeerID(binary.BigEndian.Uint16(pkt.Data[2:4]))
+ p.idOfPeer = PeerID(be.Uint16(pkt.Data[2:4]))
p.mu.Unlock()
if len(pkt.Data) > 1+1+2 {
return io.ErrUnexpectedEOF
}
- sn := seqnum(binary.BigEndian.Uint16(pkt.Data[1:3]))
- count := binary.BigEndian.Uint16(pkt.Data[3:5])
- i := binary.BigEndian.Uint16(pkt.Data[5:7])
+ sn := seqnum(be.Uint16(pkt.Data[1:3]))
+ count := be.Uint16(pkt.Data[3:5])
+ i := be.Uint16(pkt.Data[5:7])
if i >= count {
return nil
}
- splitpkts := p.chans[pkt.ChNo].insplit
+ splits := p.chans[pkt.ChNo].inSplit
// Delete old incomplete split packets
// so new ones don't get corrupted.
- delete(splitpkts, sn-0x8000)
+ splits[sn-0x8000] = nil
- if splitpkts[sn] == nil {
- splitpkts[sn] = make([][]byte, count)
+ if splits[sn] == nil {
+ splits[sn] = &inSplit{chunks: make([][]byte, count)}
}
- chunks := splitpkts[sn]
+ s := splits[sn]
- if int(count) != len(chunks) {
- return fmt.Errorf("chunk count changed on seqnum: %d", sn)
+ if int(count) != len(s.chunks) {
+ return fmt.Errorf("chunk count changed on split packet: %d", sn)
}
- chunks[i] = pkt.Data[7:]
+ s.chunks[i] = pkt.Data[7:]
+ s.size += len(s.chunks[i])
+ s.got++
- for _, chunk := range chunks {
- if chunk == nil {
- return nil
+ if s.got == len(s.chunks) {
+ data := make([]byte, 0, s.size)
+ for _, chunk := range s.chunks {
+ data = append(data, chunk...)
}
- }
- var data []byte
- for _, chunk := range chunks {
- data = append(data, chunk...)
- }
+ p.pkts <- Pkt{
+ Data: data,
+ ChNo: pkt.ChNo,
+ Unrel: pkt.Unrel,
+ }
- p.pkts <- Pkt{
- Data: data,
- ChNo: pkt.ChNo,
- Unrel: pkt.Unrel,
+ splits[sn] = nil
}
-
- delete(splitpkts, sn)
case rawTypeRel:
defer errWrap("rel: %w")
return io.ErrUnexpectedEOF
}
- sn := seqnum(binary.BigEndian.Uint16(pkt.Data[1:3]))
+ sn := seqnum(be.Uint16(pkt.Data[1:3]))
- ackdata := make([]byte, 1+1+2)
- ackdata[0] = uint8(rawTypeCtl)
- ackdata[1] = uint8(ctlAck)
- binary.BigEndian.PutUint16(ackdata[2:4], uint16(sn))
- ack := rawPkt{
- Data: ackdata,
+ ack := make([]byte, 1+1+2)
+ ack[0] = uint8(rawTypeCtl)
+ ack[1] = uint8(ctlAck)
+ be.PutUint16(ack[2:4], uint16(sn))
+ if _, err := p.sendRaw(rawPkt{
+ Data: ack,
ChNo: pkt.ChNo,
Unrel: true,
- }
- if _, err := p.sendRaw(ack); err != nil {
+ }); err != nil {
if errors.Is(err, net.ErrClosed) {
return nil
}
return fmt.Errorf("can't ack %d: %w", sn, err)
}
- if sn-c.inrelsn >= 0x8000 {
+ if sn-c.inRelSN >= 0x8000 {
return nil // Already received.
}
- c.inrel[sn] = pkt.Data[3:]
-
- for ; c.inrel[c.inrelsn] != nil; c.inrelsn++ {
- data := c.inrel[c.inrelsn]
- delete(c.inrel, c.inrelsn)
+ c.inRel[sn] = pkt.Data[3:]
+ for ; c.inRel[c.inRelSN] != nil; c.inRelSN++ {
rpkt := rawPkt{
- Data: data,
+ Data: c.inRel[c.inRelSN],
ChNo: pkt.ChNo,
Unrel: false,
}
+ c.inRel[c.inRelSN] = nil
+
if err := p.processRawPkt(rpkt); err != nil {
p.errs <- PktError{"rel", rpkt.Data, err}
}
+++ /dev/null
-/*
-Package rudp implements the low-level Minetest protocol described at
-https://dev.minetest.net/Network_Protocol#Low-level_protocol.
-
-All exported functions and methods in this package are safe for concurrent use
-by multiple goroutines.
-*/
-package rudp
-
-// protoID must be at the start of every network packet.
-const protoID uint32 = 0x4f457403
-
-// PeerIDs aren't actually used to identify peers, network addresses are,
-// these just exist for backward compatability.
-type PeerID uint16
-
-const (
- // Used by clients before the server sets their ID.
- PeerIDNil PeerID = iota
-
- // The server always has this ID.
- PeerIDSrv
-
- // Lowest ID the server can assign to a client.
- PeerIDCltMin
-)
-
-// ChannelCount is the maximum channel number + 1.
-const ChannelCount = 3
-
-/*
-rawPkt.Data format (big endian):
-
- rawType
- switch rawType {
- case rawTypeCtl:
- ctlType
- switch ctlType {
- case ctlAck:
- // Tells peer you received a rawTypeRel
- // and it doesn't need to resend it.
- seqnum
- case ctlSetPeerId:
- // Tells peer to send packets with this Src PeerID.
- PeerId
- case ctlPing:
- // Sent to prevent timeout.
- case ctlDisco:
- // Tells peer that you disconnected.
- }
- case rawTypeOrig:
- Pkt.(Data)
- case rawTypeSplit:
- // Packet larger than MaxNetPktSize split into smaller packets.
- // Packets with I >= Count should be ignored.
- // Once all Count chunks are recieved, they are sorted by I and
- // concatenated to make a Pkt.(Data).
- seqnum // Identifies split packet.
- Count, I uint16
- Chunk...
- case rawTypeRel:
- // Resent until a ctlAck with same seqnum is recieved.
- // seqnums are sequencial and start at seqnumInit,
- // These should be processed in seqnum order.
- seqnum
- rawPkt.Data
- }
-*/
-type rawPkt struct {
- Data []byte
- ChNo uint8
- Unrel bool
-}
-
-type rawType uint8
-
-const (
- rawTypeCtl rawType = iota
- rawTypeOrig
- rawTypeSplit
- rawTypeRel
-)
-
-type ctlType uint8
-
-const (
- ctlAck ctlType = iota
- ctlSetPeerID
- ctlPing
- ctlDisco
-)
-
-type Pkt struct {
- Data []byte
- ChNo uint8
- Unrel bool
-}
-
-// seqnums are sequence numbers used to maintain reliable packet order
-// and to identify split packets.
-type seqnum uint16
-
-const seqnumInit seqnum = 65500
"net"
"os"
- "github.com/anon55555/mt/rudp"
+ "mt/rudp"
)
func main() {
--- /dev/null
+/*
+Package rudp implements the low-level Minetest protocol described at
+https://dev.minetest.net/Network_Protocol#Low-level_protocol.
+
+All exported functions and methods in this package are safe for concurrent use
+by multiple goroutines.
+*/
+package rudp
+
+import "encoding/binary"
+
+var be = binary.BigEndian
+
+// protoID must be at the start of every network packet.
+const protoID uint32 = 0x4f457403
+
+// PeerIDs aren't actually used to identify peers, network addresses are,
+// these just exist for backward compatability.
+type PeerID uint16
+
+const (
+ // Used by clients before the server sets their ID.
+ PeerIDNil PeerID = iota
+
+ // The server always has this ID.
+ PeerIDSrv
+
+ // Lowest ID the server can assign to a client.
+ PeerIDCltMin
+)
+
+// ChannelCount is the maximum channel number + 1.
+const ChannelCount = 3
+
+/*
+rawPkt.Data format (big endian):
+
+ rawType
+ switch rawType {
+ case rawTypeCtl:
+ ctlType
+ switch ctlType {
+ case ctlAck:
+ // Tells peer you received a rawTypeRel
+ // and it doesn't need to resend it.
+ seqnum
+ case ctlSetPeerId:
+ // Tells peer to send packets with this Src PeerID.
+ PeerId
+ case ctlPing:
+ // Sent to prevent timeout.
+ case ctlDisco:
+ // Tells peer that you disconnected.
+ }
+ case rawTypeOrig:
+ Pkt.(Data)
+ case rawTypeSplit:
+ // Packet larger than MaxNetPktSize split into smaller packets.
+ // Packets with I >= Count should be ignored.
+ // Once all Count chunks are recieved, they are sorted by I and
+ // concatenated to make a Pkt.(Data).
+ seqnum // Identifies split packet.
+ Count, I uint16
+ Chunk...
+ case rawTypeRel:
+ // Resent until a ctlAck with same seqnum is recieved.
+ // seqnums are sequencial and start at seqnumInit,
+ // These should be processed in seqnum order.
+ seqnum
+ rawPkt.Data
+ }
+*/
+type rawPkt struct {
+ Data []byte
+ ChNo uint8
+ Unrel bool
+}
+
+type rawType uint8
+
+const (
+ rawTypeCtl rawType = iota
+ rawTypeOrig
+ rawTypeSplit
+ rawTypeRel
+)
+
+type ctlType uint8
+
+const (
+ ctlAck ctlType = iota
+ ctlSetPeerID
+ ctlPing
+ ctlDisco
+)
+
+type Pkt struct {
+ Data []byte
+ ChNo uint8
+ Unrel bool
+}
+
+// seqnums are sequence numbers used to maintain reliable packet order
+// and to identify split packets.
+type seqnum uint16
+
+const seqnumInit seqnum = 65500
package rudp
import (
- "encoding/binary"
"errors"
"fmt"
"math"
return nil, ErrChNoTooBig
}
- hdrsize := MtHdrSize
+ hdrSize := MtHdrSize
if !pkt.Unrel {
- hdrsize += RelHdrSize
+ hdrSize += RelHdrSize
}
- if hdrsize+OrigHdrSize+len(pkt.Data) > MaxNetPktSize {
+ if hdrSize+OrigHdrSize+len(pkt.Data) > MaxNetPktSize {
c := &p.chans[pkt.ChNo]
- c.outsplitmu.Lock()
- sn := c.outsplitsn
- c.outsplitsn++
- c.outsplitmu.Unlock()
+ c.outSplitMu.Lock()
+ sn := c.outSplitSN
+ c.outSplitSN++
+ c.outSplitMu.Unlock()
- chunks := split(pkt.Data, MaxNetPktSize-(hdrsize+SplitHdrSize))
+ chunks := split(pkt.Data, MaxNetPktSize-(hdrSize+SplitHdrSize))
if len(chunks) > math.MaxUint16 {
return nil, ErrPktTooBig
for i, chunk := range chunks {
data := make([]byte, SplitHdrSize+len(chunk))
data[0] = uint8(rawTypeSplit)
- binary.BigEndian.PutUint16(data[1:3], uint16(sn))
- binary.BigEndian.PutUint16(data[3:5], uint16(len(chunks)))
- binary.BigEndian.PutUint16(data[5:7], uint16(i))
+ be.PutUint16(data[1:3], uint16(sn))
+ be.PutUint16(data[3:5], uint16(len(chunks)))
+ be.PutUint16(data[5:7], uint16(i))
copy(data[SplitHdrSize:], chunk)
wg.Add(1)
return nil, err
}
if !pkt.Unrel {
- if ack == nil {
- panic("ack is nil")
- }
go func() {
<-ack
wg.Done()
}
data := make([]byte, MtHdrSize+len(pkt.Data))
- binary.BigEndian.PutUint32(data[0:4], protoID)
- binary.BigEndian.PutUint16(data[4:6], uint16(p.idOfPeer))
+ be.PutUint32(data[0:4], protoID)
+ be.PutUint16(data[4:6], uint16(p.idOfPeer))
data[6] = pkt.ChNo
copy(data[MtHdrSize:], pkt.Data)
return nil, ErrPktTooBig
}
- _, err = p.Conn().WriteTo(data, p.Addr())
- if errors.Is(err, net.ErrWriteToConnected) {
- conn, ok := p.Conn().(net.Conn)
- if !ok {
- return nil, err
- }
- _, err = conn.Write(data)
+ if p.conn != nil {
+ _, err = p.conn.Write(data)
+ } else {
+ _, err = p.pc.WriteTo(data, p.Addr())
}
if err != nil {
return nil, err
// sendRel sends a reliable raw packet to the Peer.
func (p *Peer) sendRel(pkt rawPkt) (ack <-chan struct{}, err error) {
if pkt.Unrel {
- panic("mt/rudp: sendRel: pkt.Unrel is true")
+ panic("pkt.Unrel is true")
}
c := &p.chans[pkt.ChNo]
- c.outrelmu.Lock()
- defer c.outrelmu.Unlock()
+ c.outRelMu.Lock()
+ defer c.outRelMu.Unlock()
- sn := c.outrelsn
- for ; sn-c.outrelwin >= 0x8000; c.outrelwin++ {
- if ack, ok := c.ackchans.Load(c.outrelwin); ok {
+ sn := c.outRelSN
+ for ; sn-c.outRelWin >= 0x8000; c.outRelWin++ {
+ if ack, ok := c.ackChans.Load(c.outRelWin); ok {
<-ack.(chan struct{})
}
}
- c.outrelsn++
+ c.outRelSN++
rwack := make(chan struct{}) // close-only
- c.ackchans.Store(sn, rwack)
+ c.ackChans.Store(sn, rwack)
ack = rwack
- reldata := make([]byte, RelHdrSize+len(pkt.Data))
- reldata[0] = uint8(rawTypeRel)
- binary.BigEndian.PutUint16(reldata[1:3], uint16(sn))
- copy(reldata[RelHdrSize:], pkt.Data)
- relpkt := rawPkt{
- Data: reldata,
+ data := make([]byte, RelHdrSize+len(pkt.Data))
+ data[0] = uint8(rawTypeRel)
+ be.PutUint16(data[1:3], uint16(sn))
+ copy(data[RelHdrSize:], pkt.Data)
+ rel := rawPkt{
+ Data: data,
ChNo: pkt.ChNo,
Unrel: true,
}
- if _, err := p.sendRaw(relpkt); err != nil {
- c.ackchans.Delete(sn)
+ if _, err := p.sendRaw(rel); err != nil {
+ c.ackChans.Delete(sn)
return nil, err
}
for {
select {
case <-time.After(500 * time.Millisecond):
- if _, err := p.sendRaw(relpkt); err != nil {
+ if _, err := p.sendRaw(rel); err != nil {
if errors.Is(err, net.ErrClosed) {
return
}