11 // A PktError is an error that occured while processing a packet.
12 type PktError struct {
13 Type string // "net", "raw" or "rel".
18 func (e PktError) Error() string {
19 return fmt.Sprintf("error processing %s pkt: %x: %v", e.Type, e.Data, e.Err)
22 func (e PktError) Unwrap() error { return e.Err }
24 func (p *Peer) processNetPkts(pkts <-chan netPkt) {
25 for pkt := range pkts {
26 if err := p.processNetPkt(pkt); err != nil {
27 p.errs <- PktError{"net", pkt.Data, err}
34 // A TrailingDataError reports a packet with trailing data,
35 // it doesn't stop a packet from being processed.
36 type TrailingDataError []byte
38 func (e TrailingDataError) Error() string {
39 return fmt.Sprintf("trailing data: %x", []byte(e))
42 func (p *Peer) processNetPkt(pkt netPkt) (err error) {
43 if pkt.SrcAddr.String() != p.Addr().String() {
44 return fmt.Errorf("got pkt from wrong addr: %s", p.Addr().String())
47 if len(pkt.Data) < MtHdrSize {
48 return io.ErrUnexpectedEOF
51 if id := binary.BigEndian.Uint32(pkt.Data[0:4]); id != protoID {
52 return fmt.Errorf("unsupported protocol id: 0x%08x", id)
55 // src PeerID at pkt.Data[4:6]
58 if chno >= ChannelCount {
59 return fmt.Errorf("invalid channel number: %d: >= ChannelCount", chno)
64 p.timeout.Reset(ConnTimeout)
69 Data: pkt.Data[MtHdrSize:],
73 if err := p.processRawPkt(rpkt); err != nil {
74 p.errs <- PktError{"raw", rpkt.Data, err}
80 func (p *Peer) processRawPkt(pkt rawPkt) (err error) {
81 errWrap := func(format string, a ...interface{}) {
83 err = fmt.Errorf(format, append(a, err)...)
87 c := &p.chans[pkt.ChNo]
89 if len(pkt.Data) < 1 {
90 return fmt.Errorf("can't read pkt type: %w", io.ErrUnexpectedEOF)
92 switch t := rawType(pkt.Data[0]); t {
94 defer errWrap("ctl: %w")
96 if len(pkt.Data) < 1+1 {
97 return fmt.Errorf("can't read type: %w", io.ErrUnexpectedEOF)
99 switch ct := ctlType(pkt.Data[1]); ct {
101 defer errWrap("ack: %w")
103 if len(pkt.Data) < 1+1+2 {
104 return io.ErrUnexpectedEOF
107 sn := seqnum(binary.BigEndian.Uint16(pkt.Data[2:4]))
109 if ack, ok := c.ackchans.LoadAndDelete(sn); ok {
110 close(ack.(chan struct{}))
113 if len(pkt.Data) > 1+1+2 {
114 return TrailingDataError(pkt.Data[1+1+2:])
117 defer errWrap("set peer id: %w")
119 if len(pkt.Data) < 1+1+2 {
120 return io.ErrUnexpectedEOF
123 // Ensure no concurrent senders while peer id changes.
125 if p.idOfPeer != PeerIDNil {
126 return errors.New("peer id already set")
129 p.idOfPeer = PeerID(binary.BigEndian.Uint16(pkt.Data[2:4]))
132 if len(pkt.Data) > 1+1+2 {
133 return TrailingDataError(pkt.Data[1+1+2:])
136 defer errWrap("ping: %w")
138 if len(pkt.Data) > 1+1 {
139 return TrailingDataError(pkt.Data[1+1:])
142 defer errWrap("disco: %w")
146 if len(pkt.Data) > 1+1 {
147 return TrailingDataError(pkt.Data[1+1:])
150 return fmt.Errorf("unsupported ctl type: %d", ct)
159 defer errWrap("split: %w")
161 if len(pkt.Data) < 1+2+2+2 {
162 return io.ErrUnexpectedEOF
165 sn := seqnum(binary.BigEndian.Uint16(pkt.Data[1:3]))
166 count := binary.BigEndian.Uint16(pkt.Data[3:5])
167 i := binary.BigEndian.Uint16(pkt.Data[5:7])
173 splitpkts := p.chans[pkt.ChNo].insplit
175 // Delete old incomplete split packets
176 // so new ones don't get corrupted.
177 delete(splitpkts, sn-0x8000)
179 if splitpkts[sn] == nil {
180 splitpkts[sn] = make([][]byte, count)
183 chunks := splitpkts[sn]
185 if int(count) != len(chunks) {
186 return fmt.Errorf("chunk count changed on seqnum: %d", sn)
189 chunks[i] = pkt.Data[7:]
191 for _, chunk := range chunks {
198 for _, chunk := range chunks {
199 data = append(data, chunk...)
208 delete(splitpkts, sn)
210 defer errWrap("rel: %w")
212 if len(pkt.Data) < 1+2 {
213 return io.ErrUnexpectedEOF
216 sn := seqnum(binary.BigEndian.Uint16(pkt.Data[1:3]))
218 ackdata := make([]byte, 1+1+2)
219 ackdata[0] = uint8(rawTypeCtl)
220 ackdata[1] = uint8(ctlAck)
221 binary.BigEndian.PutUint16(ackdata[2:4], uint16(sn))
227 if _, err := p.sendRaw(ack); err != nil {
228 if errors.Is(err, net.ErrClosed) {
231 return fmt.Errorf("can't ack %d: %w", sn, err)
234 if sn-c.inrelsn >= 0x8000 {
235 return nil // Already received.
238 c.inrel[sn] = pkt.Data[3:]
240 for ; c.inrel[c.inrelsn] != nil; c.inrelsn++ {
241 data := c.inrel[c.inrelsn]
242 delete(c.inrel, c.inrelsn)
249 if err := p.processRawPkt(rpkt); err != nil {
250 p.errs <- PktError{"rel", rpkt.Data, err}
254 return fmt.Errorf("unsupported pkt type: %d", t)