13 // protoID + src PeerID + channel number
19 // rawTypeSpilt + seqnum + chunk count + chunk number
20 SplitHdrSize = 1 + 2 + 2 + 2
22 // rawTypeRel + seqnum
29 MaxUnrelRawPktSize = MaxNetPktSize - MtHdrSize
30 MaxRelRawPktSize = MaxUnrelRawPktSize - RelHdrSize
32 MaxRelPktSize = (MaxRelRawPktSize - SplitHdrSize) * math.MaxUint16
33 MaxUnrelPktSize = (MaxUnrelRawPktSize - SplitHdrSize) * math.MaxUint16
36 var ErrPktTooBig = errors.New("can't send pkt: too big")
37 var ErrChNoTooBig = errors.New("can't send pkt: channel number >= ChannelCount")
39 // Send sends a packet to the Peer.
40 // It returns a channel that's closed when all chunks are acked or an error.
41 // The ack channel is nil if pkt.Unrel is true.
42 func (p *Peer) Send(pkt Pkt) (ack <-chan struct{}, err error) {
43 if pkt.ChNo >= ChannelCount {
44 return nil, ErrChNoTooBig
52 if hdrSize+OrigHdrSize+len(pkt.Data) > MaxNetPktSize {
53 c := &p.chans[pkt.ChNo]
60 chunks := split(pkt.Data, MaxNetPktSize-(hdrSize+SplitHdrSize))
62 if len(chunks) > math.MaxUint16 {
63 return nil, ErrPktTooBig
68 for i, chunk := range chunks {
69 data := make([]byte, SplitHdrSize+len(chunk))
70 data[0] = uint8(rawTypeSplit)
71 be.PutUint16(data[1:3], uint16(sn))
72 be.PutUint16(data[3:5], uint16(len(chunks)))
73 be.PutUint16(data[5:7], uint16(i))
74 copy(data[SplitHdrSize:], chunk)
77 ack, err := p.sendRaw(rawPkt{
96 ack := make(chan struct{})
107 return p.sendRaw(rawPkt{
108 Data: append([]byte{uint8(rawTypeOrig)}, pkt.Data...),
114 // sendRaw sends a raw packet to the Peer.
115 func (p *Peer) sendRaw(pkt rawPkt) (ack <-chan struct{}, err error) {
116 if pkt.ChNo >= ChannelCount {
117 return nil, ErrChNoTooBig
125 return nil, net.ErrClosed
130 return p.sendRel(pkt)
133 data := make([]byte, MtHdrSize+len(pkt.Data))
134 be.PutUint32(data[0:4], protoID)
135 be.PutUint16(data[4:6], uint16(p.idOfPeer))
137 copy(data[MtHdrSize:], pkt.Data)
139 if len(data) > MaxNetPktSize {
140 return nil, ErrPktTooBig
144 _, err = p.conn.Write(data)
146 _, err = p.pc.WriteTo(data, p.Addr())
152 p.ping.Reset(PingTimeout)
157 // sendRel sends a reliable raw packet to the Peer.
158 func (p *Peer) sendRel(pkt rawPkt) (ack <-chan struct{}, err error) {
160 panic("pkt.Unrel is true")
163 c := &p.chans[pkt.ChNo]
166 defer c.outRelMu.Unlock()
169 for ; sn-c.outRelWin >= 0x8000; c.outRelWin++ {
170 if ack, ok := c.ackChans.Load(c.outRelWin); ok {
171 <-ack.(chan struct{})
176 rwack := make(chan struct{}) // close-only
177 c.ackChans.Store(sn, rwack)
180 data := make([]byte, RelHdrSize+len(pkt.Data))
181 data[0] = uint8(rawTypeRel)
182 be.PutUint16(data[1:3], uint16(sn))
183 copy(data[RelHdrSize:], pkt.Data)
190 if _, err := p.sendRaw(rel); err != nil {
191 c.ackChans.Delete(sn)
199 case <-time.After(500 * time.Millisecond):
200 if _, err := p.sendRaw(rel); err != nil {
201 if errors.Is(err, net.ErrClosed) {
204 p.errs <- fmt.Errorf("failed to re-send timed out reliable seqnum: %d: %w", sn, err)
217 // SendDisco sends a disconnect packet to the Peer but does not close it.
218 // It returns a channel that's closed when it's acked or an error.
219 // The ack channel is nil if unrel is true.
220 func (p *Peer) SendDisco(chno uint8, unrel bool) (ack <-chan struct{}, err error) {
221 return p.sendRaw(rawPkt{
222 Data: []byte{uint8(rawTypeCtl), uint8(ctlDisco)},
228 func split(data []byte, chunksize int) [][]byte {
229 chunks := make([][]byte, 0, (len(data)+chunksize-1)/chunksize)
231 for i := 0; i < len(data); i += chunksize {
237 chunks = append(chunks, data[i:end])