]> git.lizzy.rs Git - mt.git/blob - rudp/conn.go
rudp: partial rewrite with new API supporting io.Readers
[mt.git] / rudp / conn.go
1 package rudp
2
3 import (
4         "net"
5         "sync"
6         "sync/atomic"
7         "time"
8 )
9
10 // A Conn is a connection to a client or server.
11 // All Conn's methods are safe for concurrent use.
12 type Conn struct {
13         udpConn udpConn
14
15         id PeerID
16
17         pkts chan Pkt
18         errs chan error
19
20         timeout *time.Timer
21         ping    *time.Ticker
22
23         closing uint32
24         closed  chan struct{}
25         err     error
26
27         mu       sync.RWMutex
28         remoteID PeerID
29
30         chans [ChannelCount]pktChan // read/write
31 }
32
33 // ID returns the PeerID of the Conn.
34 func (c *Conn) ID() PeerID { return c.id }
35
36 // IsSrv reports whether the Conn is a connection to a server.
37 func (c *Conn) IsSrv() bool { return c.ID() == PeerIDSrv }
38
39 // Closed returns a channel which is closed when the Conn is closed.
40 func (c *Conn) Closed() <-chan struct{} { return c.closed }
41
42 // WhyClosed returns the error that caused the Conn to be closed or nil
43 // if the Conn was closed using the Close method or by the peer.
44 // WhyClosed returns nil if the Conn is not closed.
45 func (c *Conn) WhyClosed() error {
46         select {
47         case <-c.Closed():
48                 return c.err
49         default:
50                 return nil
51         }
52 }
53
54 // LocalAddr returns the local network address.
55 func (c *Conn) LocalAddr() net.Addr { return c.udpConn.LocalAddr() }
56
57 // RemoteAddr returns the remote network address.
58 func (c *Conn) RemoteAddr() net.Addr { return c.udpConn.RemoteAddr() }
59
60 type pktChan struct {
61         // Only accessed by Conn.recvUDPPkts goroutine.
62         inRels  *[0x8000][]byte
63         inRelSN seqnum
64         sendAck func() (<-chan struct{}, error)
65         ackBuf  []byte
66
67         inSplitsMu sync.RWMutex
68         inSplits   map[seqnum]*inSplit
69
70         ackChans sync.Map // map[seqnum]chan struct{}
71
72         outSplitMu sync.Mutex
73         outSplitSN seqnum
74
75         outRelMu  sync.Mutex
76         outRelSN  seqnum
77         outRelWin seqnum
78 }
79
80 type inSplit struct {
81         chunks  [][]byte
82         got     int
83         timeout *time.Timer
84 }
85
86 // Close closes the Conn.
87 // Any blocked Send or Recv calls will return net.ErrClosed.
88 func (c *Conn) Close() error {
89         return c.closeDisco(nil)
90 }
91
92 func (c *Conn) closeDisco(err error) error {
93         c.sendRaw(func(buf []byte) int {
94                 buf[0] = uint8(rawCtl)
95                 buf[1] = uint8(ctlDisco)
96                 return 2
97         }, PktInfo{Unrel: true})()
98
99         return c.close(err)
100 }
101
102 func (c *Conn) close(err error) error {
103         if atomic.SwapUint32(&c.closing, 1) == 1 {
104                 return net.ErrClosed
105         }
106
107         c.timeout.Stop()
108         c.ping.Stop()
109
110         c.err = err
111         defer close(c.closed)
112
113         return c.udpConn.Close()
114 }
115
116 func newConn(uc udpConn, id, remoteID PeerID) *Conn {
117         var c *Conn
118         c = &Conn{
119                 udpConn: uc,
120
121                 id: id,
122
123                 pkts: make(chan Pkt),
124                 errs: make(chan error),
125
126                 timeout: time.AfterFunc(ConnTimeout, func() {
127                         c.closeDisco(ErrTimedOut)
128                 }),
129                 ping: time.NewTicker(PingTimeout),
130
131                 closed: make(chan struct{}),
132
133                 remoteID: remoteID,
134         }
135
136         for i := range c.chans {
137                 c.chans[i] = pktChan{
138                         inRels:  new([0x8000][]byte),
139                         inRelSN: initSeqnum,
140
141                         inSplits: make(map[seqnum]*inSplit),
142
143                         outSplitSN: initSeqnum,
144
145                         outRelSN:  initSeqnum,
146                         outRelWin: initSeqnum,
147                 }
148         }
149
150         c.newAckBuf()
151
152         go c.sendPings(c.ping.C)
153         go c.recvUDPPkts()
154
155         return c
156 }
157
158 func (c *Conn) sendPings(ping <-chan time.Time) {
159         send := c.sendRaw(func(buf []byte) int {
160                 buf[0] = uint8(rawCtl)
161                 buf[1] = uint8(ctlPing)
162                 return 2
163         }, PktInfo{})
164
165         for {
166                 select {
167                 case <-ping:
168                         send()
169                 case <-c.Closed():
170                         return
171                 }
172         }
173 }