]> git.lizzy.rs Git - dragonfireclient.git/blob - src/network/connectionthreads.cpp
Improve protocol-level receiving code (#9617)
[dragonfireclient.git] / src / network / connectionthreads.cpp
1 /*
2 Minetest
3 Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
4 Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 GNU Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 */
20
21 #include "connectionthreads.h"
22 #include "log.h"
23 #include "profiler.h"
24 #include "settings.h"
25 #include "network/networkpacket.h"
26 #include "util/serialize.h"
27
28 namespace con
29 {
30
31 /******************************************************************************/
32 /* defines used for debugging and profiling                                   */
33 /******************************************************************************/
34 #ifdef NDEBUG
35 #define LOG(a) a
36 #define PROFILE(a)
37 #undef DEBUG_CONNECTION_KBPS
38 #else
39 /* this mutex is used to achieve log message consistency */
40 std::mutex log_conthread_mutex;
41 #define LOG(a)                                                                \
42         {                                                                         \
43         MutexAutoLock loglock(log_conthread_mutex);                                 \
44         a;                                                                        \
45         }
46 #define PROFILE(a) a
47 //#define DEBUG_CONNECTION_KBPS
48 #undef DEBUG_CONNECTION_KBPS
49 #endif
50
51 /* maximum number of retries for reliable packets */
52 #define MAX_RELIABLE_RETRY 5
53
54 #define WINDOW_SIZE 5
55
56 static session_t readPeerId(u8 *packetdata)
57 {
58         return readU16(&packetdata[4]);
59 }
60 static u8 readChannel(u8 *packetdata)
61 {
62         return readU8(&packetdata[6]);
63 }
64
65 /******************************************************************************/
66 /* Connection Threads                                                         */
67 /******************************************************************************/
68
69 ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
70         float timeout) :
71         Thread("ConnectionSend"),
72         m_max_packet_size(max_packet_size),
73         m_timeout(timeout),
74         m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
75 {
76         SANITY_CHECK(m_max_data_packets_per_iteration > 1);
77 }
78
79 void *ConnectionSendThread::run()
80 {
81         assert(m_connection);
82
83         LOG(dout_con << m_connection->getDesc()
84                 << "ConnectionSend thread started" << std::endl);
85
86         u64 curtime = porting::getTimeMs();
87         u64 lasttime = curtime;
88
89         PROFILE(std::stringstream ThreadIdentifier);
90         PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
91
92         /* if stop is requested don't stop immediately but try to send all        */
93         /* packets first */
94         while (!stopRequested() || packetsQueued()) {
95                 BEGIN_DEBUG_EXCEPTION_HANDLER
96                 PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
97
98                 m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
99
100                 /* wait for trigger or timeout */
101                 m_send_sleep_semaphore.wait(50);
102
103                 /* remove all triggers */
104                 while (m_send_sleep_semaphore.wait(0)) {
105                 }
106
107                 lasttime = curtime;
108                 curtime = porting::getTimeMs();
109                 float dtime = CALC_DTIME(lasttime, curtime);
110
111                 /* first resend timed-out packets */
112                 runTimeouts(dtime);
113                 if (m_iteration_packets_avaialble == 0) {
114                         LOG(warningstream << m_connection->getDesc()
115                                 << " Packet quota used up after re-sending packets, "
116                                 << "max=" << m_max_data_packets_per_iteration << std::endl);
117                 }
118
119                 /* translate commands to packets */
120                 ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
121                 while (c.type != CONNCMD_NONE) {
122                         if (c.reliable)
123                                 processReliableCommand(c);
124                         else
125                                 processNonReliableCommand(c);
126
127                         c = m_connection->m_command_queue.pop_frontNoEx(0);
128                 }
129
130                 /* send queued packets */
131                 sendPackets(dtime);
132
133                 END_DEBUG_EXCEPTION_HANDLER
134         }
135
136         PROFILE(g_profiler->remove(ThreadIdentifier.str()));
137         return NULL;
138 }
139
140 void ConnectionSendThread::Trigger()
141 {
142         m_send_sleep_semaphore.post();
143 }
144
145 bool ConnectionSendThread::packetsQueued()
146 {
147         std::list<session_t> peerIds = m_connection->getPeerIDs();
148
149         if (!m_outgoing_queue.empty() && !peerIds.empty())
150                 return true;
151
152         for (session_t peerId : peerIds) {
153                 PeerHelper peer = m_connection->getPeerNoEx(peerId);
154
155                 if (!peer)
156                         continue;
157
158                 if (dynamic_cast<UDPPeer *>(&peer) == 0)
159                         continue;
160
161                 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
162                         if (!channel.queued_commands.empty()) {
163                                 return true;
164                         }
165                 }
166         }
167
168
169         return false;
170 }
171
172 void ConnectionSendThread::runTimeouts(float dtime)
173 {
174         std::list<session_t> timeouted_peers;
175         std::list<session_t> peerIds = m_connection->getPeerIDs();
176
177         for (session_t &peerId : peerIds) {
178                 PeerHelper peer = m_connection->getPeerNoEx(peerId);
179
180                 if (!peer)
181                         continue;
182
183                 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
184                 if (!udpPeer)
185                         continue;
186
187                 PROFILE(std::stringstream peerIdentifier);
188                 PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
189                         << ";" << peerId << ";RELIABLE]");
190                 PROFILE(ScopeProfiler
191                 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
192
193                 SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
194
195                 /*
196                         Check peer timeout
197                 */
198                 if (peer->isTimedOut(m_timeout)) {
199                         infostream << m_connection->getDesc()
200                                 << "RunTimeouts(): Peer " << peer->id
201                                 << " has timed out."
202                                 << std::endl;
203                         // Add peer to the list
204                         timeouted_peers.push_back(peer->id);
205                         // Don't bother going through the buffers of this one
206                         continue;
207                 }
208
209                 float resend_timeout = udpPeer->getResendTimeout();
210                 bool retry_count_exceeded = false;
211                 for (Channel &channel : udpPeer->channels) {
212                         std::list<BufferedPacket> timed_outs;
213
214                         // Remove timed out incomplete unreliable split packets
215                         channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
216
217                         // Increment reliable packet times
218                         channel.outgoing_reliables_sent.incrementTimeouts(dtime);
219
220                         unsigned int numpeers = m_connection->m_peers.size();
221
222                         if (numpeers == 0)
223                                 return;
224
225                         // Re-send timed out outgoing reliables
226                         timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
227                                 (m_max_data_packets_per_iteration / numpeers));
228
229                         channel.UpdatePacketLossCounter(timed_outs.size());
230                         g_profiler->graphAdd("packets_lost", timed_outs.size());
231
232                         m_iteration_packets_avaialble -= timed_outs.size();
233
234                         for (std::list<BufferedPacket>::iterator k = timed_outs.begin();
235                                 k != timed_outs.end(); ++k) {
236                                 session_t peer_id = readPeerId(*(k->data));
237                                 u8 channelnum = readChannel(*(k->data));
238                                 u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
239
240                                 channel.UpdateBytesLost(k->data.getSize());
241                                 k->resend_count++;
242
243                                 if (k->resend_count > MAX_RELIABLE_RETRY) {
244                                         retry_count_exceeded = true;
245                                         timeouted_peers.push_back(peer->id);
246                                         /* no need to check additional packets if a single one did timeout*/
247                                         break;
248                                 }
249
250                                 LOG(derr_con << m_connection->getDesc()
251                                         << "RE-SENDING timed-out RELIABLE to "
252                                         << k->address.serializeString()
253                                         << "(t/o=" << resend_timeout << "): "
254                                         << "from_peer_id=" << peer_id
255                                         << ", channel=" << ((int) channelnum & 0xff)
256                                         << ", seqnum=" << seqnum
257                                         << std::endl);
258
259                                 rawSend(*k);
260
261                                 // do not handle rtt here as we can't decide if this packet was
262                                 // lost or really takes more time to transmit
263                         }
264
265                         if (retry_count_exceeded) {
266                                 break; /* no need to check other channels if we already did timeout */
267                         }
268
269                         channel.UpdateTimers(dtime);
270                 }
271
272                 /* skip to next peer if we did timeout */
273                 if (retry_count_exceeded)
274                         continue;
275
276                 /* send ping if necessary */
277                 if (udpPeer->Ping(dtime, data)) {
278                         LOG(dout_con << m_connection->getDesc()
279                                 << "Sending ping for peer_id: " << udpPeer->id << std::endl);
280                         /* this may fail if there ain't a sequence number left */
281                         if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
282                                 //retrigger with reduced ping interval
283                                 udpPeer->Ping(4.0, data);
284                         }
285                 }
286
287                 udpPeer->RunCommandQueues(m_max_packet_size,
288                         m_max_commands_per_iteration,
289                         m_max_packets_requeued);
290         }
291
292         // Remove timed out peers
293         for (u16 timeouted_peer : timeouted_peers) {
294                 LOG(dout_con << m_connection->getDesc()
295                         << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
296                 m_connection->deletePeer(timeouted_peer, true);
297         }
298 }
299
300 void ConnectionSendThread::rawSend(const BufferedPacket &packet)
301 {
302         try {
303                 m_connection->m_udpSocket.Send(packet.address, *packet.data,
304                         packet.data.getSize());
305                 LOG(dout_con << m_connection->getDesc()
306                         << " rawSend: " << packet.data.getSize()
307                         << " bytes sent" << std::endl);
308         } catch (SendFailedException &e) {
309                 LOG(derr_con << m_connection->getDesc()
310                         << "Connection::rawSend(): SendFailedException: "
311                         << packet.address.serializeString() << std::endl);
312         }
313 }
314
315 void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
316 {
317         try {
318                 p.absolute_send_time = porting::getTimeMs();
319                 // Buffer the packet
320                 channel->outgoing_reliables_sent.insert(p,
321                         (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
322                                 % (MAX_RELIABLE_WINDOW_SIZE + 1));
323         }
324         catch (AlreadyExistsException &e) {
325                 LOG(derr_con << m_connection->getDesc()
326                         << "WARNING: Going to send a reliable packet"
327                         << " in outgoing buffer" << std::endl);
328         }
329
330         // Send the packet
331         rawSend(p);
332 }
333
334 bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
335         const SharedBuffer<u8> &data, bool reliable)
336 {
337         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
338         if (!peer) {
339                 LOG(dout_con << m_connection->getDesc()
340                         << " INFO: dropped packet for non existent peer_id: "
341                         << peer_id << std::endl);
342                 FATAL_ERROR_IF(!reliable,
343                         "Trying to send raw packet reliable but no peer found!");
344                 return false;
345         }
346         Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
347
348         if (reliable) {
349                 bool have_sequence_number_for_raw_packet = true;
350                 u16 seqnum =
351                         channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
352
353                 if (!have_sequence_number_for_raw_packet)
354                         return false;
355
356                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
357                 Address peer_address;
358                 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
359
360                 // Add base headers and make a packet
361                 BufferedPacket p = con::makePacket(peer_address, reliable,
362                         m_connection->GetProtocolID(), m_connection->GetPeerID(),
363                         channelnum);
364
365                 // first check if our send window is already maxed out
366                 if (channel->outgoing_reliables_sent.size()
367                         < channel->getWindowSize()) {
368                         LOG(dout_con << m_connection->getDesc()
369                                 << " INFO: sending a reliable packet to peer_id " << peer_id
370                                 << " channel: " << (u32)channelnum
371                                 << " seqnum: " << seqnum << std::endl);
372                         sendAsPacketReliable(p, channel);
373                         return true;
374                 }
375
376                 LOG(dout_con << m_connection->getDesc()
377                         << " INFO: queueing reliable packet for peer_id: " << peer_id
378                         << " channel: " << (u32)channelnum
379                         << " seqnum: " << seqnum << std::endl);
380                 channel->queued_reliables.push(p);
381                 return false;
382         }
383
384         Address peer_address;
385         if (peer->getAddress(MTP_UDP, peer_address)) {
386                 // Add base headers and make a packet
387                 BufferedPacket p = con::makePacket(peer_address, data,
388                         m_connection->GetProtocolID(), m_connection->GetPeerID(),
389                         channelnum);
390
391                 // Send the packet
392                 rawSend(p);
393                 return true;
394         }
395
396         LOG(dout_con << m_connection->getDesc()
397                 << " INFO: dropped unreliable packet for peer_id: " << peer_id
398                 << " because of (yet) missing udp address" << std::endl);
399         return false;
400 }
401
402 void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
403 {
404         assert(c.reliable);  // Pre-condition
405
406         switch (c.type) {
407                 case CONNCMD_NONE:
408                         LOG(dout_con << m_connection->getDesc()
409                                 << "UDP processing reliable CONNCMD_NONE" << std::endl);
410                         return;
411
412                 case CONNCMD_SEND:
413                         LOG(dout_con << m_connection->getDesc()
414                                 << "UDP processing reliable CONNCMD_SEND" << std::endl);
415                         sendReliable(c);
416                         return;
417
418                 case CONNCMD_SEND_TO_ALL:
419                         LOG(dout_con << m_connection->getDesc()
420                                 << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
421                         sendToAllReliable(c);
422                         return;
423
424                 case CONCMD_CREATE_PEER:
425                         LOG(dout_con << m_connection->getDesc()
426                                 << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
427                         if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
428                                 /* put to queue if we couldn't send it immediately */
429                                 sendReliable(c);
430                         }
431                         return;
432
433                 case CONNCMD_SERVE:
434                 case CONNCMD_CONNECT:
435                 case CONNCMD_DISCONNECT:
436                 case CONCMD_ACK:
437                         FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
438                 default:
439                         LOG(dout_con << m_connection->getDesc()
440                                 << " Invalid reliable command type: " << c.type << std::endl);
441         }
442 }
443
444
445 void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
446 {
447         assert(!c.reliable); // Pre-condition
448
449         switch (c.type) {
450                 case CONNCMD_NONE:
451                         LOG(dout_con << m_connection->getDesc()
452                                 << " UDP processing CONNCMD_NONE" << std::endl);
453                         return;
454                 case CONNCMD_SERVE:
455                         LOG(dout_con << m_connection->getDesc()
456                                 << " UDP processing CONNCMD_SERVE port="
457                                 << c.address.serializeString() << std::endl);
458                         serve(c.address);
459                         return;
460                 case CONNCMD_CONNECT:
461                         LOG(dout_con << m_connection->getDesc()
462                                 << " UDP processing CONNCMD_CONNECT" << std::endl);
463                         connect(c.address);
464                         return;
465                 case CONNCMD_DISCONNECT:
466                         LOG(dout_con << m_connection->getDesc()
467                                 << " UDP processing CONNCMD_DISCONNECT" << std::endl);
468                         disconnect();
469                         return;
470                 case CONNCMD_DISCONNECT_PEER:
471                         LOG(dout_con << m_connection->getDesc()
472                                 << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
473                         disconnect_peer(c.peer_id);
474                         return;
475                 case CONNCMD_SEND:
476                         LOG(dout_con << m_connection->getDesc()
477                                 << " UDP processing CONNCMD_SEND" << std::endl);
478                         send(c.peer_id, c.channelnum, c.data);
479                         return;
480                 case CONNCMD_SEND_TO_ALL:
481                         LOG(dout_con << m_connection->getDesc()
482                                 << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
483                         sendToAll(c.channelnum, c.data);
484                         return;
485                 case CONCMD_ACK:
486                         LOG(dout_con << m_connection->getDesc()
487                                 << " UDP processing CONCMD_ACK" << std::endl);
488                         sendAsPacket(c.peer_id, c.channelnum, c.data, true);
489                         return;
490                 case CONCMD_CREATE_PEER:
491                         FATAL_ERROR("Got command that should be reliable as unreliable command");
492                 default:
493                         LOG(dout_con << m_connection->getDesc()
494                                 << " Invalid command type: " << c.type << std::endl);
495         }
496 }
497
498 void ConnectionSendThread::serve(Address bind_address)
499 {
500         LOG(dout_con << m_connection->getDesc()
501                 << "UDP serving at port " << bind_address.serializeString() << std::endl);
502         try {
503                 m_connection->m_udpSocket.Bind(bind_address);
504                 m_connection->SetPeerID(PEER_ID_SERVER);
505         }
506         catch (SocketException &e) {
507                 // Create event
508                 ConnectionEvent ce;
509                 ce.bindFailed();
510                 m_connection->putEvent(ce);
511         }
512 }
513
514 void ConnectionSendThread::connect(Address address)
515 {
516         LOG(dout_con << m_connection->getDesc() << " connecting to "
517                 << address.serializeString()
518                 << ":" << address.getPort() << std::endl);
519
520         UDPPeer *peer = m_connection->createServerPeer(address);
521
522         // Create event
523         ConnectionEvent e;
524         e.peerAdded(peer->id, peer->address);
525         m_connection->putEvent(e);
526
527         Address bind_addr;
528
529         if (address.isIPv6())
530                 bind_addr.setAddress((IPv6AddressBytes *) NULL);
531         else
532                 bind_addr.setAddress(0, 0, 0, 0);
533
534         m_connection->m_udpSocket.Bind(bind_addr);
535
536         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
537         m_connection->SetPeerID(PEER_ID_INEXISTENT);
538         NetworkPacket pkt(0, 0);
539         m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
540 }
541
542 void ConnectionSendThread::disconnect()
543 {
544         LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
545
546         // Create and send DISCO packet
547         SharedBuffer<u8> data(2);
548         writeU8(&data[0], PACKET_TYPE_CONTROL);
549         writeU8(&data[1], CONTROLTYPE_DISCO);
550
551
552         // Send to all
553         std::list<session_t> peerids = m_connection->getPeerIDs();
554
555         for (session_t peerid : peerids) {
556                 sendAsPacket(peerid, 0, data, false);
557         }
558 }
559
560 void ConnectionSendThread::disconnect_peer(session_t peer_id)
561 {
562         LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
563
564         // Create and send DISCO packet
565         SharedBuffer<u8> data(2);
566         writeU8(&data[0], PACKET_TYPE_CONTROL);
567         writeU8(&data[1], CONTROLTYPE_DISCO);
568         sendAsPacket(peer_id, 0, data, false);
569
570         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
571
572         if (!peer)
573                 return;
574
575         if (dynamic_cast<UDPPeer *>(&peer) == 0) {
576                 return;
577         }
578
579         dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
580 }
581
582 void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
583         const SharedBuffer<u8> &data)
584 {
585         assert(channelnum < CHANNEL_COUNT); // Pre-condition
586
587         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
588         if (!peer) {
589                 LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
590                         << ">>>NOT<<< found on sending packet"
591                         << ", channel " << (channelnum % 0xFF)
592                         << ", size: " << data.getSize() << std::endl);
593                 return;
594         }
595
596         LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
597                 << ", channel " << (channelnum % 0xFF)
598                 << ", size: " << data.getSize() << std::endl);
599
600         u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
601
602         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
603         std::list<SharedBuffer<u8>> originals;
604
605         makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
606
607         peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
608
609         for (const SharedBuffer<u8> &original : originals) {
610                 sendAsPacket(peer_id, channelnum, original);
611         }
612 }
613
614 void ConnectionSendThread::sendReliable(ConnectionCommand &c)
615 {
616         PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
617         if (!peer)
618                 return;
619
620         peer->PutReliableSendCommand(c, m_max_packet_size);
621 }
622
623 void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data)
624 {
625         std::list<session_t> peerids = m_connection->getPeerIDs();
626
627         for (session_t peerid : peerids) {
628                 send(peerid, channelnum, data);
629         }
630 }
631
632 void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
633 {
634         std::list<session_t> peerids = m_connection->getPeerIDs();
635
636         for (session_t peerid : peerids) {
637                 PeerHelper peer = m_connection->getPeerNoEx(peerid);
638
639                 if (!peer)
640                         continue;
641
642                 peer->PutReliableSendCommand(c, m_max_packet_size);
643         }
644 }
645
646 void ConnectionSendThread::sendPackets(float dtime)
647 {
648         std::list<session_t> peerIds = m_connection->getPeerIDs();
649         std::list<session_t> pendingDisconnect;
650         std::map<session_t, bool> pending_unreliable;
651
652         const unsigned int peer_packet_quota = m_iteration_packets_avaialble
653                 / MYMAX(peerIds.size(), 1);
654
655         for (session_t peerId : peerIds) {
656                 PeerHelper peer = m_connection->getPeerNoEx(peerId);
657                 //peer may have been removed
658                 if (!peer) {
659                         LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
660                                 << peerId
661                                 << std::endl);
662                         continue;
663                 }
664                 peer->m_increment_packets_remaining = peer_packet_quota;
665
666                 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
667
668                 if (!udpPeer) {
669                         continue;
670                 }
671
672                 if (udpPeer->m_pending_disconnect) {
673                         pendingDisconnect.push_back(peerId);
674                 }
675
676                 PROFILE(std::stringstream
677                 peerIdentifier);
678                 PROFILE(
679                         peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
680                                 << ";RELIABLE]");
681                 PROFILE(ScopeProfiler
682                 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
683
684                 LOG(dout_con << m_connection->getDesc()
685                         << " Handle per peer queues: peer_id=" << peerId
686                         << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
687
688                 // first send queued reliable packets for all peers (if possible)
689                 for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
690                         Channel &channel = udpPeer->channels[i];
691                         u16 next_to_ack = 0;
692
693                         channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
694                         u16 next_to_receive = 0;
695                         channel.incoming_reliables.getFirstSeqnum(next_to_receive);
696
697                         LOG(dout_con << m_connection->getDesc() << "\t channel: "
698                                 << i << ", peer quota:"
699                                 << peer->m_increment_packets_remaining
700                                 << std::endl
701                                 << "\t\t\treliables on wire: "
702                                 << channel.outgoing_reliables_sent.size()
703                                 << ", waiting for ack for " << next_to_ack
704                                 << std::endl
705                                 << "\t\t\tincoming_reliables: "
706                                 << channel.incoming_reliables.size()
707                                 << ", next reliable packet: "
708                                 << channel.readNextIncomingSeqNum()
709                                 << ", next queued: " << next_to_receive
710                                 << std::endl
711                                 << "\t\t\treliables queued : "
712                                 << channel.queued_reliables.size()
713                                 << std::endl
714                                 << "\t\t\tqueued commands  : "
715                                 << channel.queued_commands.size()
716                                 << std::endl);
717
718                         while (!channel.queued_reliables.empty() &&
719                                         channel.outgoing_reliables_sent.size()
720                                         < channel.getWindowSize() &&
721                                         peer->m_increment_packets_remaining > 0) {
722                                 BufferedPacket p = channel.queued_reliables.front();
723                                 channel.queued_reliables.pop();
724                                 LOG(dout_con << m_connection->getDesc()
725                                         << " INFO: sending a queued reliable packet "
726                                         << " channel: " << i
727                                         << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
728                                         << std::endl);
729                                 sendAsPacketReliable(p, &channel);
730                                 peer->m_increment_packets_remaining--;
731                         }
732                 }
733         }
734
735         if (!m_outgoing_queue.empty()) {
736                 LOG(dout_con << m_connection->getDesc()
737                         << " Handle non reliable queue ("
738                         << m_outgoing_queue.size() << " pkts)" << std::endl);
739         }
740
741         unsigned int initial_queuesize = m_outgoing_queue.size();
742         /* send non reliable packets*/
743         for (unsigned int i = 0; i < initial_queuesize; i++) {
744                 OutgoingPacket packet = m_outgoing_queue.front();
745                 m_outgoing_queue.pop();
746
747                 if (packet.reliable)
748                         continue;
749
750                 PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
751                 if (!peer) {
752                         LOG(dout_con << m_connection->getDesc()
753                                 << " Outgoing queue: peer_id=" << packet.peer_id
754                                 << ">>>NOT<<< found on sending packet"
755                                 << ", channel " << (packet.channelnum % 0xFF)
756                                 << ", size: " << packet.data.getSize() << std::endl);
757                         continue;
758                 }
759
760                 /* send acks immediately */
761                 if (packet.ack || peer->m_increment_packets_remaining > 0 || stopRequested()) {
762                         rawSendAsPacket(packet.peer_id, packet.channelnum,
763                                 packet.data, packet.reliable);
764                         if (peer->m_increment_packets_remaining > 0)
765                                 peer->m_increment_packets_remaining--;
766                 } else {
767                         m_outgoing_queue.push(packet);
768                         pending_unreliable[packet.peer_id] = true;
769                 }
770         }
771
772         if (peer_packet_quota > 0) {
773                 for (session_t peerId : peerIds) {
774                         PeerHelper peer = m_connection->getPeerNoEx(peerId);
775                         if (!peer)
776                                 continue;
777                         if (peer->m_increment_packets_remaining == 0) {
778                                 LOG(warningstream << m_connection->getDesc()
779                                         << " Packet quota used up for peer_id=" << peerId
780                                         << ", was " << peer_packet_quota << " pkts" << std::endl);
781                         }
782                 }
783         }
784
785         for (session_t peerId : pendingDisconnect) {
786                 if (!pending_unreliable[peerId]) {
787                         m_connection->deletePeer(peerId, false);
788                 }
789         }
790 }
791
792 void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
793         const SharedBuffer<u8> &data, bool ack)
794 {
795         OutgoingPacket packet(peer_id, channelnum, data, false, ack);
796         m_outgoing_queue.push(packet);
797 }
798
799 ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
800         Thread("ConnectionReceive")
801 {
802 }
803
804 void *ConnectionReceiveThread::run()
805 {
806         assert(m_connection);
807
808         LOG(dout_con << m_connection->getDesc()
809                 << "ConnectionReceive thread started" << std::endl);
810
811         PROFILE(std::stringstream
812         ThreadIdentifier);
813         PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
814
815         // use IPv6 minimum allowed MTU as receive buffer size as this is
816         // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
817         // infrastructure
818         const unsigned int packet_maxsize = 1500;
819         SharedBuffer<u8> packetdata(packet_maxsize);
820
821         bool packet_queued = true;
822
823 #ifdef DEBUG_CONNECTION_KBPS
824         u64 curtime = porting::getTimeMs();
825         u64 lasttime = curtime;
826         float debug_print_timer = 0.0;
827 #endif
828
829         while (!stopRequested()) {
830                 BEGIN_DEBUG_EXCEPTION_HANDLER
831                 PROFILE(ScopeProfiler
832                 sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
833
834 #ifdef DEBUG_CONNECTION_KBPS
835                 lasttime = curtime;
836                 curtime = porting::getTimeMs();
837                 float dtime = CALC_DTIME(lasttime,curtime);
838 #endif
839
840                 /* receive packets */
841                 receive(packetdata, packet_queued);
842
843 #ifdef DEBUG_CONNECTION_KBPS
844                 debug_print_timer += dtime;
845                 if (debug_print_timer > 20.0) {
846                         debug_print_timer -= 20.0;
847
848                         std::list<session_t> peerids = m_connection->getPeerIDs();
849
850                         for (std::list<session_t>::iterator i = peerids.begin();
851                                         i != peerids.end();
852                                         i++)
853                         {
854                                 PeerHelper peer = m_connection->getPeerNoEx(*i);
855                                 if (!peer)
856                                         continue;
857
858                                 float peer_current = 0.0;
859                                 float peer_loss = 0.0;
860                                 float avg_rate = 0.0;
861                                 float avg_loss = 0.0;
862
863                                 for(u16 j=0; j<CHANNEL_COUNT; j++)
864                                 {
865                                         peer_current +=peer->channels[j].getCurrentDownloadRateKB();
866                                         peer_loss += peer->channels[j].getCurrentLossRateKB();
867                                         avg_rate += peer->channels[j].getAvgDownloadRateKB();
868                                         avg_loss += peer->channels[j].getAvgLossRateKB();
869                                 }
870
871                                 std::stringstream output;
872                                 output << std::fixed << std::setprecision(1);
873                                 output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
874                                 output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
875                                 output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
876                                 output << std::setfill(' ');
877                                 for(u16 j=0; j<CHANNEL_COUNT; j++)
878                                 {
879                                         output << "\tcha " << j << ":"
880                                                 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
881                                                 << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
882                                                 << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
883                                                 << " /"
884                                                 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
885                                                 << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
886                                                 << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
887                                                 << " / WS: " << peer->channels[j].getWindowSize()
888                                                 << std::endl;
889                                 }
890
891                                 fprintf(stderr,"%s\n",output.str().c_str());
892                         }
893                 }
894 #endif
895                 END_DEBUG_EXCEPTION_HANDLER
896         }
897
898         PROFILE(g_profiler->remove(ThreadIdentifier.str()));
899         return NULL;
900 }
901
902 // Receive packets from the network and buffers and create ConnectionEvents
903 void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
904                 bool &packet_queued)
905 {
906         try {
907                 // First, see if there any buffered packets we can process now
908                 if (packet_queued) {
909                         bool data_left = true;
910                         session_t peer_id;
911                         SharedBuffer<u8> resultdata;
912                         while (data_left) {
913                                 try {
914                                         data_left = getFromBuffers(peer_id, resultdata);
915                                         if (data_left) {
916                                                 ConnectionEvent e;
917                                                 e.dataReceived(peer_id, resultdata);
918                                                 m_connection->putEvent(e);
919                                         }
920                                 }
921                                 catch (ProcessedSilentlyException &e) {
922                                         /* try reading again */
923                                 }
924                         }
925                         packet_queued = false;
926                 }
927
928                 // Call Receive() to wait for incoming data
929                 Address sender;
930                 s32 received_size = m_connection->m_udpSocket.Receive(sender,
931                         *packetdata, packetdata.getSize());
932                 if (received_size < 0)
933                         return;
934
935                 if ((received_size < BASE_HEADER_SIZE) ||
936                         (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
937                         LOG(derr_con << m_connection->getDesc()
938                                 << "Receive(): Invalid incoming packet, "
939                                 << "size: " << received_size
940                                 << ", protocol: "
941                                 << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
942                                 << std::endl);
943                         return;
944                 }
945
946                 session_t peer_id = readPeerId(*packetdata);
947                 u8 channelnum = readChannel(*packetdata);
948
949                 if (channelnum > CHANNEL_COUNT - 1) {
950                         LOG(derr_con << m_connection->getDesc()
951                                 << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
952                         return;
953                 }
954
955                 /* Try to identify peer by sender address (may happen on join) */
956                 if (peer_id == PEER_ID_INEXISTENT) {
957                         peer_id = m_connection->lookupPeer(sender);
958                         // We do not have to remind the peer of its
959                         // peer id as the CONTROLTYPE_SET_PEER_ID
960                         // command was sent reliably.
961                 }
962
963                 /* The peer was not found in our lists. Add it. */
964                 if (peer_id == PEER_ID_INEXISTENT) {
965                         peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
966                 }
967
968                 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
969                 if (!peer) {
970                         LOG(dout_con << m_connection->getDesc()
971                                 << " got packet from unknown peer_id: "
972                                 << peer_id << " Ignoring." << std::endl);
973                         return;
974                 }
975
976                 // Validate peer address
977
978                 Address peer_address;
979                 if (peer->getAddress(MTP_UDP, peer_address)) {
980                         if (peer_address != sender) {
981                                 LOG(derr_con << m_connection->getDesc()
982                                         << " Peer " << peer_id << " sending from different address."
983                                         " Ignoring." << std::endl);
984                                 return;
985                         }
986                 } else {
987                         LOG(derr_con << m_connection->getDesc()
988                                 << " Peer " << peer_id << " doesn't have an address?!"
989                                 " Ignoring." << std::endl);
990                         return;
991                 }
992
993                 peer->ResetTimeout();
994
995                 Channel *channel = nullptr;
996                 if (dynamic_cast<UDPPeer *>(&peer)) {
997                         channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum];
998                 } else {
999                         LOG(derr_con << m_connection->getDesc()
1000                                 << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
1001                                 " Ignoring." << std::endl);
1002                         return;
1003                 }
1004
1005                 channel->UpdateBytesReceived(received_size);
1006
1007                 // Throw the received packet to channel->processPacket()
1008
1009                 // Make a new SharedBuffer from the data without the base headers
1010                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1011                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1012                         strippeddata.getSize());
1013
1014                 try {
1015                         // Process it (the result is some data with no headers made by us)
1016                         SharedBuffer<u8> resultdata = processPacket
1017                                 (channel, strippeddata, peer_id, channelnum, false);
1018
1019                         LOG(dout_con << m_connection->getDesc()
1020                                 << " ProcessPacket from peer_id: " << peer_id
1021                                 << ", channel: " << (u32)channelnum << ", returned "
1022                                 << resultdata.getSize() << " bytes" << std::endl);
1023
1024                         ConnectionEvent e;
1025                         e.dataReceived(peer_id, resultdata);
1026                         m_connection->putEvent(e);
1027                 }
1028                 catch (ProcessedSilentlyException &e) {
1029                 }
1030                 catch (ProcessedQueued &e) {
1031                         // we set it to true anyway (see below)
1032                 }
1033
1034                 /* Every time we receive a packet it can happen that a previously
1035                  * buffered packet is now ready to process. */
1036                 packet_queued = true;
1037         }
1038         catch (InvalidIncomingDataException &e) {
1039         }
1040 }
1041
1042 bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8> &dst)
1043 {
1044         std::list<session_t> peerids = m_connection->getPeerIDs();
1045
1046         for (session_t peerid : peerids) {
1047                 PeerHelper peer = m_connection->getPeerNoEx(peerid);
1048                 if (!peer)
1049                         continue;
1050
1051                 if (dynamic_cast<UDPPeer *>(&peer) == 0)
1052                         continue;
1053
1054                 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
1055                         if (checkIncomingBuffers(&channel, peer_id, dst)) {
1056                                 return true;
1057                         }
1058                 }
1059         }
1060         return false;
1061 }
1062
1063 bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
1064         session_t &peer_id, SharedBuffer<u8> &dst)
1065 {
1066         u16 firstseqnum = 0;
1067         if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
1068                 if (firstseqnum == channel->readNextIncomingSeqNum()) {
1069                         BufferedPacket p = channel->incoming_reliables.popFirst();
1070                         peer_id = readPeerId(*p.data);
1071                         u8 channelnum = readChannel(*p.data);
1072                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
1073
1074                         LOG(dout_con << m_connection->getDesc()
1075                                 << "UNBUFFERING TYPE_RELIABLE"
1076                                 << " seqnum=" << seqnum
1077                                 << " peer_id=" << peer_id
1078                                 << " channel=" << ((int) channelnum & 0xff)
1079                                 << std::endl);
1080
1081                         channel->incNextIncomingSeqNum();
1082
1083                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1084                         // Get out the inside packet and re-process it
1085                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1086                         memcpy(*payload, &p.data[headers_size], payload.getSize());
1087
1088                         dst = processPacket(channel, payload, peer_id, channelnum, true);
1089                         return true;
1090                 }
1091         }
1092         return false;
1093 }
1094
1095 SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
1096         const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum, bool reliable)
1097 {
1098         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
1099
1100         if (!peer) {
1101                 errorstream << "Peer not found (possible timeout)" << std::endl;
1102                 throw ProcessedSilentlyException("Peer not found (possible timeout)");
1103         }
1104
1105         if (packetdata.getSize() < 1)
1106                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1107
1108         u8 type = readU8(&(packetdata[0]));
1109
1110         if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
1111                 std::string errmsg = "Invalid peer_id=" + itos(peer_id);
1112                 errorstream << errmsg << std::endl;
1113                 throw InvalidIncomingDataException(errmsg.c_str());
1114         }
1115
1116         if (type >= PACKET_TYPE_MAX) {
1117                 derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
1118                         << std::endl;
1119                 throw InvalidIncomingDataException("Invalid packet type");
1120         }
1121
1122         const PacketTypeHandler &pHandle = packetTypeRouter[type];
1123         return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
1124 }
1125
1126 const ConnectionReceiveThread::PacketTypeHandler
1127         ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
1128         {&ConnectionReceiveThread::handlePacketType_Control},
1129         {&ConnectionReceiveThread::handlePacketType_Original},
1130         {&ConnectionReceiveThread::handlePacketType_Split},
1131         {&ConnectionReceiveThread::handlePacketType_Reliable},
1132 };
1133
1134 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
1135         const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1136 {
1137         if (packetdata.getSize() < 2)
1138                 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1139
1140         u8 controltype = readU8(&(packetdata[1]));
1141
1142         if (controltype == CONTROLTYPE_ACK) {
1143                 assert(channel != NULL);
1144
1145                 if (packetdata.getSize() < 4) {
1146                         throw InvalidIncomingDataException(
1147                                 "packetdata.getSize() < 4 (ACK header size)");
1148                 }
1149
1150                 u16 seqnum = readU16(&packetdata[2]);
1151                 LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
1152                         << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
1153                         << seqnum << " ]" << std::endl);
1154
1155                 try {
1156                         BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
1157
1158                         // only calculate rtt from straight sent packets
1159                         if (p.resend_count == 0) {
1160                                 // Get round trip time
1161                                 u64 current_time = porting::getTimeMs();
1162
1163                                 // a overflow is quite unlikely but as it'd result in major
1164                                 // rtt miscalculation we handle it here
1165                                 if (current_time > p.absolute_send_time) {
1166                                         float rtt = (current_time - p.absolute_send_time) / 1000.0;
1167
1168                                         // Let peer calculate stuff according to it
1169                                         // (avg_rtt and resend_timeout)
1170                                         dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1171                                 } else if (p.totaltime > 0) {
1172                                         float rtt = p.totaltime;
1173
1174                                         // Let peer calculate stuff according to it
1175                                         // (avg_rtt and resend_timeout)
1176                                         dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1177                                 }
1178                         }
1179                         // put bytes for max bandwidth calculation
1180                         channel->UpdateBytesSent(p.data.getSize(), 1);
1181                         if (channel->outgoing_reliables_sent.size() == 0)
1182                                 m_connection->TriggerSend();
1183                 } catch (NotFoundException &e) {
1184                         LOG(derr_con << m_connection->getDesc()
1185                                 << "WARNING: ACKed packet not in outgoing queue"
1186                                 << " seqnum=" << seqnum << std::endl);
1187                         channel->UpdatePacketTooLateCounter();
1188                 }
1189
1190                 throw ProcessedSilentlyException("Got an ACK");
1191         } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
1192                 // Got a packet to set our peer id
1193                 if (packetdata.getSize() < 4)
1194                         throw InvalidIncomingDataException
1195                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1196                 session_t peer_id_new = readU16(&packetdata[2]);
1197                 LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
1198                         << "... " << std::endl);
1199
1200                 if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
1201                         LOG(derr_con << m_connection->getDesc()
1202                                 << "WARNING: Not changing existing peer id." << std::endl);
1203                 } else {
1204                         LOG(dout_con << m_connection->getDesc() << "changing own peer id"
1205                                 << std::endl);
1206                         m_connection->SetPeerID(peer_id_new);
1207                 }
1208
1209                 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1210         } else if (controltype == CONTROLTYPE_PING) {
1211                 // Just ignore it, the incoming data already reset
1212                 // the timeout counter
1213                 LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
1214                 throw ProcessedSilentlyException("Got a PING");
1215         } else if (controltype == CONTROLTYPE_DISCO) {
1216                 // Just ignore it, the incoming data already reset
1217                 // the timeout counter
1218                 LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
1219                         << peer->id << std::endl);
1220
1221                 if (!m_connection->deletePeer(peer->id, false)) {
1222                         derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
1223                 }
1224
1225                 throw ProcessedSilentlyException("Got a DISCO");
1226         } else {
1227                 LOG(derr_con << m_connection->getDesc()
1228                         << "INVALID TYPE_CONTROL: invalid controltype="
1229                         << ((int) controltype & 0xff) << std::endl);
1230                 throw InvalidIncomingDataException("Invalid control type");
1231         }
1232 }
1233
1234 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
1235         const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1236 {
1237         if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
1238                 throw InvalidIncomingDataException
1239                         ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
1240         LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
1241                 << std::endl);
1242         // Get the inside packet out and return it
1243         SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1244         memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
1245         return payload;
1246 }
1247
1248 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
1249         const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1250 {
1251         Address peer_address;
1252
1253         if (peer->getAddress(MTP_UDP, peer_address)) {
1254                 // We have to create a packet again for buffering
1255                 // This isn't actually too bad an idea.
1256                 BufferedPacket packet = makePacket(peer_address,
1257                         packetdata,
1258                         m_connection->GetProtocolID(),
1259                         peer->id,
1260                         channelnum);
1261
1262                 // Buffer the packet
1263                 SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
1264
1265                 if (data.getSize() != 0) {
1266                         LOG(dout_con << m_connection->getDesc()
1267                                 << "RETURNING TYPE_SPLIT: Constructed full data, "
1268                                 << "size=" << data.getSize() << std::endl);
1269                         return data;
1270                 }
1271                 LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
1272                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1273         }
1274
1275         // We should never get here.
1276         FATAL_ERROR("Invalid execution point");
1277 }
1278
1279 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
1280         const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1281 {
1282         assert(channel != NULL);
1283
1284         // Recursive reliable packets not allowed
1285         if (reliable)
1286                 throw InvalidIncomingDataException("Found nested reliable packets");
1287
1288         if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
1289                 throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1290
1291         u16 seqnum = readU16(&packetdata[1]);
1292         bool is_future_packet = false;
1293         bool is_old_packet = false;
1294
1295         /* packet is within our receive window send ack */
1296         if (seqnum_in_window(seqnum,
1297                 channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
1298                 m_connection->sendAck(peer->id, channelnum, seqnum);
1299         } else {
1300                 is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
1301                 is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
1302
1303                 /* packet is not within receive window, don't send ack.           *
1304                  * if this was a valid packet it's gonna be retransmitted         */
1305                 if (is_future_packet)
1306                         throw ProcessedSilentlyException(
1307                                 "Received packet newer then expected, not sending ack");
1308
1309                 /* seems like our ack was lost, send another one for a old packet */
1310                 if (is_old_packet) {
1311                         LOG(dout_con << m_connection->getDesc()
1312                                 << "RE-SENDING ACK: peer_id: " << peer->id
1313                                 << ", channel: " << (channelnum & 0xFF)
1314                                 << ", seqnum: " << seqnum << std::endl;)
1315                         m_connection->sendAck(peer->id, channelnum, seqnum);
1316
1317                         // we already have this packet so this one was on wire at least
1318                         // the current timeout
1319                         // we don't know how long this packet was on wire don't do silly guessing
1320                         // dynamic_cast<UDPPeer*>(&peer)->
1321                         //     reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
1322
1323                         throw ProcessedSilentlyException("Retransmitting ack for old packet");
1324                 }
1325         }
1326
1327         if (seqnum != channel->readNextIncomingSeqNum()) {
1328                 Address peer_address;
1329
1330                 // this is a reliable packet so we have a udp address for sure
1331                 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
1332                 // This one comes later, buffer it.
1333                 // Actually we have to make a packet to buffer one.
1334                 // Well, we have all the ingredients, so just do it.
1335                 BufferedPacket packet = con::makePacket(
1336                         peer_address,
1337                         packetdata,
1338                         m_connection->GetProtocolID(),
1339                         peer->id,
1340                         channelnum);
1341                 try {
1342                         channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
1343
1344                         LOG(dout_con << m_connection->getDesc()
1345                                 << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
1346                                 << ", channel: " << (channelnum & 0xFF)
1347                                 << ", seqnum: " << seqnum << std::endl;)
1348
1349                         throw ProcessedQueued("Buffered future reliable packet");
1350                 } catch (AlreadyExistsException &e) {
1351                 } catch (IncomingDataCorruption &e) {
1352                         ConnectionCommand discon;
1353                         discon.disconnect_peer(peer->id);
1354                         m_connection->putCommand(discon);
1355
1356                         LOG(derr_con << m_connection->getDesc()
1357                                 << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
1358                                 << ", channel: " << (channelnum & 0xFF)
1359                                 << ", seqnum: " << seqnum
1360                                 << "DROPPING CLIENT!" << std::endl;)
1361                 }
1362         }
1363
1364         /* we got a packet to process right now */
1365         LOG(dout_con << m_connection->getDesc()
1366                 << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
1367                 << ", channel: " << (channelnum & 0xFF)
1368                 << ", seqnum: " << seqnum << std::endl;)
1369
1370
1371         /* check for resend case */
1372         u16 queued_seqnum = 0;
1373         if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
1374                 if (queued_seqnum == seqnum) {
1375                         BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
1376                         /** TODO find a way to verify the new against the old packet */
1377                 }
1378         }
1379
1380         channel->incNextIncomingSeqNum();
1381
1382         // Get out the inside packet and re-process it
1383         SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1384         memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1385
1386         return processPacket(channel, payload, peer->id, channelnum, true);
1387 }
1388
1389 }