14 var ErrPktTooBig = errors.New("can't send pkt: too big")
16 // A TooBigChError reports a Channel greater than or equal to ChannelCount.
17 type TooBigChError Channel
19 func (e TooBigChError) Error() string {
20 return fmt.Sprintf("channel >= ChannelCount (%d): %d", ChannelCount, e)
23 // Send sends a Pkt to the Conn.
24 // Ack is closed when the packet is acknowledged.
25 // Ack is nil if pkt.Unrel is true or err != nil.
26 func (c *Conn) Send(pkt Pkt) (ack <-chan struct{}, err error) {
27 if pkt.Channel >= ChannelCount {
28 return nil, TooBigChError(pkt.Channel)
32 send := c.sendRaw(func(buf []byte) int {
33 buf[0] = uint8(rawOrig)
37 n, err := pkt.Read(buf[nn:])
45 if _, e = pkt.Read(nil); e != nil {
49 pkt.Reader = io.MultiReader(
50 bytes.NewReader([]byte(buf[1:nn])),
66 sends []func() (<-chan struct{}, error)
74 send := c.sendRaw(func(buf []byte) int {
75 buf[0] = uint8(rawSplit)
77 n, err := io.ReadFull(pkt, buf[7:])
78 if err != nil && err != io.ErrUnexpectedEOF {
83 be.PutUint16(buf[5:7], i)
99 sends = append(sends, func() (<-chan struct{}, error) {
100 be.PutUint16(b[1:3], uint16(sn))
101 be.PutUint16(b[3:5], i)
106 ch := &c.chans[pkt.Channel]
111 ch.outSplitMu.Unlock()
113 var wg sync.WaitGroup
115 for _, send := range sends {
130 ack := make(chan struct{})
141 func (c *Conn) sendRaw(read func([]byte) int, pi PktInfo) func() (<-chan struct{}, error) {
143 buf := make([]byte, maxUDPPktSize)
144 be.PutUint32(buf[0:4], protoID)
146 be.PutUint16(buf[4:6], uint16(c.remoteID))
148 buf[6] = uint8(pi.Channel)
149 buf = buf[:7+read(buf[7:])]
151 return func() (<-chan struct{}, error) {
152 if _, err := c.udpConn.Write(buf); err != nil {
154 return nil, net.ErrClosed
157 c.ping.Reset(PingTimeout)
158 if atomic.LoadUint32(&c.closing) == 1 {
168 send := c.sendRaw(func(buf []byte) int {
169 buf[0] = uint8(rawRel)
171 return 3 + read(buf[3:])
174 return func() (<-chan struct{}, error) {
175 ch := &c.chans[pi.Channel]
178 defer ch.outRelMu.Unlock()
181 be.PutUint16(snBuf, uint16(sn))
182 for ; sn-ch.outRelWin >= 0x8000; ch.outRelWin++ {
183 if ack, ok := ch.ackChans.Load(ch.outRelWin); ok {
185 case <-ack.(chan struct{}):
191 ack := make(chan struct{})
192 ch.ackChans.Store(sn, ack)
194 if _, err := send(); err != nil {
195 if ack, ok := ch.ackChans.LoadAndDelete(sn); ok {
196 close(ack.(chan struct{}))
203 t := time.NewTimer(500 * time.Millisecond)
212 t.Reset(500 * time.Millisecond)