]> git.lizzy.rs Git - mt.git/commitdiff
rudp: fix errors returned by Peer.Recv other than net.ErrClosed when the Peer is...
authoranon5 <anon5clam@protonmail.com>
Sun, 28 Feb 2021 18:54:41 +0000 (18:54 +0000)
committeranon5 <anon5clam@protonmail.com>
Sun, 28 Feb 2021 18:54:41 +0000 (18:54 +0000)
rudp/listen.go
rudp/peer.go
rudp/process.go
rudp/proxy/proxy.go
rudp/send.go

index 2eda819596798b9e3f7b15e1bba940bb54597dfc..2d702c47913195f0da202c370b222499d4791e2a 100644 (file)
@@ -115,6 +115,9 @@ func (l *Listener) processNetPkt(pkt netPkt) error {
                data[1] = uint8(ctlSetPeerID)
                binary.BigEndian.PutUint16(data[2:4], uint16(clt.ID()))
                if _, err := clt.sendRaw(rawPkt{Data: data}); err != nil {
+                       if errors.Is(err, net.ErrClosed) {
+                               return nil
+                       }
                        return fmt.Errorf("can't set client peer id: %w", err)
                }
 
@@ -145,7 +148,8 @@ func (l *Listener) processNetPkt(pkt netPkt) error {
                select {
                case clt.pkts <- pkt:
                default:
-                       return fmt.Errorf("ignoring net pkt from %s because buf is full", addrstr)
+                       // It's OK to drop packets if the buffer is full
+                       // because MT RUDP can cope with packet loss.
                }
        }
 
index 9dca93d3a878eb3cf727a07e645dd1b5f1f0d2ea..4d8df4765b108a3ff1c5c9ba01e320c4742cd11a 100644 (file)
@@ -1,6 +1,7 @@
 package rudp
 
 import (
+       "errors"
        "fmt"
        "net"
        "sync"
@@ -167,6 +168,9 @@ func (p *Peer) sendPings(ping <-chan time.Time) {
                select {
                case <-ping:
                        if _, err := p.sendRaw(pkt); err != nil {
+                               if errors.Is(err, net.ErrClosed) {
+                                       return
+                               }
                                p.errs <- fmt.Errorf("can't send ping: %w", err)
                        }
                case <-p.Disco():
@@ -176,7 +180,7 @@ func (p *Peer) sendPings(ping <-chan time.Time) {
 }
 
 // Connect connects to the server on conn
-// and closes conn when the Peer disconnects.
+// and closes conn when the returned *Peer disconnects.
 func Connect(conn net.PacketConn, addr net.Addr) *Peer {
        srv := newPeer(conn, addr, PeerIDSrv, PeerIDNil)
 
index 70c04a23f3eb161428b15bbd13da47a3472e0e45..c85aba4edafec01fb008bc174a4c20b4c0e066a7 100644 (file)
@@ -5,6 +5,7 @@ import (
        "errors"
        "fmt"
        "io"
+       "net"
 )
 
 // A PktError is an error that occured while processing a packet.
@@ -140,9 +141,7 @@ func (p *Peer) processRawPkt(pkt rawPkt) (err error) {
                case ctlDisco:
                        defer errWrap("disco: %w")
 
-                       if err := p.Close(); err != nil {
-                               return fmt.Errorf("can't close: %w", err)
-                       }
+                       p.Close()
 
                        if len(pkt.Data) > 1+1 {
                                return TrailingDataError(pkt.Data[1+1:])
@@ -226,6 +225,9 @@ func (p *Peer) processRawPkt(pkt rawPkt) (err error) {
                        Unrel: true,
                }
                if _, err := p.sendRaw(ack); err != nil {
+                       if errors.Is(err, net.ErrClosed) {
+                               return nil
+                       }
                        return fmt.Errorf("can't ack %d: %w", sn, err)
                }
 
index 833260b7db525ea307cf210a251e7cef6ef796a0..a80b4487ef4da3f564f0bc9ef539c8662ec49b80 100644 (file)
@@ -10,12 +10,13 @@ and listen:port is the address to listen on.
 package main
 
 import (
+       "errors"
        "fmt"
        "log"
        "net"
        "os"
 
-       "mt/rudp"
+       "github.com/anon55555/mt/rudp"
 )
 
 func main() {
@@ -61,7 +62,7 @@ func proxy(src, dest *rudp.Peer) {
        for {
                pkt, err := src.Recv()
                if err != nil {
-                       if err == net.ErrClosed {
+                       if errors.Is(err, net.ErrClosed) {
                                msg := src.Addr().String() + " disconnected"
                                if src.TimedOut() {
                                        msg += " (timed out)"
index 2615c593681ed72330c0046068628799240f93b4..ce3f013292a2dbaebc1c3f3b19446f5fd5d2cdd2 100644 (file)
@@ -205,6 +205,9 @@ func (p *Peer) sendRel(pkt rawPkt) (ack <-chan struct{}, err error) {
                        select {
                        case <-time.After(500 * time.Millisecond):
                                if _, err := p.sendRaw(relpkt); err != nil {
+                                       if errors.Is(err, net.ErrClosed) {
+                                               return
+                                       }
                                        p.errs <- fmt.Errorf("failed to re-send timed out reliable seqnum: %d: %w", sn, err)
                                }
                        case <-ack: