]> git.lizzy.rs Git - dragonfireclient.git/blob - src/network/connectionthreads.cpp
Various network performance improvements (#8125)
[dragonfireclient.git] / src / network / connectionthreads.cpp
1 /*
2 Minetest
3 Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
4 Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
5
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
10
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 GNU Lesser General Public License for more details.
15
16 You should have received a copy of the GNU Lesser General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 */
20
21 #include "connectionthreads.h"
22 #include "log.h"
23 #include "profiler.h"
24 #include "settings.h"
25 #include "network/networkpacket.h"
26 #include "util/serialize.h"
27
28 namespace con
29 {
30
31 /******************************************************************************/
32 /* defines used for debugging and profiling                                   */
33 /******************************************************************************/
34 #ifdef NDEBUG
35 #define LOG(a) a
36 #define PROFILE(a)
37 #undef DEBUG_CONNECTION_KBPS
38 #else
39 /* this mutex is used to achieve log message consistency */
40 std::mutex log_conthread_mutex;
41 #define LOG(a)                                                                \
42         {                                                                         \
43         MutexAutoLock loglock(log_conthread_mutex);                                 \
44         a;                                                                        \
45         }
46 #define PROFILE(a) a
47 //#define DEBUG_CONNECTION_KBPS
48 #undef DEBUG_CONNECTION_KBPS
49 #endif
50
51 /* maximum number of retries for reliable packets */
52 #define MAX_RELIABLE_RETRY 5
53
54 #define WINDOW_SIZE 5
55
56 static session_t readPeerId(u8 *packetdata)
57 {
58         return readU16(&packetdata[4]);
59 }
60 static u8 readChannel(u8 *packetdata)
61 {
62         return readU8(&packetdata[6]);
63 }
64
65 /******************************************************************************/
66 /* Connection Threads                                                         */
67 /******************************************************************************/
68
69 ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
70         float timeout) :
71         Thread("ConnectionSend"),
72         m_max_packet_size(max_packet_size),
73         m_timeout(timeout),
74         m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
75 {
76 }
77
78 void *ConnectionSendThread::run()
79 {
80         assert(m_connection);
81
82         LOG(dout_con << m_connection->getDesc()
83                 << "ConnectionSend thread started" << std::endl);
84
85         u64 curtime = porting::getTimeMs();
86         u64 lasttime = curtime;
87
88         PROFILE(std::stringstream ThreadIdentifier);
89         PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
90
91         /* if stop is requested don't stop immediately but try to send all        */
92         /* packets first */
93         while (!stopRequested() || packetsQueued()) {
94                 BEGIN_DEBUG_EXCEPTION_HANDLER
95                 PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
96
97                 m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
98
99                 /* wait for trigger or timeout */
100                 m_send_sleep_semaphore.wait(50);
101
102                 /* remove all triggers */
103                 while (m_send_sleep_semaphore.wait(0)) {
104                 }
105
106                 lasttime = curtime;
107                 curtime = porting::getTimeMs();
108                 float dtime = CALC_DTIME(lasttime, curtime);
109
110                 /* first do all the reliable stuff */
111                 runTimeouts(dtime);
112
113                 /* translate commands to packets */
114                 ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
115                 while (c.type != CONNCMD_NONE) {
116                         if (c.reliable)
117                                 processReliableCommand(c);
118                         else
119                                 processNonReliableCommand(c);
120
121                         c = m_connection->m_command_queue.pop_frontNoEx(0);
122                 }
123
124                 /* send non reliable packets */
125                 sendPackets(dtime);
126
127                 END_DEBUG_EXCEPTION_HANDLER
128         }
129
130         PROFILE(g_profiler->remove(ThreadIdentifier.str()));
131         return NULL;
132 }
133
134 void ConnectionSendThread::Trigger()
135 {
136         m_send_sleep_semaphore.post();
137 }
138
139 bool ConnectionSendThread::packetsQueued()
140 {
141         std::list<session_t> peerIds = m_connection->getPeerIDs();
142
143         if (!m_outgoing_queue.empty() && !peerIds.empty())
144                 return true;
145
146         for (session_t peerId : peerIds) {
147                 PeerHelper peer = m_connection->getPeerNoEx(peerId);
148
149                 if (!peer)
150                         continue;
151
152                 if (dynamic_cast<UDPPeer *>(&peer) == 0)
153                         continue;
154
155                 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
156                         if (!channel.queued_commands.empty()) {
157                                 return true;
158                         }
159                 }
160         }
161
162
163         return false;
164 }
165
166 void ConnectionSendThread::runTimeouts(float dtime)
167 {
168         std::list<session_t> timeouted_peers;
169         std::list<session_t> peerIds = m_connection->getPeerIDs();
170
171         for (session_t &peerId : peerIds) {
172                 PeerHelper peer = m_connection->getPeerNoEx(peerId);
173
174                 if (!peer)
175                         continue;
176
177                 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
178                 if (!udpPeer)
179                         continue;
180
181                 PROFILE(std::stringstream peerIdentifier);
182                 PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
183                         << ";" << peerId << ";RELIABLE]");
184                 PROFILE(ScopeProfiler
185                 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
186
187                 SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
188
189                 /*
190                         Check peer timeout
191                 */
192                 if (peer->isTimedOut(m_timeout)) {
193                         infostream << m_connection->getDesc()
194                                 << "RunTimeouts(): Peer " << peer->id
195                                 << " has timed out."
196                                 << " (source=peer->timeout_counter)"
197                                 << std::endl;
198                         // Add peer to the list
199                         timeouted_peers.push_back(peer->id);
200                         // Don't bother going through the buffers of this one
201                         continue;
202                 }
203
204                 float resend_timeout = udpPeer->getResendTimeout();
205                 bool retry_count_exceeded = false;
206                 for (Channel &channel : udpPeer->channels) {
207                         std::list<BufferedPacket> timed_outs;
208
209                         // Remove timed out incomplete unreliable split packets
210                         channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
211
212                         // Increment reliable packet times
213                         channel.outgoing_reliables_sent.incrementTimeouts(dtime);
214
215                         unsigned int numpeers = m_connection->m_peers.size();
216
217                         if (numpeers == 0)
218                                 return;
219
220                         // Re-send timed out outgoing reliables
221                         timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
222                                 (m_max_data_packets_per_iteration / numpeers));
223
224                         channel.UpdatePacketLossCounter(timed_outs.size());
225                         g_profiler->graphAdd("packets_lost", timed_outs.size());
226
227                         m_iteration_packets_avaialble -= timed_outs.size();
228
229                         for (std::list<BufferedPacket>::iterator k = timed_outs.begin();
230                                 k != timed_outs.end(); ++k) {
231                                 session_t peer_id = readPeerId(*(k->data));
232                                 u8 channelnum = readChannel(*(k->data));
233                                 u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
234
235                                 channel.UpdateBytesLost(k->data.getSize());
236                                 k->resend_count++;
237
238                                 if (k->resend_count > MAX_RELIABLE_RETRY) {
239                                         retry_count_exceeded = true;
240                                         timeouted_peers.push_back(peer->id);
241                                         /* no need to check additional packets if a single one did timeout*/
242                                         break;
243                                 }
244
245                                 LOG(derr_con << m_connection->getDesc()
246                                         << "RE-SENDING timed-out RELIABLE to "
247                                         << k->address.serializeString()
248                                         << "(t/o=" << resend_timeout << "): "
249                                         << "from_peer_id=" << peer_id
250                                         << ", channel=" << ((int) channelnum & 0xff)
251                                         << ", seqnum=" << seqnum
252                                         << std::endl);
253
254                                 rawSend(*k);
255
256                                 // do not handle rtt here as we can't decide if this packet was
257                                 // lost or really takes more time to transmit
258                         }
259
260                         if (retry_count_exceeded) {
261                                 break; /* no need to check other channels if we already did timeout */
262                         }
263
264                         channel.UpdateTimers(dtime);
265                 }
266
267                 /* skip to next peer if we did timeout */
268                 if (retry_count_exceeded)
269                         continue;
270
271                 /* send ping if necessary */
272                 if (udpPeer->Ping(dtime, data)) {
273                         LOG(dout_con << m_connection->getDesc()
274                                 << "Sending ping for peer_id: " << udpPeer->id << std::endl);
275                         /* this may fail if there ain't a sequence number left */
276                         if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
277                                 //retrigger with reduced ping interval
278                                 udpPeer->Ping(4.0, data);
279                         }
280                 }
281
282                 udpPeer->RunCommandQueues(m_max_packet_size,
283                         m_max_commands_per_iteration,
284                         m_max_packets_requeued);
285         }
286
287         // Remove timed out peers
288         for (u16 timeouted_peer : timeouted_peers) {
289                 LOG(derr_con << m_connection->getDesc()
290                         << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
291                 m_connection->deletePeer(timeouted_peer, true);
292         }
293 }
294
295 void ConnectionSendThread::rawSend(const BufferedPacket &packet)
296 {
297         try {
298                 m_connection->m_udpSocket.Send(packet.address, *packet.data,
299                         packet.data.getSize());
300                 LOG(dout_con << m_connection->getDesc()
301                         << " rawSend: " << packet.data.getSize()
302                         << " bytes sent" << std::endl);
303         } catch (SendFailedException &e) {
304                 LOG(derr_con << m_connection->getDesc()
305                         << "Connection::rawSend(): SendFailedException: "
306                         << packet.address.serializeString() << std::endl);
307         }
308 }
309
310 void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
311 {
312         try {
313                 p.absolute_send_time = porting::getTimeMs();
314                 // Buffer the packet
315                 channel->outgoing_reliables_sent.insert(p,
316                         (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
317                                 % (MAX_RELIABLE_WINDOW_SIZE + 1));
318         }
319         catch (AlreadyExistsException &e) {
320                 LOG(derr_con << m_connection->getDesc()
321                         << "WARNING: Going to send a reliable packet"
322                         << " in outgoing buffer" << std::endl);
323         }
324
325         // Send the packet
326         rawSend(p);
327 }
328
329 bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
330         const SharedBuffer<u8> &data, bool reliable)
331 {
332         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
333         if (!peer) {
334                 LOG(dout_con << m_connection->getDesc()
335                         << " INFO: dropped packet for non existent peer_id: "
336                         << peer_id << std::endl);
337                 FATAL_ERROR_IF(!reliable,
338                         "Trying to send raw packet reliable but no peer found!");
339                 return false;
340         }
341         Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
342
343         if (reliable) {
344                 bool have_sequence_number_for_raw_packet = true;
345                 u16 seqnum =
346                         channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
347
348                 if (!have_sequence_number_for_raw_packet)
349                         return false;
350
351                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
352                 Address peer_address;
353                 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
354
355                 // Add base headers and make a packet
356                 BufferedPacket p = con::makePacket(peer_address, reliable,
357                         m_connection->GetProtocolID(), m_connection->GetPeerID(),
358                         channelnum);
359
360                 // first check if our send window is already maxed out
361                 if (channel->outgoing_reliables_sent.size()
362                         < channel->getWindowSize()) {
363                         LOG(dout_con << m_connection->getDesc()
364                                 << " INFO: sending a reliable packet to peer_id " << peer_id
365                                 << " channel: " << (u32)channelnum
366                                 << " seqnum: " << seqnum << std::endl);
367                         sendAsPacketReliable(p, channel);
368                         return true;
369                 }
370
371                 LOG(dout_con << m_connection->getDesc()
372                         << " INFO: queueing reliable packet for peer_id: " << peer_id
373                         << " channel: " << (u32)channelnum
374                         << " seqnum: " << seqnum << std::endl);
375                 channel->queued_reliables.push(p);
376                 return false;
377         }
378
379         Address peer_address;
380         if (peer->getAddress(MTP_UDP, peer_address)) {
381                 // Add base headers and make a packet
382                 BufferedPacket p = con::makePacket(peer_address, data,
383                         m_connection->GetProtocolID(), m_connection->GetPeerID(),
384                         channelnum);
385
386                 // Send the packet
387                 rawSend(p);
388                 return true;
389         }
390
391         LOG(dout_con << m_connection->getDesc()
392                 << " INFO: dropped unreliable packet for peer_id: " << peer_id
393                 << " because of (yet) missing udp address" << std::endl);
394         return false;
395 }
396
397 void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
398 {
399         assert(c.reliable);  // Pre-condition
400
401         switch (c.type) {
402                 case CONNCMD_NONE:
403                         LOG(dout_con << m_connection->getDesc()
404                                 << "UDP processing reliable CONNCMD_NONE" << std::endl);
405                         return;
406
407                 case CONNCMD_SEND:
408                         LOG(dout_con << m_connection->getDesc()
409                                 << "UDP processing reliable CONNCMD_SEND" << std::endl);
410                         sendReliable(c);
411                         return;
412
413                 case CONNCMD_SEND_TO_ALL:
414                         LOG(dout_con << m_connection->getDesc()
415                                 << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
416                         sendToAllReliable(c);
417                         return;
418
419                 case CONCMD_CREATE_PEER:
420                         LOG(dout_con << m_connection->getDesc()
421                                 << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
422                         if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
423                                 /* put to queue if we couldn't send it immediately */
424                                 sendReliable(c);
425                         }
426                         return;
427
428                 case CONNCMD_SERVE:
429                 case CONNCMD_CONNECT:
430                 case CONNCMD_DISCONNECT:
431                 case CONCMD_ACK:
432                         FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
433                 default:
434                         LOG(dout_con << m_connection->getDesc()
435                                 << " Invalid reliable command type: " << c.type << std::endl);
436         }
437 }
438
439
440 void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
441 {
442         assert(!c.reliable); // Pre-condition
443
444         switch (c.type) {
445                 case CONNCMD_NONE:
446                         LOG(dout_con << m_connection->getDesc()
447                                 << " UDP processing CONNCMD_NONE" << std::endl);
448                         return;
449                 case CONNCMD_SERVE:
450                         LOG(dout_con << m_connection->getDesc()
451                                 << " UDP processing CONNCMD_SERVE port="
452                                 << c.address.serializeString() << std::endl);
453                         serve(c.address);
454                         return;
455                 case CONNCMD_CONNECT:
456                         LOG(dout_con << m_connection->getDesc()
457                                 << " UDP processing CONNCMD_CONNECT" << std::endl);
458                         connect(c.address);
459                         return;
460                 case CONNCMD_DISCONNECT:
461                         LOG(dout_con << m_connection->getDesc()
462                                 << " UDP processing CONNCMD_DISCONNECT" << std::endl);
463                         disconnect();
464                         return;
465                 case CONNCMD_DISCONNECT_PEER:
466                         LOG(dout_con << m_connection->getDesc()
467                                 << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
468                         disconnect_peer(c.peer_id);
469                         return;
470                 case CONNCMD_SEND:
471                         LOG(dout_con << m_connection->getDesc()
472                                 << " UDP processing CONNCMD_SEND" << std::endl);
473                         send(c.peer_id, c.channelnum, c.data);
474                         return;
475                 case CONNCMD_SEND_TO_ALL:
476                         LOG(dout_con << m_connection->getDesc()
477                                 << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
478                         sendToAll(c.channelnum, c.data);
479                         return;
480                 case CONCMD_ACK:
481                         LOG(dout_con << m_connection->getDesc()
482                                 << " UDP processing CONCMD_ACK" << std::endl);
483                         sendAsPacket(c.peer_id, c.channelnum, c.data, true);
484                         return;
485                 case CONCMD_CREATE_PEER:
486                         FATAL_ERROR("Got command that should be reliable as unreliable command");
487                 default:
488                         LOG(dout_con << m_connection->getDesc()
489                                 << " Invalid command type: " << c.type << std::endl);
490         }
491 }
492
493 void ConnectionSendThread::serve(Address bind_address)
494 {
495         LOG(dout_con << m_connection->getDesc()
496                 << "UDP serving at port " << bind_address.serializeString() << std::endl);
497         try {
498                 m_connection->m_udpSocket.Bind(bind_address);
499                 m_connection->SetPeerID(PEER_ID_SERVER);
500         }
501         catch (SocketException &e) {
502                 // Create event
503                 ConnectionEvent ce;
504                 ce.bindFailed();
505                 m_connection->putEvent(ce);
506         }
507 }
508
509 void ConnectionSendThread::connect(Address address)
510 {
511         LOG(dout_con << m_connection->getDesc() << " connecting to "
512                 << address.serializeString()
513                 << ":" << address.getPort() << std::endl);
514
515         UDPPeer *peer = m_connection->createServerPeer(address);
516
517         // Create event
518         ConnectionEvent e;
519         e.peerAdded(peer->id, peer->address);
520         m_connection->putEvent(e);
521
522         Address bind_addr;
523
524         if (address.isIPv6())
525                 bind_addr.setAddress((IPv6AddressBytes *) NULL);
526         else
527                 bind_addr.setAddress(0, 0, 0, 0);
528
529         m_connection->m_udpSocket.Bind(bind_addr);
530
531         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
532         m_connection->SetPeerID(PEER_ID_INEXISTENT);
533         NetworkPacket pkt(0, 0);
534         m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
535 }
536
537 void ConnectionSendThread::disconnect()
538 {
539         LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
540
541         // Create and send DISCO packet
542         SharedBuffer<u8> data(2);
543         writeU8(&data[0], PACKET_TYPE_CONTROL);
544         writeU8(&data[1], CONTROLTYPE_DISCO);
545
546
547         // Send to all
548         std::list<session_t> peerids = m_connection->getPeerIDs();
549
550         for (session_t peerid : peerids) {
551                 sendAsPacket(peerid, 0, data, false);
552         }
553 }
554
555 void ConnectionSendThread::disconnect_peer(session_t peer_id)
556 {
557         LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
558
559         // Create and send DISCO packet
560         SharedBuffer<u8> data(2);
561         writeU8(&data[0], PACKET_TYPE_CONTROL);
562         writeU8(&data[1], CONTROLTYPE_DISCO);
563         sendAsPacket(peer_id, 0, data, false);
564
565         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
566
567         if (!peer)
568                 return;
569
570         if (dynamic_cast<UDPPeer *>(&peer) == 0) {
571                 return;
572         }
573
574         dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
575 }
576
577 void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
578         const SharedBuffer<u8> &data)
579 {
580         assert(channelnum < CHANNEL_COUNT); // Pre-condition
581
582         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
583         if (!peer) {
584                 LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
585                         << ">>>NOT<<< found on sending packet"
586                         << ", channel " << (channelnum % 0xFF)
587                         << ", size: " << data.getSize() << std::endl);
588                 return;
589         }
590
591         LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
592                 << ", channel " << (channelnum % 0xFF)
593                 << ", size: " << data.getSize() << std::endl);
594
595         u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
596
597         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
598         std::list<SharedBuffer<u8>> originals;
599
600         makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
601
602         peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
603
604         for (const SharedBuffer<u8> &original : originals) {
605                 sendAsPacket(peer_id, channelnum, original);
606         }
607 }
608
609 void ConnectionSendThread::sendReliable(ConnectionCommand &c)
610 {
611         PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
612         if (!peer)
613                 return;
614
615         peer->PutReliableSendCommand(c, m_max_packet_size);
616 }
617
618 void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data)
619 {
620         std::list<session_t> peerids = m_connection->getPeerIDs();
621
622         for (session_t peerid : peerids) {
623                 send(peerid, channelnum, data);
624         }
625 }
626
627 void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
628 {
629         std::list<session_t> peerids = m_connection->getPeerIDs();
630
631         for (session_t peerid : peerids) {
632                 PeerHelper peer = m_connection->getPeerNoEx(peerid);
633
634                 if (!peer)
635                         continue;
636
637                 peer->PutReliableSendCommand(c, m_max_packet_size);
638         }
639 }
640
641 void ConnectionSendThread::sendPackets(float dtime)
642 {
643         std::list<session_t> peerIds = m_connection->getPeerIDs();
644         std::list<session_t> pendingDisconnect;
645         std::map<session_t, bool> pending_unreliable;
646
647         for (session_t peerId : peerIds) {
648                 PeerHelper peer = m_connection->getPeerNoEx(peerId);
649                 //peer may have been removed
650                 if (!peer) {
651                         LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
652                                 << peerId
653                                 << std::endl);
654                         continue;
655                 }
656                 peer->m_increment_packets_remaining =
657                         m_iteration_packets_avaialble / m_connection->m_peers.size();
658
659                 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
660
661                 if (!udpPeer) {
662                         continue;
663                 }
664
665                 if (udpPeer->m_pending_disconnect) {
666                         pendingDisconnect.push_back(peerId);
667                 }
668
669                 PROFILE(std::stringstream
670                 peerIdentifier);
671                 PROFILE(
672                         peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
673                                 << ";RELIABLE]");
674                 PROFILE(ScopeProfiler
675                 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
676
677                 LOG(dout_con << m_connection->getDesc()
678                         << " Handle per peer queues: peer_id=" << peerId
679                         << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
680
681                 // first send queued reliable packets for all peers (if possible)
682                 for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
683                         Channel &channel = udpPeer->channels[i];
684                         u16 next_to_ack = 0;
685
686                         channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
687                         u16 next_to_receive = 0;
688                         channel.incoming_reliables.getFirstSeqnum(next_to_receive);
689
690                         LOG(dout_con << m_connection->getDesc() << "\t channel: "
691                                 << i << ", peer quota:"
692                                 << peer->m_increment_packets_remaining
693                                 << std::endl
694                                 << "\t\t\treliables on wire: "
695                                 << channel.outgoing_reliables_sent.size()
696                                 << ", waiting for ack for " << next_to_ack
697                                 << std::endl
698                                 << "\t\t\tincoming_reliables: "
699                                 << channel.incoming_reliables.size()
700                                 << ", next reliable packet: "
701                                 << channel.readNextIncomingSeqNum()
702                                 << ", next queued: " << next_to_receive
703                                 << std::endl
704                                 << "\t\t\treliables queued : "
705                                 << channel.queued_reliables.size()
706                                 << std::endl
707                                 << "\t\t\tqueued commands  : "
708                                 << channel.queued_commands.size()
709                                 << std::endl);
710
711                         while (!channel.queued_reliables.empty() &&
712                                         channel.outgoing_reliables_sent.size()
713                                         < channel.getWindowSize() &&
714                                         peer->m_increment_packets_remaining > 0) {
715                                 BufferedPacket p = channel.queued_reliables.front();
716                                 channel.queued_reliables.pop();
717                                 LOG(dout_con << m_connection->getDesc()
718                                         << " INFO: sending a queued reliable packet "
719                                         << " channel: " << i
720                                         << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
721                                         << std::endl);
722                                 sendAsPacketReliable(p, &channel);
723                                 peer->m_increment_packets_remaining--;
724                         }
725                 }
726         }
727
728         if (!m_outgoing_queue.empty()) {
729                 LOG(dout_con << m_connection->getDesc()
730                         << " Handle non reliable queue ("
731                         << m_outgoing_queue.size() << " pkts)" << std::endl);
732         }
733
734         unsigned int initial_queuesize = m_outgoing_queue.size();
735         /* send non reliable packets*/
736         for (unsigned int i = 0; i < initial_queuesize; i++) {
737                 OutgoingPacket packet = m_outgoing_queue.front();
738                 m_outgoing_queue.pop();
739
740                 if (packet.reliable)
741                         continue;
742
743                 PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
744                 if (!peer) {
745                         LOG(dout_con << m_connection->getDesc()
746                                 << " Outgoing queue: peer_id=" << packet.peer_id
747                                 << ">>>NOT<<< found on sending packet"
748                                 << ", channel " << (packet.channelnum % 0xFF)
749                                 << ", size: " << packet.data.getSize() << std::endl);
750                         continue;
751                 }
752
753                 /* send acks immediately */
754                 if (packet.ack) {
755                         rawSendAsPacket(packet.peer_id, packet.channelnum,
756                                 packet.data, packet.reliable);
757                         peer->m_increment_packets_remaining =
758                                 MYMIN(0, peer->m_increment_packets_remaining--);
759                 } else if (
760                         (peer->m_increment_packets_remaining > 0) ||
761                                 (stopRequested())) {
762                         rawSendAsPacket(packet.peer_id, packet.channelnum,
763                                 packet.data, packet.reliable);
764                         peer->m_increment_packets_remaining--;
765                 } else {
766                         m_outgoing_queue.push(packet);
767                         pending_unreliable[packet.peer_id] = true;
768                 }
769         }
770
771         for (session_t peerId : pendingDisconnect) {
772                 if (!pending_unreliable[peerId]) {
773                         m_connection->deletePeer(peerId, false);
774                 }
775         }
776 }
777
778 void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
779         const SharedBuffer<u8> &data, bool ack)
780 {
781         OutgoingPacket packet(peer_id, channelnum, data, false, ack);
782         m_outgoing_queue.push(packet);
783 }
784
785 ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
786         Thread("ConnectionReceive")
787 {
788 }
789
790 void *ConnectionReceiveThread::run()
791 {
792         assert(m_connection);
793
794         LOG(dout_con << m_connection->getDesc()
795                 << "ConnectionReceive thread started" << std::endl);
796
797         PROFILE(std::stringstream
798         ThreadIdentifier);
799         PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
800
801 #ifdef DEBUG_CONNECTION_KBPS
802         u64 curtime = porting::getTimeMs();
803         u64 lasttime = curtime;
804         float debug_print_timer = 0.0;
805 #endif
806
807         while (!stopRequested()) {
808                 BEGIN_DEBUG_EXCEPTION_HANDLER
809                 PROFILE(ScopeProfiler
810                 sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
811
812 #ifdef DEBUG_CONNECTION_KBPS
813                 lasttime = curtime;
814                 curtime = porting::getTimeMs();
815                 float dtime = CALC_DTIME(lasttime,curtime);
816 #endif
817
818                 /* receive packets */
819                 receive();
820
821 #ifdef DEBUG_CONNECTION_KBPS
822                 debug_print_timer += dtime;
823                 if (debug_print_timer > 20.0) {
824                         debug_print_timer -= 20.0;
825
826                         std::list<session_t> peerids = m_connection->getPeerIDs();
827
828                         for (std::list<session_t>::iterator i = peerids.begin();
829                                         i != peerids.end();
830                                         i++)
831                         {
832                                 PeerHelper peer = m_connection->getPeerNoEx(*i);
833                                 if (!peer)
834                                         continue;
835
836                                 float peer_current = 0.0;
837                                 float peer_loss = 0.0;
838                                 float avg_rate = 0.0;
839                                 float avg_loss = 0.0;
840
841                                 for(u16 j=0; j<CHANNEL_COUNT; j++)
842                                 {
843                                         peer_current +=peer->channels[j].getCurrentDownloadRateKB();
844                                         peer_loss += peer->channels[j].getCurrentLossRateKB();
845                                         avg_rate += peer->channels[j].getAvgDownloadRateKB();
846                                         avg_loss += peer->channels[j].getAvgLossRateKB();
847                                 }
848
849                                 std::stringstream output;
850                                 output << std::fixed << std::setprecision(1);
851                                 output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
852                                 output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
853                                 output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
854                                 output << std::setfill(' ');
855                                 for(u16 j=0; j<CHANNEL_COUNT; j++)
856                                 {
857                                         output << "\tcha " << j << ":"
858                                                 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
859                                                 << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
860                                                 << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
861                                                 << " /"
862                                                 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
863                                                 << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
864                                                 << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
865                                                 << " / WS: " << peer->channels[j].getWindowSize()
866                                                 << std::endl;
867                                 }
868
869                                 fprintf(stderr,"%s\n",output.str().c_str());
870                         }
871                 }
872 #endif
873                 END_DEBUG_EXCEPTION_HANDLER
874         }
875
876         PROFILE(g_profiler->remove(ThreadIdentifier.str()));
877         return NULL;
878 }
879
880 // Receive packets from the network and buffers and create ConnectionEvents
881 void ConnectionReceiveThread::receive()
882 {
883         // use IPv6 minimum allowed MTU as receive buffer size as this is
884         // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
885         // infrastructure
886         unsigned int packet_maxsize = 1500;
887         SharedBuffer<u8> packetdata(packet_maxsize);
888
889         bool packet_queued = true;
890
891         unsigned int loop_count = 0;
892
893         /* first of all read packets from socket */
894         /* check for incoming data available */
895         while ((loop_count < 10) &&
896                 (m_connection->m_udpSocket.WaitData(50))) {
897                 loop_count++;
898                 try {
899                         if (packet_queued) {
900                                 bool data_left = true;
901                                 session_t peer_id;
902                                 SharedBuffer<u8> resultdata;
903                                 while (data_left) {
904                                         try {
905                                                 data_left = getFromBuffers(peer_id, resultdata);
906                                                 if (data_left) {
907                                                         ConnectionEvent e;
908                                                         e.dataReceived(peer_id, resultdata);
909                                                         m_connection->putEvent(e);
910                                                 }
911                                         }
912                                         catch (ProcessedSilentlyException &e) {
913                                                 /* try reading again */
914                                         }
915                                 }
916                                 packet_queued = false;
917                         }
918
919                         Address sender;
920                         s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata,
921                                 packet_maxsize);
922
923                         if ((received_size < BASE_HEADER_SIZE) ||
924                                 (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
925                                 LOG(derr_con << m_connection->getDesc()
926                                         << "Receive(): Invalid incoming packet, "
927                                         << "size: " << received_size
928                                         << ", protocol: "
929                                         << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
930                                         << std::endl);
931                                 continue;
932                         }
933
934                         session_t peer_id = readPeerId(*packetdata);
935                         u8 channelnum = readChannel(*packetdata);
936
937                         if (channelnum > CHANNEL_COUNT - 1) {
938                                 LOG(derr_con << m_connection->getDesc()
939                                         << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
940                                 throw InvalidIncomingDataException("Channel doesn't exist");
941                         }
942
943                         /* Try to identify peer by sender address (may happen on join) */
944                         if (peer_id == PEER_ID_INEXISTENT) {
945                                 peer_id = m_connection->lookupPeer(sender);
946                                 // We do not have to remind the peer of its
947                                 // peer id as the CONTROLTYPE_SET_PEER_ID
948                                 // command was sent reliably.
949                         }
950
951                         /* The peer was not found in our lists. Add it. */
952                         if (peer_id == PEER_ID_INEXISTENT) {
953                                 peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
954                         }
955
956                         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
957
958                         if (!peer) {
959                                 LOG(dout_con << m_connection->getDesc()
960                                         << " got packet from unknown peer_id: "
961                                         << peer_id << " Ignoring." << std::endl);
962                                 continue;
963                         }
964
965                         // Validate peer address
966
967                         Address peer_address;
968
969                         if (peer->getAddress(MTP_UDP, peer_address)) {
970                                 if (peer_address != sender) {
971                                         LOG(derr_con << m_connection->getDesc()
972                                                 << m_connection->getDesc()
973                                                 << " Peer " << peer_id << " sending from different address."
974                                                 " Ignoring." << std::endl);
975                                         continue;
976                                 }
977                         } else {
978
979                                 bool invalid_address = true;
980                                 if (invalid_address) {
981                                         LOG(derr_con << m_connection->getDesc()
982                                                 << m_connection->getDesc()
983                                                 << " Peer " << peer_id << " unknown."
984                                                 " Ignoring." << std::endl);
985                                         continue;
986                                 }
987                         }
988
989                         peer->ResetTimeout();
990
991                         Channel *channel = 0;
992
993                         if (dynamic_cast<UDPPeer *>(&peer) != 0) {
994                                 channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
995                         }
996
997                         if (channel != 0) {
998                                 channel->UpdateBytesReceived(received_size);
999                         }
1000
1001                         // Throw the received packet to channel->processPacket()
1002
1003                         // Make a new SharedBuffer from the data without the base headers
1004                         SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1005                         memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1006                                 strippeddata.getSize());
1007
1008                         try {
1009                                 // Process it (the result is some data with no headers made by us)
1010                                 SharedBuffer<u8> resultdata = processPacket
1011                                         (channel, strippeddata, peer_id, channelnum, false);
1012
1013                                 LOG(dout_con << m_connection->getDesc()
1014                                         << " ProcessPacket from peer_id: " << peer_id
1015                                         << ", channel: " << (u32)channelnum << ", returned "
1016                                         << resultdata.getSize() << " bytes" << std::endl);
1017
1018                                 ConnectionEvent e;
1019                                 e.dataReceived(peer_id, resultdata);
1020                                 m_connection->putEvent(e);
1021                         }
1022                         catch (ProcessedSilentlyException &e) {
1023                         }
1024                         catch (ProcessedQueued &e) {
1025                                 packet_queued = true;
1026                         }
1027                 }
1028                 catch (InvalidIncomingDataException &e) {
1029                 }
1030                 catch (ProcessedSilentlyException &e) {
1031                 }
1032         }
1033 }
1034
1035 bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8> &dst)
1036 {
1037         std::list<session_t> peerids = m_connection->getPeerIDs();
1038
1039         for (session_t peerid : peerids) {
1040                 PeerHelper peer = m_connection->getPeerNoEx(peerid);
1041                 if (!peer)
1042                         continue;
1043
1044                 if (dynamic_cast<UDPPeer *>(&peer) == 0)
1045                         continue;
1046
1047                 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
1048                         if (checkIncomingBuffers(&channel, peer_id, dst)) {
1049                                 return true;
1050                         }
1051                 }
1052         }
1053         return false;
1054 }
1055
1056 bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
1057         session_t &peer_id, SharedBuffer<u8> &dst)
1058 {
1059         u16 firstseqnum = 0;
1060         if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
1061                 if (firstseqnum == channel->readNextIncomingSeqNum()) {
1062                         BufferedPacket p = channel->incoming_reliables.popFirst();
1063                         peer_id = readPeerId(*p.data);
1064                         u8 channelnum = readChannel(*p.data);
1065                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
1066
1067                         LOG(dout_con << m_connection->getDesc()
1068                                 << "UNBUFFERING TYPE_RELIABLE"
1069                                 << " seqnum=" << seqnum
1070                                 << " peer_id=" << peer_id
1071                                 << " channel=" << ((int) channelnum & 0xff)
1072                                 << std::endl);
1073
1074                         channel->incNextIncomingSeqNum();
1075
1076                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1077                         // Get out the inside packet and re-process it
1078                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1079                         memcpy(*payload, &p.data[headers_size], payload.getSize());
1080
1081                         dst = processPacket(channel, payload, peer_id, channelnum, true);
1082                         return true;
1083                 }
1084         }
1085         return false;
1086 }
1087
1088 SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
1089         const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum, bool reliable)
1090 {
1091         PeerHelper peer = m_connection->getPeerNoEx(peer_id);
1092
1093         if (!peer) {
1094                 errorstream << "Peer not found (possible timeout)" << std::endl;
1095                 throw ProcessedSilentlyException("Peer not found (possible timeout)");
1096         }
1097
1098         if (packetdata.getSize() < 1)
1099                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1100
1101         u8 type = readU8(&(packetdata[0]));
1102
1103         if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
1104                 std::string errmsg = "Invalid peer_id=" + itos(peer_id);
1105                 errorstream << errmsg << std::endl;
1106                 throw InvalidIncomingDataException(errmsg.c_str());
1107         }
1108
1109         if (type >= PACKET_TYPE_MAX) {
1110                 derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
1111                         << std::endl;
1112                 throw InvalidIncomingDataException("Invalid packet type");
1113         }
1114
1115         const PacketTypeHandler &pHandle = packetTypeRouter[type];
1116         return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
1117 }
1118
1119 const ConnectionReceiveThread::PacketTypeHandler
1120         ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
1121         {&ConnectionReceiveThread::handlePacketType_Control},
1122         {&ConnectionReceiveThread::handlePacketType_Original},
1123         {&ConnectionReceiveThread::handlePacketType_Split},
1124         {&ConnectionReceiveThread::handlePacketType_Reliable},
1125 };
1126
1127 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
1128         const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1129 {
1130         if (packetdata.getSize() < 2)
1131                 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1132
1133         u8 controltype = readU8(&(packetdata[1]));
1134
1135         if (controltype == CONTROLTYPE_ACK) {
1136                 assert(channel != NULL);
1137
1138                 if (packetdata.getSize() < 4) {
1139                         throw InvalidIncomingDataException(
1140                                 "packetdata.getSize() < 4 (ACK header size)");
1141                 }
1142
1143                 u16 seqnum = readU16(&packetdata[2]);
1144                 LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
1145                         << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
1146                         << seqnum << " ]" << std::endl);
1147
1148                 try {
1149                         BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
1150
1151                         // only calculate rtt from straight sent packets
1152                         if (p.resend_count == 0) {
1153                                 // Get round trip time
1154                                 u64 current_time = porting::getTimeMs();
1155
1156                                 // a overflow is quite unlikely but as it'd result in major
1157                                 // rtt miscalculation we handle it here
1158                                 if (current_time > p.absolute_send_time) {
1159                                         float rtt = (current_time - p.absolute_send_time) / 1000.0;
1160
1161                                         // Let peer calculate stuff according to it
1162                                         // (avg_rtt and resend_timeout)
1163                                         dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1164                                 } else if (p.totaltime > 0) {
1165                                         float rtt = p.totaltime;
1166
1167                                         // Let peer calculate stuff according to it
1168                                         // (avg_rtt and resend_timeout)
1169                                         dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1170                                 }
1171                         }
1172                         // put bytes for max bandwidth calculation
1173                         channel->UpdateBytesSent(p.data.getSize(), 1);
1174                         if (channel->outgoing_reliables_sent.size() == 0)
1175                                 m_connection->TriggerSend();
1176                 } catch (NotFoundException &e) {
1177                         LOG(derr_con << m_connection->getDesc()
1178                                 << "WARNING: ACKed packet not in outgoing queue" << std::endl);
1179                         channel->UpdatePacketTooLateCounter();
1180                 }
1181
1182                 throw ProcessedSilentlyException("Got an ACK");
1183         } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
1184                 // Got a packet to set our peer id
1185                 if (packetdata.getSize() < 4)
1186                         throw InvalidIncomingDataException
1187                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1188                 session_t peer_id_new = readU16(&packetdata[2]);
1189                 LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
1190                         << "... " << std::endl);
1191
1192                 if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
1193                         LOG(derr_con << m_connection->getDesc()
1194                                 << "WARNING: Not changing existing peer id." << std::endl);
1195                 } else {
1196                         LOG(dout_con << m_connection->getDesc() << "changing own peer id"
1197                                 << std::endl);
1198                         m_connection->SetPeerID(peer_id_new);
1199                 }
1200
1201                 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1202         } else if (controltype == CONTROLTYPE_PING) {
1203                 // Just ignore it, the incoming data already reset
1204                 // the timeout counter
1205                 LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
1206                 throw ProcessedSilentlyException("Got a PING");
1207         } else if (controltype == CONTROLTYPE_DISCO) {
1208                 // Just ignore it, the incoming data already reset
1209                 // the timeout counter
1210                 LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
1211                         << peer->id << std::endl);
1212
1213                 if (!m_connection->deletePeer(peer->id, false)) {
1214                         derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
1215                 }
1216
1217                 throw ProcessedSilentlyException("Got a DISCO");
1218         } else {
1219                 LOG(derr_con << m_connection->getDesc()
1220                         << "INVALID TYPE_CONTROL: invalid controltype="
1221                         << ((int) controltype & 0xff) << std::endl);
1222                 throw InvalidIncomingDataException("Invalid control type");
1223         }
1224 }
1225
1226 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
1227         const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1228 {
1229         if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
1230                 throw InvalidIncomingDataException
1231                         ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
1232         LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
1233                 << std::endl);
1234         // Get the inside packet out and return it
1235         SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1236         memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
1237         return payload;
1238 }
1239
1240 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
1241         const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1242 {
1243         Address peer_address;
1244
1245         if (peer->getAddress(MTP_UDP, peer_address)) {
1246                 // We have to create a packet again for buffering
1247                 // This isn't actually too bad an idea.
1248                 BufferedPacket packet = makePacket(peer_address,
1249                         packetdata,
1250                         m_connection->GetProtocolID(),
1251                         peer->id,
1252                         channelnum);
1253
1254                 // Buffer the packet
1255                 SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
1256
1257                 if (data.getSize() != 0) {
1258                         LOG(dout_con << m_connection->getDesc()
1259                                 << "RETURNING TYPE_SPLIT: Constructed full data, "
1260                                 << "size=" << data.getSize() << std::endl);
1261                         return data;
1262                 }
1263                 LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
1264                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1265         }
1266
1267         // We should never get here.
1268         FATAL_ERROR("Invalid execution point");
1269 }
1270
1271 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
1272         const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1273 {
1274         assert(channel != NULL);
1275
1276         // Recursive reliable packets not allowed
1277         if (reliable)
1278                 throw InvalidIncomingDataException("Found nested reliable packets");
1279
1280         if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
1281                 throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1282
1283         u16 seqnum = readU16(&packetdata[1]);
1284         bool is_future_packet = false;
1285         bool is_old_packet = false;
1286
1287         /* packet is within our receive window send ack */
1288         if (seqnum_in_window(seqnum,
1289                 channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
1290                 m_connection->sendAck(peer->id, channelnum, seqnum);
1291         } else {
1292                 is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
1293                 is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
1294
1295                 /* packet is not within receive window, don't send ack.           *
1296                  * if this was a valid packet it's gonna be retransmitted         */
1297                 if (is_future_packet)
1298                         throw ProcessedSilentlyException(
1299                                 "Received packet newer then expected, not sending ack");
1300
1301                 /* seems like our ack was lost, send another one for a old packet */
1302                 if (is_old_packet) {
1303                         LOG(dout_con << m_connection->getDesc()
1304                                 << "RE-SENDING ACK: peer_id: " << peer->id
1305                                 << ", channel: " << (channelnum & 0xFF)
1306                                 << ", seqnum: " << seqnum << std::endl;)
1307                         m_connection->sendAck(peer->id, channelnum, seqnum);
1308
1309                         // we already have this packet so this one was on wire at least
1310                         // the current timeout
1311                         // we don't know how long this packet was on wire don't do silly guessing
1312                         // dynamic_cast<UDPPeer*>(&peer)->
1313                         //     reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
1314
1315                         throw ProcessedSilentlyException("Retransmitting ack for old packet");
1316                 }
1317         }
1318
1319         if (seqnum != channel->readNextIncomingSeqNum()) {
1320                 Address peer_address;
1321
1322                 // this is a reliable packet so we have a udp address for sure
1323                 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
1324                 // This one comes later, buffer it.
1325                 // Actually we have to make a packet to buffer one.
1326                 // Well, we have all the ingredients, so just do it.
1327                 BufferedPacket packet = con::makePacket(
1328                         peer_address,
1329                         packetdata,
1330                         m_connection->GetProtocolID(),
1331                         peer->id,
1332                         channelnum);
1333                 try {
1334                         channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
1335
1336                         LOG(dout_con << m_connection->getDesc()
1337                                 << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
1338                                 << ", channel: " << (channelnum & 0xFF)
1339                                 << ", seqnum: " << seqnum << std::endl;)
1340
1341                         throw ProcessedQueued("Buffered future reliable packet");
1342                 } catch (AlreadyExistsException &e) {
1343                 } catch (IncomingDataCorruption &e) {
1344                         ConnectionCommand discon;
1345                         discon.disconnect_peer(peer->id);
1346                         m_connection->putCommand(discon);
1347
1348                         LOG(derr_con << m_connection->getDesc()
1349                                 << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
1350                                 << ", channel: " << (channelnum & 0xFF)
1351                                 << ", seqnum: " << seqnum
1352                                 << "DROPPING CLIENT!" << std::endl;)
1353                 }
1354         }
1355
1356         /* we got a packet to process right now */
1357         LOG(dout_con << m_connection->getDesc()
1358                 << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
1359                 << ", channel: " << (channelnum & 0xFF)
1360                 << ", seqnum: " << seqnum << std::endl;)
1361
1362
1363         /* check for resend case */
1364         u16 queued_seqnum = 0;
1365         if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
1366                 if (queued_seqnum == seqnum) {
1367                         BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
1368                         /** TODO find a way to verify the new against the old packet */
1369                 }
1370         }
1371
1372         channel->incNextIncomingSeqNum();
1373
1374         // Get out the inside packet and re-process it
1375         SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1376         memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1377
1378         return processPacket(channel, payload, peer->id, channelnum, true);
1379 }
1380
1381 }