]> git.lizzy.rs Git - minetest.git/blob - src/network/connectionthreads.cpp
RTT fixes (#7428)
[minetest.git] / src / network / connectionthreads.cpp
1 /*
2 Minetest
3 Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
4 Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 GNU Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 */
20
21 #include "connectionthreads.h"
22 #include "log.h"
23 #include "profiler.h"
24 #include "settings.h"
25 #include "network/networkpacket.h"
26 #include "util/serialize.h"
27
28 namespace con
29 {
30
31 /******************************************************************************/
32 /* defines used for debugging and profiling                                   */
33 /******************************************************************************/
34 #ifdef NDEBUG
35 #define LOG(a) a
36 #define PROFILE(a)
37 #undef DEBUG_CONNECTION_KBPS
38 #else
39 /* this mutex is used to achieve log message consistency */
40 std::mutex log_conthread_mutex;
41 #define LOG(a)                                                                \
42         {                                                                         \
43         MutexAutoLock loglock(log_conthread_mutex);                                 \
44         a;                                                                        \
45         }
46 #define PROFILE(a) a
47 //#define DEBUG_CONNECTION_KBPS
48 #undef DEBUG_CONNECTION_KBPS
49 #endif
50
51 /* maximum number of retries for reliable packets */
52 #define MAX_RELIABLE_RETRY 5
53
54 #define WINDOW_SIZE 5
55
56 static session_t readPeerId(u8 *packetdata)
57 {
58         return readU16(&packetdata[4]);
59 }
60 static u8 readChannel(u8 *packetdata)
61 {
62         return readU8(&packetdata[6]);
63 }
64
65 /******************************************************************************/
66 /* Connection Threads                                                         */
67 /******************************************************************************/
68
69 ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
70         float timeout) :
71         Thread("ConnectionSend"),
72         m_max_packet_size(max_packet_size),
73         m_timeout(timeout),
74         m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
75 {
76 }
77
78 void *ConnectionSendThread::run()
79 {
80         assert(m_connection);
81
82         LOG(dout_con << m_connection->getDesc()
83                 << "ConnectionSend thread started" << std::endl);
84
85         u64 curtime = porting::getTimeMs();
86         u64 lasttime = curtime;
87
88         PROFILE(std::stringstream ThreadIdentifier);
89         PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
90
91         /* if stop is requested don't stop immediately but try to send all        */
92         /* packets first */
93         while (!stopRequested() || packetsQueued()) {
94                 BEGIN_DEBUG_EXCEPTION_HANDLER
95                 PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
96
97                 m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
98
99                 /* wait for trigger or timeout */
100                 m_send_sleep_semaphore.wait(50);
101
102                 /* remove all triggers */
103                 while (m_send_sleep_semaphore.wait(0)) {
104                 }
105
106                 lasttime = curtime;
107                 curtime = porting::getTimeMs();
108                 float dtime = CALC_DTIME(lasttime, curtime);
109
110                 /* first do all the reliable stuff */
111                 runTimeouts(dtime);
112
113                 /* translate commands to packets */
114                 ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
115                 while (c.type != CONNCMD_NONE) {
116                         if (c.reliable)
117                                 processReliableCommand(c);
118                         else
119                                 processNonReliableCommand(c);
120
121                         c = m_connection->m_command_queue.pop_frontNoEx(0);
122                 }
123
124                 /* send non reliable packets */
125                 sendPackets(dtime);
126
127                 END_DEBUG_EXCEPTION_HANDLER
128         }
129
130         PROFILE(g_profiler->remove(ThreadIdentifier.str()));
131         return NULL;
132 }
133
134 void ConnectionSendThread::Trigger()
135 {
136         m_send_sleep_semaphore.post();
137 }
138
139 bool ConnectionSendThread::packetsQueued()
140 {
141         std::list<session_t> peerIds = m_connection->getPeerIDs();
142
143         if (!m_outgoing_queue.empty() && !peerIds.empty())
144                 return true;
145
146         for (session_t peerId : peerIds) {
147                 PeerHelper peer = m_connection->getPeerNoEx(peerId);
148
149                 if (!peer)
150                         continue;
151
152                 if (dynamic_cast<UDPPeer *>(&peer) == 0)
153                         continue;
154
155                 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
156                         if (!channel.queued_commands.empty()) {
157                                 return true;
158                         }
159                 }
160         }
161
162
163         return false;
164 }
165
166 void ConnectionSendThread::runTimeouts(float dtime)
167 {
168         std::list<session_t> timeouted_peers;
169         std::list<session_t> peerIds = m_connection->getPeerIDs();
170
171         for (session_t &peerId : peerIds) {
172                 PeerHelper peer = m_connection->getPeerNoEx(peerId);
173
174                 if (!peer)
175                         continue;
176
177                 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
178                 if (!udpPeer)
179                         continue;
180
181                 PROFILE(std::stringstream peerIdentifier);
182                 PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
183                         << ";" << peerId << ";RELIABLE]");
184                 PROFILE(ScopeProfiler
185                 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
186
187                 SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
188
189                 /*
190                         Check peer timeout
191                 */
192                 if (peer->isTimedOut(m_timeout)) {
193                         infostream << m_connection->getDesc()
194                                 << "RunTimeouts(): Peer " << peer->id
195                                 << " has timed out."
196                                 << " (source=peer->timeout_counter)"
197                                 << std::endl;
198                         // Add peer to the list
199                         timeouted_peers.push_back(peer->id);
200                         // Don't bother going through the buffers of this one
201                         continue;
202                 }
203
204                 float resend_timeout = udpPeer->getResendTimeout();
205                 bool retry_count_exceeded = false;
206                 for (Channel &channel : udpPeer->channels) {
207                         std::list<BufferedPacket> timed_outs;
208
209                         if (udpPeer->getLegacyPeer())
210                                 channel.setWindowSize(WINDOW_SIZE);
211
212                         // Remove timed out incomplete unreliable split packets
213                         channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
214
215                         // Increment reliable packet times
216                         channel.outgoing_reliables_sent.incrementTimeouts(dtime);
217
218                         unsigned int numpeers = m_connection->m_peers.size();
219
220                         if (numpeers == 0)
221                                 return;
222
223                         // Re-send timed out outgoing reliables
224                         timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
225                                 (m_max_data_packets_per_iteration / numpeers));
226
227                         channel.UpdatePacketLossCounter(timed_outs.size());
228                         g_profiler->graphAdd("packets_lost", timed_outs.size());
229
230                         m_iteration_packets_avaialble -= timed_outs.size();
231
232                         for (std::list<BufferedPacket>::iterator k = timed_outs.begin();
233                                 k != timed_outs.end(); ++k) {
234                                 session_t peer_id = readPeerId(*(k->data));
235                                 u8 channelnum = readChannel(*(k->data));
236                                 u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
237
238                                 channel.UpdateBytesLost(k->data.getSize());
239                                 k->resend_count++;
240
241                                 if (k->resend_count > MAX_RELIABLE_RETRY) {
242                                         retry_count_exceeded = true;
243                                         timeouted_peers.push_back(peer->id);
244                                         /* no need to check additional packets if a single one did timeout*/
245                                         break;
246                                 }
247
248                                 LOG(derr_con << m_connection->getDesc()
249                                         << "RE-SENDING timed-out RELIABLE to "
250                                         << k->address.serializeString()
251                                         << "(t/o=" << resend_timeout << "): "
252                                         << "from_peer_id=" << peer_id
253                                         << ", channel=" << ((int) channelnum & 0xff)
254                                         << ", seqnum=" << seqnum
255                                         << std::endl);
256
257                                 rawSend(*k);
258
259                                 // do not handle rtt here as we can't decide if this packet was
260                                 // lost or really takes more time to transmit
261                         }
262
263                         if (retry_count_exceeded) {
264                                 break; /* no need to check other channels if we already did timeout */
265                         }
266
267                         channel.UpdateTimers(dtime, udpPeer->getLegacyPeer());
268                 }
269
270                 /* skip to next peer if we did timeout */
271                 if (retry_count_exceeded)
272                         continue;
273
274                 /* send ping if necessary */
275                 if (udpPeer->Ping(dtime, data)) {
276                         LOG(dout_con << m_connection->getDesc()
277                                 << "Sending ping for peer_id: " << udpPeer->id << std::endl);
278                         /* this may fail if there ain't a sequence number left */
279                         if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
280                                 //retrigger with reduced ping interval
281                                 udpPeer->Ping(4.0, data);
282                         }
283                 }
284
285                 udpPeer->RunCommandQueues(m_max_packet_size,
286                         m_max_commands_per_iteration,
287                         m_max_packets_requeued);
288         }
289
290         // Remove timed out peers
291         for (u16 timeouted_peer : timeouted_peers) {
292                 LOG(derr_con << m_connection->getDesc()
293                         << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
294                 m_connection->deletePeer(timeouted_peer, true);
295         }
296 }
297
298 void ConnectionSendThread::rawSend(const BufferedPacket &packet)
299 {
300         try {
301                 m_connection->m_udpSocket.Send(packet.address, *packet.data,
302                         packet.data.getSize());
303                 LOG(dout_con << m_connection->getDesc()
304                         << " rawSend: " << packet.data.getSize()
305                         << " bytes sent" << std::endl);
306         } catch (SendFailedException &e) {
307                 LOG(derr_con << m_connection->getDesc()
308                         << "Connection::rawSend(): SendFailedException: "
309                         << packet.address.serializeString() << std::endl);
310         }
311 }
312
313 void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
314 {
315         try {
316                 p.absolute_send_time = porting::getTimeMs();
317                 // Buffer the packet
318                 channel->outgoing_reliables_sent.insert(p,
319                         (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
320                                 % (MAX_RELIABLE_WINDOW_SIZE + 1));
321         }
322         catch (AlreadyExistsException &e) {
323                 LOG(derr_con << m_connection->getDesc()
324                         << "WARNING: Going to send a reliable packet"
325                         << " in outgoing buffer" << std::endl);
326         }
327
328         // Send the packet
329         rawSend(p);
330 }
331
332 bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
333         SharedBuffer<u8> data, bool reliable)
334 {
335         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
336         if (!peer) {
337                 LOG(dout_con << m_connection->getDesc()
338                         << " INFO: dropped packet for non existent peer_id: "
339                         << peer_id << std::endl);
340                 FATAL_ERROR_IF(!reliable,
341                         "Trying to send raw packet reliable but no peer found!");
342                 return false;
343         }
344         Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
345
346         if (reliable) {
347                 bool have_sequence_number_for_raw_packet = true;
348                 u16 seqnum =
349                         channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
350
351                 if (!have_sequence_number_for_raw_packet)
352                         return false;
353
354                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
355                 Address peer_address;
356                 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
357
358                 // Add base headers and make a packet
359                 BufferedPacket p = con::makePacket(peer_address, reliable,
360                         m_connection->GetProtocolID(), m_connection->GetPeerID(),
361                         channelnum);
362
363                 // first check if our send window is already maxed out
364                 if (channel->outgoing_reliables_sent.size()
365                         < channel->getWindowSize()) {
366                         LOG(dout_con << m_connection->getDesc()
367                                 << " INFO: sending a reliable packet to peer_id " << peer_id
368                                 << " channel: " << (u32)channelnum
369                                 << " seqnum: " << seqnum << std::endl);
370                         sendAsPacketReliable(p, channel);
371                         return true;
372                 }
373
374                 LOG(dout_con << m_connection->getDesc()
375                         << " INFO: queueing reliable packet for peer_id: " << peer_id
376                         << " channel: " << (u32)channelnum
377                         << " seqnum: " << seqnum << std::endl);
378                 channel->queued_reliables.push(p);
379                 return false;
380         }
381
382         Address peer_address;
383         if (peer->getAddress(MTP_UDP, peer_address)) {
384                 // Add base headers and make a packet
385                 BufferedPacket p = con::makePacket(peer_address, data,
386                         m_connection->GetProtocolID(), m_connection->GetPeerID(),
387                         channelnum);
388
389                 // Send the packet
390                 rawSend(p);
391                 return true;
392         }
393
394         LOG(dout_con << m_connection->getDesc()
395                 << " INFO: dropped unreliable packet for peer_id: " << peer_id
396                 << " because of (yet) missing udp address" << std::endl);
397         return false;
398 }
399
400 void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
401 {
402         assert(c.reliable);  // Pre-condition
403
404         switch (c.type) {
405                 case CONNCMD_NONE:
406                         LOG(dout_con << m_connection->getDesc()
407                                 << "UDP processing reliable CONNCMD_NONE" << std::endl);
408                         return;
409
410                 case CONNCMD_SEND:
411                         LOG(dout_con << m_connection->getDesc()
412                                 << "UDP processing reliable CONNCMD_SEND" << std::endl);
413                         sendReliable(c);
414                         return;
415
416                 case CONNCMD_SEND_TO_ALL:
417                         LOG(dout_con << m_connection->getDesc()
418                                 << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
419                         sendToAllReliable(c);
420                         return;
421
422                 case CONCMD_CREATE_PEER:
423                         LOG(dout_con << m_connection->getDesc()
424                                 << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
425                         if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
426                                 /* put to queue if we couldn't send it immediately */
427                                 sendReliable(c);
428                         }
429                         return;
430
431                 case CONCMD_DISABLE_LEGACY:
432                         LOG(dout_con << m_connection->getDesc()
433                                 << "UDP processing reliable CONCMD_DISABLE_LEGACY" << std::endl);
434                         if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
435                                 /* put to queue if we couldn't send it immediately */
436                                 sendReliable(c);
437                         }
438                         return;
439
440                 case CONNCMD_SERVE:
441                 case CONNCMD_CONNECT:
442                 case CONNCMD_DISCONNECT:
443                 case CONCMD_ACK:
444                         FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
445                 default:
446                         LOG(dout_con << m_connection->getDesc()
447                                 << " Invalid reliable command type: " << c.type << std::endl);
448         }
449 }
450
451
452 void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
453 {
454         assert(!c.reliable); // Pre-condition
455
456         switch (c.type) {
457                 case CONNCMD_NONE:
458                         LOG(dout_con << m_connection->getDesc()
459                                 << " UDP processing CONNCMD_NONE" << std::endl);
460                         return;
461                 case CONNCMD_SERVE:
462                         LOG(dout_con << m_connection->getDesc()
463                                 << " UDP processing CONNCMD_SERVE port="
464                                 << c.address.serializeString() << std::endl);
465                         serve(c.address);
466                         return;
467                 case CONNCMD_CONNECT:
468                         LOG(dout_con << m_connection->getDesc()
469                                 << " UDP processing CONNCMD_CONNECT" << std::endl);
470                         connect(c.address);
471                         return;
472                 case CONNCMD_DISCONNECT:
473                         LOG(dout_con << m_connection->getDesc()
474                                 << " UDP processing CONNCMD_DISCONNECT" << std::endl);
475                         disconnect();
476                         return;
477                 case CONNCMD_DISCONNECT_PEER:
478                         LOG(dout_con << m_connection->getDesc()
479                                 << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
480                         disconnect_peer(c.peer_id);
481                         return;
482                 case CONNCMD_SEND:
483                         LOG(dout_con << m_connection->getDesc()
484                                 << " UDP processing CONNCMD_SEND" << std::endl);
485                         send(c.peer_id, c.channelnum, c.data);
486                         return;
487                 case CONNCMD_SEND_TO_ALL:
488                         LOG(dout_con << m_connection->getDesc()
489                                 << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
490                         sendToAll(c.channelnum, c.data);
491                         return;
492                 case CONCMD_ACK:
493                         LOG(dout_con << m_connection->getDesc()
494                                 << " UDP processing CONCMD_ACK" << std::endl);
495                         sendAsPacket(c.peer_id, c.channelnum, c.data, true);
496                         return;
497                 case CONCMD_CREATE_PEER:
498                         FATAL_ERROR("Got command that should be reliable as unreliable command");
499                 default:
500                         LOG(dout_con << m_connection->getDesc()
501                                 << " Invalid command type: " << c.type << std::endl);
502         }
503 }
504
505 void ConnectionSendThread::serve(Address bind_address)
506 {
507         LOG(dout_con << m_connection->getDesc()
508                 << "UDP serving at port " << bind_address.serializeString() << std::endl);
509         try {
510                 m_connection->m_udpSocket.Bind(bind_address);
511                 m_connection->SetPeerID(PEER_ID_SERVER);
512         }
513         catch (SocketException &e) {
514                 // Create event
515                 ConnectionEvent ce;
516                 ce.bindFailed();
517                 m_connection->putEvent(ce);
518         }
519 }
520
521 void ConnectionSendThread::connect(Address address)
522 {
523         LOG(dout_con << m_connection->getDesc() << " connecting to "
524                 << address.serializeString()
525                 << ":" << address.getPort() << std::endl);
526
527         UDPPeer *peer = m_connection->createServerPeer(address);
528
529         // Create event
530         ConnectionEvent e;
531         e.peerAdded(peer->id, peer->address);
532         m_connection->putEvent(e);
533
534         Address bind_addr;
535
536         if (address.isIPv6())
537                 bind_addr.setAddress((IPv6AddressBytes *) NULL);
538         else
539                 bind_addr.setAddress(0, 0, 0, 0);
540
541         m_connection->m_udpSocket.Bind(bind_addr);
542
543         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
544         m_connection->SetPeerID(PEER_ID_INEXISTENT);
545         NetworkPacket pkt(0, 0);
546         m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
547 }
548
549 void ConnectionSendThread::disconnect()
550 {
551         LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
552
553         // Create and send DISCO packet
554         SharedBuffer<u8> data(2);
555         writeU8(&data[0], PACKET_TYPE_CONTROL);
556         writeU8(&data[1], CONTROLTYPE_DISCO);
557
558
559         // Send to all
560         std::list<session_t> peerids = m_connection->getPeerIDs();
561
562         for (session_t peerid : peerids) {
563                 sendAsPacket(peerid, 0, data, false);
564         }
565 }
566
567 void ConnectionSendThread::disconnect_peer(session_t peer_id)
568 {
569         LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
570
571         // Create and send DISCO packet
572         SharedBuffer<u8> data(2);
573         writeU8(&data[0], PACKET_TYPE_CONTROL);
574         writeU8(&data[1], CONTROLTYPE_DISCO);
575         sendAsPacket(peer_id, 0, data, false);
576
577         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
578
579         if (!peer)
580                 return;
581
582         if (dynamic_cast<UDPPeer *>(&peer) == 0) {
583                 return;
584         }
585
586         dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
587 }
588
589 void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
590         SharedBuffer<u8> data)
591 {
592         assert(channelnum < CHANNEL_COUNT); // Pre-condition
593
594         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
595         if (!peer) {
596                 LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
597                         << ">>>NOT<<< found on sending packet"
598                         << ", channel " << (channelnum % 0xFF)
599                         << ", size: " << data.getSize() << std::endl);
600                 return;
601         }
602
603         LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
604                 << ", channel " << (channelnum % 0xFF)
605                 << ", size: " << data.getSize() << std::endl);
606
607         u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
608
609         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
610         std::list<SharedBuffer<u8>> originals;
611
612         makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
613
614         peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
615
616         for (const SharedBuffer<u8> &original : originals) {
617                 sendAsPacket(peer_id, channelnum, original);
618         }
619 }
620
621 void ConnectionSendThread::sendReliable(ConnectionCommand &c)
622 {
623         PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
624         if (!peer)
625                 return;
626
627         peer->PutReliableSendCommand(c, m_max_packet_size);
628 }
629
630 void ConnectionSendThread::sendToAll(u8 channelnum, SharedBuffer<u8> data)
631 {
632         std::list<session_t> peerids = m_connection->getPeerIDs();
633
634         for (session_t peerid : peerids) {
635                 send(peerid, channelnum, data);
636         }
637 }
638
639 void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
640 {
641         std::list<session_t> peerids = m_connection->getPeerIDs();
642
643         for (session_t peerid : peerids) {
644                 PeerHelper peer = m_connection->getPeerNoEx(peerid);
645
646                 if (!peer)
647                         continue;
648
649                 peer->PutReliableSendCommand(c, m_max_packet_size);
650         }
651 }
652
653 void ConnectionSendThread::sendPackets(float dtime)
654 {
655         std::list<session_t> peerIds = m_connection->getPeerIDs();
656         std::list<session_t> pendingDisconnect;
657         std::map<session_t, bool> pending_unreliable;
658
659         for (session_t peerId : peerIds) {
660                 PeerHelper peer = m_connection->getPeerNoEx(peerId);
661                 //peer may have been removed
662                 if (!peer) {
663                         LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
664                                 << peerId
665                                 << std::endl);
666                         continue;
667                 }
668                 peer->m_increment_packets_remaining =
669                         m_iteration_packets_avaialble / m_connection->m_peers.size();
670
671                 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
672
673                 if (!udpPeer) {
674                         continue;
675                 }
676
677                 if (udpPeer->m_pending_disconnect) {
678                         pendingDisconnect.push_back(peerId);
679                 }
680
681                 PROFILE(std::stringstream
682                 peerIdentifier);
683                 PROFILE(
684                         peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
685                                 << ";RELIABLE]");
686                 PROFILE(ScopeProfiler
687                 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
688
689                 LOG(dout_con << m_connection->getDesc()
690                         << " Handle per peer queues: peer_id=" << peerId
691                         << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
692
693                 // first send queued reliable packets for all peers (if possible)
694                 for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
695                         Channel &channel = udpPeer->channels[i];
696                         u16 next_to_ack = 0;
697
698                         channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
699                         u16 next_to_receive = 0;
700                         channel.incoming_reliables.getFirstSeqnum(next_to_receive);
701
702                         LOG(dout_con << m_connection->getDesc() << "\t channel: "
703                                 << i << ", peer quota:"
704                                 << peer->m_increment_packets_remaining
705                                 << std::endl
706                                 << "\t\t\treliables on wire: "
707                                 << channel.outgoing_reliables_sent.size()
708                                 << ", waiting for ack for " << next_to_ack
709                                 << std::endl
710                                 << "\t\t\tincoming_reliables: "
711                                 << channel.incoming_reliables.size()
712                                 << ", next reliable packet: "
713                                 << channel.readNextIncomingSeqNum()
714                                 << ", next queued: " << next_to_receive
715                                 << std::endl
716                                 << "\t\t\treliables queued : "
717                                 << channel.queued_reliables.size()
718                                 << std::endl
719                                 << "\t\t\tqueued commands  : "
720                                 << channel.queued_commands.size()
721                                 << std::endl);
722
723                         while (!channel.queued_reliables.empty() &&
724                                         channel.outgoing_reliables_sent.size()
725                                         < channel.getWindowSize() &&
726                                         peer->m_increment_packets_remaining > 0) {
727                                 BufferedPacket p = channel.queued_reliables.front();
728                                 channel.queued_reliables.pop();
729                                 LOG(dout_con << m_connection->getDesc()
730                                         << " INFO: sending a queued reliable packet "
731                                         << " channel: " << i
732                                         << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
733                                         << std::endl);
734                                 sendAsPacketReliable(p, &channel);
735                                 peer->m_increment_packets_remaining--;
736                         }
737                 }
738         }
739
740         if (!m_outgoing_queue.empty()) {
741                 LOG(dout_con << m_connection->getDesc()
742                         << " Handle non reliable queue ("
743                         << m_outgoing_queue.size() << " pkts)" << std::endl);
744         }
745
746         unsigned int initial_queuesize = m_outgoing_queue.size();
747         /* send non reliable packets*/
748         for (unsigned int i = 0; i < initial_queuesize; i++) {
749                 OutgoingPacket packet = m_outgoing_queue.front();
750                 m_outgoing_queue.pop();
751
752                 if (packet.reliable)
753                         continue;
754
755                 PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
756                 if (!peer) {
757                         LOG(dout_con << m_connection->getDesc()
758                                 << " Outgoing queue: peer_id=" << packet.peer_id
759                                 << ">>>NOT<<< found on sending packet"
760                                 << ", channel " << (packet.channelnum % 0xFF)
761                                 << ", size: " << packet.data.getSize() << std::endl);
762                         continue;
763                 }
764
765                 /* send acks immediately */
766                 if (packet.ack) {
767                         rawSendAsPacket(packet.peer_id, packet.channelnum,
768                                 packet.data, packet.reliable);
769                         peer->m_increment_packets_remaining =
770                                 MYMIN(0, peer->m_increment_packets_remaining--);
771                 } else if (
772                         (peer->m_increment_packets_remaining > 0) ||
773                                 (stopRequested())) {
774                         rawSendAsPacket(packet.peer_id, packet.channelnum,
775                                 packet.data, packet.reliable);
776                         peer->m_increment_packets_remaining--;
777                 } else {
778                         m_outgoing_queue.push(packet);
779                         pending_unreliable[packet.peer_id] = true;
780                 }
781         }
782
783         for (session_t peerId : pendingDisconnect) {
784                 if (!pending_unreliable[peerId]) {
785                         m_connection->deletePeer(peerId, false);
786                 }
787         }
788 }
789
790 void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
791         SharedBuffer<u8> data, bool ack)
792 {
793         OutgoingPacket packet(peer_id, channelnum, data, false, ack);
794         m_outgoing_queue.push(packet);
795 }
796
797 ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
798         Thread("ConnectionReceive")
799 {
800 }
801
802 void *ConnectionReceiveThread::run()
803 {
804         assert(m_connection);
805
806         LOG(dout_con << m_connection->getDesc()
807                 << "ConnectionReceive thread started" << std::endl);
808
809         PROFILE(std::stringstream
810         ThreadIdentifier);
811         PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
812
813 #ifdef DEBUG_CONNECTION_KBPS
814         u64 curtime = porting::getTimeMs();
815         u64 lasttime = curtime;
816         float debug_print_timer = 0.0;
817 #endif
818
819         while (!stopRequested()) {
820                 BEGIN_DEBUG_EXCEPTION_HANDLER
821                 PROFILE(ScopeProfiler
822                 sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
823
824 #ifdef DEBUG_CONNECTION_KBPS
825                 lasttime = curtime;
826                 curtime = porting::getTimeMs();
827                 float dtime = CALC_DTIME(lasttime,curtime);
828 #endif
829
830                 /* receive packets */
831                 receive();
832
833 #ifdef DEBUG_CONNECTION_KBPS
834                 debug_print_timer += dtime;
835                 if (debug_print_timer > 20.0) {
836                         debug_print_timer -= 20.0;
837
838                         std::list<session_t> peerids = m_connection->getPeerIDs();
839
840                         for (std::list<session_t>::iterator i = peerids.begin();
841                                         i != peerids.end();
842                                         i++)
843                         {
844                                 PeerHelper peer = m_connection->getPeerNoEx(*i);
845                                 if (!peer)
846                                         continue;
847
848                                 float peer_current = 0.0;
849                                 float peer_loss = 0.0;
850                                 float avg_rate = 0.0;
851                                 float avg_loss = 0.0;
852
853                                 for(u16 j=0; j<CHANNEL_COUNT; j++)
854                                 {
855                                         peer_current +=peer->channels[j].getCurrentDownloadRateKB();
856                                         peer_loss += peer->channels[j].getCurrentLossRateKB();
857                                         avg_rate += peer->channels[j].getAvgDownloadRateKB();
858                                         avg_loss += peer->channels[j].getAvgLossRateKB();
859                                 }
860
861                                 std::stringstream output;
862                                 output << std::fixed << std::setprecision(1);
863                                 output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
864                                 output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
865                                 output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
866                                 output << std::setfill(' ');
867                                 for(u16 j=0; j<CHANNEL_COUNT; j++)
868                                 {
869                                         output << "\tcha " << j << ":"
870                                                 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
871                                                 << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
872                                                 << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
873                                                 << " /"
874                                                 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
875                                                 << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
876                                                 << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
877                                                 << " / WS: " << peer->channels[j].getWindowSize()
878                                                 << std::endl;
879                                 }
880
881                                 fprintf(stderr,"%s\n",output.str().c_str());
882                         }
883                 }
884 #endif
885                 END_DEBUG_EXCEPTION_HANDLER
886         }
887
888         PROFILE(g_profiler->remove(ThreadIdentifier.str()));
889         return NULL;
890 }
891
892 // Receive packets from the network and buffers and create ConnectionEvents
893 void ConnectionReceiveThread::receive()
894 {
895         // use IPv6 minimum allowed MTU as receive buffer size as this is
896         // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
897         // infrastructure
898         unsigned int packet_maxsize = 1500;
899         SharedBuffer<u8> packetdata(packet_maxsize);
900
901         bool packet_queued = true;
902
903         unsigned int loop_count = 0;
904
905         /* first of all read packets from socket */
906         /* check for incoming data available */
907         while ((loop_count < 10) &&
908                 (m_connection->m_udpSocket.WaitData(50))) {
909                 loop_count++;
910                 try {
911                         if (packet_queued) {
912                                 bool data_left = true;
913                                 session_t peer_id;
914                                 SharedBuffer<u8> resultdata;
915                                 while (data_left) {
916                                         try {
917                                                 data_left = getFromBuffers(peer_id, resultdata);
918                                                 if (data_left) {
919                                                         ConnectionEvent e;
920                                                         e.dataReceived(peer_id, resultdata);
921                                                         m_connection->putEvent(e);
922                                                 }
923                                         }
924                                         catch (ProcessedSilentlyException &e) {
925                                                 /* try reading again */
926                                         }
927                                 }
928                                 packet_queued = false;
929                         }
930
931                         Address sender;
932                         s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata,
933                                 packet_maxsize);
934
935                         if ((received_size < BASE_HEADER_SIZE) ||
936                                 (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
937                                 LOG(derr_con << m_connection->getDesc()
938                                         << "Receive(): Invalid incoming packet, "
939                                         << "size: " << received_size
940                                         << ", protocol: "
941                                         << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
942                                         << std::endl);
943                                 continue;
944                         }
945
946                         session_t peer_id = readPeerId(*packetdata);
947                         u8 channelnum = readChannel(*packetdata);
948
949                         if (channelnum > CHANNEL_COUNT - 1) {
950                                 LOG(derr_con << m_connection->getDesc()
951                                         << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
952                                 throw InvalidIncomingDataException("Channel doesn't exist");
953                         }
954
955                         /* Try to identify peer by sender address (may happen on join) */
956                         if (peer_id == PEER_ID_INEXISTENT) {
957                                 peer_id = m_connection->lookupPeer(sender);
958                                 // We do not have to remind the peer of its
959                                 // peer id as the CONTROLTYPE_SET_PEER_ID
960                                 // command was sent reliably.
961                         }
962
963                         /* The peer was not found in our lists. Add it. */
964                         if (peer_id == PEER_ID_INEXISTENT) {
965                                 peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
966                         }
967
968                         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
969
970                         if (!peer) {
971                                 LOG(dout_con << m_connection->getDesc()
972                                         << " got packet from unknown peer_id: "
973                                         << peer_id << " Ignoring." << std::endl);
974                                 continue;
975                         }
976
977                         // Validate peer address
978
979                         Address peer_address;
980
981                         if (peer->getAddress(MTP_UDP, peer_address)) {
982                                 if (peer_address != sender) {
983                                         LOG(derr_con << m_connection->getDesc()
984                                                 << m_connection->getDesc()
985                                                 << " Peer " << peer_id << " sending from different address."
986                                                 " Ignoring." << std::endl);
987                                         continue;
988                                 }
989                         } else {
990
991                                 bool invalid_address = true;
992                                 if (invalid_address) {
993                                         LOG(derr_con << m_connection->getDesc()
994                                                 << m_connection->getDesc()
995                                                 << " Peer " << peer_id << " unknown."
996                                                 " Ignoring." << std::endl);
997                                         continue;
998                                 }
999                         }
1000
1001                         peer->ResetTimeout();
1002
1003                         Channel *channel = 0;
1004
1005                         if (dynamic_cast<UDPPeer *>(&peer) != 0) {
1006                                 channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
1007                         }
1008
1009                         if (channel != 0) {
1010                                 channel->UpdateBytesReceived(received_size);
1011                         }
1012
1013                         // Throw the received packet to channel->processPacket()
1014
1015                         // Make a new SharedBuffer from the data without the base headers
1016                         SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1017                         memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1018                                 strippeddata.getSize());
1019
1020                         try {
1021                                 // Process it (the result is some data with no headers made by us)
1022                                 SharedBuffer<u8> resultdata = processPacket
1023                                         (channel, strippeddata, peer_id, channelnum, false);
1024
1025                                 LOG(dout_con << m_connection->getDesc()
1026                                         << " ProcessPacket from peer_id: " << peer_id
1027                                         << ", channel: " << (u32)channelnum << ", returned "
1028                                         << resultdata.getSize() << " bytes" << std::endl);
1029
1030                                 ConnectionEvent e;
1031                                 e.dataReceived(peer_id, resultdata);
1032                                 m_connection->putEvent(e);
1033                         }
1034                         catch (ProcessedSilentlyException &e) {
1035                         }
1036                         catch (ProcessedQueued &e) {
1037                                 packet_queued = true;
1038                         }
1039                 }
1040                 catch (InvalidIncomingDataException &e) {
1041                 }
1042                 catch (ProcessedSilentlyException &e) {
1043                 }
1044         }
1045 }
1046
1047 bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8> &dst)
1048 {
1049         std::list<session_t> peerids = m_connection->getPeerIDs();
1050
1051         for (session_t peerid : peerids) {
1052                 PeerHelper peer = m_connection->getPeerNoEx(peerid);
1053                 if (!peer)
1054                         continue;
1055
1056                 if (dynamic_cast<UDPPeer *>(&peer) == 0)
1057                         continue;
1058
1059                 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
1060                         if (checkIncomingBuffers(&channel, peer_id, dst)) {
1061                                 return true;
1062                         }
1063                 }
1064         }
1065         return false;
1066 }
1067
1068 bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
1069         session_t &peer_id, SharedBuffer<u8> &dst)
1070 {
1071         u16 firstseqnum = 0;
1072         if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
1073                 if (firstseqnum == channel->readNextIncomingSeqNum()) {
1074                         BufferedPacket p = channel->incoming_reliables.popFirst();
1075                         peer_id = readPeerId(*p.data);
1076                         u8 channelnum = readChannel(*p.data);
1077                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
1078
1079                         LOG(dout_con << m_connection->getDesc()
1080                                 << "UNBUFFERING TYPE_RELIABLE"
1081                                 << " seqnum=" << seqnum
1082                                 << " peer_id=" << peer_id
1083                                 << " channel=" << ((int) channelnum & 0xff)
1084                                 << std::endl);
1085
1086                         channel->incNextIncomingSeqNum();
1087
1088                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1089                         // Get out the inside packet and re-process it
1090                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1091                         memcpy(*payload, &p.data[headers_size], payload.getSize());
1092
1093                         dst = processPacket(channel, payload, peer_id, channelnum, true);
1094                         return true;
1095                 }
1096         }
1097         return false;
1098 }
1099
1100 SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
1101         SharedBuffer<u8> packetdata, session_t peer_id, u8 channelnum, bool reliable)
1102 {
1103         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
1104
1105         if (!peer) {
1106                 errorstream << "Peer not found (possible timeout)" << std::endl;
1107                 throw ProcessedSilentlyException("Peer not found (possible timeout)");
1108         }
1109
1110         if (packetdata.getSize() < 1)
1111                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1112
1113         u8 type = readU8(&(packetdata[0]));
1114
1115         if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
1116                 std::string errmsg = "Invalid peer_id=" + itos(peer_id);
1117                 errorstream << errmsg << std::endl;
1118                 throw InvalidIncomingDataException(errmsg.c_str());
1119         }
1120
1121         if (type >= PACKET_TYPE_MAX) {
1122                 derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
1123                         << std::endl;
1124                 throw InvalidIncomingDataException("Invalid packet type");
1125         }
1126
1127         const PacketTypeHandler &pHandle = packetTypeRouter[type];
1128         return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
1129 }
1130
1131 const ConnectionReceiveThread::PacketTypeHandler
1132         ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
1133         {&ConnectionReceiveThread::handlePacketType_Control},
1134         {&ConnectionReceiveThread::handlePacketType_Original},
1135         {&ConnectionReceiveThread::handlePacketType_Split},
1136         {&ConnectionReceiveThread::handlePacketType_Reliable},
1137 };
1138
1139 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
1140         SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
1141 {
1142         if (packetdata.getSize() < 2)
1143                 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1144
1145         u8 controltype = readU8(&(packetdata[1]));
1146
1147         if (controltype == CONTROLTYPE_ACK) {
1148                 assert(channel != NULL);
1149
1150                 if (packetdata.getSize() < 4) {
1151                         throw InvalidIncomingDataException(
1152                                 "packetdata.getSize() < 4 (ACK header size)");
1153                 }
1154
1155                 u16 seqnum = readU16(&packetdata[2]);
1156                 LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
1157                         << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
1158                         << seqnum << " ]" << std::endl);
1159
1160                 try {
1161                         BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
1162
1163                         // only calculate rtt from straight sent packets
1164                         if (p.resend_count == 0) {
1165                                 // Get round trip time
1166                                 u64 current_time = porting::getTimeMs();
1167
1168                                 // a overflow is quite unlikely but as it'd result in major
1169                                 // rtt miscalculation we handle it here
1170                                 float rtt;
1171                                 if (current_time > p.absolute_send_time) {
1172                                         rtt = (current_time - p.absolute_send_time) / 1000.0f;
1173                                 } else if (p.totaltime > 0) {
1174                                         rtt = p.totaltime;
1175                                 }
1176
1177                                 // Let peer calculate stuff according to it
1178                                 // (avg_rtt and resend_timeout)
1179                                 dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1180                         }
1181                         // put bytes for max bandwidth calculation
1182                         channel->UpdateBytesSent(p.data.getSize(), 1);
1183                         if (channel->outgoing_reliables_sent.size() == 0)
1184                                 m_connection->TriggerSend();
1185                 } catch (NotFoundException &e) {
1186                         LOG(derr_con << m_connection->getDesc()
1187                                 << "WARNING: ACKed packet not in outgoing queue" << std::endl);
1188                         channel->UpdatePacketTooLateCounter();
1189                 }
1190
1191                 throw ProcessedSilentlyException("Got an ACK");
1192         } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
1193                 // Got a packet to set our peer id
1194                 if (packetdata.getSize() < 4)
1195                         throw InvalidIncomingDataException
1196                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1197                 session_t peer_id_new = readU16(&packetdata[2]);
1198                 LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
1199                         << "... " << std::endl);
1200
1201                 if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
1202                         LOG(derr_con << m_connection->getDesc()
1203                                 << "WARNING: Not changing existing peer id." << std::endl);
1204                 } else {
1205                         LOG(dout_con << m_connection->getDesc() << "changing own peer id"
1206                                 << std::endl);
1207                         m_connection->SetPeerID(peer_id_new);
1208                 }
1209
1210                 ConnectionCommand cmd;
1211
1212                 SharedBuffer<u8> reply(2);
1213                 writeU8(&reply[0], PACKET_TYPE_CONTROL);
1214                 writeU8(&reply[1], CONTROLTYPE_ENABLE_BIG_SEND_WINDOW);
1215                 cmd.disableLegacy(PEER_ID_SERVER, reply);
1216                 m_connection->putCommand(cmd);
1217
1218                 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1219         } else if (controltype == CONTROLTYPE_PING) {
1220                 // Just ignore it, the incoming data already reset
1221                 // the timeout counter
1222                 LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
1223                 throw ProcessedSilentlyException("Got a PING");
1224         } else if (controltype == CONTROLTYPE_DISCO) {
1225                 // Just ignore it, the incoming data already reset
1226                 // the timeout counter
1227                 LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
1228                         << peer->id << std::endl);
1229
1230                 if (!m_connection->deletePeer(peer->id, false)) {
1231                         derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
1232                 }
1233
1234                 throw ProcessedSilentlyException("Got a DISCO");
1235         } else if (controltype == CONTROLTYPE_ENABLE_BIG_SEND_WINDOW) {
1236                 dynamic_cast<UDPPeer *>(peer)->setNonLegacyPeer();
1237                 throw ProcessedSilentlyException("Got non legacy control");
1238         } else {
1239                 LOG(derr_con << m_connection->getDesc()
1240                         << "INVALID TYPE_CONTROL: invalid controltype="
1241                         << ((int) controltype & 0xff) << std::endl);
1242                 throw InvalidIncomingDataException("Invalid control type");
1243         }
1244 }
1245
1246 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
1247         SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
1248 {
1249         if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
1250                 throw InvalidIncomingDataException
1251                         ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
1252         LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
1253                 << std::endl);
1254         // Get the inside packet out and return it
1255         SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1256         memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
1257         return payload;
1258 }
1259
1260 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
1261         SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
1262 {
1263         Address peer_address;
1264
1265         if (peer->getAddress(MTP_UDP, peer_address)) {
1266                 // We have to create a packet again for buffering
1267                 // This isn't actually too bad an idea.
1268                 BufferedPacket packet = makePacket(peer_address,
1269                         packetdata,
1270                         m_connection->GetProtocolID(),
1271                         peer->id,
1272                         channelnum);
1273
1274                 // Buffer the packet
1275                 SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
1276
1277                 if (data.getSize() != 0) {
1278                         LOG(dout_con << m_connection->getDesc()
1279                                 << "RETURNING TYPE_SPLIT: Constructed full data, "
1280                                 << "size=" << data.getSize() << std::endl);
1281                         return data;
1282                 }
1283                 LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
1284                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1285         }
1286
1287         // We should never get here.
1288         FATAL_ERROR("Invalid execution point");
1289 }
1290
1291 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
1292         SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
1293 {
1294         assert(channel != NULL);
1295
1296         // Recursive reliable packets not allowed
1297         if (reliable)
1298                 throw InvalidIncomingDataException("Found nested reliable packets");
1299
1300         if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
1301                 throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1302
1303         u16 seqnum = readU16(&packetdata[1]);
1304         bool is_future_packet = false;
1305         bool is_old_packet = false;
1306
1307         /* packet is within our receive window send ack */
1308         if (seqnum_in_window(seqnum,
1309                 channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
1310                 m_connection->sendAck(peer->id, channelnum, seqnum);
1311         } else {
1312                 is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
1313                 is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
1314
1315                 /* packet is not within receive window, don't send ack.           *
1316                  * if this was a valid packet it's gonna be retransmitted         */
1317                 if (is_future_packet)
1318                         throw ProcessedSilentlyException(
1319                                 "Received packet newer then expected, not sending ack");
1320
1321                 /* seems like our ack was lost, send another one for a old packet */
1322                 if (is_old_packet) {
1323                         LOG(dout_con << m_connection->getDesc()
1324                                 << "RE-SENDING ACK: peer_id: " << peer->id
1325                                 << ", channel: " << (channelnum & 0xFF)
1326                                 << ", seqnum: " << seqnum << std::endl;)
1327                         m_connection->sendAck(peer->id, channelnum, seqnum);
1328
1329                         // we already have this packet so this one was on wire at least
1330                         // the current timeout
1331                         // we don't know how long this packet was on wire don't do silly guessing
1332                         // dynamic_cast<UDPPeer*>(&peer)->
1333                         //     reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
1334
1335                         throw ProcessedSilentlyException("Retransmitting ack for old packet");
1336                 }
1337         }
1338
1339         if (seqnum != channel->readNextIncomingSeqNum()) {
1340                 Address peer_address;
1341
1342                 // this is a reliable packet so we have a udp address for sure
1343                 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
1344                 // This one comes later, buffer it.
1345                 // Actually we have to make a packet to buffer one.
1346                 // Well, we have all the ingredients, so just do it.
1347                 BufferedPacket packet = con::makePacket(
1348                         peer_address,
1349                         packetdata,
1350                         m_connection->GetProtocolID(),
1351                         peer->id,
1352                         channelnum);
1353                 try {
1354                         channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
1355
1356                         LOG(dout_con << m_connection->getDesc()
1357                                 << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
1358                                 << ", channel: " << (channelnum & 0xFF)
1359                                 << ", seqnum: " << seqnum << std::endl;)
1360
1361                         throw ProcessedQueued("Buffered future reliable packet");
1362                 } catch (AlreadyExistsException &e) {
1363                 } catch (IncomingDataCorruption &e) {
1364                         ConnectionCommand discon;
1365                         discon.disconnect_peer(peer->id);
1366                         m_connection->putCommand(discon);
1367
1368                         LOG(derr_con << m_connection->getDesc()
1369                                 << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
1370                                 << ", channel: " << (channelnum & 0xFF)
1371                                 << ", seqnum: " << seqnum
1372                                 << "DROPPING CLIENT!" << std::endl;)
1373                 }
1374         }
1375
1376         /* we got a packet to process right now */
1377         LOG(dout_con << m_connection->getDesc()
1378                 << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
1379                 << ", channel: " << (channelnum & 0xFF)
1380                 << ", seqnum: " << seqnum << std::endl;)
1381
1382
1383         /* check for resend case */
1384         u16 queued_seqnum = 0;
1385         if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
1386                 if (queued_seqnum == seqnum) {
1387                         BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
1388                         /** TODO find a way to verify the new against the old packet */
1389                 }
1390         }
1391
1392         channel->incNextIncomingSeqNum();
1393
1394         // Get out the inside packet and re-process it
1395         SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1396         memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1397
1398         return processPacket(channel, payload, peer->id, channelnum, true);
1399 }
1400
1401 }