3 Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
4 Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 #include "connectionthreads.h"
25 #include "network/networkpacket.h"
26 #include "util/serialize.h"
31 /******************************************************************************/
32 /* defines used for debugging and profiling */
33 /******************************************************************************/
36 #undef DEBUG_CONNECTION_KBPS
38 /* this mutex is used to achieve log message consistency */
40 //#define DEBUG_CONNECTION_KBPS
41 #undef DEBUG_CONNECTION_KBPS
44 // TODO: Clean this up.
49 static session_t readPeerId(const u8 *packetdata)
51 return readU16(&packetdata[4]);
53 static u8 readChannel(const u8 *packetdata)
55 return readU8(&packetdata[6]);
58 /******************************************************************************/
59 /* Connection Threads */
60 /******************************************************************************/
62 ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
64 Thread("ConnectionSend"),
65 m_max_packet_size(max_packet_size),
67 m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
69 SANITY_CHECK(m_max_data_packets_per_iteration > 1);
72 void *ConnectionSendThread::run()
76 LOG(dout_con << m_connection->getDesc()
77 << "ConnectionSend thread started" << std::endl);
79 u64 curtime = porting::getTimeMs();
80 u64 lasttime = curtime;
82 PROFILE(std::stringstream ThreadIdentifier);
83 PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
85 /* if stop is requested don't stop immediately but try to send all */
87 while (!stopRequested() || packetsQueued()) {
88 BEGIN_DEBUG_EXCEPTION_HANDLER
89 PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
91 m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
93 /* wait for trigger or timeout */
94 m_send_sleep_semaphore.wait(50);
96 /* remove all triggers */
97 while (m_send_sleep_semaphore.wait(0)) {
101 curtime = porting::getTimeMs();
102 float dtime = CALC_DTIME(lasttime, curtime);
104 /* first resend timed-out packets */
106 if (m_iteration_packets_avaialble == 0) {
107 LOG(warningstream << m_connection->getDesc()
108 << " Packet quota used up after re-sending packets, "
109 << "max=" << m_max_data_packets_per_iteration << std::endl);
112 /* translate commands to packets */
113 auto c = m_connection->m_command_queue.pop_frontNoEx(0);
114 while (c && c->type != CONNCMD_NONE) {
116 processReliableCommand(c);
118 processNonReliableCommand(c);
120 c = m_connection->m_command_queue.pop_frontNoEx(0);
123 /* send queued packets */
126 END_DEBUG_EXCEPTION_HANDLER
129 PROFILE(g_profiler->remove(ThreadIdentifier.str()));
133 void ConnectionSendThread::Trigger()
135 m_send_sleep_semaphore.post();
138 bool ConnectionSendThread::packetsQueued()
140 std::vector<session_t> peerIds = m_connection->getPeerIDs();
142 if (!m_outgoing_queue.empty() && !peerIds.empty())
145 for (session_t peerId : peerIds) {
146 PeerHelper peer = m_connection->getPeerNoEx(peerId);
151 if (dynamic_cast<UDPPeer *>(&peer) == 0)
154 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
155 if (!channel.queued_commands.empty()) {
165 void ConnectionSendThread::runTimeouts(float dtime)
167 std::vector<session_t> timeouted_peers;
168 std::vector<session_t> peerIds = m_connection->getPeerIDs();
170 const u32 numpeers = m_connection->m_peers.size();
175 for (session_t &peerId : peerIds) {
176 PeerHelper peer = m_connection->getPeerNoEx(peerId);
181 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
185 PROFILE(std::stringstream peerIdentifier);
186 PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
187 << ";" << peerId << ";RELIABLE]");
188 PROFILE(ScopeProfiler
189 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
191 SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
196 if (peer->isTimedOut(m_timeout)) {
197 infostream << m_connection->getDesc()
198 << "RunTimeouts(): Peer " << peer->id
201 // Add peer to the list
202 timeouted_peers.push_back(peer->id);
203 // Don't bother going through the buffers of this one
207 float resend_timeout = udpPeer->getResendTimeout();
208 for (Channel &channel : udpPeer->channels) {
210 // Remove timed out incomplete unreliable split packets
211 channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
213 // Increment reliable packet times
214 channel.outgoing_reliables_sent.incrementTimeouts(dtime);
216 // Re-send timed out outgoing reliables
217 auto timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
218 (m_max_data_packets_per_iteration / numpeers));
220 channel.UpdatePacketLossCounter(timed_outs.size());
221 g_profiler->graphAdd("packets_lost", timed_outs.size());
223 m_iteration_packets_avaialble -= timed_outs.size();
225 for (const auto &k : timed_outs) {
226 u8 channelnum = readChannel(k->data);
227 u16 seqnum = k->getSeqnum();
229 channel.UpdateBytesLost(k->size());
231 LOG(derr_con << m_connection->getDesc()
232 << "RE-SENDING timed-out RELIABLE to "
233 << k->address.serializeString()
234 << "(t/o=" << resend_timeout << "): "
235 << "count=" << k->resend_count
236 << ", channel=" << ((int) channelnum & 0xff)
237 << ", seqnum=" << seqnum
242 // do not handle rtt here as we can't decide if this packet was
243 // lost or really takes more time to transmit
246 channel.UpdateTimers(dtime);
249 /* send ping if necessary */
250 if (udpPeer->Ping(dtime, data)) {
251 LOG(dout_con << m_connection->getDesc()
252 << "Sending ping for peer_id: " << udpPeer->id << std::endl);
253 /* this may fail if there ain't a sequence number left */
254 if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
255 //retrigger with reduced ping interval
256 udpPeer->Ping(4.0, data);
260 udpPeer->RunCommandQueues(m_max_packet_size,
261 m_max_commands_per_iteration,
262 m_max_packets_requeued);
265 // Remove timed out peers
266 for (u16 timeouted_peer : timeouted_peers) {
267 LOG(dout_con << m_connection->getDesc()
268 << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
269 m_connection->deletePeer(timeouted_peer, true);
273 void ConnectionSendThread::rawSend(const BufferedPacket *p)
276 m_connection->m_udpSocket.Send(p->address, p->data, p->size());
277 LOG(dout_con << m_connection->getDesc()
278 << " rawSend: " << p->size()
279 << " bytes sent" << std::endl);
280 } catch (SendFailedException &e) {
281 LOG(derr_con << m_connection->getDesc()
282 << "Connection::rawSend(): SendFailedException: "
283 << p->address.serializeString() << std::endl);
287 void ConnectionSendThread::sendAsPacketReliable(BufferedPacketPtr &p, Channel *channel)
290 p->absolute_send_time = porting::getTimeMs();
292 channel->outgoing_reliables_sent.insert(p,
293 (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
294 % (MAX_RELIABLE_WINDOW_SIZE + 1));
296 catch (AlreadyExistsException &e) {
297 LOG(derr_con << m_connection->getDesc()
298 << "WARNING: Going to send a reliable packet"
299 << " in outgoing buffer" << std::endl);
306 bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
307 const SharedBuffer<u8> &data, bool reliable)
309 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
311 LOG(errorstream << m_connection->getDesc()
312 << " dropped " << (reliable ? "reliable " : "")
313 << "packet for non existent peer_id: " << peer_id << std::endl);
316 Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
319 bool have_seqnum = false;
320 const u16 seqnum = channel->getOutgoingSequenceNumber(have_seqnum);
325 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
326 Address peer_address;
327 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
329 // Add base headers and make a packet
330 BufferedPacketPtr p = con::makePacket(peer_address, reliable,
331 m_connection->GetProtocolID(), m_connection->GetPeerID(),
334 // first check if our send window is already maxed out
335 if (channel->outgoing_reliables_sent.size() < channel->getWindowSize()) {
336 LOG(dout_con << m_connection->getDesc()
337 << " INFO: sending a reliable packet to peer_id " << peer_id
338 << " channel: " << (u32)channelnum
339 << " seqnum: " << seqnum << std::endl);
340 sendAsPacketReliable(p, channel);
344 LOG(dout_con << m_connection->getDesc()
345 << " INFO: queueing reliable packet for peer_id: " << peer_id
346 << " channel: " << (u32)channelnum
347 << " seqnum: " << seqnum << std::endl);
348 channel->queued_reliables.push(p);
352 Address peer_address;
353 if (peer->getAddress(MTP_UDP, peer_address)) {
354 // Add base headers and make a packet
355 BufferedPacketPtr p = con::makePacket(peer_address, data,
356 m_connection->GetProtocolID(), m_connection->GetPeerID(),
364 LOG(dout_con << m_connection->getDesc()
365 << " INFO: dropped unreliable packet for peer_id: " << peer_id
366 << " because of (yet) missing udp address" << std::endl);
370 void ConnectionSendThread::processReliableCommand(ConnectionCommandPtr &c)
372 assert(c->reliable); // Pre-condition
376 LOG(dout_con << m_connection->getDesc()
377 << "UDP processing reliable CONNCMD_NONE" << std::endl);
381 LOG(dout_con << m_connection->getDesc()
382 << "UDP processing reliable CONNCMD_SEND" << std::endl);
386 case CONNCMD_SEND_TO_ALL:
387 LOG(dout_con << m_connection->getDesc()
388 << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
389 sendToAllReliable(c);
392 case CONCMD_CREATE_PEER:
393 LOG(dout_con << m_connection->getDesc()
394 << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
395 if (!rawSendAsPacket(c->peer_id, c->channelnum, c->data, c->reliable)) {
396 /* put to queue if we couldn't send it immediately */
402 case CONNCMD_CONNECT:
403 case CONNCMD_DISCONNECT:
405 FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
407 LOG(dout_con << m_connection->getDesc()
408 << " Invalid reliable command type: " << c->type << std::endl);
413 void ConnectionSendThread::processNonReliableCommand(ConnectionCommandPtr &c_ptr)
415 const ConnectionCommand &c = *c_ptr;
416 assert(!c.reliable); // Pre-condition
420 LOG(dout_con << m_connection->getDesc()
421 << " UDP processing CONNCMD_NONE" << std::endl);
424 LOG(dout_con << m_connection->getDesc()
425 << " UDP processing CONNCMD_SERVE port="
426 << c.address.serializeString() << std::endl);
429 case CONNCMD_CONNECT:
430 LOG(dout_con << m_connection->getDesc()
431 << " UDP processing CONNCMD_CONNECT" << std::endl);
434 case CONNCMD_DISCONNECT:
435 LOG(dout_con << m_connection->getDesc()
436 << " UDP processing CONNCMD_DISCONNECT" << std::endl);
439 case CONNCMD_DISCONNECT_PEER:
440 LOG(dout_con << m_connection->getDesc()
441 << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
442 disconnect_peer(c.peer_id);
445 LOG(dout_con << m_connection->getDesc()
446 << " UDP processing CONNCMD_SEND" << std::endl);
447 send(c.peer_id, c.channelnum, c.data);
449 case CONNCMD_SEND_TO_ALL:
450 LOG(dout_con << m_connection->getDesc()
451 << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
452 sendToAll(c.channelnum, c.data);
455 LOG(dout_con << m_connection->getDesc()
456 << " UDP processing CONCMD_ACK" << std::endl);
457 sendAsPacket(c.peer_id, c.channelnum, c.data, true);
459 case CONCMD_CREATE_PEER:
460 FATAL_ERROR("Got command that should be reliable as unreliable command");
462 LOG(dout_con << m_connection->getDesc()
463 << " Invalid command type: " << c.type << std::endl);
467 void ConnectionSendThread::serve(Address bind_address)
469 LOG(dout_con << m_connection->getDesc()
470 << "UDP serving at port " << bind_address.serializeString() << std::endl);
472 m_connection->m_udpSocket.Bind(bind_address);
473 m_connection->SetPeerID(PEER_ID_SERVER);
475 catch (SocketException &e) {
477 m_connection->putEvent(ConnectionEvent::bindFailed());
481 void ConnectionSendThread::connect(Address address)
483 LOG(dout_con << m_connection->getDesc() << " connecting to "
484 << address.serializeString()
485 << ":" << address.getPort() << std::endl);
487 UDPPeer *peer = m_connection->createServerPeer(address);
490 m_connection->putEvent(ConnectionEvent::peerAdded(peer->id, peer->address));
494 if (address.isIPv6())
495 bind_addr.setAddress((IPv6AddressBytes *) NULL);
497 bind_addr.setAddress(0, 0, 0, 0);
499 m_connection->m_udpSocket.Bind(bind_addr);
501 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
502 m_connection->SetPeerID(PEER_ID_INEXISTENT);
503 NetworkPacket pkt(0, 0);
504 m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
507 void ConnectionSendThread::disconnect()
509 LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
511 // Create and send DISCO packet
512 SharedBuffer<u8> data(2);
513 writeU8(&data[0], PACKET_TYPE_CONTROL);
514 writeU8(&data[1], CONTROLTYPE_DISCO);
518 std::vector<session_t> peerids = m_connection->getPeerIDs();
520 for (session_t peerid : peerids) {
521 sendAsPacket(peerid, 0, data, false);
525 void ConnectionSendThread::disconnect_peer(session_t peer_id)
527 LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
529 // Create and send DISCO packet
530 SharedBuffer<u8> data(2);
531 writeU8(&data[0], PACKET_TYPE_CONTROL);
532 writeU8(&data[1], CONTROLTYPE_DISCO);
533 sendAsPacket(peer_id, 0, data, false);
535 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
540 if (dynamic_cast<UDPPeer *>(&peer) == 0) {
544 dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
547 void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
548 const SharedBuffer<u8> &data)
550 assert(channelnum < CHANNEL_COUNT); // Pre-condition
552 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
554 LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
555 << ">>>NOT<<< found on sending packet"
556 << ", channel " << (channelnum % 0xFF)
557 << ", size: " << data.getSize() << std::endl);
561 LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
562 << ", channel " << (channelnum % 0xFF)
563 << ", size: " << data.getSize() << std::endl);
565 u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
567 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
568 std::list<SharedBuffer<u8>> originals;
570 makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
572 peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
574 for (const SharedBuffer<u8> &original : originals) {
575 sendAsPacket(peer_id, channelnum, original);
579 void ConnectionSendThread::sendReliable(ConnectionCommandPtr &c)
581 PeerHelper peer = m_connection->getPeerNoEx(c->peer_id);
585 peer->PutReliableSendCommand(c, m_max_packet_size);
588 void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data)
590 std::vector<session_t> peerids = m_connection->getPeerIDs();
592 for (session_t peerid : peerids) {
593 send(peerid, channelnum, data);
597 void ConnectionSendThread::sendToAllReliable(ConnectionCommandPtr &c)
599 std::vector<session_t> peerids = m_connection->getPeerIDs();
601 for (session_t peerid : peerids) {
602 PeerHelper peer = m_connection->getPeerNoEx(peerid);
607 peer->PutReliableSendCommand(c, m_max_packet_size);
611 void ConnectionSendThread::sendPackets(float dtime)
613 std::vector<session_t> peerIds = m_connection->getPeerIDs();
614 std::vector<session_t> pendingDisconnect;
615 std::map<session_t, bool> pending_unreliable;
617 const unsigned int peer_packet_quota = m_iteration_packets_avaialble
618 / MYMAX(peerIds.size(), 1);
620 for (session_t peerId : peerIds) {
621 PeerHelper peer = m_connection->getPeerNoEx(peerId);
622 //peer may have been removed
624 LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
629 peer->m_increment_packets_remaining = peer_packet_quota;
631 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
637 if (udpPeer->m_pending_disconnect) {
638 pendingDisconnect.push_back(peerId);
641 PROFILE(std::stringstream
644 peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
646 PROFILE(ScopeProfiler
647 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
649 LOG(dout_con << m_connection->getDesc()
650 << " Handle per peer queues: peer_id=" << peerId
651 << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
653 // first send queued reliable packets for all peers (if possible)
654 for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
655 Channel &channel = udpPeer->channels[i];
657 // Reduces logging verbosity
658 if (channel.queued_reliables.empty())
662 channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
663 u16 next_to_receive = 0;
664 channel.incoming_reliables.getFirstSeqnum(next_to_receive);
666 LOG(dout_con << m_connection->getDesc() << "\t channel: "
667 << i << ", peer quota:"
668 << peer->m_increment_packets_remaining
670 << "\t\t\treliables on wire: "
671 << channel.outgoing_reliables_sent.size()
672 << ", waiting for ack for " << next_to_ack
674 << "\t\t\tincoming_reliables: "
675 << channel.incoming_reliables.size()
676 << ", next reliable packet: "
677 << channel.readNextIncomingSeqNum()
678 << ", next queued: " << next_to_receive
680 << "\t\t\treliables queued : "
681 << channel.queued_reliables.size()
683 << "\t\t\tqueued commands : "
684 << channel.queued_commands.size()
687 while (!channel.queued_reliables.empty() &&
688 channel.outgoing_reliables_sent.size()
689 < channel.getWindowSize() &&
690 peer->m_increment_packets_remaining > 0) {
691 BufferedPacketPtr p = channel.queued_reliables.front();
692 channel.queued_reliables.pop();
694 LOG(dout_con << m_connection->getDesc()
695 << " INFO: sending a queued reliable packet "
697 << ", seqnum: " << p->getSeqnum()
700 sendAsPacketReliable(p, &channel);
701 peer->m_increment_packets_remaining--;
706 if (!m_outgoing_queue.empty()) {
707 LOG(dout_con << m_connection->getDesc()
708 << " Handle non reliable queue ("
709 << m_outgoing_queue.size() << " pkts)" << std::endl);
712 unsigned int initial_queuesize = m_outgoing_queue.size();
713 /* send non reliable packets*/
714 for (unsigned int i = 0; i < initial_queuesize; i++) {
715 OutgoingPacket packet = m_outgoing_queue.front();
716 m_outgoing_queue.pop();
721 PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
723 LOG(dout_con << m_connection->getDesc()
724 << " Outgoing queue: peer_id=" << packet.peer_id
725 << ">>>NOT<<< found on sending packet"
726 << ", channel " << (packet.channelnum % 0xFF)
727 << ", size: " << packet.data.getSize() << std::endl);
731 /* send acks immediately */
732 if (packet.ack || peer->m_increment_packets_remaining > 0 || stopRequested()) {
733 rawSendAsPacket(packet.peer_id, packet.channelnum,
734 packet.data, packet.reliable);
735 if (peer->m_increment_packets_remaining > 0)
736 peer->m_increment_packets_remaining--;
738 m_outgoing_queue.push(packet);
739 pending_unreliable[packet.peer_id] = true;
743 if (peer_packet_quota > 0) {
744 for (session_t peerId : peerIds) {
745 PeerHelper peer = m_connection->getPeerNoEx(peerId);
748 if (peer->m_increment_packets_remaining == 0) {
749 LOG(warningstream << m_connection->getDesc()
750 << " Packet quota used up for peer_id=" << peerId
751 << ", was " << peer_packet_quota << " pkts" << std::endl);
756 for (session_t peerId : pendingDisconnect) {
757 if (!pending_unreliable[peerId]) {
758 m_connection->deletePeer(peerId, false);
763 void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
764 const SharedBuffer<u8> &data, bool ack)
766 OutgoingPacket packet(peer_id, channelnum, data, false, ack);
767 m_outgoing_queue.push(packet);
770 ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
771 Thread("ConnectionReceive")
775 void *ConnectionReceiveThread::run()
777 assert(m_connection);
779 LOG(dout_con << m_connection->getDesc()
780 << "ConnectionReceive thread started" << std::endl);
782 PROFILE(std::stringstream
784 PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
786 // use IPv6 minimum allowed MTU as receive buffer size as this is
787 // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
789 const unsigned int packet_maxsize = 1500;
790 SharedBuffer<u8> packetdata(packet_maxsize);
792 bool packet_queued = true;
794 #ifdef DEBUG_CONNECTION_KBPS
795 u64 curtime = porting::getTimeMs();
796 u64 lasttime = curtime;
797 float debug_print_timer = 0.0;
800 while (!stopRequested()) {
801 BEGIN_DEBUG_EXCEPTION_HANDLER
802 PROFILE(ScopeProfiler
803 sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
805 #ifdef DEBUG_CONNECTION_KBPS
807 curtime = porting::getTimeMs();
808 float dtime = CALC_DTIME(lasttime,curtime);
811 /* receive packets */
812 receive(packetdata, packet_queued);
814 #ifdef DEBUG_CONNECTION_KBPS
815 debug_print_timer += dtime;
816 if (debug_print_timer > 20.0) {
817 debug_print_timer -= 20.0;
819 std::vector<session_t> peerids = m_connection->getPeerIDs();
821 for (auto id : peerids)
823 PeerHelper peer = m_connection->getPeerNoEx(id);
827 float peer_current = 0.0;
828 float peer_loss = 0.0;
829 float avg_rate = 0.0;
830 float avg_loss = 0.0;
832 for(u16 j=0; j<CHANNEL_COUNT; j++)
834 peer_current +=peer->channels[j].getCurrentDownloadRateKB();
835 peer_loss += peer->channels[j].getCurrentLossRateKB();
836 avg_rate += peer->channels[j].getAvgDownloadRateKB();
837 avg_loss += peer->channels[j].getAvgLossRateKB();
840 std::stringstream output;
841 output << std::fixed << std::setprecision(1);
842 output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
843 output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
844 output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
845 output << std::setfill(' ');
846 for(u16 j=0; j<CHANNEL_COUNT; j++)
848 output << "\tcha " << j << ":"
849 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
850 << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
851 << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
853 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
854 << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
855 << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
856 << " / WS: " << peer->channels[j].getWindowSize()
860 fprintf(stderr,"%s\n",output.str().c_str());
864 END_DEBUG_EXCEPTION_HANDLER
867 PROFILE(g_profiler->remove(ThreadIdentifier.str()));
871 // Receive packets from the network and buffers and create ConnectionEvents
872 void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
876 // First, see if there any buffered packets we can process now
879 SharedBuffer<u8> resultdata;
882 if (!getFromBuffers(peer_id, resultdata))
885 m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
887 catch (ProcessedSilentlyException &e) {
888 /* try reading again */
891 packet_queued = false;
894 // Call Receive() to wait for incoming data
896 s32 received_size = m_connection->m_udpSocket.Receive(sender,
897 *packetdata, packetdata.getSize());
898 if (received_size < 0)
901 if ((received_size < BASE_HEADER_SIZE) ||
902 (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
903 LOG(derr_con << m_connection->getDesc()
904 << "Receive(): Invalid incoming packet, "
905 << "size: " << received_size
907 << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
912 session_t peer_id = readPeerId(*packetdata);
913 u8 channelnum = readChannel(*packetdata);
915 if (channelnum > CHANNEL_COUNT - 1) {
916 LOG(derr_con << m_connection->getDesc()
917 << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
921 /* Try to identify peer by sender address (may happen on join) */
922 if (peer_id == PEER_ID_INEXISTENT) {
923 peer_id = m_connection->lookupPeer(sender);
924 // We do not have to remind the peer of its
925 // peer id as the CONTROLTYPE_SET_PEER_ID
926 // command was sent reliably.
929 if (peer_id == PEER_ID_INEXISTENT) {
930 /* Ignore it if we are a client */
931 if (m_connection->ConnectedToServer())
933 /* The peer was not found in our lists. Add it. */
934 peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
937 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
939 LOG(dout_con << m_connection->getDesc()
940 << " got packet from unknown peer_id: "
941 << peer_id << " Ignoring." << std::endl);
945 // Validate peer address
947 Address peer_address;
948 if (peer->getAddress(MTP_UDP, peer_address)) {
949 if (peer_address != sender) {
950 LOG(derr_con << m_connection->getDesc()
951 << " Peer " << peer_id << " sending from different address."
952 " Ignoring." << std::endl);
956 LOG(derr_con << m_connection->getDesc()
957 << " Peer " << peer_id << " doesn't have an address?!"
958 " Ignoring." << std::endl);
962 peer->ResetTimeout();
964 Channel *channel = nullptr;
965 if (dynamic_cast<UDPPeer *>(&peer)) {
966 channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum];
968 LOG(derr_con << m_connection->getDesc()
969 << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
970 " Ignoring." << std::endl);
974 channel->UpdateBytesReceived(received_size);
976 // Throw the received packet to channel->processPacket()
978 // Make a new SharedBuffer from the data without the base headers
979 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
980 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
981 strippeddata.getSize());
984 // Process it (the result is some data with no headers made by us)
985 SharedBuffer<u8> resultdata = processPacket
986 (channel, strippeddata, peer_id, channelnum, false);
988 LOG(dout_con << m_connection->getDesc()
989 << " ProcessPacket from peer_id: " << peer_id
990 << ", channel: " << (u32)channelnum << ", returned "
991 << resultdata.getSize() << " bytes" << std::endl);
993 m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
995 catch (ProcessedSilentlyException &e) {
997 catch (ProcessedQueued &e) {
998 // we set it to true anyway (see below)
1001 /* Every time we receive a packet it can happen that a previously
1002 * buffered packet is now ready to process. */
1003 packet_queued = true;
1005 catch (InvalidIncomingDataException &e) {
1009 bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8> &dst)
1011 std::vector<session_t> peerids = m_connection->getPeerIDs();
1013 for (session_t peerid : peerids) {
1014 PeerHelper peer = m_connection->getPeerNoEx(peerid);
1018 UDPPeer *p = dynamic_cast<UDPPeer *>(&peer);
1022 for (Channel &channel : p->channels) {
1023 if (checkIncomingBuffers(&channel, peer_id, dst)) {
1031 bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
1032 session_t &peer_id, SharedBuffer<u8> &dst)
1034 u16 firstseqnum = 0;
1035 if (!channel->incoming_reliables.getFirstSeqnum(firstseqnum))
1038 if (firstseqnum != channel->readNextIncomingSeqNum())
1041 BufferedPacketPtr p = channel->incoming_reliables.popFirst();
1043 peer_id = readPeerId(p->data); // Carried over to caller function
1044 u8 channelnum = readChannel(p->data);
1045 u16 seqnum = p->getSeqnum();
1047 LOG(dout_con << m_connection->getDesc()
1048 << "UNBUFFERING TYPE_RELIABLE"
1049 << " seqnum=" << seqnum
1050 << " peer_id=" << peer_id
1051 << " channel=" << ((int) channelnum & 0xff)
1054 channel->incNextIncomingSeqNum();
1056 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1057 // Get out the inside packet and re-process it
1058 SharedBuffer<u8> payload(p->size() - headers_size);
1059 memcpy(*payload, &p->data[headers_size], payload.getSize());
1061 dst = processPacket(channel, payload, peer_id, channelnum, true);
1065 SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
1066 const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum, bool reliable)
1068 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
1071 errorstream << "Peer not found (possible timeout)" << std::endl;
1072 throw ProcessedSilentlyException("Peer not found (possible timeout)");
1075 if (packetdata.getSize() < 1)
1076 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1078 u8 type = readU8(&(packetdata[0]));
1080 if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
1081 std::string errmsg = "Invalid peer_id=" + itos(peer_id);
1082 errorstream << errmsg << std::endl;
1083 throw InvalidIncomingDataException(errmsg.c_str());
1086 if (type >= PACKET_TYPE_MAX) {
1087 derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
1089 throw InvalidIncomingDataException("Invalid packet type");
1092 const PacketTypeHandler &pHandle = packetTypeRouter[type];
1093 return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
1096 const ConnectionReceiveThread::PacketTypeHandler
1097 ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
1098 {&ConnectionReceiveThread::handlePacketType_Control},
1099 {&ConnectionReceiveThread::handlePacketType_Original},
1100 {&ConnectionReceiveThread::handlePacketType_Split},
1101 {&ConnectionReceiveThread::handlePacketType_Reliable},
1104 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
1105 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1107 if (packetdata.getSize() < 2)
1108 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1110 ControlType controltype = (ControlType)readU8(&(packetdata[1]));
1112 if (controltype == CONTROLTYPE_ACK) {
1113 assert(channel != NULL);
1115 if (packetdata.getSize() < 4) {
1116 throw InvalidIncomingDataException(
1117 "packetdata.getSize() < 4 (ACK header size)");
1120 u16 seqnum = readU16(&packetdata[2]);
1121 LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
1122 << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
1123 << seqnum << " ]" << std::endl);
1126 BufferedPacketPtr p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
1128 // the rtt calculation will be a bit off for re-sent packets but that's okay
1130 // Get round trip time
1131 u64 current_time = porting::getTimeMs();
1133 // a overflow is quite unlikely but as it'd result in major
1134 // rtt miscalculation we handle it here
1135 if (current_time > p->absolute_send_time) {
1136 float rtt = (current_time - p->absolute_send_time) / 1000.0;
1138 // Let peer calculate stuff according to it
1139 // (avg_rtt and resend_timeout)
1140 dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1141 } else if (p->totaltime > 0) {
1142 float rtt = p->totaltime;
1144 // Let peer calculate stuff according to it
1145 // (avg_rtt and resend_timeout)
1146 dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1150 // put bytes for max bandwidth calculation
1151 channel->UpdateBytesSent(p->size(), 1);
1152 if (channel->outgoing_reliables_sent.size() == 0)
1153 m_connection->TriggerSend();
1154 } catch (NotFoundException &e) {
1155 LOG(derr_con << m_connection->getDesc()
1156 << "WARNING: ACKed packet not in outgoing queue"
1157 << " seqnum=" << seqnum << std::endl);
1158 channel->UpdatePacketTooLateCounter();
1161 throw ProcessedSilentlyException("Got an ACK");
1162 } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
1163 // Got a packet to set our peer id
1164 if (packetdata.getSize() < 4)
1165 throw InvalidIncomingDataException
1166 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1167 session_t peer_id_new = readU16(&packetdata[2]);
1168 LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
1169 << "... " << std::endl);
1171 if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
1172 LOG(derr_con << m_connection->getDesc()
1173 << "WARNING: Not changing existing peer id." << std::endl);
1175 LOG(dout_con << m_connection->getDesc() << "changing own peer id"
1177 m_connection->SetPeerID(peer_id_new);
1180 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1181 } else if (controltype == CONTROLTYPE_PING) {
1182 // Just ignore it, the incoming data already reset
1183 // the timeout counter
1184 LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
1185 throw ProcessedSilentlyException("Got a PING");
1186 } else if (controltype == CONTROLTYPE_DISCO) {
1187 // Just ignore it, the incoming data already reset
1188 // the timeout counter
1189 LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
1190 << peer->id << std::endl);
1192 if (!m_connection->deletePeer(peer->id, false)) {
1193 derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
1196 throw ProcessedSilentlyException("Got a DISCO");
1198 LOG(derr_con << m_connection->getDesc()
1199 << "INVALID controltype="
1200 << ((int) controltype & 0xff) << std::endl);
1201 throw InvalidIncomingDataException("Invalid control type");
1205 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
1206 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1208 if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
1209 throw InvalidIncomingDataException
1210 ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
1211 LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
1213 // Get the inside packet out and return it
1214 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1215 memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
1219 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
1220 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1222 Address peer_address;
1224 if (peer->getAddress(MTP_UDP, peer_address)) {
1225 // We have to create a packet again for buffering
1226 // This isn't actually too bad an idea.
1227 BufferedPacketPtr packet = con::makePacket(peer_address,
1229 m_connection->GetProtocolID(),
1233 // Buffer the packet
1234 SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
1236 if (data.getSize() != 0) {
1237 LOG(dout_con << m_connection->getDesc()
1238 << "RETURNING TYPE_SPLIT: Constructed full data, "
1239 << "size=" << data.getSize() << std::endl);
1242 LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
1243 throw ProcessedSilentlyException("Buffered a split packet chunk");
1246 // We should never get here.
1247 FATAL_ERROR("Invalid execution point");
1250 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
1251 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1253 assert(channel != NULL);
1255 // Recursive reliable packets not allowed
1257 throw InvalidIncomingDataException("Found nested reliable packets");
1259 if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
1260 throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1262 const u16 seqnum = readU16(&packetdata[1]);
1263 bool is_future_packet = false;
1264 bool is_old_packet = false;
1266 /* packet is within our receive window send ack */
1267 if (seqnum_in_window(seqnum,
1268 channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
1269 m_connection->sendAck(peer->id, channelnum, seqnum);
1271 is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
1272 is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
1274 /* packet is not within receive window, don't send ack. *
1275 * if this was a valid packet it's gonna be retransmitted */
1276 if (is_future_packet)
1277 throw ProcessedSilentlyException(
1278 "Received packet newer then expected, not sending ack");
1280 /* seems like our ack was lost, send another one for a old packet */
1281 if (is_old_packet) {
1282 LOG(dout_con << m_connection->getDesc()
1283 << "RE-SENDING ACK: peer_id: " << peer->id
1284 << ", channel: " << (channelnum & 0xFF)
1285 << ", seqnum: " << seqnum << std::endl;)
1286 m_connection->sendAck(peer->id, channelnum, seqnum);
1288 // we already have this packet so this one was on wire at least
1289 // the current timeout
1290 // we don't know how long this packet was on wire don't do silly guessing
1291 // dynamic_cast<UDPPeer*>(&peer)->
1292 // reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
1294 throw ProcessedSilentlyException("Retransmitting ack for old packet");
1298 if (seqnum != channel->readNextIncomingSeqNum()) {
1299 Address peer_address;
1301 // this is a reliable packet so we have a udp address for sure
1302 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
1303 // This one comes later, buffer it.
1304 // Actually we have to make a packet to buffer one.
1305 // Well, we have all the ingredients, so just do it.
1306 BufferedPacketPtr packet = con::makePacket(
1309 m_connection->GetProtocolID(),
1313 channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
1315 LOG(dout_con << m_connection->getDesc()
1316 << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
1317 << ", channel: " << (channelnum & 0xFF)
1318 << ", seqnum: " << seqnum << std::endl;)
1320 throw ProcessedQueued("Buffered future reliable packet");
1321 } catch (AlreadyExistsException &e) {
1322 } catch (IncomingDataCorruption &e) {
1323 m_connection->putCommand(ConnectionCommand::disconnect_peer(peer->id));
1325 LOG(derr_con << m_connection->getDesc()
1326 << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
1327 << ", channel: " << (channelnum & 0xFF)
1328 << ", seqnum: " << seqnum
1329 << "DROPPING CLIENT!" << std::endl;)
1333 /* we got a packet to process right now */
1334 LOG(dout_con << m_connection->getDesc()
1335 << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
1336 << ", channel: " << (channelnum & 0xFF)
1337 << ", seqnum: " << seqnum << std::endl;)
1340 /* check for resend case */
1341 u16 queued_seqnum = 0;
1342 if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
1343 if (queued_seqnum == seqnum) {
1344 BufferedPacketPtr queued_packet = channel->incoming_reliables.popFirst();
1345 /** TODO find a way to verify the new against the old packet */
1349 channel->incNextIncomingSeqNum();
1351 // Get out the inside packet and re-process it
1352 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1353 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1355 return processPacket(channel, payload, peer->id, channelnum, true);