11 // A PktError is an error that occured while processing a packet.
12 type PktError struct {
13 Type string // "net", "raw" or "rel".
18 func (e PktError) Error() string {
19 return "error processing " + e.Type + " pkt: " +
20 hex.EncodeToString(e.Data) + ": " +
24 func (e PktError) Unwrap() error { return e.Err }
26 func (p *Peer) processNetPkts(pkts <-chan netPkt) {
27 for pkt := range pkts {
28 if err := p.processNetPkt(pkt); err != nil {
29 p.errs <- PktError{"net", pkt.Data, err}
36 // A TrailingDataError reports a packet with trailing data,
37 // it doesn't stop a packet from being processed.
38 type TrailingDataError []byte
40 func (e TrailingDataError) Error() string {
41 return "trailing data: " + hex.EncodeToString([]byte(e))
44 func (p *Peer) processNetPkt(pkt netPkt) (err error) {
45 if pkt.SrcAddr.String() != p.Addr().String() {
46 return fmt.Errorf("got pkt from wrong addr: %s", p.Addr().String())
49 if len(pkt.Data) < MtHdrSize {
50 return io.ErrUnexpectedEOF
53 if id := binary.BigEndian.Uint32(pkt.Data[0:4]); id != protoID {
54 return fmt.Errorf("unsupported protocol id: 0x%08x", id)
57 // src PeerID at pkt.Data[4:6]
60 if chno >= ChannelCount {
61 return fmt.Errorf("invalid channel number: %d: >= ChannelCount", chno)
66 p.timeout.Reset(ConnTimeout)
71 Data: pkt.Data[MtHdrSize:],
75 if err := p.processRawPkt(rpkt); err != nil {
76 p.errs <- PktError{"raw", rpkt.Data, err}
82 func (p *Peer) processRawPkt(pkt rawPkt) (err error) {
83 errWrap := func(format string, a ...interface{}) {
85 err = fmt.Errorf(format, append(a, err)...)
89 c := &p.chans[pkt.ChNo]
91 if len(pkt.Data) < 1 {
92 return fmt.Errorf("can't read pkt type: %w", io.ErrUnexpectedEOF)
94 switch t := rawType(pkt.Data[0]); t {
96 defer errWrap("ctl: %w")
98 if len(pkt.Data) < 1+1 {
99 return fmt.Errorf("can't read type: %w", io.ErrUnexpectedEOF)
101 switch ct := ctlType(pkt.Data[1]); ct {
103 defer errWrap("ack: %w")
105 if len(pkt.Data) < 1+1+2 {
106 return io.ErrUnexpectedEOF
109 sn := seqnum(binary.BigEndian.Uint16(pkt.Data[2:4]))
111 if ack, ok := c.ackchans.LoadAndDelete(sn); ok {
112 close(ack.(chan struct{}))
115 if len(pkt.Data) > 1+1+2 {
116 return TrailingDataError(pkt.Data[1+1+2:])
119 defer errWrap("set peer id: %w")
121 if len(pkt.Data) < 1+1+2 {
122 return io.ErrUnexpectedEOF
125 // Ensure no concurrent senders while peer id changes.
127 if p.idOfPeer != PeerIDNil {
128 return errors.New("peer id already set")
131 p.idOfPeer = PeerID(binary.BigEndian.Uint16(pkt.Data[2:4]))
134 if len(pkt.Data) > 1+1+2 {
135 return TrailingDataError(pkt.Data[1+1+2:])
138 defer errWrap("ping: %w")
140 if len(pkt.Data) > 1+1 {
141 return TrailingDataError(pkt.Data[1+1:])
144 defer errWrap("disco: %w")
146 if err := p.Close(); err != nil {
147 return fmt.Errorf("can't close: %w", err)
150 if len(pkt.Data) > 1+1 {
151 return TrailingDataError(pkt.Data[1+1:])
154 return fmt.Errorf("unsupported ctl type: %d", ct)
163 defer errWrap("split: %w")
165 if len(pkt.Data) < 1+2+2+2 {
166 return io.ErrUnexpectedEOF
169 sn := seqnum(binary.BigEndian.Uint16(pkt.Data[1:3]))
170 count := binary.BigEndian.Uint16(pkt.Data[3:5])
171 i := binary.BigEndian.Uint16(pkt.Data[5:7])
177 splitpkts := p.chans[pkt.ChNo].insplit
179 // Delete old incomplete split packets
180 // so new ones don't get corrupted.
181 delete(splitpkts, sn-0x8000)
183 if splitpkts[sn] == nil {
184 splitpkts[sn] = make([][]byte, count)
187 chunks := splitpkts[sn]
189 if int(count) != len(chunks) {
190 return fmt.Errorf("chunk count changed on seqnum: %d", sn)
193 chunks[i] = pkt.Data[7:]
195 for _, chunk := range chunks {
202 for _, chunk := range chunks {
203 data = append(data, chunk...)
212 delete(splitpkts, sn)
214 defer errWrap("rel: %w")
216 if len(pkt.Data) < 1+2 {
217 return io.ErrUnexpectedEOF
220 sn := seqnum(binary.BigEndian.Uint16(pkt.Data[1:3]))
222 ackdata := make([]byte, 1+1+2)
223 ackdata[0] = uint8(rawTypeCtl)
224 ackdata[1] = uint8(ctlAck)
225 binary.BigEndian.PutUint16(ackdata[2:4], uint16(sn))
231 if _, err := p.sendRaw(ack); err != nil {
232 return fmt.Errorf("can't ack %d: %w", sn, err)
235 if sn-c.inrelsn >= 0x8000 {
236 return nil // Already received.
239 c.inrel[sn] = pkt.Data[3:]
241 for ; c.inrel[c.inrelsn] != nil; c.inrelsn++ {
242 data := c.inrel[c.inrelsn]
243 delete(c.inrel, c.inrelsn)
250 if err := p.processRawPkt(rpkt); err != nil {
251 p.errs <- PktError{"rel", rpkt.Data, err}
255 return fmt.Errorf("unsupported pkt type: %d", t)