3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 #include "irrlichttypes_bloated.h"
23 #include "peerhandler.h"
25 #include "constants.h"
26 #include "util/pointer.h"
27 #include "util/container.h"
28 #include "util/thread.h"
29 #include "util/numeric.h"
30 #include "networkprotocol.h"
41 class ConnectionReceiveThread;
42 class ConnectionSendThread;
44 typedef enum MTProtocols
48 MTP_MINETEST_RELIABLE_UDP
51 #define MAX_UDP_PEERS 65535
53 #define SEQNUM_MAX 65535
55 inline bool seqnum_higher(u16 totest, u16 base)
58 if ((totest - base) > (SEQNUM_MAX / 2))
64 if ((base - totest) > (SEQNUM_MAX / 2))
70 inline bool seqnum_in_window(u16 seqnum, u16 next, u16 window_size)
72 u16 window_start = next;
73 u16 window_end = (next + window_size) % (SEQNUM_MAX + 1);
75 if (window_start < window_end) {
76 return ((seqnum >= window_start) && (seqnum < window_end));
79 return ((seqnum < window_end) || (seqnum >= window_start));
82 static inline float CALC_DTIME(u64 lasttime, u64 curtime)
84 float value = (curtime - lasttime) / 1000.0;
85 return MYMAX(MYMIN(value, 0.1), 0.0);
90 BufferedPacket(u8 *a_data, u32 a_size) : data(a_data, a_size) {}
91 BufferedPacket(u32 a_size) : data(a_size) {}
92 Buffer<u8> data; // Data of the packet, including headers
93 float time = 0.0f; // Seconds from buffering the packet or re-sending
94 float totaltime = 0.0f; // Seconds from buffering the packet
95 u64 absolute_send_time = -1;
96 Address address; // Sender or destination
97 unsigned int resend_count = 0;
100 // This adds the base headers to the data and makes a packet out of it
101 BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data, u32 protocol_id,
102 session_t sender_peer_id, u8 channel);
104 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
105 // Increments split_seqnum if a split packet is made
106 void makeAutoSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max,
107 u16 &split_seqnum, std::list<SharedBuffer<u8>> *list);
109 // Add the TYPE_RELIABLE header to the data
110 SharedBuffer<u8> makeReliablePacket(const SharedBuffer<u8> &data, u16 seqnum);
112 struct IncomingSplitPacket
114 IncomingSplitPacket(u32 cc, bool r) : chunk_count(cc), reliable(r) {}
116 IncomingSplitPacket() = delete;
118 float time = 0.0f; // Seconds from adding
120 bool reliable; // If true, isn't deleted on timeout
122 bool allReceived() const { return (chunks.size() == chunk_count); }
123 bool insert(u32 chunk_num, SharedBuffer<u8> &chunkdata);
124 SharedBuffer<u8> reassemble();
127 // Key is chunk number, value is data without headers
128 std::map<u16, SharedBuffer<u8>> chunks;
134 A packet is sent through a channel to a peer with a basic header:
137 [4] session_t sender_peer_id
141 value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
142 value 1 (PEER_ID_SERVER) is reserved for server
143 these constants are defined in constants.h
145 Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist.
147 #define BASE_HEADER_SIZE 7
148 #define CHANNEL_COUNT 3
152 CONTROL: This is a packet used by the protocol.
153 - When this is processed, nothing is handed to the user.
157 controltype and data description:
160 CONTROLTYPE_SET_PEER_ID
161 [2] session_t peer_id_new
163 - There is no actual reply, but this can be sent in a reliable
164 packet to get a reply
167 //#define TYPE_CONTROL 0
168 #define CONTROLTYPE_ACK 0
169 #define CONTROLTYPE_SET_PEER_ID 1
170 #define CONTROLTYPE_PING 2
171 #define CONTROLTYPE_DISCO 3
174 ORIGINAL: This is a plain packet with no control and no error
176 - When this is processed, it is directly handed to the user.
180 //#define TYPE_ORIGINAL 1
181 #define ORIGINAL_HEADER_SIZE 1
183 SPLIT: These are sequences of packets forming one bigger piece of
185 - When processed and all the packet_nums 0...packet_count-1 are
186 present (this should be buffered), the resulting data shall be
187 directly handed to the user.
188 - If the data fails to come up in a reasonable time, the buffer shall
189 be silently discarded.
190 - These can be sent as-is or atop of a RELIABLE packet stream.
197 //#define TYPE_SPLIT 2
199 RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
200 and they shall be delivered in the same order as sent. This is done
201 with a buffer in the receiving and transmitting end.
202 - When this is processed, the contents of each packet is recursively
203 processed as packets.
209 //#define TYPE_RELIABLE 3
210 #define RELIABLE_HEADER_SIZE 3
211 #define SEQNUM_INITIAL 65500
215 PACKET_TYPE_CONTROL = 0,
216 PACKET_TYPE_ORIGINAL = 1,
217 PACKET_TYPE_SPLIT = 2,
218 PACKET_TYPE_RELIABLE = 3,
222 A buffer which stores reliable packets and sorts them internally
223 for fast access to the smallest one.
226 typedef std::list<BufferedPacket>::iterator RPBSearchResult;
228 class ReliablePacketBuffer
231 ReliablePacketBuffer() = default;
233 bool getFirstSeqnum(u16 &result);
235 BufferedPacket popFirst();
236 BufferedPacket popSeqnum(u16 seqnum);
237 void insert(BufferedPacket &p, u16 next_expected);
239 void incrementTimeouts(float dtime);
240 std::list<BufferedPacket> getTimedOuts(float timeout, unsigned int max_packets);
244 RPBSearchResult notFound();
248 RPBSearchResult findPacket(u16 seqnum); // does not perform locking
250 std::list<BufferedPacket> m_list;
252 u16 m_oldest_non_answered_ack;
254 std::mutex m_list_mutex;
258 A buffer for reconstructing split packets
261 class IncomingSplitBuffer
264 ~IncomingSplitBuffer();
266 Returns a reference counted buffer of length != 0 when a full split
267 packet is constructed. If not, returns one of length 0.
269 SharedBuffer<u8> insert(const BufferedPacket &p, bool reliable);
271 void removeUnreliableTimedOuts(float dtime, float timeout);
275 std::map<u16, IncomingSplitPacket *> m_buf;
277 std::mutex m_map_mutex;
280 struct OutgoingPacket
284 SharedBuffer<u8> data;
288 OutgoingPacket(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
289 bool reliable_, bool ack_ = false) :
291 channelnum(channelnum_), data(data_), reliable(reliable_),
297 enum ConnectionCommandType
303 CONNCMD_DISCONNECT_PEER,
310 struct ConnectionCommand
312 enum ConnectionCommandType type = CONNCMD_NONE;
314 session_t peer_id = PEER_ID_INEXISTENT;
317 bool reliable = false;
320 ConnectionCommand() = default;
321 ConnectionCommand &operator=(const ConnectionCommand &other)
324 address = other.address;
325 peer_id = other.peer_id;
326 channelnum = other.channelnum;
327 // We must copy the buffer here to prevent race condition
328 data = SharedBuffer<u8>(*other.data, other.data.getSize());
329 reliable = other.reliable;
334 void serve(Address address_)
336 type = CONNCMD_SERVE;
339 void connect(Address address_)
341 type = CONNCMD_CONNECT;
344 void disconnect() { type = CONNCMD_DISCONNECT; }
345 void disconnect_peer(session_t peer_id_)
347 type = CONNCMD_DISCONNECT_PEER;
351 void send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, bool reliable_);
353 void ack(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_)
357 channelnum = channelnum_;
362 void createPeer(session_t peer_id_, const SharedBuffer<u8> &data_)
364 type = CONCMD_CREATE_PEER;
373 /* maximum window size to use, 0xFFFF is theoretical maximum. don't think about
374 * touching it, the less you're away from it the more likely data corruption
377 #define MAX_RELIABLE_WINDOW_SIZE 0x8000
378 /* starting value for window size */
379 #define START_RELIABLE_WINDOW_SIZE 0x400
380 /* minimum value for window size */
381 #define MIN_RELIABLE_WINDOW_SIZE 0x40
387 u16 readNextIncomingSeqNum();
388 u16 incNextIncomingSeqNum();
390 u16 getOutgoingSequenceNumber(bool &successfull);
391 u16 readOutgoingSequenceNumber();
392 bool putBackSequenceNumber(u16);
394 u16 readNextSplitSeqNum();
395 void setNextSplitSeqNum(u16 seqnum);
397 // This is for buffering the incoming packets that are coming in
399 ReliablePacketBuffer incoming_reliables;
400 // This is for buffering the sent packets so that the sender can
401 // re-send them if no ACK is received
402 ReliablePacketBuffer outgoing_reliables_sent;
404 // queued reliable packets
405 std::queue<BufferedPacket> queued_reliables;
407 // queue commands prior splitting to packets
408 std::deque<ConnectionCommand> queued_commands;
410 IncomingSplitBuffer incoming_splits;
413 ~Channel() = default;
415 void UpdatePacketLossCounter(unsigned int count);
416 void UpdatePacketTooLateCounter();
417 void UpdateBytesSent(unsigned int bytes, unsigned int packages = 1);
418 void UpdateBytesLost(unsigned int bytes);
419 void UpdateBytesReceived(unsigned int bytes);
421 void UpdateTimers(float dtime);
423 const float getCurrentDownloadRateKB()
425 MutexAutoLock lock(m_internal_mutex);
428 const float getMaxDownloadRateKB()
430 MutexAutoLock lock(m_internal_mutex);
434 const float getCurrentLossRateKB()
436 MutexAutoLock lock(m_internal_mutex);
437 return cur_kbps_lost;
439 const float getMaxLossRateKB()
441 MutexAutoLock lock(m_internal_mutex);
442 return max_kbps_lost;
445 const float getCurrentIncomingRateKB()
447 MutexAutoLock lock(m_internal_mutex);
448 return cur_incoming_kbps;
450 const float getMaxIncomingRateKB()
452 MutexAutoLock lock(m_internal_mutex);
453 return max_incoming_kbps;
456 const float getAvgDownloadRateKB()
458 MutexAutoLock lock(m_internal_mutex);
461 const float getAvgLossRateKB()
463 MutexAutoLock lock(m_internal_mutex);
464 return avg_kbps_lost;
466 const float getAvgIncomingRateKB()
468 MutexAutoLock lock(m_internal_mutex);
469 return avg_incoming_kbps;
472 const unsigned int getWindowSize() const { return window_size; };
474 void setWindowSize(unsigned int size) { window_size = size; };
477 std::mutex m_internal_mutex;
478 int window_size = MIN_RELIABLE_WINDOW_SIZE;
480 u16 next_incoming_seqnum = SEQNUM_INITIAL;
482 u16 next_outgoing_seqnum = SEQNUM_INITIAL;
483 u16 next_outgoing_split_seqnum = SEQNUM_INITIAL;
485 unsigned int current_packet_loss = 0;
486 unsigned int current_packet_too_late = 0;
487 unsigned int current_packet_successful = 0;
488 float packet_loss_counter = 0.0f;
490 unsigned int current_bytes_transfered = 0;
491 unsigned int current_bytes_received = 0;
492 unsigned int current_bytes_lost = 0;
493 float max_kbps = 0.0f;
494 float cur_kbps = 0.0f;
495 float avg_kbps = 0.0f;
496 float max_incoming_kbps = 0.0f;
497 float cur_incoming_kbps = 0.0f;
498 float avg_incoming_kbps = 0.0f;
499 float max_kbps_lost = 0.0f;
500 float cur_kbps_lost = 0.0f;
501 float avg_kbps_lost = 0.0f;
502 float bpm_counter = 0.0f;
504 unsigned int rate_samples = 0;
512 PeerHelper() = default;
513 PeerHelper(Peer *peer);
516 PeerHelper &operator=(Peer *peer);
517 Peer *operator->() const;
519 Peer *operator&() const;
520 bool operator!=(void *ptr);
523 Peer *m_peer = nullptr;
541 friend class PeerHelper;
543 Peer(Address address_, u16 id_, Connection *connection) :
544 id(id_), m_connection(connection), address(address_),
545 m_last_timeout_check(porting::getTimeMs()){};
549 MutexAutoLock usage_lock(m_exclusive_access_mutex);
550 FATAL_ERROR_IF(m_usage != 0, "Reference counting failure");
553 // Unique id of the peer
558 virtual void PutReliableSendCommand(
559 ConnectionCommand &c, unsigned int max_packet_size){};
561 virtual bool getAddress(MTProtocols type, Address &toset) = 0;
563 bool isPendingDeletion()
565 MutexAutoLock lock(m_exclusive_access_mutex);
566 return m_pending_deletion;
571 MutexAutoLock lock(m_exclusive_access_mutex);
572 m_timeout_counter = 0.0;
575 bool isTimedOut(float timeout);
577 unsigned int m_increment_packets_remaining = 0;
579 virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
580 virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum){};
581 virtual SharedBuffer<u8> addSplitPacket(
582 u8 channel, const BufferedPacket &toadd, bool reliable)
584 errorstream << "Peer::addSplitPacket called,"
585 << " this is supposed to be never called!" << std::endl;
586 return SharedBuffer<u8>(0);
589 virtual bool Ping(float dtime, SharedBuffer<u8> &data) { return false; };
591 virtual float getStat(rtt_stat_type type) const
595 return m_rtt.min_rtt;
597 return m_rtt.max_rtt;
599 return m_rtt.avg_rtt;
601 return m_rtt.jitter_min;
603 return m_rtt.jitter_max;
605 return m_rtt.jitter_avg;
611 virtual void reportRTT(float rtt){};
613 void RTTStatistics(float rtt, const std::string &profiler_id = "",
614 unsigned int num_samples = 1000);
619 std::mutex m_exclusive_access_mutex;
621 bool m_pending_deletion = false;
623 Connection *m_connection;
625 // Address of the peer
629 float m_ping_timer = 0.0f;
634 float jitter_min = FLT_MAX;
635 float jitter_max = 0.0f;
636 float jitter_avg = -1.0f;
637 float min_rtt = FLT_MAX;
638 float max_rtt = 0.0f;
639 float avg_rtt = -1.0f;
641 rttstats() = default;
645 float m_last_rtt = -1.0f;
647 // current usage count
648 unsigned int m_usage = 0;
650 // Seconds from last receive
651 float m_timeout_counter = 0.0f;
653 u64 m_last_timeout_check;
656 class UDPPeer : public Peer
659 friend class PeerHelper;
660 friend class ConnectionReceiveThread;
661 friend class ConnectionSendThread;
662 friend class Connection;
664 UDPPeer(u16 a_id, Address a_address, Connection *connection);
665 virtual ~UDPPeer() = default;
667 void PutReliableSendCommand(ConnectionCommand &c, unsigned int max_packet_size);
669 bool getAddress(MTProtocols type, Address &toset);
671 u16 getNextSplitSequenceNumber(u8 channel);
672 void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
674 SharedBuffer<u8> addSplitPacket(
675 u8 channel, const BufferedPacket &toadd, bool reliable);
679 Calculates avg_rtt and resend_timeout.
680 rtt=-1 only recalculates resend_timeout
682 void reportRTT(float rtt);
684 void RunCommandQueues(unsigned int max_packet_size, unsigned int maxcommands,
685 unsigned int maxtransfer);
687 float getResendTimeout()
689 MutexAutoLock lock(m_exclusive_access_mutex);
690 return resend_timeout;
693 void setResendTimeout(float timeout)
695 MutexAutoLock lock(m_exclusive_access_mutex);
696 resend_timeout = timeout;
698 bool Ping(float dtime, SharedBuffer<u8> &data);
700 Channel channels[CHANNEL_COUNT];
701 bool m_pending_disconnect = false;
704 // This is changed dynamically
705 float resend_timeout = 0.5;
707 bool processReliableSendCommand(
708 ConnectionCommand &c, unsigned int max_packet_size);
715 enum ConnectionEventType
718 CONNEVENT_DATA_RECEIVED,
719 CONNEVENT_PEER_ADDED,
720 CONNEVENT_PEER_REMOVED,
721 CONNEVENT_BIND_FAILED,
724 struct ConnectionEvent
726 enum ConnectionEventType type = CONNEVENT_NONE;
727 session_t peer_id = 0;
729 bool timeout = false;
732 ConnectionEvent() = default;
734 std::string describe()
738 return "CONNEVENT_NONE";
739 case CONNEVENT_DATA_RECEIVED:
740 return "CONNEVENT_DATA_RECEIVED";
741 case CONNEVENT_PEER_ADDED:
742 return "CONNEVENT_PEER_ADDED";
743 case CONNEVENT_PEER_REMOVED:
744 return "CONNEVENT_PEER_REMOVED";
745 case CONNEVENT_BIND_FAILED:
746 return "CONNEVENT_BIND_FAILED";
748 return "Invalid ConnectionEvent";
751 void dataReceived(session_t peer_id_, const SharedBuffer<u8> &data_)
753 type = CONNEVENT_DATA_RECEIVED;
757 void peerAdded(session_t peer_id_, Address address_)
759 type = CONNEVENT_PEER_ADDED;
763 void peerRemoved(session_t peer_id_, bool timeout_, Address address_)
765 type = CONNEVENT_PEER_REMOVED;
770 void bindFailed() { type = CONNEVENT_BIND_FAILED; }
778 friend class ConnectionSendThread;
779 friend class ConnectionReceiveThread;
781 Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
782 PeerHandler *peerhandler);
786 ConnectionEvent waitEvent(u32 timeout_ms);
787 void putCommand(ConnectionCommand &c);
789 void SetTimeoutMs(u32 timeout) { m_bc_receive_timeout = timeout; }
790 void Serve(Address bind_addr);
791 void Connect(Address address);
794 void Receive(NetworkPacket *pkt);
795 bool TryReceive(NetworkPacket *pkt);
796 void Send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable);
797 session_t GetPeerID() const { return m_peer_id; }
798 Address GetPeerAddress(session_t peer_id);
799 float getPeerStat(session_t peer_id, rtt_stat_type type);
800 float getLocalStat(rate_stat_type type);
801 const u32 GetProtocolID() const { return m_protocol_id; };
802 const std::string getDesc();
803 void DisconnectPeer(session_t peer_id);
806 PeerHelper getPeerNoEx(session_t peer_id);
807 u16 lookupPeer(Address &sender);
809 u16 createPeer(Address &sender, MTProtocols protocol, int fd);
810 UDPPeer *createServerPeer(Address &sender);
811 bool deletePeer(session_t peer_id, bool timeout);
813 void SetPeerID(session_t id) { m_peer_id = id; }
815 void sendAck(session_t peer_id, u8 channelnum, u16 seqnum);
817 void PrintInfo(std::ostream &out);
819 std::list<session_t> getPeerIDs()
821 MutexAutoLock peerlock(m_peers_mutex);
825 UDPSocket m_udpSocket;
826 MutexedQueue<ConnectionCommand> m_command_queue;
828 bool Receive(NetworkPacket *pkt, u32 timeout);
830 void putEvent(ConnectionEvent &e);
835 MutexedQueue<ConnectionEvent> m_event_queue;
837 session_t m_peer_id = 0;
840 std::map<session_t, Peer *> m_peers;
841 std::list<session_t> m_peer_ids;
842 std::mutex m_peers_mutex;
844 std::unique_ptr<ConnectionSendThread> m_sendThread;
845 std::unique_ptr<ConnectionReceiveThread> m_receiveThread;
847 std::mutex m_info_mutex;
849 // Backwards compatibility
850 PeerHandler *m_bc_peerhandler;
851 u32 m_bc_receive_timeout = 0;
853 bool m_shutting_down = false;
855 session_t m_next_remote_peer_id = 2;