3 Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
4 Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 #include "connectionthreads.h"
25 #include "network/networkpacket.h"
26 #include "util/serialize.h"
31 /******************************************************************************/
32 /* defines used for debugging and profiling */
33 /******************************************************************************/
37 #undef DEBUG_CONNECTION_KBPS
39 /* this mutex is used to achieve log message consistency */
40 std::mutex log_conthread_mutex;
43 MutexAutoLock loglock(log_conthread_mutex); \
47 //#define DEBUG_CONNECTION_KBPS
48 #undef DEBUG_CONNECTION_KBPS
51 /* maximum number of retries for reliable packets */
52 #define MAX_RELIABLE_RETRY 5
56 static session_t readPeerId(u8 *packetdata)
58 return readU16(&packetdata[4]);
60 static u8 readChannel(u8 *packetdata)
62 return readU8(&packetdata[6]);
65 /******************************************************************************/
66 /* Connection Threads */
67 /******************************************************************************/
69 ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
71 Thread("ConnectionSend"),
72 m_max_packet_size(max_packet_size),
74 m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
76 SANITY_CHECK(m_max_data_packets_per_iteration > 1);
79 void *ConnectionSendThread::run()
83 LOG(dout_con << m_connection->getDesc()
84 << "ConnectionSend thread started" << std::endl);
86 u64 curtime = porting::getTimeMs();
87 u64 lasttime = curtime;
89 PROFILE(std::stringstream ThreadIdentifier);
90 PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
92 /* if stop is requested don't stop immediately but try to send all */
94 while (!stopRequested() || packetsQueued()) {
95 BEGIN_DEBUG_EXCEPTION_HANDLER
96 PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
98 m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
100 /* wait for trigger or timeout */
101 m_send_sleep_semaphore.wait(50);
103 /* remove all triggers */
104 while (m_send_sleep_semaphore.wait(0)) {
108 curtime = porting::getTimeMs();
109 float dtime = CALC_DTIME(lasttime, curtime);
111 /* first resend timed-out packets */
113 if (m_iteration_packets_avaialble == 0) {
114 LOG(warningstream << m_connection->getDesc()
115 << " Packet quota used up after re-sending packets, "
116 << "max=" << m_max_data_packets_per_iteration << std::endl);
119 /* translate commands to packets */
120 ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
121 while (c.type != CONNCMD_NONE) {
123 processReliableCommand(c);
125 processNonReliableCommand(c);
127 c = m_connection->m_command_queue.pop_frontNoEx(0);
130 /* send queued packets */
133 END_DEBUG_EXCEPTION_HANDLER
136 PROFILE(g_profiler->remove(ThreadIdentifier.str()));
140 void ConnectionSendThread::Trigger()
142 m_send_sleep_semaphore.post();
145 bool ConnectionSendThread::packetsQueued()
147 std::vector<session_t> peerIds = m_connection->getPeerIDs();
149 if (!m_outgoing_queue.empty() && !peerIds.empty())
152 for (session_t peerId : peerIds) {
153 PeerHelper peer = m_connection->getPeerNoEx(peerId);
158 if (dynamic_cast<UDPPeer *>(&peer) == 0)
161 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
162 if (!channel.queued_commands.empty()) {
172 void ConnectionSendThread::runTimeouts(float dtime)
174 std::vector<session_t> timeouted_peers;
175 std::vector<session_t> peerIds = m_connection->getPeerIDs();
177 for (session_t &peerId : peerIds) {
178 PeerHelper peer = m_connection->getPeerNoEx(peerId);
183 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
187 PROFILE(std::stringstream peerIdentifier);
188 PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
189 << ";" << peerId << ";RELIABLE]");
190 PROFILE(ScopeProfiler
191 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
193 SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
198 if (peer->isTimedOut(m_timeout)) {
199 infostream << m_connection->getDesc()
200 << "RunTimeouts(): Peer " << peer->id
203 // Add peer to the list
204 timeouted_peers.push_back(peer->id);
205 // Don't bother going through the buffers of this one
209 float resend_timeout = udpPeer->getResendTimeout();
210 bool retry_count_exceeded = false;
211 for (Channel &channel : udpPeer->channels) {
212 std::list<BufferedPacket> timed_outs;
214 // Remove timed out incomplete unreliable split packets
215 channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
217 // Increment reliable packet times
218 channel.outgoing_reliables_sent.incrementTimeouts(dtime);
220 unsigned int numpeers = m_connection->m_peers.size();
225 // Re-send timed out outgoing reliables
226 timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
227 (m_max_data_packets_per_iteration / numpeers));
229 channel.UpdatePacketLossCounter(timed_outs.size());
230 g_profiler->graphAdd("packets_lost", timed_outs.size());
232 m_iteration_packets_avaialble -= timed_outs.size();
234 for (std::list<BufferedPacket>::iterator k = timed_outs.begin();
235 k != timed_outs.end(); ++k) {
236 session_t peer_id = readPeerId(*(k->data));
237 u8 channelnum = readChannel(*(k->data));
238 u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
240 channel.UpdateBytesLost(k->data.getSize());
243 if (k->resend_count > MAX_RELIABLE_RETRY) {
244 retry_count_exceeded = true;
245 timeouted_peers.push_back(peer->id);
246 /* no need to check additional packets if a single one did timeout*/
250 LOG(derr_con << m_connection->getDesc()
251 << "RE-SENDING timed-out RELIABLE to "
252 << k->address.serializeString()
253 << "(t/o=" << resend_timeout << "): "
254 << "from_peer_id=" << peer_id
255 << ", channel=" << ((int) channelnum & 0xff)
256 << ", seqnum=" << seqnum
261 // do not handle rtt here as we can't decide if this packet was
262 // lost or really takes more time to transmit
265 if (retry_count_exceeded) {
266 break; /* no need to check other channels if we already did timeout */
269 channel.UpdateTimers(dtime);
272 /* skip to next peer if we did timeout */
273 if (retry_count_exceeded)
276 /* send ping if necessary */
277 if (udpPeer->Ping(dtime, data)) {
278 LOG(dout_con << m_connection->getDesc()
279 << "Sending ping for peer_id: " << udpPeer->id << std::endl);
280 /* this may fail if there ain't a sequence number left */
281 if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
282 //retrigger with reduced ping interval
283 udpPeer->Ping(4.0, data);
287 udpPeer->RunCommandQueues(m_max_packet_size,
288 m_max_commands_per_iteration,
289 m_max_packets_requeued);
292 // Remove timed out peers
293 for (u16 timeouted_peer : timeouted_peers) {
294 LOG(dout_con << m_connection->getDesc()
295 << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
296 m_connection->deletePeer(timeouted_peer, true);
300 void ConnectionSendThread::rawSend(const BufferedPacket &packet)
303 m_connection->m_udpSocket.Send(packet.address, *packet.data,
304 packet.data.getSize());
305 LOG(dout_con << m_connection->getDesc()
306 << " rawSend: " << packet.data.getSize()
307 << " bytes sent" << std::endl);
308 } catch (SendFailedException &e) {
309 LOG(derr_con << m_connection->getDesc()
310 << "Connection::rawSend(): SendFailedException: "
311 << packet.address.serializeString() << std::endl);
315 void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
318 p.absolute_send_time = porting::getTimeMs();
320 channel->outgoing_reliables_sent.insert(p,
321 (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
322 % (MAX_RELIABLE_WINDOW_SIZE + 1));
324 catch (AlreadyExistsException &e) {
325 LOG(derr_con << m_connection->getDesc()
326 << "WARNING: Going to send a reliable packet"
327 << " in outgoing buffer" << std::endl);
334 bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
335 const SharedBuffer<u8> &data, bool reliable)
337 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
339 LOG(errorstream << m_connection->getDesc()
340 << " dropped " << (reliable ? "reliable " : "")
341 << "packet for non existent peer_id: " << peer_id << std::endl);
344 Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
347 bool have_sequence_number_for_raw_packet = true;
349 channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
351 if (!have_sequence_number_for_raw_packet)
354 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
355 Address peer_address;
356 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
358 // Add base headers and make a packet
359 BufferedPacket p = con::makePacket(peer_address, reliable,
360 m_connection->GetProtocolID(), m_connection->GetPeerID(),
363 // first check if our send window is already maxed out
364 if (channel->outgoing_reliables_sent.size()
365 < channel->getWindowSize()) {
366 LOG(dout_con << m_connection->getDesc()
367 << " INFO: sending a reliable packet to peer_id " << peer_id
368 << " channel: " << (u32)channelnum
369 << " seqnum: " << seqnum << std::endl);
370 sendAsPacketReliable(p, channel);
374 LOG(dout_con << m_connection->getDesc()
375 << " INFO: queueing reliable packet for peer_id: " << peer_id
376 << " channel: " << (u32)channelnum
377 << " seqnum: " << seqnum << std::endl);
378 channel->queued_reliables.push(p);
382 Address peer_address;
383 if (peer->getAddress(MTP_UDP, peer_address)) {
384 // Add base headers and make a packet
385 BufferedPacket p = con::makePacket(peer_address, data,
386 m_connection->GetProtocolID(), m_connection->GetPeerID(),
394 LOG(dout_con << m_connection->getDesc()
395 << " INFO: dropped unreliable packet for peer_id: " << peer_id
396 << " because of (yet) missing udp address" << std::endl);
400 void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
402 assert(c.reliable); // Pre-condition
406 LOG(dout_con << m_connection->getDesc()
407 << "UDP processing reliable CONNCMD_NONE" << std::endl);
411 LOG(dout_con << m_connection->getDesc()
412 << "UDP processing reliable CONNCMD_SEND" << std::endl);
416 case CONNCMD_SEND_TO_ALL:
417 LOG(dout_con << m_connection->getDesc()
418 << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
419 sendToAllReliable(c);
422 case CONCMD_CREATE_PEER:
423 LOG(dout_con << m_connection->getDesc()
424 << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
425 if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
426 /* put to queue if we couldn't send it immediately */
432 case CONNCMD_CONNECT:
433 case CONNCMD_DISCONNECT:
435 FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
437 LOG(dout_con << m_connection->getDesc()
438 << " Invalid reliable command type: " << c.type << std::endl);
443 void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
445 assert(!c.reliable); // Pre-condition
449 LOG(dout_con << m_connection->getDesc()
450 << " UDP processing CONNCMD_NONE" << std::endl);
453 LOG(dout_con << m_connection->getDesc()
454 << " UDP processing CONNCMD_SERVE port="
455 << c.address.serializeString() << std::endl);
458 case CONNCMD_CONNECT:
459 LOG(dout_con << m_connection->getDesc()
460 << " UDP processing CONNCMD_CONNECT" << std::endl);
463 case CONNCMD_DISCONNECT:
464 LOG(dout_con << m_connection->getDesc()
465 << " UDP processing CONNCMD_DISCONNECT" << std::endl);
468 case CONNCMD_DISCONNECT_PEER:
469 LOG(dout_con << m_connection->getDesc()
470 << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
471 disconnect_peer(c.peer_id);
474 LOG(dout_con << m_connection->getDesc()
475 << " UDP processing CONNCMD_SEND" << std::endl);
476 send(c.peer_id, c.channelnum, c.data);
478 case CONNCMD_SEND_TO_ALL:
479 LOG(dout_con << m_connection->getDesc()
480 << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
481 sendToAll(c.channelnum, c.data);
484 LOG(dout_con << m_connection->getDesc()
485 << " UDP processing CONCMD_ACK" << std::endl);
486 sendAsPacket(c.peer_id, c.channelnum, c.data, true);
488 case CONCMD_CREATE_PEER:
489 FATAL_ERROR("Got command that should be reliable as unreliable command");
491 LOG(dout_con << m_connection->getDesc()
492 << " Invalid command type: " << c.type << std::endl);
496 void ConnectionSendThread::serve(Address bind_address)
498 LOG(dout_con << m_connection->getDesc()
499 << "UDP serving at port " << bind_address.serializeString() << std::endl);
501 m_connection->m_udpSocket.Bind(bind_address);
502 m_connection->SetPeerID(PEER_ID_SERVER);
504 catch (SocketException &e) {
508 m_connection->putEvent(ce);
512 void ConnectionSendThread::connect(Address address)
514 LOG(dout_con << m_connection->getDesc() << " connecting to "
515 << address.serializeString()
516 << ":" << address.getPort() << std::endl);
518 UDPPeer *peer = m_connection->createServerPeer(address);
522 e.peerAdded(peer->id, peer->address);
523 m_connection->putEvent(e);
527 if (address.isIPv6())
528 bind_addr.setAddress((IPv6AddressBytes *) NULL);
530 bind_addr.setAddress(0, 0, 0, 0);
532 m_connection->m_udpSocket.Bind(bind_addr);
534 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
535 m_connection->SetPeerID(PEER_ID_INEXISTENT);
536 NetworkPacket pkt(0, 0);
537 m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
540 void ConnectionSendThread::disconnect()
542 LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
544 // Create and send DISCO packet
545 SharedBuffer<u8> data(2);
546 writeU8(&data[0], PACKET_TYPE_CONTROL);
547 writeU8(&data[1], CONTROLTYPE_DISCO);
551 std::vector<session_t> peerids = m_connection->getPeerIDs();
553 for (session_t peerid : peerids) {
554 sendAsPacket(peerid, 0, data, false);
558 void ConnectionSendThread::disconnect_peer(session_t peer_id)
560 LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
562 // Create and send DISCO packet
563 SharedBuffer<u8> data(2);
564 writeU8(&data[0], PACKET_TYPE_CONTROL);
565 writeU8(&data[1], CONTROLTYPE_DISCO);
566 sendAsPacket(peer_id, 0, data, false);
568 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
573 if (dynamic_cast<UDPPeer *>(&peer) == 0) {
577 dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
580 void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
581 const SharedBuffer<u8> &data)
583 assert(channelnum < CHANNEL_COUNT); // Pre-condition
585 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
587 LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
588 << ">>>NOT<<< found on sending packet"
589 << ", channel " << (channelnum % 0xFF)
590 << ", size: " << data.getSize() << std::endl);
594 LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
595 << ", channel " << (channelnum % 0xFF)
596 << ", size: " << data.getSize() << std::endl);
598 u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
600 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
601 std::list<SharedBuffer<u8>> originals;
603 makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
605 peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
607 for (const SharedBuffer<u8> &original : originals) {
608 sendAsPacket(peer_id, channelnum, original);
612 void ConnectionSendThread::sendReliable(ConnectionCommand &c)
614 PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
618 peer->PutReliableSendCommand(c, m_max_packet_size);
621 void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data)
623 std::vector<session_t> peerids = m_connection->getPeerIDs();
625 for (session_t peerid : peerids) {
626 send(peerid, channelnum, data);
630 void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
632 std::vector<session_t> peerids = m_connection->getPeerIDs();
634 for (session_t peerid : peerids) {
635 PeerHelper peer = m_connection->getPeerNoEx(peerid);
640 peer->PutReliableSendCommand(c, m_max_packet_size);
644 void ConnectionSendThread::sendPackets(float dtime)
646 std::vector<session_t> peerIds = m_connection->getPeerIDs();
647 std::vector<session_t> pendingDisconnect;
648 std::map<session_t, bool> pending_unreliable;
650 const unsigned int peer_packet_quota = m_iteration_packets_avaialble
651 / MYMAX(peerIds.size(), 1);
653 for (session_t peerId : peerIds) {
654 PeerHelper peer = m_connection->getPeerNoEx(peerId);
655 //peer may have been removed
657 LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
662 peer->m_increment_packets_remaining = peer_packet_quota;
664 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
670 if (udpPeer->m_pending_disconnect) {
671 pendingDisconnect.push_back(peerId);
674 PROFILE(std::stringstream
677 peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
679 PROFILE(ScopeProfiler
680 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
682 LOG(dout_con << m_connection->getDesc()
683 << " Handle per peer queues: peer_id=" << peerId
684 << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
686 // first send queued reliable packets for all peers (if possible)
687 for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
688 Channel &channel = udpPeer->channels[i];
691 channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
692 u16 next_to_receive = 0;
693 channel.incoming_reliables.getFirstSeqnum(next_to_receive);
695 LOG(dout_con << m_connection->getDesc() << "\t channel: "
696 << i << ", peer quota:"
697 << peer->m_increment_packets_remaining
699 << "\t\t\treliables on wire: "
700 << channel.outgoing_reliables_sent.size()
701 << ", waiting for ack for " << next_to_ack
703 << "\t\t\tincoming_reliables: "
704 << channel.incoming_reliables.size()
705 << ", next reliable packet: "
706 << channel.readNextIncomingSeqNum()
707 << ", next queued: " << next_to_receive
709 << "\t\t\treliables queued : "
710 << channel.queued_reliables.size()
712 << "\t\t\tqueued commands : "
713 << channel.queued_commands.size()
716 while (!channel.queued_reliables.empty() &&
717 channel.outgoing_reliables_sent.size()
718 < channel.getWindowSize() &&
719 peer->m_increment_packets_remaining > 0) {
720 BufferedPacket p = channel.queued_reliables.front();
721 channel.queued_reliables.pop();
722 LOG(dout_con << m_connection->getDesc()
723 << " INFO: sending a queued reliable packet "
725 << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
727 sendAsPacketReliable(p, &channel);
728 peer->m_increment_packets_remaining--;
733 if (!m_outgoing_queue.empty()) {
734 LOG(dout_con << m_connection->getDesc()
735 << " Handle non reliable queue ("
736 << m_outgoing_queue.size() << " pkts)" << std::endl);
739 unsigned int initial_queuesize = m_outgoing_queue.size();
740 /* send non reliable packets*/
741 for (unsigned int i = 0; i < initial_queuesize; i++) {
742 OutgoingPacket packet = m_outgoing_queue.front();
743 m_outgoing_queue.pop();
748 PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
750 LOG(dout_con << m_connection->getDesc()
751 << " Outgoing queue: peer_id=" << packet.peer_id
752 << ">>>NOT<<< found on sending packet"
753 << ", channel " << (packet.channelnum % 0xFF)
754 << ", size: " << packet.data.getSize() << std::endl);
758 /* send acks immediately */
759 if (packet.ack || peer->m_increment_packets_remaining > 0 || stopRequested()) {
760 rawSendAsPacket(packet.peer_id, packet.channelnum,
761 packet.data, packet.reliable);
762 if (peer->m_increment_packets_remaining > 0)
763 peer->m_increment_packets_remaining--;
765 m_outgoing_queue.push(packet);
766 pending_unreliable[packet.peer_id] = true;
770 if (peer_packet_quota > 0) {
771 for (session_t peerId : peerIds) {
772 PeerHelper peer = m_connection->getPeerNoEx(peerId);
775 if (peer->m_increment_packets_remaining == 0) {
776 LOG(warningstream << m_connection->getDesc()
777 << " Packet quota used up for peer_id=" << peerId
778 << ", was " << peer_packet_quota << " pkts" << std::endl);
783 for (session_t peerId : pendingDisconnect) {
784 if (!pending_unreliable[peerId]) {
785 m_connection->deletePeer(peerId, false);
790 void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
791 const SharedBuffer<u8> &data, bool ack)
793 OutgoingPacket packet(peer_id, channelnum, data, false, ack);
794 m_outgoing_queue.push(packet);
797 ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
798 Thread("ConnectionReceive")
802 void *ConnectionReceiveThread::run()
804 assert(m_connection);
806 LOG(dout_con << m_connection->getDesc()
807 << "ConnectionReceive thread started" << std::endl);
809 PROFILE(std::stringstream
811 PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
813 // use IPv6 minimum allowed MTU as receive buffer size as this is
814 // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
816 const unsigned int packet_maxsize = 1500;
817 SharedBuffer<u8> packetdata(packet_maxsize);
819 bool packet_queued = true;
821 #ifdef DEBUG_CONNECTION_KBPS
822 u64 curtime = porting::getTimeMs();
823 u64 lasttime = curtime;
824 float debug_print_timer = 0.0;
827 while (!stopRequested()) {
828 BEGIN_DEBUG_EXCEPTION_HANDLER
829 PROFILE(ScopeProfiler
830 sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
832 #ifdef DEBUG_CONNECTION_KBPS
834 curtime = porting::getTimeMs();
835 float dtime = CALC_DTIME(lasttime,curtime);
838 /* receive packets */
839 receive(packetdata, packet_queued);
841 #ifdef DEBUG_CONNECTION_KBPS
842 debug_print_timer += dtime;
843 if (debug_print_timer > 20.0) {
844 debug_print_timer -= 20.0;
846 std::vector<session_t> peerids = m_connection->getPeerIDs();
848 for (auto id : peerids)
850 PeerHelper peer = m_connection->getPeerNoEx(id);
854 float peer_current = 0.0;
855 float peer_loss = 0.0;
856 float avg_rate = 0.0;
857 float avg_loss = 0.0;
859 for(u16 j=0; j<CHANNEL_COUNT; j++)
861 peer_current +=peer->channels[j].getCurrentDownloadRateKB();
862 peer_loss += peer->channels[j].getCurrentLossRateKB();
863 avg_rate += peer->channels[j].getAvgDownloadRateKB();
864 avg_loss += peer->channels[j].getAvgLossRateKB();
867 std::stringstream output;
868 output << std::fixed << std::setprecision(1);
869 output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
870 output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
871 output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
872 output << std::setfill(' ');
873 for(u16 j=0; j<CHANNEL_COUNT; j++)
875 output << "\tcha " << j << ":"
876 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
877 << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
878 << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
880 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
881 << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
882 << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
883 << " / WS: " << peer->channels[j].getWindowSize()
887 fprintf(stderr,"%s\n",output.str().c_str());
891 END_DEBUG_EXCEPTION_HANDLER
894 PROFILE(g_profiler->remove(ThreadIdentifier.str()));
898 // Receive packets from the network and buffers and create ConnectionEvents
899 void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
903 // First, see if there any buffered packets we can process now
905 bool data_left = true;
907 SharedBuffer<u8> resultdata;
910 data_left = getFromBuffers(peer_id, resultdata);
913 e.dataReceived(peer_id, resultdata);
914 m_connection->putEvent(e);
917 catch (ProcessedSilentlyException &e) {
918 /* try reading again */
921 packet_queued = false;
924 // Call Receive() to wait for incoming data
926 s32 received_size = m_connection->m_udpSocket.Receive(sender,
927 *packetdata, packetdata.getSize());
928 if (received_size < 0)
931 if ((received_size < BASE_HEADER_SIZE) ||
932 (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
933 LOG(derr_con << m_connection->getDesc()
934 << "Receive(): Invalid incoming packet, "
935 << "size: " << received_size
937 << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
942 session_t peer_id = readPeerId(*packetdata);
943 u8 channelnum = readChannel(*packetdata);
945 if (channelnum > CHANNEL_COUNT - 1) {
946 LOG(derr_con << m_connection->getDesc()
947 << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
951 /* Try to identify peer by sender address (may happen on join) */
952 if (peer_id == PEER_ID_INEXISTENT) {
953 peer_id = m_connection->lookupPeer(sender);
954 // We do not have to remind the peer of its
955 // peer id as the CONTROLTYPE_SET_PEER_ID
956 // command was sent reliably.
959 if (peer_id == PEER_ID_INEXISTENT) {
960 /* Ignore it if we are a client */
961 if (m_connection->ConnectedToServer())
963 /* The peer was not found in our lists. Add it. */
964 peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
967 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
969 LOG(dout_con << m_connection->getDesc()
970 << " got packet from unknown peer_id: "
971 << peer_id << " Ignoring." << std::endl);
975 // Validate peer address
977 Address peer_address;
978 if (peer->getAddress(MTP_UDP, peer_address)) {
979 if (peer_address != sender) {
980 LOG(derr_con << m_connection->getDesc()
981 << " Peer " << peer_id << " sending from different address."
982 " Ignoring." << std::endl);
986 LOG(derr_con << m_connection->getDesc()
987 << " Peer " << peer_id << " doesn't have an address?!"
988 " Ignoring." << std::endl);
992 peer->ResetTimeout();
994 Channel *channel = nullptr;
995 if (dynamic_cast<UDPPeer *>(&peer)) {
996 channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum];
998 LOG(derr_con << m_connection->getDesc()
999 << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
1000 " Ignoring." << std::endl);
1004 channel->UpdateBytesReceived(received_size);
1006 // Throw the received packet to channel->processPacket()
1008 // Make a new SharedBuffer from the data without the base headers
1009 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1010 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1011 strippeddata.getSize());
1014 // Process it (the result is some data with no headers made by us)
1015 SharedBuffer<u8> resultdata = processPacket
1016 (channel, strippeddata, peer_id, channelnum, false);
1018 LOG(dout_con << m_connection->getDesc()
1019 << " ProcessPacket from peer_id: " << peer_id
1020 << ", channel: " << (u32)channelnum << ", returned "
1021 << resultdata.getSize() << " bytes" << std::endl);
1024 e.dataReceived(peer_id, resultdata);
1025 m_connection->putEvent(e);
1027 catch (ProcessedSilentlyException &e) {
1029 catch (ProcessedQueued &e) {
1030 // we set it to true anyway (see below)
1033 /* Every time we receive a packet it can happen that a previously
1034 * buffered packet is now ready to process. */
1035 packet_queued = true;
1037 catch (InvalidIncomingDataException &e) {
1041 bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8> &dst)
1043 std::vector<session_t> peerids = m_connection->getPeerIDs();
1045 for (session_t peerid : peerids) {
1046 PeerHelper peer = m_connection->getPeerNoEx(peerid);
1050 if (dynamic_cast<UDPPeer *>(&peer) == 0)
1053 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
1054 if (checkIncomingBuffers(&channel, peer_id, dst)) {
1062 bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
1063 session_t &peer_id, SharedBuffer<u8> &dst)
1065 u16 firstseqnum = 0;
1066 if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
1067 if (firstseqnum == channel->readNextIncomingSeqNum()) {
1068 BufferedPacket p = channel->incoming_reliables.popFirst();
1069 peer_id = readPeerId(*p.data);
1070 u8 channelnum = readChannel(*p.data);
1071 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
1073 LOG(dout_con << m_connection->getDesc()
1074 << "UNBUFFERING TYPE_RELIABLE"
1075 << " seqnum=" << seqnum
1076 << " peer_id=" << peer_id
1077 << " channel=" << ((int) channelnum & 0xff)
1080 channel->incNextIncomingSeqNum();
1082 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1083 // Get out the inside packet and re-process it
1084 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1085 memcpy(*payload, &p.data[headers_size], payload.getSize());
1087 dst = processPacket(channel, payload, peer_id, channelnum, true);
1094 SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
1095 const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum, bool reliable)
1097 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
1100 errorstream << "Peer not found (possible timeout)" << std::endl;
1101 throw ProcessedSilentlyException("Peer not found (possible timeout)");
1104 if (packetdata.getSize() < 1)
1105 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1107 u8 type = readU8(&(packetdata[0]));
1109 if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
1110 std::string errmsg = "Invalid peer_id=" + itos(peer_id);
1111 errorstream << errmsg << std::endl;
1112 throw InvalidIncomingDataException(errmsg.c_str());
1115 if (type >= PACKET_TYPE_MAX) {
1116 derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
1118 throw InvalidIncomingDataException("Invalid packet type");
1121 const PacketTypeHandler &pHandle = packetTypeRouter[type];
1122 return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
1125 const ConnectionReceiveThread::PacketTypeHandler
1126 ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
1127 {&ConnectionReceiveThread::handlePacketType_Control},
1128 {&ConnectionReceiveThread::handlePacketType_Original},
1129 {&ConnectionReceiveThread::handlePacketType_Split},
1130 {&ConnectionReceiveThread::handlePacketType_Reliable},
1133 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
1134 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1136 if (packetdata.getSize() < 2)
1137 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1139 u8 controltype = readU8(&(packetdata[1]));
1141 if (controltype == CONTROLTYPE_ACK) {
1142 assert(channel != NULL);
1144 if (packetdata.getSize() < 4) {
1145 throw InvalidIncomingDataException(
1146 "packetdata.getSize() < 4 (ACK header size)");
1149 u16 seqnum = readU16(&packetdata[2]);
1150 LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
1151 << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
1152 << seqnum << " ]" << std::endl);
1155 BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
1157 // only calculate rtt from straight sent packets
1158 if (p.resend_count == 0) {
1159 // Get round trip time
1160 u64 current_time = porting::getTimeMs();
1162 // a overflow is quite unlikely but as it'd result in major
1163 // rtt miscalculation we handle it here
1164 if (current_time > p.absolute_send_time) {
1165 float rtt = (current_time - p.absolute_send_time) / 1000.0;
1167 // Let peer calculate stuff according to it
1168 // (avg_rtt and resend_timeout)
1169 dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1170 } else if (p.totaltime > 0) {
1171 float rtt = p.totaltime;
1173 // Let peer calculate stuff according to it
1174 // (avg_rtt and resend_timeout)
1175 dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1178 // put bytes for max bandwidth calculation
1179 channel->UpdateBytesSent(p.data.getSize(), 1);
1180 if (channel->outgoing_reliables_sent.size() == 0)
1181 m_connection->TriggerSend();
1182 } catch (NotFoundException &e) {
1183 LOG(derr_con << m_connection->getDesc()
1184 << "WARNING: ACKed packet not in outgoing queue"
1185 << " seqnum=" << seqnum << std::endl);
1186 channel->UpdatePacketTooLateCounter();
1189 throw ProcessedSilentlyException("Got an ACK");
1190 } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
1191 // Got a packet to set our peer id
1192 if (packetdata.getSize() < 4)
1193 throw InvalidIncomingDataException
1194 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1195 session_t peer_id_new = readU16(&packetdata[2]);
1196 LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
1197 << "... " << std::endl);
1199 if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
1200 LOG(derr_con << m_connection->getDesc()
1201 << "WARNING: Not changing existing peer id." << std::endl);
1203 LOG(dout_con << m_connection->getDesc() << "changing own peer id"
1205 m_connection->SetPeerID(peer_id_new);
1208 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1209 } else if (controltype == CONTROLTYPE_PING) {
1210 // Just ignore it, the incoming data already reset
1211 // the timeout counter
1212 LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
1213 throw ProcessedSilentlyException("Got a PING");
1214 } else if (controltype == CONTROLTYPE_DISCO) {
1215 // Just ignore it, the incoming data already reset
1216 // the timeout counter
1217 LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
1218 << peer->id << std::endl);
1220 if (!m_connection->deletePeer(peer->id, false)) {
1221 derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
1224 throw ProcessedSilentlyException("Got a DISCO");
1226 LOG(derr_con << m_connection->getDesc()
1227 << "INVALID TYPE_CONTROL: invalid controltype="
1228 << ((int) controltype & 0xff) << std::endl);
1229 throw InvalidIncomingDataException("Invalid control type");
1233 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
1234 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1236 if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
1237 throw InvalidIncomingDataException
1238 ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
1239 LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
1241 // Get the inside packet out and return it
1242 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1243 memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
1247 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
1248 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1250 Address peer_address;
1252 if (peer->getAddress(MTP_UDP, peer_address)) {
1253 // We have to create a packet again for buffering
1254 // This isn't actually too bad an idea.
1255 BufferedPacket packet = makePacket(peer_address,
1257 m_connection->GetProtocolID(),
1261 // Buffer the packet
1262 SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
1264 if (data.getSize() != 0) {
1265 LOG(dout_con << m_connection->getDesc()
1266 << "RETURNING TYPE_SPLIT: Constructed full data, "
1267 << "size=" << data.getSize() << std::endl);
1270 LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
1271 throw ProcessedSilentlyException("Buffered a split packet chunk");
1274 // We should never get here.
1275 FATAL_ERROR("Invalid execution point");
1278 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
1279 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1281 assert(channel != NULL);
1283 // Recursive reliable packets not allowed
1285 throw InvalidIncomingDataException("Found nested reliable packets");
1287 if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
1288 throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1290 u16 seqnum = readU16(&packetdata[1]);
1291 bool is_future_packet = false;
1292 bool is_old_packet = false;
1294 /* packet is within our receive window send ack */
1295 if (seqnum_in_window(seqnum,
1296 channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
1297 m_connection->sendAck(peer->id, channelnum, seqnum);
1299 is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
1300 is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
1302 /* packet is not within receive window, don't send ack. *
1303 * if this was a valid packet it's gonna be retransmitted */
1304 if (is_future_packet)
1305 throw ProcessedSilentlyException(
1306 "Received packet newer then expected, not sending ack");
1308 /* seems like our ack was lost, send another one for a old packet */
1309 if (is_old_packet) {
1310 LOG(dout_con << m_connection->getDesc()
1311 << "RE-SENDING ACK: peer_id: " << peer->id
1312 << ", channel: " << (channelnum & 0xFF)
1313 << ", seqnum: " << seqnum << std::endl;)
1314 m_connection->sendAck(peer->id, channelnum, seqnum);
1316 // we already have this packet so this one was on wire at least
1317 // the current timeout
1318 // we don't know how long this packet was on wire don't do silly guessing
1319 // dynamic_cast<UDPPeer*>(&peer)->
1320 // reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
1322 throw ProcessedSilentlyException("Retransmitting ack for old packet");
1326 if (seqnum != channel->readNextIncomingSeqNum()) {
1327 Address peer_address;
1329 // this is a reliable packet so we have a udp address for sure
1330 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
1331 // This one comes later, buffer it.
1332 // Actually we have to make a packet to buffer one.
1333 // Well, we have all the ingredients, so just do it.
1334 BufferedPacket packet = con::makePacket(
1337 m_connection->GetProtocolID(),
1341 channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
1343 LOG(dout_con << m_connection->getDesc()
1344 << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
1345 << ", channel: " << (channelnum & 0xFF)
1346 << ", seqnum: " << seqnum << std::endl;)
1348 throw ProcessedQueued("Buffered future reliable packet");
1349 } catch (AlreadyExistsException &e) {
1350 } catch (IncomingDataCorruption &e) {
1351 ConnectionCommand discon;
1352 discon.disconnect_peer(peer->id);
1353 m_connection->putCommand(discon);
1355 LOG(derr_con << m_connection->getDesc()
1356 << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
1357 << ", channel: " << (channelnum & 0xFF)
1358 << ", seqnum: " << seqnum
1359 << "DROPPING CLIENT!" << std::endl;)
1363 /* we got a packet to process right now */
1364 LOG(dout_con << m_connection->getDesc()
1365 << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
1366 << ", channel: " << (channelnum & 0xFF)
1367 << ", seqnum: " << seqnum << std::endl;)
1370 /* check for resend case */
1371 u16 queued_seqnum = 0;
1372 if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
1373 if (queued_seqnum == seqnum) {
1374 BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
1375 /** TODO find a way to verify the new against the old packet */
1379 channel->incNextIncomingSeqNum();
1381 // Get out the inside packet and re-process it
1382 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1383 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1385 return processPacket(channel, payload, peer->id, channelnum, true);