3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 #include "irrlichttypes.h"
23 #include "peerhandler.h"
25 #include "constants.h"
26 #include "util/pointer.h"
27 #include "util/container.h"
28 #include "util/thread.h"
29 #include "util/numeric.h"
30 #include "networkprotocol.h"
35 #define MAX_UDP_PEERS 65535
40 A packet is sent through a channel to a peer with a basic header:
43 [4] session_t sender_peer_id
47 value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
48 value 1 (PEER_ID_SERVER) is reserved for server
49 these constants are defined in constants.h
51 Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist.
53 #define BASE_HEADER_SIZE 7
54 #define CHANNEL_COUNT 3
59 CONTROL: This is a packet used by the protocol.
60 - When this is processed, nothing is handed to the user.
64 controltype and data description:
67 CONTROLTYPE_SET_PEER_ID
68 [2] session_t peer_id_new
70 - There is no actual reply, but this can be sent in a reliable
74 enum ControlType : u8 {
76 CONTROLTYPE_SET_PEER_ID = 1,
78 CONTROLTYPE_DISCO = 3,
82 ORIGINAL: This is a plain packet with no control and no error
84 - When this is processed, it is directly handed to the user.
88 //#define TYPE_ORIGINAL 1
89 #define ORIGINAL_HEADER_SIZE 1
92 SPLIT: These are sequences of packets forming one bigger piece of
94 - When processed and all the packet_nums 0...packet_count-1 are
95 present (this should be buffered), the resulting data shall be
96 directly handed to the user.
97 - If the data fails to come up in a reasonable time, the buffer shall
98 be silently discarded.
99 - These can be sent as-is or atop of a RELIABLE packet stream.
106 //#define TYPE_SPLIT 2
109 RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
110 and they shall be delivered in the same order as sent. This is done
111 with a buffer in the receiving and transmitting end.
112 - When this is processed, the contents of each packet is recursively
113 processed as packets.
119 //#define TYPE_RELIABLE 3
120 #define RELIABLE_HEADER_SIZE 3
121 #define SEQNUM_INITIAL 65500
122 #define SEQNUM_MAX 65535
129 class ConnectionReceiveThread;
130 class ConnectionSendThread;
132 typedef enum MTProtocols {
135 MTP_MINETEST_RELIABLE_UDP
138 enum PacketType : u8 {
139 PACKET_TYPE_CONTROL = 0,
140 PACKET_TYPE_ORIGINAL = 1,
141 PACKET_TYPE_SPLIT = 2,
142 PACKET_TYPE_RELIABLE = 3,
146 inline bool seqnum_higher(u16 totest, u16 base)
150 if ((totest - base) > (SEQNUM_MAX/2))
156 if ((base - totest) > (SEQNUM_MAX/2))
162 inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
164 u16 window_start = next;
165 u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1);
167 if (window_start < window_end) {
168 return ((seqnum >= window_start) && (seqnum < window_end));
172 return ((seqnum < window_end) || (seqnum >= window_start));
175 static inline float CALC_DTIME(u64 lasttime, u64 curtime)
177 float value = ( curtime - lasttime) / 1000.0;
178 return MYMAX(MYMIN(value,0.1),0.0);
182 Struct for all kinds of packets. Includes following data:
184 u8[] packet data (usually copied from SharedBuffer<u8>)
186 struct BufferedPacket {
187 BufferedPacket(u32 a_size)
189 m_data.resize(a_size);
193 DISABLE_CLASS_COPY(BufferedPacket)
195 u16 getSeqnum() const;
197 inline size_t size() const { return m_data.size(); }
199 u8 *data; // Direct memory access
200 float time = 0.0f; // Seconds from buffering the packet or re-sending
201 float totaltime = 0.0f; // Seconds from buffering the packet
202 u64 absolute_send_time = -1;
203 Address address; // Sender or destination
204 unsigned int resend_count = 0;
207 std::vector<u8> m_data; // Data of the packet, including headers
210 typedef std::shared_ptr<BufferedPacket> BufferedPacketPtr;
213 // This adds the base headers to the data and makes a packet out of it
214 BufferedPacketPtr makePacket(Address &address, const SharedBuffer<u8> &data,
215 u32 protocol_id, session_t sender_peer_id, u8 channel);
217 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
218 // Increments split_seqnum if a split packet is made
219 void makeAutoSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max,
220 u16 &split_seqnum, std::list<SharedBuffer<u8>> *list);
222 // Add the TYPE_RELIABLE header to the data
223 SharedBuffer<u8> makeReliablePacket(const SharedBuffer<u8> &data, u16 seqnum);
225 struct IncomingSplitPacket
227 IncomingSplitPacket(u32 cc, bool r):
228 chunk_count(cc), reliable(r) {}
230 IncomingSplitPacket() = delete;
232 float time = 0.0f; // Seconds from adding
234 bool reliable; // If true, isn't deleted on timeout
236 bool allReceived() const
238 return (chunks.size() == chunk_count);
240 bool insert(u32 chunk_num, SharedBuffer<u8> &chunkdata);
241 SharedBuffer<u8> reassemble();
244 // Key is chunk number, value is data without headers
245 std::map<u16, SharedBuffer<u8>> chunks;
249 A buffer which stores reliable packets and sorts them internally
250 for fast access to the smallest one.
253 typedef std::list<BufferedPacketPtr>::iterator RPBSearchResult;
255 class ReliablePacketBuffer
258 ReliablePacketBuffer() = default;
260 bool getFirstSeqnum(u16& result);
262 BufferedPacketPtr popFirst();
263 BufferedPacketPtr popSeqnum(u16 seqnum);
264 void insert(BufferedPacketPtr &p_ptr, u16 next_expected);
266 void incrementTimeouts(float dtime);
267 std::list<ConstSharedPtr<BufferedPacket>> getTimedOuts(float timeout, u32 max_packets);
275 RPBSearchResult findPacketNoLock(u16 seqnum);
277 std::list<BufferedPacketPtr> m_list;
279 u16 m_oldest_non_answered_ack;
281 std::mutex m_list_mutex;
285 A buffer for reconstructing split packets
288 class IncomingSplitBuffer
291 ~IncomingSplitBuffer();
293 Returns a reference counted buffer of length != 0 when a full split
294 packet is constructed. If not, returns one of length 0.
296 SharedBuffer<u8> insert(BufferedPacketPtr &p_ptr, bool reliable);
298 void removeUnreliableTimedOuts(float dtime, float timeout);
302 std::map<u16, IncomingSplitPacket*> m_buf;
304 std::mutex m_map_mutex;
307 enum ConnectionCommandType{
312 CONNCMD_DISCONNECT_PEER,
319 struct ConnectionCommand;
320 typedef std::shared_ptr<ConnectionCommand> ConnectionCommandPtr;
322 // This is very similar to ConnectionEvent
323 struct ConnectionCommand
325 const ConnectionCommandType type;
327 session_t peer_id = PEER_ID_INEXISTENT;
330 bool reliable = false;
333 DISABLE_CLASS_COPY(ConnectionCommand);
335 static ConnectionCommandPtr serve(Address address);
336 static ConnectionCommandPtr connect(Address address);
337 static ConnectionCommandPtr disconnect();
338 static ConnectionCommandPtr disconnect_peer(session_t peer_id);
339 static ConnectionCommandPtr send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable);
340 static ConnectionCommandPtr ack(session_t peer_id, u8 channelnum, const Buffer<u8> &data);
341 static ConnectionCommandPtr createPeer(session_t peer_id, const Buffer<u8> &data);
344 ConnectionCommand(ConnectionCommandType type_) :
347 static ConnectionCommandPtr create(ConnectionCommandType type);
350 /* maximum window size to use, 0xFFFF is theoretical maximum. don't think about
351 * touching it, the less you're away from it the more likely data corruption
354 #define MAX_RELIABLE_WINDOW_SIZE 0x8000
355 /* starting value for window size */
356 #define START_RELIABLE_WINDOW_SIZE 0x400
357 /* minimum value for window size */
358 #define MIN_RELIABLE_WINDOW_SIZE 0x40
364 u16 readNextIncomingSeqNum();
365 u16 incNextIncomingSeqNum();
367 u16 getOutgoingSequenceNumber(bool& successful);
368 u16 readOutgoingSequenceNumber();
369 bool putBackSequenceNumber(u16);
371 u16 readNextSplitSeqNum();
372 void setNextSplitSeqNum(u16 seqnum);
374 // This is for buffering the incoming packets that are coming in
376 ReliablePacketBuffer incoming_reliables;
377 // This is for buffering the sent packets so that the sender can
378 // re-send them if no ACK is received
379 ReliablePacketBuffer outgoing_reliables_sent;
381 //queued reliable packets
382 std::queue<BufferedPacketPtr> queued_reliables;
384 //queue commands prior splitting to packets
385 std::deque<ConnectionCommandPtr> queued_commands;
387 IncomingSplitBuffer incoming_splits;
390 ~Channel() = default;
392 void UpdatePacketLossCounter(unsigned int count);
393 void UpdatePacketTooLateCounter();
394 void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
395 void UpdateBytesLost(unsigned int bytes);
396 void UpdateBytesReceived(unsigned int bytes);
398 void UpdateTimers(float dtime);
400 float getCurrentDownloadRateKB()
401 { MutexAutoLock lock(m_internal_mutex); return cur_kbps; };
402 float getMaxDownloadRateKB()
403 { MutexAutoLock lock(m_internal_mutex); return max_kbps; };
405 float getCurrentLossRateKB()
406 { MutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
407 float getMaxLossRateKB()
408 { MutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
410 float getCurrentIncomingRateKB()
411 { MutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
412 float getMaxIncomingRateKB()
413 { MutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
415 float getAvgDownloadRateKB()
416 { MutexAutoLock lock(m_internal_mutex); return avg_kbps; };
417 float getAvgLossRateKB()
418 { MutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
419 float getAvgIncomingRateKB()
420 { MutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
422 u16 getWindowSize() const { return m_window_size; };
424 void setWindowSize(long size)
426 m_window_size = (u16)rangelim(size, MIN_RELIABLE_WINDOW_SIZE, MAX_RELIABLE_WINDOW_SIZE);
430 std::mutex m_internal_mutex;
431 u16 m_window_size = MIN_RELIABLE_WINDOW_SIZE;
433 u16 next_incoming_seqnum = SEQNUM_INITIAL;
435 u16 next_outgoing_seqnum = SEQNUM_INITIAL;
436 u16 next_outgoing_split_seqnum = SEQNUM_INITIAL;
438 unsigned int current_packet_loss = 0;
439 unsigned int current_packet_too_late = 0;
440 unsigned int current_packet_successful = 0;
441 float packet_loss_counter = 0.0f;
443 unsigned int current_bytes_transfered = 0;
444 unsigned int current_bytes_received = 0;
445 unsigned int current_bytes_lost = 0;
446 float max_kbps = 0.0f;
447 float cur_kbps = 0.0f;
448 float avg_kbps = 0.0f;
449 float max_incoming_kbps = 0.0f;
450 float cur_incoming_kbps = 0.0f;
451 float avg_incoming_kbps = 0.0f;
452 float max_kbps_lost = 0.0f;
453 float cur_kbps_lost = 0.0f;
454 float avg_kbps_lost = 0.0f;
455 float bpm_counter = 0.0f;
457 unsigned int rate_samples = 0;
465 PeerHelper() = default;
466 PeerHelper(Peer* peer);
469 PeerHelper& operator=(Peer* peer);
470 Peer* operator->() const;
472 Peer* operator&() const;
473 bool operator!=(void* ptr);
476 Peer *m_peer = nullptr;
492 friend class PeerHelper;
494 Peer(Address address_,session_t id_,Connection* connection) :
496 m_connection(connection),
498 m_last_timeout_check(porting::getTimeMs())
503 MutexAutoLock usage_lock(m_exclusive_access_mutex);
504 FATAL_ERROR_IF(m_usage != 0, "Reference counting failure");
507 // Unique id of the peer
512 virtual void PutReliableSendCommand(ConnectionCommandPtr &c,
513 unsigned int max_packet_size) {};
515 virtual bool getAddress(MTProtocols type, Address& toset) = 0;
517 bool isPendingDeletion()
518 { MutexAutoLock lock(m_exclusive_access_mutex); return m_pending_deletion; };
521 {MutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter = 0.0; };
523 bool isTimedOut(float timeout);
525 unsigned int m_increment_packets_remaining = 0;
527 virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
528 virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
529 virtual SharedBuffer<u8> addSplitPacket(u8 channel, BufferedPacketPtr &toadd,
532 errorstream << "Peer::addSplitPacket called,"
533 << " this is supposed to be never called!" << std::endl;
534 return SharedBuffer<u8>(0);
537 virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
539 virtual float getStat(rtt_stat_type type) const {
542 return m_rtt.min_rtt;
544 return m_rtt.max_rtt;
546 return m_rtt.avg_rtt;
548 return m_rtt.jitter_min;
550 return m_rtt.jitter_max;
552 return m_rtt.jitter_avg;
557 virtual void reportRTT(float rtt) {};
559 void RTTStatistics(float rtt,
560 const std::string &profiler_id = "",
561 unsigned int num_samples = 1000);
566 mutable std::mutex m_exclusive_access_mutex;
568 bool m_pending_deletion = false;
570 Connection* m_connection;
572 // Address of the peer
576 float m_ping_timer = 0.0f;
580 float jitter_min = FLT_MAX;
581 float jitter_max = 0.0f;
582 float jitter_avg = -1.0f;
583 float min_rtt = FLT_MAX;
584 float max_rtt = 0.0f;
585 float avg_rtt = -1.0f;
587 rttstats() = default;
591 float m_last_rtt = -1.0f;
593 // current usage count
594 unsigned int m_usage = 0;
596 // Seconds from last receive
597 float m_timeout_counter = 0.0f;
599 u64 m_last_timeout_check;
602 class UDPPeer : public Peer
606 friend class PeerHelper;
607 friend class ConnectionReceiveThread;
608 friend class ConnectionSendThread;
609 friend class Connection;
611 UDPPeer(u16 a_id, Address a_address, Connection* connection);
612 virtual ~UDPPeer() = default;
614 void PutReliableSendCommand(ConnectionCommandPtr &c,
615 unsigned int max_packet_size);
617 bool getAddress(MTProtocols type, Address& toset);
619 u16 getNextSplitSequenceNumber(u8 channel);
620 void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
622 SharedBuffer<u8> addSplitPacket(u8 channel, BufferedPacketPtr &toadd,
627 Calculates avg_rtt and resend_timeout.
628 rtt=-1 only recalculates resend_timeout
630 void reportRTT(float rtt);
632 void RunCommandQueues(
633 unsigned int max_packet_size,
634 unsigned int maxcommands,
635 unsigned int maxtransfer);
637 float getResendTimeout()
638 { MutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
640 void setResendTimeout(float timeout)
641 { MutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
642 bool Ping(float dtime,SharedBuffer<u8>& data);
644 Channel channels[CHANNEL_COUNT];
645 bool m_pending_disconnect = false;
647 // This is changed dynamically
648 float resend_timeout = 0.5;
650 bool processReliableSendCommand(
651 ConnectionCommandPtr &c_ptr,
652 unsigned int max_packet_size);
659 enum ConnectionEventType {
661 CONNEVENT_DATA_RECEIVED,
662 CONNEVENT_PEER_ADDED,
663 CONNEVENT_PEER_REMOVED,
664 CONNEVENT_BIND_FAILED,
667 struct ConnectionEvent;
668 typedef std::shared_ptr<ConnectionEvent> ConnectionEventPtr;
670 // This is very similar to ConnectionCommand
671 struct ConnectionEvent
673 const ConnectionEventType type;
674 session_t peer_id = 0;
676 bool timeout = false;
679 // We don't want to copy "data"
680 DISABLE_CLASS_COPY(ConnectionEvent);
682 static ConnectionEventPtr create(ConnectionEventType type);
683 static ConnectionEventPtr dataReceived(session_t peer_id, const Buffer<u8> &data);
684 static ConnectionEventPtr peerAdded(session_t peer_id, Address address);
685 static ConnectionEventPtr peerRemoved(session_t peer_id, bool is_timeout, Address address);
686 static ConnectionEventPtr bindFailed();
688 const char *describe() const;
691 ConnectionEvent(ConnectionEventType type_) :
700 friend class ConnectionSendThread;
701 friend class ConnectionReceiveThread;
703 Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
704 PeerHandler *peerhandler);
708 ConnectionEventPtr waitEvent(u32 timeout_ms);
710 void putCommand(ConnectionCommandPtr c);
712 void SetTimeoutMs(u32 timeout) { m_bc_receive_timeout = timeout; }
713 void Serve(Address bind_addr);
714 void Connect(Address address);
717 void Receive(NetworkPacket* pkt);
718 bool TryReceive(NetworkPacket *pkt);
719 void Send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable);
720 session_t GetPeerID() const { return m_peer_id; }
721 Address GetPeerAddress(session_t peer_id);
722 float getPeerStat(session_t peer_id, rtt_stat_type type);
723 float getLocalStat(rate_stat_type type);
724 u32 GetProtocolID() const { return m_protocol_id; };
725 const std::string getDesc();
726 void DisconnectPeer(session_t peer_id);
729 PeerHelper getPeerNoEx(session_t peer_id);
730 u16 lookupPeer(Address& sender);
732 u16 createPeer(Address& sender, MTProtocols protocol, int fd);
733 UDPPeer* createServerPeer(Address& sender);
734 bool deletePeer(session_t peer_id, bool timeout);
736 void SetPeerID(session_t id) { m_peer_id = id; }
738 void sendAck(session_t peer_id, u8 channelnum, u16 seqnum);
740 std::vector<session_t> getPeerIDs()
742 MutexAutoLock peerlock(m_peers_mutex);
746 UDPSocket m_udpSocket;
747 // Command queue: user -> SendThread
748 MutexedQueue<ConnectionCommandPtr> m_command_queue;
750 bool Receive(NetworkPacket *pkt, u32 timeout);
752 void putEvent(ConnectionEventPtr e);
756 bool ConnectedToServer()
758 return getPeerNoEx(PEER_ID_SERVER) != nullptr;
761 // Event queue: ReceiveThread -> user
762 MutexedQueue<ConnectionEventPtr> m_event_queue;
764 session_t m_peer_id = 0;
767 std::map<session_t, Peer *> m_peers;
768 std::vector<session_t> m_peer_ids;
769 std::mutex m_peers_mutex;
771 std::unique_ptr<ConnectionSendThread> m_sendThread;
772 std::unique_ptr<ConnectionReceiveThread> m_receiveThread;
774 mutable std::mutex m_info_mutex;
776 // Backwards compatibility
777 PeerHandler *m_bc_peerhandler;
778 u32 m_bc_receive_timeout = 0;
780 bool m_shutting_down = false;
782 session_t m_next_remote_peer_id = 2;