]> git.lizzy.rs Git - dragonfireclient.git/blob - src/network/connectionthreads.cpp
Merge pull request #59 from PrairieAstronomer/readme_irrlicht_change
[dragonfireclient.git] / src / network / connectionthreads.cpp
1 /*
2 Minetest
3 Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
4 Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 GNU Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 */
20
21 #include "connectionthreads.h"
22 #include "log.h"
23 #include "profiler.h"
24 #include "settings.h"
25 #include "network/networkpacket.h"
26 #include "util/serialize.h"
27
28 namespace con
29 {
30
31 /******************************************************************************/
32 /* defines used for debugging and profiling                                   */
33 /******************************************************************************/
34 #ifdef NDEBUG
35 #define PROFILE(a)
36 #undef DEBUG_CONNECTION_KBPS
37 #else
38 /* this mutex is used to achieve log message consistency */
39 #define PROFILE(a) a
40 //#define DEBUG_CONNECTION_KBPS
41 #undef DEBUG_CONNECTION_KBPS
42 #endif
43
44 // TODO: Clean this up.
45 #define LOG(a) a
46
47 #define WINDOW_SIZE 5
48
49 static session_t readPeerId(const u8 *packetdata)
50 {
51         return readU16(&packetdata[4]);
52 }
53 static u8 readChannel(const u8 *packetdata)
54 {
55         return readU8(&packetdata[6]);
56 }
57
58 /******************************************************************************/
59 /* Connection Threads                                                         */
60 /******************************************************************************/
61
62 ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
63         float timeout) :
64         Thread("ConnectionSend"),
65         m_max_packet_size(max_packet_size),
66         m_timeout(timeout),
67         m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
68 {
69         SANITY_CHECK(m_max_data_packets_per_iteration > 1);
70 }
71
72 void *ConnectionSendThread::run()
73 {
74         assert(m_connection);
75
76         LOG(dout_con << m_connection->getDesc()
77                 << "ConnectionSend thread started" << std::endl);
78
79         u64 curtime = porting::getTimeMs();
80         u64 lasttime = curtime;
81
82         PROFILE(std::stringstream ThreadIdentifier);
83         PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
84
85         /* if stop is requested don't stop immediately but try to send all        */
86         /* packets first */
87         while (!stopRequested() || packetsQueued()) {
88                 BEGIN_DEBUG_EXCEPTION_HANDLER
89                 PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
90
91                 m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
92
93                 /* wait for trigger or timeout */
94                 m_send_sleep_semaphore.wait(50);
95
96                 /* remove all triggers */
97                 while (m_send_sleep_semaphore.wait(0)) {
98                 }
99
100                 lasttime = curtime;
101                 curtime = porting::getTimeMs();
102                 float dtime = CALC_DTIME(lasttime, curtime);
103
104                 /* first resend timed-out packets */
105                 runTimeouts(dtime);
106                 if (m_iteration_packets_avaialble == 0) {
107                         LOG(warningstream << m_connection->getDesc()
108                                 << " Packet quota used up after re-sending packets, "
109                                 << "max=" << m_max_data_packets_per_iteration << std::endl);
110                 }
111
112                 /* translate commands to packets */
113                 auto c = m_connection->m_command_queue.pop_frontNoEx(0);
114                 while (c && c->type != CONNCMD_NONE) {
115                         if (c->reliable)
116                                 processReliableCommand(c);
117                         else
118                                 processNonReliableCommand(c);
119
120                         c = m_connection->m_command_queue.pop_frontNoEx(0);
121                 }
122
123                 /* send queued packets */
124                 sendPackets(dtime);
125
126                 END_DEBUG_EXCEPTION_HANDLER
127         }
128
129         PROFILE(g_profiler->remove(ThreadIdentifier.str()));
130         return NULL;
131 }
132
133 void ConnectionSendThread::Trigger()
134 {
135         m_send_sleep_semaphore.post();
136 }
137
138 bool ConnectionSendThread::packetsQueued()
139 {
140         std::vector<session_t> peerIds = m_connection->getPeerIDs();
141
142         if (!m_outgoing_queue.empty() && !peerIds.empty())
143                 return true;
144
145         for (session_t peerId : peerIds) {
146                 PeerHelper peer = m_connection->getPeerNoEx(peerId);
147
148                 if (!peer)
149                         continue;
150
151                 if (dynamic_cast<UDPPeer *>(&peer) == 0)
152                         continue;
153
154                 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
155                         if (!channel.queued_commands.empty()) {
156                                 return true;
157                         }
158                 }
159         }
160
161
162         return false;
163 }
164
165 void ConnectionSendThread::runTimeouts(float dtime)
166 {
167         std::vector<session_t> timeouted_peers;
168         std::vector<session_t> peerIds = m_connection->getPeerIDs();
169
170         const u32 numpeers = m_connection->m_peers.size();
171
172         if (numpeers == 0)
173                 return;
174
175         for (session_t &peerId : peerIds) {
176                 PeerHelper peer = m_connection->getPeerNoEx(peerId);
177
178                 if (!peer)
179                         continue;
180
181                 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
182                 if (!udpPeer)
183                         continue;
184
185                 PROFILE(std::stringstream peerIdentifier);
186                 PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
187                         << ";" << peerId << ";RELIABLE]");
188                 PROFILE(ScopeProfiler
189                 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
190
191                 SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
192
193                 /*
194                         Check peer timeout
195                 */
196                 if (peer->isTimedOut(m_timeout)) {
197                         infostream << m_connection->getDesc()
198                                 << "RunTimeouts(): Peer " << peer->id
199                                 << " has timed out."
200                                 << std::endl;
201                         // Add peer to the list
202                         timeouted_peers.push_back(peer->id);
203                         // Don't bother going through the buffers of this one
204                         continue;
205                 }
206
207                 float resend_timeout = udpPeer->getResendTimeout();
208                 for (Channel &channel : udpPeer->channels) {
209
210                         // Remove timed out incomplete unreliable split packets
211                         channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
212
213                         // Increment reliable packet times
214                         channel.outgoing_reliables_sent.incrementTimeouts(dtime);
215
216                         // Re-send timed out outgoing reliables
217                         auto timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
218                                 (m_max_data_packets_per_iteration / numpeers));
219
220                         channel.UpdatePacketLossCounter(timed_outs.size());
221                         g_profiler->graphAdd("packets_lost", timed_outs.size());
222
223                         m_iteration_packets_avaialble -= timed_outs.size();
224
225                         for (const auto &k : timed_outs) {
226                                 u8 channelnum = readChannel(k->data);
227                                 u16 seqnum = k->getSeqnum();
228
229                                 channel.UpdateBytesLost(k->size());
230
231                                 LOG(derr_con << m_connection->getDesc()
232                                         << "RE-SENDING timed-out RELIABLE to "
233                                         << k->address.serializeString()
234                                         << "(t/o=" << resend_timeout << "): "
235                                         << "count=" << k->resend_count
236                                         << ", channel=" << ((int) channelnum & 0xff)
237                                         << ", seqnum=" << seqnum
238                                         << std::endl);
239
240                                 rawSend(k.get());
241
242                                 // do not handle rtt here as we can't decide if this packet was
243                                 // lost or really takes more time to transmit
244                         }
245
246                         channel.UpdateTimers(dtime);
247                 }
248
249                 /* send ping if necessary */
250                 if (udpPeer->Ping(dtime, data)) {
251                         LOG(dout_con << m_connection->getDesc()
252                                 << "Sending ping for peer_id: " << udpPeer->id << std::endl);
253                         /* this may fail if there ain't a sequence number left */
254                         if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
255                                 //retrigger with reduced ping interval
256                                 udpPeer->Ping(4.0, data);
257                         }
258                 }
259
260                 udpPeer->RunCommandQueues(m_max_packet_size,
261                         m_max_commands_per_iteration,
262                         m_max_packets_requeued);
263         }
264
265         // Remove timed out peers
266         for (u16 timeouted_peer : timeouted_peers) {
267                 LOG(dout_con << m_connection->getDesc()
268                         << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
269                 m_connection->deletePeer(timeouted_peer, true);
270         }
271 }
272
273 void ConnectionSendThread::rawSend(const BufferedPacket *p)
274 {
275         try {
276                 m_connection->m_udpSocket.Send(p->address, p->data, p->size());
277                 LOG(dout_con << m_connection->getDesc()
278                         << " rawSend: " << p->size()
279                         << " bytes sent" << std::endl);
280         } catch (SendFailedException &e) {
281                 LOG(derr_con << m_connection->getDesc()
282                         << "Connection::rawSend(): SendFailedException: "
283                         << p->address.serializeString() << std::endl);
284         }
285 }
286
287 void ConnectionSendThread::sendAsPacketReliable(BufferedPacketPtr &p, Channel *channel)
288 {
289         try {
290                 p->absolute_send_time = porting::getTimeMs();
291                 // Buffer the packet
292                 channel->outgoing_reliables_sent.insert(p,
293                         (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
294                                 % (MAX_RELIABLE_WINDOW_SIZE + 1));
295         }
296         catch (AlreadyExistsException &e) {
297                 LOG(derr_con << m_connection->getDesc()
298                         << "WARNING: Going to send a reliable packet"
299                         << " in outgoing buffer" << std::endl);
300         }
301
302         // Send the packet
303         rawSend(p.get());
304 }
305
306 bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
307         const SharedBuffer<u8> &data, bool reliable)
308 {
309         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
310         if (!peer) {
311                 LOG(errorstream << m_connection->getDesc()
312                         << " dropped " << (reliable ? "reliable " : "")
313                         << "packet for non existent peer_id: " << peer_id << std::endl);
314                 return false;
315         }
316         Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
317
318         if (reliable) {
319                 bool have_seqnum = false;
320                 const u16 seqnum = channel->getOutgoingSequenceNumber(have_seqnum);
321
322                 if (!have_seqnum)
323                         return false;
324
325                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
326                 Address peer_address;
327                 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
328
329                 // Add base headers and make a packet
330                 BufferedPacketPtr p = con::makePacket(peer_address, reliable,
331                         m_connection->GetProtocolID(), m_connection->GetPeerID(),
332                         channelnum);
333
334                 // first check if our send window is already maxed out
335                 if (channel->outgoing_reliables_sent.size() < channel->getWindowSize()) {
336                         LOG(dout_con << m_connection->getDesc()
337                                 << " INFO: sending a reliable packet to peer_id " << peer_id
338                                 << " channel: " << (u32)channelnum
339                                 << " seqnum: " << seqnum << std::endl);
340                         sendAsPacketReliable(p, channel);
341                         return true;
342                 }
343
344                 LOG(dout_con << m_connection->getDesc()
345                         << " INFO: queueing reliable packet for peer_id: " << peer_id
346                         << " channel: " << (u32)channelnum
347                         << " seqnum: " << seqnum << std::endl);
348                 channel->queued_reliables.push(p);
349                 return false;
350         }
351
352         Address peer_address;
353         if (peer->getAddress(MTP_UDP, peer_address)) {
354                 // Add base headers and make a packet
355                 BufferedPacketPtr p = con::makePacket(peer_address, data,
356                         m_connection->GetProtocolID(), m_connection->GetPeerID(),
357                         channelnum);
358
359                 // Send the packet
360                 rawSend(p.get());
361                 return true;
362         }
363
364         LOG(dout_con << m_connection->getDesc()
365                 << " INFO: dropped unreliable packet for peer_id: " << peer_id
366                 << " because of (yet) missing udp address" << std::endl);
367         return false;
368 }
369
370 void ConnectionSendThread::processReliableCommand(ConnectionCommandPtr &c)
371 {
372         assert(c->reliable);  // Pre-condition
373
374         switch (c->type) {
375                 case CONNCMD_NONE:
376                         LOG(dout_con << m_connection->getDesc()
377                                 << "UDP processing reliable CONNCMD_NONE" << std::endl);
378                         return;
379
380                 case CONNCMD_SEND:
381                         LOG(dout_con << m_connection->getDesc()
382                                 << "UDP processing reliable CONNCMD_SEND" << std::endl);
383                         sendReliable(c);
384                         return;
385
386                 case CONNCMD_SEND_TO_ALL:
387                         LOG(dout_con << m_connection->getDesc()
388                                 << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
389                         sendToAllReliable(c);
390                         return;
391
392                 case CONCMD_CREATE_PEER:
393                         LOG(dout_con << m_connection->getDesc()
394                                 << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
395                         if (!rawSendAsPacket(c->peer_id, c->channelnum, c->data, c->reliable)) {
396                                 /* put to queue if we couldn't send it immediately */
397                                 sendReliable(c);
398                         }
399                         return;
400
401                 case CONNCMD_SERVE:
402                 case CONNCMD_CONNECT:
403                 case CONNCMD_DISCONNECT:
404                 case CONCMD_ACK:
405                         FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
406                 default:
407                         LOG(dout_con << m_connection->getDesc()
408                                 << " Invalid reliable command type: " << c->type << std::endl);
409         }
410 }
411
412
413 void ConnectionSendThread::processNonReliableCommand(ConnectionCommandPtr &c_ptr)
414 {
415         const ConnectionCommand &c = *c_ptr;
416         assert(!c.reliable); // Pre-condition
417
418         switch (c.type) {
419                 case CONNCMD_NONE:
420                         LOG(dout_con << m_connection->getDesc()
421                                 << " UDP processing CONNCMD_NONE" << std::endl);
422                         return;
423                 case CONNCMD_SERVE:
424                         LOG(dout_con << m_connection->getDesc()
425                                 << " UDP processing CONNCMD_SERVE port="
426                                 << c.address.serializeString() << std::endl);
427                         serve(c.address);
428                         return;
429                 case CONNCMD_CONNECT:
430                         LOG(dout_con << m_connection->getDesc()
431                                 << " UDP processing CONNCMD_CONNECT" << std::endl);
432                         connect(c.address);
433                         return;
434                 case CONNCMD_DISCONNECT:
435                         LOG(dout_con << m_connection->getDesc()
436                                 << " UDP processing CONNCMD_DISCONNECT" << std::endl);
437                         disconnect();
438                         return;
439                 case CONNCMD_DISCONNECT_PEER:
440                         LOG(dout_con << m_connection->getDesc()
441                                 << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
442                         disconnect_peer(c.peer_id);
443                         return;
444                 case CONNCMD_SEND:
445                         LOG(dout_con << m_connection->getDesc()
446                                 << " UDP processing CONNCMD_SEND" << std::endl);
447                         send(c.peer_id, c.channelnum, c.data);
448                         return;
449                 case CONNCMD_SEND_TO_ALL:
450                         LOG(dout_con << m_connection->getDesc()
451                                 << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
452                         sendToAll(c.channelnum, c.data);
453                         return;
454                 case CONCMD_ACK:
455                         LOG(dout_con << m_connection->getDesc()
456                                 << " UDP processing CONCMD_ACK" << std::endl);
457                         sendAsPacket(c.peer_id, c.channelnum, c.data, true);
458                         return;
459                 case CONCMD_CREATE_PEER:
460                         FATAL_ERROR("Got command that should be reliable as unreliable command");
461                 default:
462                         LOG(dout_con << m_connection->getDesc()
463                                 << " Invalid command type: " << c.type << std::endl);
464         }
465 }
466
467 void ConnectionSendThread::serve(Address bind_address)
468 {
469         LOG(dout_con << m_connection->getDesc()
470                 << "UDP serving at port " << bind_address.serializeString() << std::endl);
471         try {
472                 m_connection->m_udpSocket.Bind(bind_address);
473                 m_connection->SetPeerID(PEER_ID_SERVER);
474         }
475         catch (SocketException &e) {
476                 // Create event
477                 m_connection->putEvent(ConnectionEvent::bindFailed());
478         }
479 }
480
481 void ConnectionSendThread::connect(Address address)
482 {
483         LOG(dout_con << m_connection->getDesc() << " connecting to "
484                 << address.serializeString()
485                 << ":" << address.getPort() << std::endl);
486
487         UDPPeer *peer = m_connection->createServerPeer(address);
488
489         // Create event
490         m_connection->putEvent(ConnectionEvent::peerAdded(peer->id, peer->address));
491
492         Address bind_addr;
493
494         if (address.isIPv6())
495                 bind_addr.setAddress((IPv6AddressBytes *) NULL);
496         else
497                 bind_addr.setAddress(0, 0, 0, 0);
498
499         m_connection->m_udpSocket.Bind(bind_addr);
500
501         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
502         m_connection->SetPeerID(PEER_ID_INEXISTENT);
503         NetworkPacket pkt(0, 0);
504         m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
505 }
506
507 void ConnectionSendThread::disconnect()
508 {
509         LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
510
511         // Create and send DISCO packet
512         SharedBuffer<u8> data(2);
513         writeU8(&data[0], PACKET_TYPE_CONTROL);
514         writeU8(&data[1], CONTROLTYPE_DISCO);
515
516
517         // Send to all
518         std::vector<session_t> peerids = m_connection->getPeerIDs();
519
520         for (session_t peerid : peerids) {
521                 sendAsPacket(peerid, 0, data, false);
522         }
523 }
524
525 void ConnectionSendThread::disconnect_peer(session_t peer_id)
526 {
527         LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
528
529         // Create and send DISCO packet
530         SharedBuffer<u8> data(2);
531         writeU8(&data[0], PACKET_TYPE_CONTROL);
532         writeU8(&data[1], CONTROLTYPE_DISCO);
533         sendAsPacket(peer_id, 0, data, false);
534
535         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
536
537         if (!peer)
538                 return;
539
540         if (dynamic_cast<UDPPeer *>(&peer) == 0) {
541                 return;
542         }
543
544         dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
545 }
546
547 void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
548         const SharedBuffer<u8> &data)
549 {
550         assert(channelnum < CHANNEL_COUNT); // Pre-condition
551
552         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
553         if (!peer) {
554                 LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
555                         << ">>>NOT<<< found on sending packet"
556                         << ", channel " << (channelnum % 0xFF)
557                         << ", size: " << data.getSize() << std::endl);
558                 return;
559         }
560
561         LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
562                 << ", channel " << (channelnum % 0xFF)
563                 << ", size: " << data.getSize() << std::endl);
564
565         u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
566
567         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
568         std::list<SharedBuffer<u8>> originals;
569
570         makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
571
572         peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
573
574         for (const SharedBuffer<u8> &original : originals) {
575                 sendAsPacket(peer_id, channelnum, original);
576         }
577 }
578
579 void ConnectionSendThread::sendReliable(ConnectionCommandPtr &c)
580 {
581         PeerHelper peer = m_connection->getPeerNoEx(c->peer_id);
582         if (!peer)
583                 return;
584
585         peer->PutReliableSendCommand(c, m_max_packet_size);
586 }
587
588 void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data)
589 {
590         std::vector<session_t> peerids = m_connection->getPeerIDs();
591
592         for (session_t peerid : peerids) {
593                 send(peerid, channelnum, data);
594         }
595 }
596
597 void ConnectionSendThread::sendToAllReliable(ConnectionCommandPtr &c)
598 {
599         std::vector<session_t> peerids = m_connection->getPeerIDs();
600
601         for (session_t peerid : peerids) {
602                 PeerHelper peer = m_connection->getPeerNoEx(peerid);
603
604                 if (!peer)
605                         continue;
606
607                 peer->PutReliableSendCommand(c, m_max_packet_size);
608         }
609 }
610
611 void ConnectionSendThread::sendPackets(float dtime)
612 {
613         std::vector<session_t> peerIds = m_connection->getPeerIDs();
614         std::vector<session_t> pendingDisconnect;
615         std::map<session_t, bool> pending_unreliable;
616
617         const unsigned int peer_packet_quota = m_iteration_packets_avaialble
618                 / MYMAX(peerIds.size(), 1);
619
620         for (session_t peerId : peerIds) {
621                 PeerHelper peer = m_connection->getPeerNoEx(peerId);
622                 //peer may have been removed
623                 if (!peer) {
624                         LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
625                                 << peerId
626                                 << std::endl);
627                         continue;
628                 }
629                 peer->m_increment_packets_remaining = peer_packet_quota;
630
631                 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
632
633                 if (!udpPeer) {
634                         continue;
635                 }
636
637                 if (udpPeer->m_pending_disconnect) {
638                         pendingDisconnect.push_back(peerId);
639                 }
640
641                 PROFILE(std::stringstream
642                 peerIdentifier);
643                 PROFILE(
644                         peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
645                                 << ";RELIABLE]");
646                 PROFILE(ScopeProfiler
647                 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
648
649                 LOG(dout_con << m_connection->getDesc()
650                         << " Handle per peer queues: peer_id=" << peerId
651                         << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
652
653                 // first send queued reliable packets for all peers (if possible)
654                 for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
655                         Channel &channel = udpPeer->channels[i];
656
657                         // Reduces logging verbosity
658                         if (channel.queued_reliables.empty())
659                                 continue;
660
661                         u16 next_to_ack = 0;
662                         channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
663                         u16 next_to_receive = 0;
664                         channel.incoming_reliables.getFirstSeqnum(next_to_receive);
665
666                         LOG(dout_con << m_connection->getDesc() << "\t channel: "
667                                 << i << ", peer quota:"
668                                 << peer->m_increment_packets_remaining
669                                 << std::endl
670                                 << "\t\t\treliables on wire: "
671                                 << channel.outgoing_reliables_sent.size()
672                                 << ", waiting for ack for " << next_to_ack
673                                 << std::endl
674                                 << "\t\t\tincoming_reliables: "
675                                 << channel.incoming_reliables.size()
676                                 << ", next reliable packet: "
677                                 << channel.readNextIncomingSeqNum()
678                                 << ", next queued: " << next_to_receive
679                                 << std::endl
680                                 << "\t\t\treliables queued : "
681                                 << channel.queued_reliables.size()
682                                 << std::endl
683                                 << "\t\t\tqueued commands  : "
684                                 << channel.queued_commands.size()
685                                 << std::endl);
686
687                         while (!channel.queued_reliables.empty() &&
688                                         channel.outgoing_reliables_sent.size()
689                                         < channel.getWindowSize() &&
690                                         peer->m_increment_packets_remaining > 0) {
691                                 BufferedPacketPtr p = channel.queued_reliables.front();
692                                 channel.queued_reliables.pop();
693
694                                 LOG(dout_con << m_connection->getDesc()
695                                         << " INFO: sending a queued reliable packet "
696                                         << " channel: " << i
697                                         << ", seqnum: " << p->getSeqnum()
698                                         << std::endl);
699
700                                 sendAsPacketReliable(p, &channel);
701                                 peer->m_increment_packets_remaining--;
702                         }
703                 }
704         }
705
706         if (!m_outgoing_queue.empty()) {
707                 LOG(dout_con << m_connection->getDesc()
708                         << " Handle non reliable queue ("
709                         << m_outgoing_queue.size() << " pkts)" << std::endl);
710         }
711
712         unsigned int initial_queuesize = m_outgoing_queue.size();
713         /* send non reliable packets*/
714         for (unsigned int i = 0; i < initial_queuesize; i++) {
715                 OutgoingPacket packet = m_outgoing_queue.front();
716                 m_outgoing_queue.pop();
717
718                 if (packet.reliable)
719                         continue;
720
721                 PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
722                 if (!peer) {
723                         LOG(dout_con << m_connection->getDesc()
724                                 << " Outgoing queue: peer_id=" << packet.peer_id
725                                 << ">>>NOT<<< found on sending packet"
726                                 << ", channel " << (packet.channelnum % 0xFF)
727                                 << ", size: " << packet.data.getSize() << std::endl);
728                         continue;
729                 }
730
731                 /* send acks immediately */
732                 if (packet.ack || peer->m_increment_packets_remaining > 0 || stopRequested()) {
733                         rawSendAsPacket(packet.peer_id, packet.channelnum,
734                                 packet.data, packet.reliable);
735                         if (peer->m_increment_packets_remaining > 0)
736                                 peer->m_increment_packets_remaining--;
737                 } else {
738                         m_outgoing_queue.push(packet);
739                         pending_unreliable[packet.peer_id] = true;
740                 }
741         }
742
743         if (peer_packet_quota > 0) {
744                 for (session_t peerId : peerIds) {
745                         PeerHelper peer = m_connection->getPeerNoEx(peerId);
746                         if (!peer)
747                                 continue;
748                         if (peer->m_increment_packets_remaining == 0) {
749                                 LOG(warningstream << m_connection->getDesc()
750                                         << " Packet quota used up for peer_id=" << peerId
751                                         << ", was " << peer_packet_quota << " pkts" << std::endl);
752                         }
753                 }
754         }
755
756         for (session_t peerId : pendingDisconnect) {
757                 if (!pending_unreliable[peerId]) {
758                         m_connection->deletePeer(peerId, false);
759                 }
760         }
761 }
762
763 void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
764         const SharedBuffer<u8> &data, bool ack)
765 {
766         OutgoingPacket packet(peer_id, channelnum, data, false, ack);
767         m_outgoing_queue.push(packet);
768 }
769
770 ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
771         Thread("ConnectionReceive")
772 {
773 }
774
775 void *ConnectionReceiveThread::run()
776 {
777         assert(m_connection);
778
779         LOG(dout_con << m_connection->getDesc()
780                 << "ConnectionReceive thread started" << std::endl);
781
782         PROFILE(std::stringstream
783         ThreadIdentifier);
784         PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
785
786         // use IPv6 minimum allowed MTU as receive buffer size as this is
787         // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
788         // infrastructure
789         const unsigned int packet_maxsize = 1500;
790         SharedBuffer<u8> packetdata(packet_maxsize);
791
792         bool packet_queued = true;
793
794 #ifdef DEBUG_CONNECTION_KBPS
795         u64 curtime = porting::getTimeMs();
796         u64 lasttime = curtime;
797         float debug_print_timer = 0.0;
798 #endif
799
800         while (!stopRequested()) {
801                 BEGIN_DEBUG_EXCEPTION_HANDLER
802                 PROFILE(ScopeProfiler
803                 sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
804
805 #ifdef DEBUG_CONNECTION_KBPS
806                 lasttime = curtime;
807                 curtime = porting::getTimeMs();
808                 float dtime = CALC_DTIME(lasttime,curtime);
809 #endif
810
811                 /* receive packets */
812                 receive(packetdata, packet_queued);
813
814 #ifdef DEBUG_CONNECTION_KBPS
815                 debug_print_timer += dtime;
816                 if (debug_print_timer > 20.0) {
817                         debug_print_timer -= 20.0;
818
819                         std::vector<session_t> peerids = m_connection->getPeerIDs();
820
821                         for (auto id : peerids)
822                         {
823                                 PeerHelper peer = m_connection->getPeerNoEx(id);
824                                 if (!peer)
825                                         continue;
826
827                                 float peer_current = 0.0;
828                                 float peer_loss = 0.0;
829                                 float avg_rate = 0.0;
830                                 float avg_loss = 0.0;
831
832                                 for(u16 j=0; j<CHANNEL_COUNT; j++)
833                                 {
834                                         peer_current +=peer->channels[j].getCurrentDownloadRateKB();
835                                         peer_loss += peer->channels[j].getCurrentLossRateKB();
836                                         avg_rate += peer->channels[j].getAvgDownloadRateKB();
837                                         avg_loss += peer->channels[j].getAvgLossRateKB();
838                                 }
839
840                                 std::stringstream output;
841                                 output << std::fixed << std::setprecision(1);
842                                 output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
843                                 output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
844                                 output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
845                                 output << std::setfill(' ');
846                                 for(u16 j=0; j<CHANNEL_COUNT; j++)
847                                 {
848                                         output << "\tcha " << j << ":"
849                                                 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
850                                                 << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
851                                                 << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
852                                                 << " /"
853                                                 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
854                                                 << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
855                                                 << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
856                                                 << " / WS: " << peer->channels[j].getWindowSize()
857                                                 << std::endl;
858                                 }
859
860                                 fprintf(stderr,"%s\n",output.str().c_str());
861                         }
862                 }
863 #endif
864                 END_DEBUG_EXCEPTION_HANDLER
865         }
866
867         PROFILE(g_profiler->remove(ThreadIdentifier.str()));
868         return NULL;
869 }
870
871 // Receive packets from the network and buffers and create ConnectionEvents
872 void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
873                 bool &packet_queued)
874 {
875         try {
876                 // First, see if there any buffered packets we can process now
877                 if (packet_queued) {
878                         session_t peer_id;
879                         SharedBuffer<u8> resultdata;
880                         while (true) {
881                                 try {
882                                         if (!getFromBuffers(peer_id, resultdata))
883                                                 break;
884
885                                         m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
886                                 }
887                                 catch (ProcessedSilentlyException &e) {
888                                         /* try reading again */
889                                 }
890                         }
891                         packet_queued = false;
892                 }
893
894                 // Call Receive() to wait for incoming data
895                 Address sender;
896                 s32 received_size = m_connection->m_udpSocket.Receive(sender,
897                         *packetdata, packetdata.getSize());
898                 if (received_size < 0)
899                         return;
900
901                 if ((received_size < BASE_HEADER_SIZE) ||
902                                 (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
903                         LOG(derr_con << m_connection->getDesc()
904                                 << "Receive(): Invalid incoming packet, "
905                                 << "size: " << received_size
906                                 << ", protocol: "
907                                 << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
908                                 << std::endl);
909                         return;
910                 }
911
912                 session_t peer_id = readPeerId(*packetdata);
913                 u8 channelnum = readChannel(*packetdata);
914
915                 if (channelnum > CHANNEL_COUNT - 1) {
916                         LOG(derr_con << m_connection->getDesc()
917                                 << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
918                         return;
919                 }
920
921                 /* Try to identify peer by sender address (may happen on join) */
922                 if (peer_id == PEER_ID_INEXISTENT) {
923                         peer_id = m_connection->lookupPeer(sender);
924                         // We do not have to remind the peer of its
925                         // peer id as the CONTROLTYPE_SET_PEER_ID
926                         // command was sent reliably.
927                 }
928
929                 if (peer_id == PEER_ID_INEXISTENT) {
930                         /* Ignore it if we are a client */
931                         if (m_connection->ConnectedToServer())
932                                 return;
933                         /* The peer was not found in our lists. Add it. */
934                         peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
935                 }
936
937                 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
938                 if (!peer) {
939                         LOG(dout_con << m_connection->getDesc()
940                                 << " got packet from unknown peer_id: "
941                                 << peer_id << " Ignoring." << std::endl);
942                         return;
943                 }
944
945                 // Validate peer address
946
947                 Address peer_address;
948                 if (peer->getAddress(MTP_UDP, peer_address)) {
949                         if (peer_address != sender) {
950                                 LOG(derr_con << m_connection->getDesc()
951                                         << " Peer " << peer_id << " sending from different address."
952                                         " Ignoring." << std::endl);
953                                 return;
954                         }
955                 } else {
956                         LOG(derr_con << m_connection->getDesc()
957                                 << " Peer " << peer_id << " doesn't have an address?!"
958                                 " Ignoring." << std::endl);
959                         return;
960                 }
961
962                 peer->ResetTimeout();
963
964                 Channel *channel = nullptr;
965                 if (dynamic_cast<UDPPeer *>(&peer)) {
966                         channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum];
967                 } else {
968                         LOG(derr_con << m_connection->getDesc()
969                                 << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
970                                 " Ignoring." << std::endl);
971                         return;
972                 }
973
974                 channel->UpdateBytesReceived(received_size);
975
976                 // Throw the received packet to channel->processPacket()
977
978                 // Make a new SharedBuffer from the data without the base headers
979                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
980                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
981                         strippeddata.getSize());
982
983                 try {
984                         // Process it (the result is some data with no headers made by us)
985                         SharedBuffer<u8> resultdata = processPacket
986                                 (channel, strippeddata, peer_id, channelnum, false);
987
988                         LOG(dout_con << m_connection->getDesc()
989                                 << " ProcessPacket from peer_id: " << peer_id
990                                 << ", channel: " << (u32)channelnum << ", returned "
991                                 << resultdata.getSize() << " bytes" << std::endl);
992
993                         m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
994                 }
995                 catch (ProcessedSilentlyException &e) {
996                 }
997                 catch (ProcessedQueued &e) {
998                         // we set it to true anyway (see below)
999                 }
1000
1001                 /* Every time we receive a packet it can happen that a previously
1002                  * buffered packet is now ready to process. */
1003                 packet_queued = true;
1004         }
1005         catch (InvalidIncomingDataException &e) {
1006         }
1007 }
1008
1009 bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8> &dst)
1010 {
1011         std::vector<session_t> peerids = m_connection->getPeerIDs();
1012
1013         for (session_t peerid : peerids) {
1014                 PeerHelper peer = m_connection->getPeerNoEx(peerid);
1015                 if (!peer)
1016                         continue;
1017
1018                 UDPPeer *p = dynamic_cast<UDPPeer *>(&peer);
1019                 if (!p)
1020                         continue;
1021
1022                 for (Channel &channel : p->channels) {
1023                         if (checkIncomingBuffers(&channel, peer_id, dst)) {
1024                                 return true;
1025                         }
1026                 }
1027         }
1028         return false;
1029 }
1030
1031 bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
1032         session_t &peer_id, SharedBuffer<u8> &dst)
1033 {
1034         u16 firstseqnum = 0;
1035         if (!channel->incoming_reliables.getFirstSeqnum(firstseqnum))
1036                 return false;
1037
1038         if (firstseqnum != channel->readNextIncomingSeqNum())
1039                 return false;
1040
1041         BufferedPacketPtr p = channel->incoming_reliables.popFirst();
1042
1043         peer_id = readPeerId(p->data); // Carried over to caller function
1044         u8 channelnum = readChannel(p->data);
1045         u16 seqnum = p->getSeqnum();
1046
1047         LOG(dout_con << m_connection->getDesc()
1048                 << "UNBUFFERING TYPE_RELIABLE"
1049                 << " seqnum=" << seqnum
1050                 << " peer_id=" << peer_id
1051                 << " channel=" << ((int) channelnum & 0xff)
1052                 << std::endl);
1053
1054         channel->incNextIncomingSeqNum();
1055
1056         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1057         // Get out the inside packet and re-process it
1058         SharedBuffer<u8> payload(p->size() - headers_size);
1059         memcpy(*payload, &p->data[headers_size], payload.getSize());
1060
1061         dst = processPacket(channel, payload, peer_id, channelnum, true);
1062         return true;
1063 }
1064
1065 SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
1066         const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum, bool reliable)
1067 {
1068         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
1069
1070         if (!peer) {
1071                 errorstream << "Peer not found (possible timeout)" << std::endl;
1072                 throw ProcessedSilentlyException("Peer not found (possible timeout)");
1073         }
1074
1075         if (packetdata.getSize() < 1)
1076                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1077
1078         u8 type = readU8(&(packetdata[0]));
1079
1080         if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
1081                 std::string errmsg = "Invalid peer_id=" + itos(peer_id);
1082                 errorstream << errmsg << std::endl;
1083                 throw InvalidIncomingDataException(errmsg.c_str());
1084         }
1085
1086         if (type >= PACKET_TYPE_MAX) {
1087                 derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
1088                         << std::endl;
1089                 throw InvalidIncomingDataException("Invalid packet type");
1090         }
1091
1092         const PacketTypeHandler &pHandle = packetTypeRouter[type];
1093         return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
1094 }
1095
1096 const ConnectionReceiveThread::PacketTypeHandler
1097         ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
1098         {&ConnectionReceiveThread::handlePacketType_Control},
1099         {&ConnectionReceiveThread::handlePacketType_Original},
1100         {&ConnectionReceiveThread::handlePacketType_Split},
1101         {&ConnectionReceiveThread::handlePacketType_Reliable},
1102 };
1103
1104 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
1105         const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1106 {
1107         if (packetdata.getSize() < 2)
1108                 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1109
1110         ControlType controltype = (ControlType)readU8(&(packetdata[1]));
1111
1112         if (controltype == CONTROLTYPE_ACK) {
1113                 assert(channel != NULL);
1114
1115                 if (packetdata.getSize() < 4) {
1116                         throw InvalidIncomingDataException(
1117                                 "packetdata.getSize() < 4 (ACK header size)");
1118                 }
1119
1120                 u16 seqnum = readU16(&packetdata[2]);
1121                 LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
1122                         << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
1123                         << seqnum << " ]" << std::endl);
1124
1125                 try {
1126                         BufferedPacketPtr p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
1127
1128                         // the rtt calculation will be a bit off for re-sent packets but that's okay
1129                         {
1130                                 // Get round trip time
1131                                 u64 current_time = porting::getTimeMs();
1132
1133                                 // a overflow is quite unlikely but as it'd result in major
1134                                 // rtt miscalculation we handle it here
1135                                 if (current_time > p->absolute_send_time) {
1136                                         float rtt = (current_time - p->absolute_send_time) / 1000.0;
1137
1138                                         // Let peer calculate stuff according to it
1139                                         // (avg_rtt and resend_timeout)
1140                                         dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1141                                 } else if (p->totaltime > 0) {
1142                                         float rtt = p->totaltime;
1143
1144                                         // Let peer calculate stuff according to it
1145                                         // (avg_rtt and resend_timeout)
1146                                         dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1147                                 }
1148                         }
1149
1150                         // put bytes for max bandwidth calculation
1151                         channel->UpdateBytesSent(p->size(), 1);
1152                         if (channel->outgoing_reliables_sent.size() == 0)
1153                                 m_connection->TriggerSend();
1154                 } catch (NotFoundException &e) {
1155                         LOG(derr_con << m_connection->getDesc()
1156                                 << "WARNING: ACKed packet not in outgoing queue"
1157                                 << " seqnum=" << seqnum << std::endl);
1158                         channel->UpdatePacketTooLateCounter();
1159                 }
1160
1161                 throw ProcessedSilentlyException("Got an ACK");
1162         } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
1163                 // Got a packet to set our peer id
1164                 if (packetdata.getSize() < 4)
1165                         throw InvalidIncomingDataException
1166                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1167                 session_t peer_id_new = readU16(&packetdata[2]);
1168                 LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
1169                         << "... " << std::endl);
1170
1171                 if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
1172                         LOG(derr_con << m_connection->getDesc()
1173                                 << "WARNING: Not changing existing peer id." << std::endl);
1174                 } else {
1175                         LOG(dout_con << m_connection->getDesc() << "changing own peer id"
1176                                 << std::endl);
1177                         m_connection->SetPeerID(peer_id_new);
1178                 }
1179
1180                 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1181         } else if (controltype == CONTROLTYPE_PING) {
1182                 // Just ignore it, the incoming data already reset
1183                 // the timeout counter
1184                 LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
1185                 throw ProcessedSilentlyException("Got a PING");
1186         } else if (controltype == CONTROLTYPE_DISCO) {
1187                 // Just ignore it, the incoming data already reset
1188                 // the timeout counter
1189                 LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
1190                         << peer->id << std::endl);
1191
1192                 if (!m_connection->deletePeer(peer->id, false)) {
1193                         derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
1194                 }
1195
1196                 throw ProcessedSilentlyException("Got a DISCO");
1197         } else {
1198                 LOG(derr_con << m_connection->getDesc()
1199                         << "INVALID controltype="
1200                         << ((int) controltype & 0xff) << std::endl);
1201                 throw InvalidIncomingDataException("Invalid control type");
1202         }
1203 }
1204
1205 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
1206         const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1207 {
1208         if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
1209                 throw InvalidIncomingDataException
1210                         ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
1211         LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
1212                 << std::endl);
1213         // Get the inside packet out and return it
1214         SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1215         memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
1216         return payload;
1217 }
1218
1219 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
1220         const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1221 {
1222         Address peer_address;
1223
1224         if (peer->getAddress(MTP_UDP, peer_address)) {
1225                 // We have to create a packet again for buffering
1226                 // This isn't actually too bad an idea.
1227                 BufferedPacketPtr packet = con::makePacket(peer_address,
1228                         packetdata,
1229                         m_connection->GetProtocolID(),
1230                         peer->id,
1231                         channelnum);
1232
1233                 // Buffer the packet
1234                 SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
1235
1236                 if (data.getSize() != 0) {
1237                         LOG(dout_con << m_connection->getDesc()
1238                                 << "RETURNING TYPE_SPLIT: Constructed full data, "
1239                                 << "size=" << data.getSize() << std::endl);
1240                         return data;
1241                 }
1242                 LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
1243                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1244         }
1245
1246         // We should never get here.
1247         FATAL_ERROR("Invalid execution point");
1248 }
1249
1250 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
1251         const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1252 {
1253         assert(channel != NULL);
1254
1255         // Recursive reliable packets not allowed
1256         if (reliable)
1257                 throw InvalidIncomingDataException("Found nested reliable packets");
1258
1259         if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
1260                 throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1261
1262         const u16 seqnum = readU16(&packetdata[1]);
1263         bool is_future_packet = false;
1264         bool is_old_packet = false;
1265
1266         /* packet is within our receive window send ack */
1267         if (seqnum_in_window(seqnum,
1268                 channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
1269                 m_connection->sendAck(peer->id, channelnum, seqnum);
1270         } else {
1271                 is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
1272                 is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
1273
1274                 /* packet is not within receive window, don't send ack.           *
1275                  * if this was a valid packet it's gonna be retransmitted         */
1276                 if (is_future_packet)
1277                         throw ProcessedSilentlyException(
1278                                 "Received packet newer then expected, not sending ack");
1279
1280                 /* seems like our ack was lost, send another one for a old packet */
1281                 if (is_old_packet) {
1282                         LOG(dout_con << m_connection->getDesc()
1283                                 << "RE-SENDING ACK: peer_id: " << peer->id
1284                                 << ", channel: " << (channelnum & 0xFF)
1285                                 << ", seqnum: " << seqnum << std::endl;)
1286                         m_connection->sendAck(peer->id, channelnum, seqnum);
1287
1288                         // we already have this packet so this one was on wire at least
1289                         // the current timeout
1290                         // we don't know how long this packet was on wire don't do silly guessing
1291                         // dynamic_cast<UDPPeer*>(&peer)->
1292                         //     reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
1293
1294                         throw ProcessedSilentlyException("Retransmitting ack for old packet");
1295                 }
1296         }
1297
1298         if (seqnum != channel->readNextIncomingSeqNum()) {
1299                 Address peer_address;
1300
1301                 // this is a reliable packet so we have a udp address for sure
1302                 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
1303                 // This one comes later, buffer it.
1304                 // Actually we have to make a packet to buffer one.
1305                 // Well, we have all the ingredients, so just do it.
1306                 BufferedPacketPtr packet = con::makePacket(
1307                         peer_address,
1308                         packetdata,
1309                         m_connection->GetProtocolID(),
1310                         peer->id,
1311                         channelnum);
1312                 try {
1313                         channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
1314
1315                         LOG(dout_con << m_connection->getDesc()
1316                                 << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
1317                                 << ", channel: " << (channelnum & 0xFF)
1318                                 << ", seqnum: " << seqnum << std::endl;)
1319
1320                         throw ProcessedQueued("Buffered future reliable packet");
1321                 } catch (AlreadyExistsException &e) {
1322                 } catch (IncomingDataCorruption &e) {
1323                         m_connection->putCommand(ConnectionCommand::disconnect_peer(peer->id));
1324
1325                         LOG(derr_con << m_connection->getDesc()
1326                                 << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
1327                                 << ", channel: " << (channelnum & 0xFF)
1328                                 << ", seqnum: " << seqnum
1329                                 << "DROPPING CLIENT!" << std::endl;)
1330                 }
1331         }
1332
1333         /* we got a packet to process right now */
1334         LOG(dout_con << m_connection->getDesc()
1335                 << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
1336                 << ", channel: " << (channelnum & 0xFF)
1337                 << ", seqnum: " << seqnum << std::endl;)
1338
1339
1340         /* check for resend case */
1341         u16 queued_seqnum = 0;
1342         if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
1343                 if (queued_seqnum == seqnum) {
1344                         BufferedPacketPtr queued_packet = channel->incoming_reliables.popFirst();
1345                         /** TODO find a way to verify the new against the old packet */
1346                 }
1347         }
1348
1349         channel->incNextIncomingSeqNum();
1350
1351         // Get out the inside packet and re-process it
1352         SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1353         memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1354
1355         return processPacket(channel, payload, peer->id, channelnum, true);
1356 }
1357
1358 }