10 // A PktError is an error that occured while processing a packet.
11 type PktError struct {
12 Type string // "net", "raw" or "rel".
17 func (e PktError) Error() string {
18 return fmt.Sprintf("error processing %s pkt: %x: %v", e.Type, e.Data, e.Err)
21 func (e PktError) Unwrap() error { return e.Err }
23 func (p *Peer) processNetPkts(pkts <-chan netPkt) {
24 for pkt := range pkts {
25 if err := p.processNetPkt(pkt); err != nil {
26 p.errs <- PktError{"net", pkt.Data, err}
33 // A TrailingDataError reports a packet with trailing data,
34 // it doesn't stop a packet from being processed.
35 type TrailingDataError []byte
37 func (e TrailingDataError) Error() string {
38 return fmt.Sprintf("trailing data: %x", []byte(e))
41 func (p *Peer) processNetPkt(pkt netPkt) (err error) {
42 if pkt.SrcAddr.String() != p.Addr().String() {
43 return fmt.Errorf("got pkt from wrong addr: %s", p.Addr().String())
46 if len(pkt.Data) < MtHdrSize {
47 return io.ErrUnexpectedEOF
50 if id := be.Uint32(pkt.Data[0:4]); id != protoID {
51 return fmt.Errorf("unsupported protocol id: 0x%08x", id)
54 // src PeerID at pkt.Data[4:6]
57 if chno >= ChannelCount {
58 return fmt.Errorf("invalid channel number: %d: >= ChannelCount", chno)
63 p.timeout.Reset(ConnTimeout)
68 Data: pkt.Data[MtHdrSize:],
72 if err := p.processRawPkt(rpkt); err != nil {
73 p.errs <- PktError{"raw", rpkt.Data, err}
79 func (p *Peer) processRawPkt(pkt rawPkt) (err error) {
80 errWrap := func(format string, a ...interface{}) {
82 err = fmt.Errorf(format, append(a, err)...)
86 c := &p.chans[pkt.ChNo]
88 if len(pkt.Data) < 1 {
89 return fmt.Errorf("can't read pkt type: %w", io.ErrUnexpectedEOF)
91 switch t := rawType(pkt.Data[0]); t {
93 defer errWrap("ctl: %w")
95 if len(pkt.Data) < 1+1 {
96 return fmt.Errorf("can't read type: %w", io.ErrUnexpectedEOF)
98 switch ct := ctlType(pkt.Data[1]); ct {
100 defer errWrap("ack: %w")
102 if len(pkt.Data) < 1+1+2 {
103 return io.ErrUnexpectedEOF
106 sn := seqnum(be.Uint16(pkt.Data[2:4]))
108 if ack, ok := c.ackChans.LoadAndDelete(sn); ok {
109 close(ack.(chan struct{}))
112 if len(pkt.Data) > 1+1+2 {
113 return TrailingDataError(pkt.Data[1+1+2:])
116 defer errWrap("set peer id: %w")
118 if len(pkt.Data) < 1+1+2 {
119 return io.ErrUnexpectedEOF
122 // Ensure no concurrent senders while peer id changes.
124 if p.idOfPeer != PeerIDNil {
125 return errors.New("peer id already set")
128 p.idOfPeer = PeerID(be.Uint16(pkt.Data[2:4]))
131 if len(pkt.Data) > 1+1+2 {
132 return TrailingDataError(pkt.Data[1+1+2:])
135 defer errWrap("ping: %w")
137 if len(pkt.Data) > 1+1 {
138 return TrailingDataError(pkt.Data[1+1:])
141 defer errWrap("disco: %w")
145 if len(pkt.Data) > 1+1 {
146 return TrailingDataError(pkt.Data[1+1:])
149 return fmt.Errorf("unsupported ctl type: %d", ct)
158 defer errWrap("split: %w")
160 if len(pkt.Data) < 1+2+2+2 {
161 return io.ErrUnexpectedEOF
164 sn := seqnum(be.Uint16(pkt.Data[1:3]))
165 count := be.Uint16(pkt.Data[3:5])
166 i := be.Uint16(pkt.Data[5:7])
172 splits := p.chans[pkt.ChNo].inSplit
174 // Delete old incomplete split packets
175 // so new ones don't get corrupted.
176 splits[sn-0x8000] = nil
178 if splits[sn] == nil {
179 splits[sn] = &inSplit{chunks: make([][]byte, count)}
184 if int(count) != len(s.chunks) {
185 return fmt.Errorf("chunk count changed on split packet: %d", sn)
188 s.chunks[i] = pkt.Data[7:]
189 s.size += len(s.chunks[i])
192 if s.got == len(s.chunks) {
193 data := make([]byte, 0, s.size)
194 for _, chunk := range s.chunks {
195 data = append(data, chunk...)
207 defer errWrap("rel: %w")
209 if len(pkt.Data) < 1+2 {
210 return io.ErrUnexpectedEOF
213 sn := seqnum(be.Uint16(pkt.Data[1:3]))
215 ack := make([]byte, 1+1+2)
216 ack[0] = uint8(rawTypeCtl)
217 ack[1] = uint8(ctlAck)
218 be.PutUint16(ack[2:4], uint16(sn))
219 if _, err := p.sendRaw(rawPkt{
224 if errors.Is(err, net.ErrClosed) {
227 return fmt.Errorf("can't ack %d: %w", sn, err)
230 if sn-c.inRelSN >= 0x8000 {
231 return nil // Already received.
234 c.inRel[sn] = pkt.Data[3:]
236 for ; c.inRel[c.inRelSN] != nil; c.inRelSN++ {
238 Data: c.inRel[c.inRelSN],
242 c.inRel[c.inRelSN] = nil
244 if err := p.processRawPkt(rpkt); err != nil {
245 p.errs <- PktError{"rel", rpkt.Data, err}
249 return fmt.Errorf("unsupported pkt type: %d", t)