12 // Recv receives a Pkt from the Conn.
13 func (c *Conn) Recv() (Pkt, error) {
20 return Pkt{}, net.ErrClosed
24 func (c *Conn) gotPkt(pkt Pkt) {
31 func (c *Conn) gotErr(kind string, data []byte, err error) {
33 case c.errs <- fmt.Errorf("%s: %x: %w", kind, data, err):
38 func (c *Conn) recvUDPPkts() {
40 pkt, err := c.udpConn.recvUDP()
46 if err := c.processUDPPkt(pkt); err != nil {
47 c.gotErr("udp", pkt, err)
52 func (c *Conn) processUDPPkt(pkt []byte) error {
54 c.timeout.Reset(ConnTimeout)
58 return io.ErrUnexpectedEOF
61 if id := be.Uint32(pkt[0:4]); id != protoID {
62 return fmt.Errorf("unsupported protocol id: 0x%08x", id)
66 if ch >= ChannelCount {
67 return TooBigChError(ch)
70 if err := c.processRawPkt(pkt[7:], PktInfo{Channel: ch, Unrel: true}); err != nil {
71 c.gotErr("raw", pkt, err)
77 // A TrailingDataError reports trailing data after a packet.
78 type TrailingDataError []byte
80 func (e TrailingDataError) Error() string {
81 return fmt.Sprintf("trailing data: %x", []byte(e))
84 func (c *Conn) processRawPkt(data []byte, pi PktInfo) (err error) {
85 errWrap := func(format string, a ...interface{}) {
87 err = fmt.Errorf(format+": %w", append(a, err)...)
93 switch r := recover(); r {
96 err = io.ErrUnexpectedEOF
103 eat := func(n int) []byte {
112 ch := &c.chans[pi.Channel]
114 switch t := rawType(eat(1)[0]); t {
118 switch ct := ctlType(eat(1)[0]); ct {
122 sn := seqnum(be.Uint16(eat(2)))
124 if ack, ok := ch.ackChans.LoadAndDelete(sn); ok {
125 close(ack.(chan struct{}))
128 defer errWrap("set peer id")
131 if c.remoteID != PeerIDNil {
132 return errors.New("peer id already set")
135 c.remoteID = PeerID(be.Uint16(eat(2)))
140 defer errWrap("ping")
142 defer errWrap("disco")
146 return fmt.Errorf("unsupported ctl type: %d", ct)
150 return TrailingDataError(data[off:])
154 Reader: bytes.NewReader(data[off:]),
158 defer errWrap("split")
160 sn := seqnum(be.Uint16(eat(2)))
161 n := be.Uint16(eat(2))
162 i := be.Uint16(eat(2))
164 defer errWrap("%d", sn)
167 return fmt.Errorf("chunk number (%d) > chunk count (%d)", i, n)
170 ch.inSplitsMu.RLock()
172 ch.inSplitsMu.RUnlock()
175 s = &inSplit{chunks: make([][]byte, n)}
177 s.timeout = time.AfterFunc(ConnTimeout, func() {
179 delete(ch.inSplits, sn)
180 ch.inSplitsMu.Unlock()
186 ch.inSplitsMu.Unlock()
189 if int(n) != len(s.chunks) {
190 return fmt.Errorf("chunk count changed from %d to %d", len(s.chunks), n)
193 if s.chunks[i] == nil {
194 s.chunks[i] = data[off:]
198 if s.got < len(s.chunks) {
199 if s.timeout != nil && s.timeout.Stop() {
200 s.timeout.Reset(ConnTimeout)
205 if s.timeout != nil {
210 delete(ch.inSplits, sn)
211 ch.inSplitsMu.Unlock()
214 Reader: (*net.Buffers)(&s.chunks),
220 sn := seqnum(be.Uint16(eat(2)))
222 defer errWrap("%d", sn)
224 be.PutUint16(ch.ackBuf, uint16(sn))
227 if sn-ch.inRelSN >= 0x8000 {
232 ch.inRels[sn&0x7fff] = data[off:]
234 i := func() seqnum { return ch.inRelSN & 0x7fff }
235 for ; ch.inRels[i()] != nil; ch.inRelSN++ {
236 data := ch.inRels[i()]
238 if err := c.processRawPkt(data, PktInfo{Channel: pi.Channel}); err != nil {
239 c.gotErr("rel", data, err)
243 return fmt.Errorf("unsupported pkt type: %d", t)
249 func (c *Conn) newAckBuf() {
250 for i := range c.chans {
252 ch.sendAck = c.sendRaw(func(buf []byte) int {
253 buf[0] = uint8(rawCtl)
254 buf[1] = uint8(ctlAck)
257 }, PktInfo{Channel: Channel(i), Unrel: true})