3 Copyright (C) 2013-2017 celeron55, Perttu Ahola <celeron55@gmail.com>
4 Copyright (C) 2017 celeron55, Loic Blot <loic.blot@unix-experience.fr>
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU Lesser General Public License as published by
8 the Free Software Foundation; either version 2.1 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU Lesser General Public License for more details.
16 You should have received a copy of the GNU Lesser General Public License along
17 with this program; if not, write to the Free Software Foundation, Inc.,
18 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 #include "connectionthreads.h"
25 #include "network/networkpacket.h"
26 #include "util/serialize.h"
31 /******************************************************************************/
32 /* defines used for debugging and profiling */
33 /******************************************************************************/
37 #undef DEBUG_CONNECTION_KBPS
39 /* this mutex is used to achieve log message consistency */
40 std::mutex log_conthread_mutex;
43 MutexAutoLock loglock(log_conthread_mutex); \
47 //#define DEBUG_CONNECTION_KBPS
48 #undef DEBUG_CONNECTION_KBPS
51 /* maximum number of retries for reliable packets */
52 #define MAX_RELIABLE_RETRY 5
56 static session_t readPeerId(u8 *packetdata)
58 return readU16(&packetdata[4]);
60 static u8 readChannel(u8 *packetdata)
62 return readU8(&packetdata[6]);
65 /******************************************************************************/
66 /* Connection Threads */
67 /******************************************************************************/
69 ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
71 Thread("ConnectionSend"),
72 m_max_packet_size(max_packet_size),
74 m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
78 void *ConnectionSendThread::run()
82 LOG(dout_con << m_connection->getDesc()
83 << "ConnectionSend thread started" << std::endl);
85 u64 curtime = porting::getTimeMs();
86 u64 lasttime = curtime;
88 PROFILE(std::stringstream ThreadIdentifier);
89 PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
91 /* if stop is requested don't stop immediately but try to send all */
93 while (!stopRequested() || packetsQueued()) {
94 BEGIN_DEBUG_EXCEPTION_HANDLER
95 PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
97 m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
99 /* wait for trigger or timeout */
100 m_send_sleep_semaphore.wait(50);
102 /* remove all triggers */
103 while (m_send_sleep_semaphore.wait(0)) {
107 curtime = porting::getTimeMs();
108 float dtime = CALC_DTIME(lasttime, curtime);
110 /* first do all the reliable stuff */
113 /* translate commands to packets */
114 ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
115 while (c.type != CONNCMD_NONE) {
117 processReliableCommand(c);
119 processNonReliableCommand(c);
121 c = m_connection->m_command_queue.pop_frontNoEx(0);
124 /* send non reliable packets */
127 END_DEBUG_EXCEPTION_HANDLER
130 PROFILE(g_profiler->remove(ThreadIdentifier.str()));
134 void ConnectionSendThread::Trigger()
136 m_send_sleep_semaphore.post();
139 bool ConnectionSendThread::packetsQueued()
141 std::list<session_t> peerIds = m_connection->getPeerIDs();
143 if (!m_outgoing_queue.empty() && !peerIds.empty())
146 for (session_t peerId : peerIds) {
147 PeerHelper peer = m_connection->getPeerNoEx(peerId);
152 if (dynamic_cast<UDPPeer *>(&peer) == 0)
155 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
156 if (!channel.queued_commands.empty()) {
166 void ConnectionSendThread::runTimeouts(float dtime)
168 std::list<session_t> timeouted_peers;
169 std::list<session_t> peerIds = m_connection->getPeerIDs();
171 for (session_t &peerId : peerIds) {
172 PeerHelper peer = m_connection->getPeerNoEx(peerId);
177 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
181 PROFILE(std::stringstream peerIdentifier);
182 PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc()
183 << ";" << peerId << ";RELIABLE]");
184 PROFILE(ScopeProfiler
185 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
187 SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
192 if (peer->isTimedOut(m_timeout)) {
193 infostream << m_connection->getDesc()
194 << "RunTimeouts(): Peer " << peer->id
196 << " (source=peer->timeout_counter)"
198 // Add peer to the list
199 timeouted_peers.push_back(peer->id);
200 // Don't bother going through the buffers of this one
204 float resend_timeout = udpPeer->getResendTimeout();
205 bool retry_count_exceeded = false;
206 for (Channel &channel : udpPeer->channels) {
207 std::list<BufferedPacket> timed_outs;
209 // Remove timed out incomplete unreliable split packets
210 channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
212 // Increment reliable packet times
213 channel.outgoing_reliables_sent.incrementTimeouts(dtime);
215 unsigned int numpeers = m_connection->m_peers.size();
220 // Re-send timed out outgoing reliables
221 timed_outs = channel.outgoing_reliables_sent.getTimedOuts(resend_timeout,
222 (m_max_data_packets_per_iteration / numpeers));
224 channel.UpdatePacketLossCounter(timed_outs.size());
225 g_profiler->graphAdd("packets_lost", timed_outs.size());
227 m_iteration_packets_avaialble -= timed_outs.size();
229 for (std::list<BufferedPacket>::iterator k = timed_outs.begin();
230 k != timed_outs.end(); ++k) {
231 session_t peer_id = readPeerId(*(k->data));
232 u8 channelnum = readChannel(*(k->data));
233 u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE + 1]));
235 channel.UpdateBytesLost(k->data.getSize());
238 if (k->resend_count > MAX_RELIABLE_RETRY) {
239 retry_count_exceeded = true;
240 timeouted_peers.push_back(peer->id);
241 /* no need to check additional packets if a single one did timeout*/
245 LOG(derr_con << m_connection->getDesc()
246 << "RE-SENDING timed-out RELIABLE to "
247 << k->address.serializeString()
248 << "(t/o=" << resend_timeout << "): "
249 << "from_peer_id=" << peer_id
250 << ", channel=" << ((int) channelnum & 0xff)
251 << ", seqnum=" << seqnum
256 // do not handle rtt here as we can't decide if this packet was
257 // lost or really takes more time to transmit
260 if (retry_count_exceeded) {
261 break; /* no need to check other channels if we already did timeout */
264 channel.UpdateTimers(dtime);
267 /* skip to next peer if we did timeout */
268 if (retry_count_exceeded)
271 /* send ping if necessary */
272 if (udpPeer->Ping(dtime, data)) {
273 LOG(dout_con << m_connection->getDesc()
274 << "Sending ping for peer_id: " << udpPeer->id << std::endl);
275 /* this may fail if there ain't a sequence number left */
276 if (!rawSendAsPacket(udpPeer->id, 0, data, true)) {
277 //retrigger with reduced ping interval
278 udpPeer->Ping(4.0, data);
282 udpPeer->RunCommandQueues(m_max_packet_size,
283 m_max_commands_per_iteration,
284 m_max_packets_requeued);
287 // Remove timed out peers
288 for (u16 timeouted_peer : timeouted_peers) {
289 LOG(derr_con << m_connection->getDesc()
290 << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
291 m_connection->deletePeer(timeouted_peer, true);
295 void ConnectionSendThread::rawSend(const BufferedPacket &packet)
298 m_connection->m_udpSocket.Send(packet.address, *packet.data,
299 packet.data.getSize());
300 LOG(dout_con << m_connection->getDesc()
301 << " rawSend: " << packet.data.getSize()
302 << " bytes sent" << std::endl);
303 } catch (SendFailedException &e) {
304 LOG(derr_con << m_connection->getDesc()
305 << "Connection::rawSend(): SendFailedException: "
306 << packet.address.serializeString() << std::endl);
310 void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
313 p.absolute_send_time = porting::getTimeMs();
315 channel->outgoing_reliables_sent.insert(p,
316 (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
317 % (MAX_RELIABLE_WINDOW_SIZE + 1));
319 catch (AlreadyExistsException &e) {
320 LOG(derr_con << m_connection->getDesc()
321 << "WARNING: Going to send a reliable packet"
322 << " in outgoing buffer" << std::endl);
329 bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
330 const SharedBuffer<u8> &data, bool reliable)
332 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
334 LOG(dout_con << m_connection->getDesc()
335 << " INFO: dropped packet for non existent peer_id: "
336 << peer_id << std::endl);
337 FATAL_ERROR_IF(!reliable,
338 "Trying to send raw packet reliable but no peer found!");
341 Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
344 bool have_sequence_number_for_raw_packet = true;
346 channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
348 if (!have_sequence_number_for_raw_packet)
351 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
352 Address peer_address;
353 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
355 // Add base headers and make a packet
356 BufferedPacket p = con::makePacket(peer_address, reliable,
357 m_connection->GetProtocolID(), m_connection->GetPeerID(),
360 // first check if our send window is already maxed out
361 if (channel->outgoing_reliables_sent.size()
362 < channel->getWindowSize()) {
363 LOG(dout_con << m_connection->getDesc()
364 << " INFO: sending a reliable packet to peer_id " << peer_id
365 << " channel: " << (u32)channelnum
366 << " seqnum: " << seqnum << std::endl);
367 sendAsPacketReliable(p, channel);
371 LOG(dout_con << m_connection->getDesc()
372 << " INFO: queueing reliable packet for peer_id: " << peer_id
373 << " channel: " << (u32)channelnum
374 << " seqnum: " << seqnum << std::endl);
375 channel->queued_reliables.push(p);
379 Address peer_address;
380 if (peer->getAddress(MTP_UDP, peer_address)) {
381 // Add base headers and make a packet
382 BufferedPacket p = con::makePacket(peer_address, data,
383 m_connection->GetProtocolID(), m_connection->GetPeerID(),
391 LOG(dout_con << m_connection->getDesc()
392 << " INFO: dropped unreliable packet for peer_id: " << peer_id
393 << " because of (yet) missing udp address" << std::endl);
397 void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
399 assert(c.reliable); // Pre-condition
403 LOG(dout_con << m_connection->getDesc()
404 << "UDP processing reliable CONNCMD_NONE" << std::endl);
408 LOG(dout_con << m_connection->getDesc()
409 << "UDP processing reliable CONNCMD_SEND" << std::endl);
413 case CONNCMD_SEND_TO_ALL:
414 LOG(dout_con << m_connection->getDesc()
415 << "UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
416 sendToAllReliable(c);
419 case CONCMD_CREATE_PEER:
420 LOG(dout_con << m_connection->getDesc()
421 << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
422 if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
423 /* put to queue if we couldn't send it immediately */
429 case CONNCMD_CONNECT:
430 case CONNCMD_DISCONNECT:
432 FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
434 LOG(dout_con << m_connection->getDesc()
435 << " Invalid reliable command type: " << c.type << std::endl);
440 void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
442 assert(!c.reliable); // Pre-condition
446 LOG(dout_con << m_connection->getDesc()
447 << " UDP processing CONNCMD_NONE" << std::endl);
450 LOG(dout_con << m_connection->getDesc()
451 << " UDP processing CONNCMD_SERVE port="
452 << c.address.serializeString() << std::endl);
455 case CONNCMD_CONNECT:
456 LOG(dout_con << m_connection->getDesc()
457 << " UDP processing CONNCMD_CONNECT" << std::endl);
460 case CONNCMD_DISCONNECT:
461 LOG(dout_con << m_connection->getDesc()
462 << " UDP processing CONNCMD_DISCONNECT" << std::endl);
465 case CONNCMD_DISCONNECT_PEER:
466 LOG(dout_con << m_connection->getDesc()
467 << " UDP processing CONNCMD_DISCONNECT_PEER" << std::endl);
468 disconnect_peer(c.peer_id);
471 LOG(dout_con << m_connection->getDesc()
472 << " UDP processing CONNCMD_SEND" << std::endl);
473 send(c.peer_id, c.channelnum, c.data);
475 case CONNCMD_SEND_TO_ALL:
476 LOG(dout_con << m_connection->getDesc()
477 << " UDP processing CONNCMD_SEND_TO_ALL" << std::endl);
478 sendToAll(c.channelnum, c.data);
481 LOG(dout_con << m_connection->getDesc()
482 << " UDP processing CONCMD_ACK" << std::endl);
483 sendAsPacket(c.peer_id, c.channelnum, c.data, true);
485 case CONCMD_CREATE_PEER:
486 FATAL_ERROR("Got command that should be reliable as unreliable command");
488 LOG(dout_con << m_connection->getDesc()
489 << " Invalid command type: " << c.type << std::endl);
493 void ConnectionSendThread::serve(Address bind_address)
495 LOG(dout_con << m_connection->getDesc()
496 << "UDP serving at port " << bind_address.serializeString() << std::endl);
498 m_connection->m_udpSocket.Bind(bind_address);
499 m_connection->SetPeerID(PEER_ID_SERVER);
501 catch (SocketException &e) {
505 m_connection->putEvent(ce);
509 void ConnectionSendThread::connect(Address address)
511 LOG(dout_con << m_connection->getDesc() << " connecting to "
512 << address.serializeString()
513 << ":" << address.getPort() << std::endl);
515 UDPPeer *peer = m_connection->createServerPeer(address);
519 e.peerAdded(peer->id, peer->address);
520 m_connection->putEvent(e);
524 if (address.isIPv6())
525 bind_addr.setAddress((IPv6AddressBytes *) NULL);
527 bind_addr.setAddress(0, 0, 0, 0);
529 m_connection->m_udpSocket.Bind(bind_addr);
531 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
532 m_connection->SetPeerID(PEER_ID_INEXISTENT);
533 NetworkPacket pkt(0, 0);
534 m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
537 void ConnectionSendThread::disconnect()
539 LOG(dout_con << m_connection->getDesc() << " disconnecting" << std::endl);
541 // Create and send DISCO packet
542 SharedBuffer<u8> data(2);
543 writeU8(&data[0], PACKET_TYPE_CONTROL);
544 writeU8(&data[1], CONTROLTYPE_DISCO);
548 std::list<session_t> peerids = m_connection->getPeerIDs();
550 for (session_t peerid : peerids) {
551 sendAsPacket(peerid, 0, data, false);
555 void ConnectionSendThread::disconnect_peer(session_t peer_id)
557 LOG(dout_con << m_connection->getDesc() << " disconnecting peer" << std::endl);
559 // Create and send DISCO packet
560 SharedBuffer<u8> data(2);
561 writeU8(&data[0], PACKET_TYPE_CONTROL);
562 writeU8(&data[1], CONTROLTYPE_DISCO);
563 sendAsPacket(peer_id, 0, data, false);
565 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
570 if (dynamic_cast<UDPPeer *>(&peer) == 0) {
574 dynamic_cast<UDPPeer *>(&peer)->m_pending_disconnect = true;
577 void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
578 const SharedBuffer<u8> &data)
580 assert(channelnum < CHANNEL_COUNT); // Pre-condition
582 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
584 LOG(dout_con << m_connection->getDesc() << " peer: peer_id=" << peer_id
585 << ">>>NOT<<< found on sending packet"
586 << ", channel " << (channelnum % 0xFF)
587 << ", size: " << data.getSize() << std::endl);
591 LOG(dout_con << m_connection->getDesc() << " sending to peer_id=" << peer_id
592 << ", channel " << (channelnum % 0xFF)
593 << ", size: " << data.getSize() << std::endl);
595 u16 split_sequence_number = peer->getNextSplitSequenceNumber(channelnum);
597 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
598 std::list<SharedBuffer<u8>> originals;
600 makeAutoSplitPacket(data, chunksize_max, split_sequence_number, &originals);
602 peer->setNextSplitSequenceNumber(channelnum, split_sequence_number);
604 for (const SharedBuffer<u8> &original : originals) {
605 sendAsPacket(peer_id, channelnum, original);
609 void ConnectionSendThread::sendReliable(ConnectionCommand &c)
611 PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
615 peer->PutReliableSendCommand(c, m_max_packet_size);
618 void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data)
620 std::list<session_t> peerids = m_connection->getPeerIDs();
622 for (session_t peerid : peerids) {
623 send(peerid, channelnum, data);
627 void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
629 std::list<session_t> peerids = m_connection->getPeerIDs();
631 for (session_t peerid : peerids) {
632 PeerHelper peer = m_connection->getPeerNoEx(peerid);
637 peer->PutReliableSendCommand(c, m_max_packet_size);
641 void ConnectionSendThread::sendPackets(float dtime)
643 std::list<session_t> peerIds = m_connection->getPeerIDs();
644 std::list<session_t> pendingDisconnect;
645 std::map<session_t, bool> pending_unreliable;
647 for (session_t peerId : peerIds) {
648 PeerHelper peer = m_connection->getPeerNoEx(peerId);
649 //peer may have been removed
651 LOG(dout_con << m_connection->getDesc() << " Peer not found: peer_id="
656 peer->m_increment_packets_remaining =
657 m_iteration_packets_avaialble / m_connection->m_peers.size();
659 UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
665 if (udpPeer->m_pending_disconnect) {
666 pendingDisconnect.push_back(peerId);
669 PROFILE(std::stringstream
672 peerIdentifier << "sendPackets[" << m_connection->getDesc() << ";" << peerId
674 PROFILE(ScopeProfiler
675 peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
677 LOG(dout_con << m_connection->getDesc()
678 << " Handle per peer queues: peer_id=" << peerId
679 << " packet quota: " << peer->m_increment_packets_remaining << std::endl);
681 // first send queued reliable packets for all peers (if possible)
682 for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
683 Channel &channel = udpPeer->channels[i];
686 channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
687 u16 next_to_receive = 0;
688 channel.incoming_reliables.getFirstSeqnum(next_to_receive);
690 LOG(dout_con << m_connection->getDesc() << "\t channel: "
691 << i << ", peer quota:"
692 << peer->m_increment_packets_remaining
694 << "\t\t\treliables on wire: "
695 << channel.outgoing_reliables_sent.size()
696 << ", waiting for ack for " << next_to_ack
698 << "\t\t\tincoming_reliables: "
699 << channel.incoming_reliables.size()
700 << ", next reliable packet: "
701 << channel.readNextIncomingSeqNum()
702 << ", next queued: " << next_to_receive
704 << "\t\t\treliables queued : "
705 << channel.queued_reliables.size()
707 << "\t\t\tqueued commands : "
708 << channel.queued_commands.size()
711 while (!channel.queued_reliables.empty() &&
712 channel.outgoing_reliables_sent.size()
713 < channel.getWindowSize() &&
714 peer->m_increment_packets_remaining > 0) {
715 BufferedPacket p = channel.queued_reliables.front();
716 channel.queued_reliables.pop();
717 LOG(dout_con << m_connection->getDesc()
718 << " INFO: sending a queued reliable packet "
720 << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
722 sendAsPacketReliable(p, &channel);
723 peer->m_increment_packets_remaining--;
728 if (!m_outgoing_queue.empty()) {
729 LOG(dout_con << m_connection->getDesc()
730 << " Handle non reliable queue ("
731 << m_outgoing_queue.size() << " pkts)" << std::endl);
734 unsigned int initial_queuesize = m_outgoing_queue.size();
735 /* send non reliable packets*/
736 for (unsigned int i = 0; i < initial_queuesize; i++) {
737 OutgoingPacket packet = m_outgoing_queue.front();
738 m_outgoing_queue.pop();
743 PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
745 LOG(dout_con << m_connection->getDesc()
746 << " Outgoing queue: peer_id=" << packet.peer_id
747 << ">>>NOT<<< found on sending packet"
748 << ", channel " << (packet.channelnum % 0xFF)
749 << ", size: " << packet.data.getSize() << std::endl);
753 /* send acks immediately */
755 rawSendAsPacket(packet.peer_id, packet.channelnum,
756 packet.data, packet.reliable);
757 peer->m_increment_packets_remaining =
758 MYMIN(0, peer->m_increment_packets_remaining--);
760 (peer->m_increment_packets_remaining > 0) ||
762 rawSendAsPacket(packet.peer_id, packet.channelnum,
763 packet.data, packet.reliable);
764 peer->m_increment_packets_remaining--;
766 m_outgoing_queue.push(packet);
767 pending_unreliable[packet.peer_id] = true;
771 for (session_t peerId : pendingDisconnect) {
772 if (!pending_unreliable[peerId]) {
773 m_connection->deletePeer(peerId, false);
778 void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
779 const SharedBuffer<u8> &data, bool ack)
781 OutgoingPacket packet(peer_id, channelnum, data, false, ack);
782 m_outgoing_queue.push(packet);
785 ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
786 Thread("ConnectionReceive")
790 void *ConnectionReceiveThread::run()
792 assert(m_connection);
794 LOG(dout_con << m_connection->getDesc()
795 << "ConnectionReceive thread started" << std::endl);
797 PROFILE(std::stringstream
799 PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
801 #ifdef DEBUG_CONNECTION_KBPS
802 u64 curtime = porting::getTimeMs();
803 u64 lasttime = curtime;
804 float debug_print_timer = 0.0;
807 while (!stopRequested()) {
808 BEGIN_DEBUG_EXCEPTION_HANDLER
809 PROFILE(ScopeProfiler
810 sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
812 #ifdef DEBUG_CONNECTION_KBPS
814 curtime = porting::getTimeMs();
815 float dtime = CALC_DTIME(lasttime,curtime);
818 /* receive packets */
821 #ifdef DEBUG_CONNECTION_KBPS
822 debug_print_timer += dtime;
823 if (debug_print_timer > 20.0) {
824 debug_print_timer -= 20.0;
826 std::list<session_t> peerids = m_connection->getPeerIDs();
828 for (std::list<session_t>::iterator i = peerids.begin();
832 PeerHelper peer = m_connection->getPeerNoEx(*i);
836 float peer_current = 0.0;
837 float peer_loss = 0.0;
838 float avg_rate = 0.0;
839 float avg_loss = 0.0;
841 for(u16 j=0; j<CHANNEL_COUNT; j++)
843 peer_current +=peer->channels[j].getCurrentDownloadRateKB();
844 peer_loss += peer->channels[j].getCurrentLossRateKB();
845 avg_rate += peer->channels[j].getAvgDownloadRateKB();
846 avg_loss += peer->channels[j].getAvgLossRateKB();
849 std::stringstream output;
850 output << std::fixed << std::setprecision(1);
851 output << "OUT to Peer " << *i << " RATES (good / loss) " << std::endl;
852 output << "\tcurrent (sum): " << peer_current << "kb/s "<< peer_loss << "kb/s" << std::endl;
853 output << "\taverage (sum): " << avg_rate << "kb/s "<< avg_loss << "kb/s" << std::endl;
854 output << std::setfill(' ');
855 for(u16 j=0; j<CHANNEL_COUNT; j++)
857 output << "\tcha " << j << ":"
858 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentDownloadRateKB() <<"kb/s"
859 << " AVG: " << std::setw(6) << peer->channels[j].getAvgDownloadRateKB() <<"kb/s"
860 << " MAX: " << std::setw(6) << peer->channels[j].getMaxDownloadRateKB() <<"kb/s"
862 << " CUR: " << std::setw(6) << peer->channels[j].getCurrentLossRateKB() <<"kb/s"
863 << " AVG: " << std::setw(6) << peer->channels[j].getAvgLossRateKB() <<"kb/s"
864 << " MAX: " << std::setw(6) << peer->channels[j].getMaxLossRateKB() <<"kb/s"
865 << " / WS: " << peer->channels[j].getWindowSize()
869 fprintf(stderr,"%s\n",output.str().c_str());
873 END_DEBUG_EXCEPTION_HANDLER
876 PROFILE(g_profiler->remove(ThreadIdentifier.str()));
880 // Receive packets from the network and buffers and create ConnectionEvents
881 void ConnectionReceiveThread::receive()
883 // use IPv6 minimum allowed MTU as receive buffer size as this is
884 // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
886 unsigned int packet_maxsize = 1500;
887 SharedBuffer<u8> packetdata(packet_maxsize);
889 bool packet_queued = true;
891 unsigned int loop_count = 0;
893 /* first of all read packets from socket */
894 /* check for incoming data available */
895 while ((loop_count < 10) &&
896 (m_connection->m_udpSocket.WaitData(50))) {
900 bool data_left = true;
902 SharedBuffer<u8> resultdata;
905 data_left = getFromBuffers(peer_id, resultdata);
908 e.dataReceived(peer_id, resultdata);
909 m_connection->putEvent(e);
912 catch (ProcessedSilentlyException &e) {
913 /* try reading again */
916 packet_queued = false;
920 s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata,
923 if ((received_size < BASE_HEADER_SIZE) ||
924 (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
925 LOG(derr_con << m_connection->getDesc()
926 << "Receive(): Invalid incoming packet, "
927 << "size: " << received_size
929 << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
934 session_t peer_id = readPeerId(*packetdata);
935 u8 channelnum = readChannel(*packetdata);
937 if (channelnum > CHANNEL_COUNT - 1) {
938 LOG(derr_con << m_connection->getDesc()
939 << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
940 throw InvalidIncomingDataException("Channel doesn't exist");
943 /* Try to identify peer by sender address (may happen on join) */
944 if (peer_id == PEER_ID_INEXISTENT) {
945 peer_id = m_connection->lookupPeer(sender);
946 // We do not have to remind the peer of its
947 // peer id as the CONTROLTYPE_SET_PEER_ID
948 // command was sent reliably.
951 /* The peer was not found in our lists. Add it. */
952 if (peer_id == PEER_ID_INEXISTENT) {
953 peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
956 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
959 LOG(dout_con << m_connection->getDesc()
960 << " got packet from unknown peer_id: "
961 << peer_id << " Ignoring." << std::endl);
965 // Validate peer address
967 Address peer_address;
969 if (peer->getAddress(MTP_UDP, peer_address)) {
970 if (peer_address != sender) {
971 LOG(derr_con << m_connection->getDesc()
972 << m_connection->getDesc()
973 << " Peer " << peer_id << " sending from different address."
974 " Ignoring." << std::endl);
979 bool invalid_address = true;
980 if (invalid_address) {
981 LOG(derr_con << m_connection->getDesc()
982 << m_connection->getDesc()
983 << " Peer " << peer_id << " unknown."
984 " Ignoring." << std::endl);
989 peer->ResetTimeout();
991 Channel *channel = 0;
993 if (dynamic_cast<UDPPeer *>(&peer) != 0) {
994 channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
998 channel->UpdateBytesReceived(received_size);
1001 // Throw the received packet to channel->processPacket()
1003 // Make a new SharedBuffer from the data without the base headers
1004 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1005 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1006 strippeddata.getSize());
1009 // Process it (the result is some data with no headers made by us)
1010 SharedBuffer<u8> resultdata = processPacket
1011 (channel, strippeddata, peer_id, channelnum, false);
1013 LOG(dout_con << m_connection->getDesc()
1014 << " ProcessPacket from peer_id: " << peer_id
1015 << ", channel: " << (u32)channelnum << ", returned "
1016 << resultdata.getSize() << " bytes" << std::endl);
1019 e.dataReceived(peer_id, resultdata);
1020 m_connection->putEvent(e);
1022 catch (ProcessedSilentlyException &e) {
1024 catch (ProcessedQueued &e) {
1025 packet_queued = true;
1028 catch (InvalidIncomingDataException &e) {
1030 catch (ProcessedSilentlyException &e) {
1035 bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8> &dst)
1037 std::list<session_t> peerids = m_connection->getPeerIDs();
1039 for (session_t peerid : peerids) {
1040 PeerHelper peer = m_connection->getPeerNoEx(peerid);
1044 if (dynamic_cast<UDPPeer *>(&peer) == 0)
1047 for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
1048 if (checkIncomingBuffers(&channel, peer_id, dst)) {
1056 bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
1057 session_t &peer_id, SharedBuffer<u8> &dst)
1059 u16 firstseqnum = 0;
1060 if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
1061 if (firstseqnum == channel->readNextIncomingSeqNum()) {
1062 BufferedPacket p = channel->incoming_reliables.popFirst();
1063 peer_id = readPeerId(*p.data);
1064 u8 channelnum = readChannel(*p.data);
1065 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
1067 LOG(dout_con << m_connection->getDesc()
1068 << "UNBUFFERING TYPE_RELIABLE"
1069 << " seqnum=" << seqnum
1070 << " peer_id=" << peer_id
1071 << " channel=" << ((int) channelnum & 0xff)
1074 channel->incNextIncomingSeqNum();
1076 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1077 // Get out the inside packet and re-process it
1078 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1079 memcpy(*payload, &p.data[headers_size], payload.getSize());
1081 dst = processPacket(channel, payload, peer_id, channelnum, true);
1088 SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
1089 const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum, bool reliable)
1091 PeerHelper peer = m_connection->getPeerNoEx(peer_id);
1094 errorstream << "Peer not found (possible timeout)" << std::endl;
1095 throw ProcessedSilentlyException("Peer not found (possible timeout)");
1098 if (packetdata.getSize() < 1)
1099 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1101 u8 type = readU8(&(packetdata[0]));
1103 if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
1104 std::string errmsg = "Invalid peer_id=" + itos(peer_id);
1105 errorstream << errmsg << std::endl;
1106 throw InvalidIncomingDataException(errmsg.c_str());
1109 if (type >= PACKET_TYPE_MAX) {
1110 derr_con << m_connection->getDesc() << "Got invalid type=" << ((int) type & 0xff)
1112 throw InvalidIncomingDataException("Invalid packet type");
1115 const PacketTypeHandler &pHandle = packetTypeRouter[type];
1116 return (this->*pHandle.handler)(channel, packetdata, &peer, channelnum, reliable);
1119 const ConnectionReceiveThread::PacketTypeHandler
1120 ConnectionReceiveThread::packetTypeRouter[PACKET_TYPE_MAX] = {
1121 {&ConnectionReceiveThread::handlePacketType_Control},
1122 {&ConnectionReceiveThread::handlePacketType_Original},
1123 {&ConnectionReceiveThread::handlePacketType_Split},
1124 {&ConnectionReceiveThread::handlePacketType_Reliable},
1127 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
1128 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1130 if (packetdata.getSize() < 2)
1131 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1133 u8 controltype = readU8(&(packetdata[1]));
1135 if (controltype == CONTROLTYPE_ACK) {
1136 assert(channel != NULL);
1138 if (packetdata.getSize() < 4) {
1139 throw InvalidIncomingDataException(
1140 "packetdata.getSize() < 4 (ACK header size)");
1143 u16 seqnum = readU16(&packetdata[2]);
1144 LOG(dout_con << m_connection->getDesc() << " [ CONTROLTYPE_ACK: channelnum="
1145 << ((int) channelnum & 0xff) << ", peer_id=" << peer->id << ", seqnum="
1146 << seqnum << " ]" << std::endl);
1149 BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
1151 // only calculate rtt from straight sent packets
1152 if (p.resend_count == 0) {
1153 // Get round trip time
1154 u64 current_time = porting::getTimeMs();
1156 // a overflow is quite unlikely but as it'd result in major
1157 // rtt miscalculation we handle it here
1158 if (current_time > p.absolute_send_time) {
1159 float rtt = (current_time - p.absolute_send_time) / 1000.0;
1161 // Let peer calculate stuff according to it
1162 // (avg_rtt and resend_timeout)
1163 dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1164 } else if (p.totaltime > 0) {
1165 float rtt = p.totaltime;
1167 // Let peer calculate stuff according to it
1168 // (avg_rtt and resend_timeout)
1169 dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
1172 // put bytes for max bandwidth calculation
1173 channel->UpdateBytesSent(p.data.getSize(), 1);
1174 if (channel->outgoing_reliables_sent.size() == 0)
1175 m_connection->TriggerSend();
1176 } catch (NotFoundException &e) {
1177 LOG(derr_con << m_connection->getDesc()
1178 << "WARNING: ACKed packet not in outgoing queue" << std::endl);
1179 channel->UpdatePacketTooLateCounter();
1182 throw ProcessedSilentlyException("Got an ACK");
1183 } else if (controltype == CONTROLTYPE_SET_PEER_ID) {
1184 // Got a packet to set our peer id
1185 if (packetdata.getSize() < 4)
1186 throw InvalidIncomingDataException
1187 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1188 session_t peer_id_new = readU16(&packetdata[2]);
1189 LOG(dout_con << m_connection->getDesc() << "Got new peer id: " << peer_id_new
1190 << "... " << std::endl);
1192 if (m_connection->GetPeerID() != PEER_ID_INEXISTENT) {
1193 LOG(derr_con << m_connection->getDesc()
1194 << "WARNING: Not changing existing peer id." << std::endl);
1196 LOG(dout_con << m_connection->getDesc() << "changing own peer id"
1198 m_connection->SetPeerID(peer_id_new);
1201 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1202 } else if (controltype == CONTROLTYPE_PING) {
1203 // Just ignore it, the incoming data already reset
1204 // the timeout counter
1205 LOG(dout_con << m_connection->getDesc() << "PING" << std::endl);
1206 throw ProcessedSilentlyException("Got a PING");
1207 } else if (controltype == CONTROLTYPE_DISCO) {
1208 // Just ignore it, the incoming data already reset
1209 // the timeout counter
1210 LOG(dout_con << m_connection->getDesc() << "DISCO: Removing peer "
1211 << peer->id << std::endl);
1213 if (!m_connection->deletePeer(peer->id, false)) {
1214 derr_con << m_connection->getDesc() << "DISCO: Peer not found" << std::endl;
1217 throw ProcessedSilentlyException("Got a DISCO");
1219 LOG(derr_con << m_connection->getDesc()
1220 << "INVALID TYPE_CONTROL: invalid controltype="
1221 << ((int) controltype & 0xff) << std::endl);
1222 throw InvalidIncomingDataException("Invalid control type");
1226 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
1227 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1229 if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
1230 throw InvalidIncomingDataException
1231 ("packetdata.getSize() <= ORIGINAL_HEADER_SIZE");
1232 LOG(dout_con << m_connection->getDesc() << "RETURNING TYPE_ORIGINAL to user"
1234 // Get the inside packet out and return it
1235 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1236 memcpy(*payload, &(packetdata[ORIGINAL_HEADER_SIZE]), payload.getSize());
1240 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
1241 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1243 Address peer_address;
1245 if (peer->getAddress(MTP_UDP, peer_address)) {
1246 // We have to create a packet again for buffering
1247 // This isn't actually too bad an idea.
1248 BufferedPacket packet = makePacket(peer_address,
1250 m_connection->GetProtocolID(),
1254 // Buffer the packet
1255 SharedBuffer<u8> data = peer->addSplitPacket(channelnum, packet, reliable);
1257 if (data.getSize() != 0) {
1258 LOG(dout_con << m_connection->getDesc()
1259 << "RETURNING TYPE_SPLIT: Constructed full data, "
1260 << "size=" << data.getSize() << std::endl);
1263 LOG(dout_con << m_connection->getDesc() << "BUFFERED TYPE_SPLIT" << std::endl);
1264 throw ProcessedSilentlyException("Buffered a split packet chunk");
1267 // We should never get here.
1268 FATAL_ERROR("Invalid execution point");
1271 SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
1272 const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
1274 assert(channel != NULL);
1276 // Recursive reliable packets not allowed
1278 throw InvalidIncomingDataException("Found nested reliable packets");
1280 if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
1281 throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1283 u16 seqnum = readU16(&packetdata[1]);
1284 bool is_future_packet = false;
1285 bool is_old_packet = false;
1287 /* packet is within our receive window send ack */
1288 if (seqnum_in_window(seqnum,
1289 channel->readNextIncomingSeqNum(), MAX_RELIABLE_WINDOW_SIZE)) {
1290 m_connection->sendAck(peer->id, channelnum, seqnum);
1292 is_future_packet = seqnum_higher(seqnum, channel->readNextIncomingSeqNum());
1293 is_old_packet = seqnum_higher(channel->readNextIncomingSeqNum(), seqnum);
1295 /* packet is not within receive window, don't send ack. *
1296 * if this was a valid packet it's gonna be retransmitted */
1297 if (is_future_packet)
1298 throw ProcessedSilentlyException(
1299 "Received packet newer then expected, not sending ack");
1301 /* seems like our ack was lost, send another one for a old packet */
1302 if (is_old_packet) {
1303 LOG(dout_con << m_connection->getDesc()
1304 << "RE-SENDING ACK: peer_id: " << peer->id
1305 << ", channel: " << (channelnum & 0xFF)
1306 << ", seqnum: " << seqnum << std::endl;)
1307 m_connection->sendAck(peer->id, channelnum, seqnum);
1309 // we already have this packet so this one was on wire at least
1310 // the current timeout
1311 // we don't know how long this packet was on wire don't do silly guessing
1312 // dynamic_cast<UDPPeer*>(&peer)->
1313 // reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
1315 throw ProcessedSilentlyException("Retransmitting ack for old packet");
1319 if (seqnum != channel->readNextIncomingSeqNum()) {
1320 Address peer_address;
1322 // this is a reliable packet so we have a udp address for sure
1323 peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
1324 // This one comes later, buffer it.
1325 // Actually we have to make a packet to buffer one.
1326 // Well, we have all the ingredients, so just do it.
1327 BufferedPacket packet = con::makePacket(
1330 m_connection->GetProtocolID(),
1334 channel->incoming_reliables.insert(packet, channel->readNextIncomingSeqNum());
1336 LOG(dout_con << m_connection->getDesc()
1337 << "BUFFERING, TYPE_RELIABLE peer_id: " << peer->id
1338 << ", channel: " << (channelnum & 0xFF)
1339 << ", seqnum: " << seqnum << std::endl;)
1341 throw ProcessedQueued("Buffered future reliable packet");
1342 } catch (AlreadyExistsException &e) {
1343 } catch (IncomingDataCorruption &e) {
1344 ConnectionCommand discon;
1345 discon.disconnect_peer(peer->id);
1346 m_connection->putCommand(discon);
1348 LOG(derr_con << m_connection->getDesc()
1349 << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
1350 << ", channel: " << (channelnum & 0xFF)
1351 << ", seqnum: " << seqnum
1352 << "DROPPING CLIENT!" << std::endl;)
1356 /* we got a packet to process right now */
1357 LOG(dout_con << m_connection->getDesc()
1358 << "RECURSIVE, TYPE_RELIABLE peer_id: " << peer->id
1359 << ", channel: " << (channelnum & 0xFF)
1360 << ", seqnum: " << seqnum << std::endl;)
1363 /* check for resend case */
1364 u16 queued_seqnum = 0;
1365 if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
1366 if (queued_seqnum == seqnum) {
1367 BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
1368 /** TODO find a way to verify the new against the old packet */
1372 channel->incNextIncomingSeqNum();
1374 // Get out the inside packet and re-process it
1375 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1376 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1378 return processPacket(channel, payload, peer->id, channelnum, true);