]> git.lizzy.rs Git - mt.git/blob - rudp/send.go
c43d0561a1f041a0adab453e3b082636975c0390
[mt.git] / rudp / send.go
1 package rudp
2
3 import (
4         "errors"
5         "fmt"
6         "math"
7         "net"
8         "sync"
9         "time"
10 )
11
12 const (
13         // protoID + src PeerID + channel number
14         MtHdrSize = 4 + 2 + 1
15
16         // rawTypeOrig
17         OrigHdrSize = 1
18
19         // rawTypeSpilt + seqnum + chunk count + chunk number
20         SplitHdrSize = 1 + 2 + 2 + 2
21
22         // rawTypeRel + seqnum
23         RelHdrSize = 1 + 2
24 )
25
26 const (
27         MaxNetPktSize = 512
28
29         MaxUnrelRawPktSize = MaxNetPktSize - MtHdrSize
30         MaxRelRawPktSize   = MaxUnrelRawPktSize - RelHdrSize
31
32         MaxRelPktSize   = (MaxRelRawPktSize - SplitHdrSize) * math.MaxUint16
33         MaxUnrelPktSize = (MaxUnrelRawPktSize - SplitHdrSize) * math.MaxUint16
34 )
35
36 var ErrPktTooBig = errors.New("can't send pkt: too big")
37 var ErrChNoTooBig = errors.New("can't send pkt: channel number >= ChannelCount")
38
39 // Send sends a packet to the Peer.
40 // It returns a channel that's closed when all chunks are acked or an error.
41 // The ack channel is nil if pkt.Unrel is true.
42 func (p *Peer) Send(pkt Pkt) (ack <-chan struct{}, err error) {
43         if pkt.ChNo >= ChannelCount {
44                 return nil, ErrChNoTooBig
45         }
46
47         hdrSize := MtHdrSize
48         if !pkt.Unrel {
49                 hdrSize += RelHdrSize
50         }
51
52         if hdrSize+OrigHdrSize+len(pkt.Data) > MaxNetPktSize {
53                 c := &p.chans[pkt.ChNo]
54
55                 c.outSplitMu.Lock()
56                 sn := c.outSplitSN
57                 c.outSplitSN++
58                 c.outSplitMu.Unlock()
59
60                 chunks := split(pkt.Data, MaxNetPktSize-(hdrSize+SplitHdrSize))
61
62                 if len(chunks) > math.MaxUint16 {
63                         return nil, ErrPktTooBig
64                 }
65
66                 var wg sync.WaitGroup
67
68                 for i, chunk := range chunks {
69                         data := make([]byte, SplitHdrSize+len(chunk))
70                         data[0] = uint8(rawTypeSplit)
71                         be.PutUint16(data[1:3], uint16(sn))
72                         be.PutUint16(data[3:5], uint16(len(chunks)))
73                         be.PutUint16(data[5:7], uint16(i))
74                         copy(data[SplitHdrSize:], chunk)
75
76                         wg.Add(1)
77                         ack, err := p.sendRaw(rawPkt{
78                                 Data:  data,
79                                 ChNo:  pkt.ChNo,
80                                 Unrel: pkt.Unrel,
81                         })
82                         if err != nil {
83                                 return nil, err
84                         }
85                         if !pkt.Unrel {
86                                 go func() {
87                                         <-ack
88                                         wg.Done()
89                                 }()
90                         }
91                 }
92
93                 if pkt.Unrel {
94                         return nil, nil
95                 } else {
96                         ack := make(chan struct{})
97
98                         go func() {
99                                 wg.Wait()
100                                 close(ack)
101                         }()
102
103                         return ack, nil
104                 }
105         }
106
107         return p.sendRaw(rawPkt{
108                 Data:  append([]byte{uint8(rawTypeOrig)}, pkt.Data...),
109                 ChNo:  pkt.ChNo,
110                 Unrel: pkt.Unrel,
111         })
112 }
113
114 // sendRaw sends a raw packet to the Peer.
115 func (p *Peer) sendRaw(pkt rawPkt) (ack <-chan struct{}, err error) {
116         if pkt.ChNo >= ChannelCount {
117                 return nil, ErrChNoTooBig
118         }
119
120         p.mu.RLock()
121         defer p.mu.RUnlock()
122
123         select {
124         case <-p.Disco():
125                 return nil, net.ErrClosed
126         default:
127         }
128
129         if !pkt.Unrel {
130                 return p.sendRel(pkt)
131         }
132
133         data := make([]byte, MtHdrSize+len(pkt.Data))
134         be.PutUint32(data[0:4], protoID)
135         be.PutUint16(data[4:6], uint16(p.idOfPeer))
136         data[6] = pkt.ChNo
137         copy(data[MtHdrSize:], pkt.Data)
138
139         if len(data) > MaxNetPktSize {
140                 return nil, ErrPktTooBig
141         }
142
143         if p.conn != nil {
144                 _, err = p.conn.Write(data)
145         } else {
146                 _, err = p.pc.WriteTo(data, p.Addr())
147         }
148         if err != nil {
149                 return nil, err
150         }
151
152         p.ping.Reset(PingTimeout)
153
154         return nil, nil
155 }
156
157 // sendRel sends a reliable raw packet to the Peer.
158 func (p *Peer) sendRel(pkt rawPkt) (ack <-chan struct{}, err error) {
159         if pkt.Unrel {
160                 panic("pkt.Unrel is true")
161         }
162
163         c := &p.chans[pkt.ChNo]
164
165         c.outRelMu.Lock()
166         defer c.outRelMu.Unlock()
167
168         sn := c.outRelSN
169         for ; sn-c.outRelWin >= 0x8000; c.outRelWin++ {
170                 if ack, ok := c.ackChans.Load(c.outRelWin); ok {
171                         <-ack.(chan struct{})
172                 }
173         }
174         c.outRelSN++
175
176         rwack := make(chan struct{}) // close-only
177         c.ackChans.Store(sn, rwack)
178         ack = rwack
179
180         data := make([]byte, RelHdrSize+len(pkt.Data))
181         data[0] = uint8(rawTypeRel)
182         be.PutUint16(data[1:3], uint16(sn))
183         copy(data[RelHdrSize:], pkt.Data)
184         rel := rawPkt{
185                 Data:  data,
186                 ChNo:  pkt.ChNo,
187                 Unrel: true,
188         }
189
190         if _, err := p.sendRaw(rel); err != nil {
191                 c.ackChans.Delete(sn)
192
193                 return nil, err
194         }
195
196         go func() {
197                 for {
198                         select {
199                         case <-time.After(500 * time.Millisecond):
200                                 if _, err := p.sendRaw(rel); err != nil {
201                                         if errors.Is(err, net.ErrClosed) {
202                                                 return
203                                         }
204                                         p.errs <- fmt.Errorf("failed to re-send timed out reliable seqnum: %d: %w", sn, err)
205                                 }
206                         case <-ack:
207                                 return
208                         case <-p.Disco():
209                                 return
210                         }
211                 }
212         }()
213
214         return ack, nil
215 }
216
217 // SendDisco sends a disconnect packet to the Peer but does not close it.
218 // It returns a channel that's closed when it's acked or an error.
219 // The ack channel is nil if unrel is true.
220 func (p *Peer) SendDisco(chno uint8, unrel bool) (ack <-chan struct{}, err error) {
221         return p.sendRaw(rawPkt{
222                 Data:  []byte{uint8(rawTypeCtl), uint8(ctlDisco)},
223                 ChNo:  chno,
224                 Unrel: unrel,
225         })
226 }
227
228 func split(data []byte, chunksize int) [][]byte {
229         chunks := make([][]byte, 0, (len(data)+chunksize-1)/chunksize)
230
231         for i := 0; i < len(data); i += chunksize {
232                 end := i + chunksize
233                 if end > len(data) {
234                         end = len(data)
235                 }
236
237                 chunks = append(chunks, data[i:end])
238         }
239
240         return chunks
241 }