3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #ifndef CONNECTION_HEADER
21 #define CONNECTION_HEADER
23 #include "irrlichttypes_bloated.h"
25 #include "exceptions.h"
26 #include "constants.h"
27 #include "network/networkpacket.h"
28 #include "util/pointer.h"
29 #include "util/container.h"
30 #include "util/thread.h"
31 #include "util/numeric.h"
43 class NotFoundException : public BaseException
46 NotFoundException(const char *s):
51 class PeerNotFoundException : public BaseException
54 PeerNotFoundException(const char *s):
59 class ConnectionException : public BaseException
62 ConnectionException(const char *s):
67 class ConnectionBindFailed : public BaseException
70 ConnectionBindFailed(const char *s):
75 class InvalidIncomingDataException : public BaseException
78 InvalidIncomingDataException(const char *s):
83 class InvalidOutgoingDataException : public BaseException
86 InvalidOutgoingDataException(const char *s):
91 class NoIncomingDataException : public BaseException
94 NoIncomingDataException(const char *s):
99 class ProcessedSilentlyException : public BaseException
102 ProcessedSilentlyException(const char *s):
107 class ProcessedQueued : public BaseException
110 ProcessedQueued(const char *s):
115 class IncomingDataCorruption : public BaseException
118 IncomingDataCorruption(const char *s):
123 typedef enum MTProtocols {
126 MTP_MINETEST_RELIABLE_UDP
129 #define SEQNUM_MAX 65535
130 inline bool seqnum_higher(u16 totest, u16 base)
134 if ((totest - base) > (SEQNUM_MAX/2))
141 if ((base - totest) > (SEQNUM_MAX/2))
148 inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
150 u16 window_start = next;
151 u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1);
153 if (window_start < window_end)
155 return ((seqnum >= window_start) && (seqnum < window_end));
159 return ((seqnum < window_end) || (seqnum >= window_start));
163 struct BufferedPacket
165 BufferedPacket(u8 *a_data, u32 a_size):
166 data(a_data, a_size), time(0.0), totaltime(0.0), absolute_send_time(-1),
169 BufferedPacket(u32 a_size):
170 data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1),
173 SharedBuffer<u8> data; // Data of the packet, including headers
174 float time; // Seconds from buffering the packet or re-sending
175 float totaltime; // Seconds from buffering the packet
176 unsigned int absolute_send_time;
177 Address address; // Sender or destination
178 unsigned int resend_count;
181 // This adds the base headers to the data and makes a packet out of it
182 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
183 u32 protocol_id, u16 sender_peer_id, u8 channel);
184 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
185 u32 protocol_id, u16 sender_peer_id, u8 channel);
187 // Add the TYPE_ORIGINAL header to the data
188 SharedBuffer<u8> makeOriginalPacket(
189 SharedBuffer<u8> data);
191 // Split data in chunks and add TYPE_SPLIT headers to them
192 std::list<SharedBuffer<u8> > makeSplitPacket(
193 SharedBuffer<u8> data,
197 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
198 // Increments split_seqnum if a split packet is made
199 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
200 SharedBuffer<u8> data,
204 // Add the TYPE_RELIABLE header to the data
205 SharedBuffer<u8> makeReliablePacket(
206 SharedBuffer<u8> data,
209 struct IncomingSplitPacket
211 IncomingSplitPacket()
216 // Key is chunk number, value is data without headers
217 std::map<u16, SharedBuffer<u8> > chunks;
219 float time; // Seconds from adding
220 bool reliable; // If true, isn't deleted on timeout
224 return (chunks.size() == chunk_count);
231 A packet is sent through a channel to a peer with a basic header:
232 TODO: Should we have a receiver_peer_id also?
235 [4] u16 sender_peer_id
239 value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
240 value 1 (PEER_ID_SERVER) is reserved for server
241 these constants are defined in constants.h
243 The lower the number, the higher the priority is.
244 Only channels 0, 1 and 2 exist.
246 #define BASE_HEADER_SIZE 7
247 #define CHANNEL_COUNT 3
251 CONTROL: This is a packet used by the protocol.
252 - When this is processed, nothing is handed to the user.
256 controltype and data description:
259 CONTROLTYPE_SET_PEER_ID
262 - There is no actual reply, but this can be sent in a reliable
263 packet to get a reply
266 #define TYPE_CONTROL 0
267 #define CONTROLTYPE_ACK 0
268 #define CONTROLTYPE_SET_PEER_ID 1
269 #define CONTROLTYPE_PING 2
270 #define CONTROLTYPE_DISCO 3
271 #define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4
274 ORIGINAL: This is a plain packet with no control and no error
276 - When this is processed, it is directly handed to the user.
280 #define TYPE_ORIGINAL 1
281 #define ORIGINAL_HEADER_SIZE 1
283 SPLIT: These are sequences of packets forming one bigger piece of
285 - When processed and all the packet_nums 0...packet_count-1 are
286 present (this should be buffered), the resulting data shall be
287 directly handed to the user.
288 - If the data fails to come up in a reasonable time, the buffer shall
289 be silently discarded.
290 - These can be sent as-is or atop of a RELIABLE packet stream.
299 RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
300 and they shall be delivered in the same order as sent. This is done
301 with a buffer in the receiving and transmitting end.
302 - When this is processed, the contents of each packet is recursively
303 processed as packets.
309 #define TYPE_RELIABLE 3
310 #define RELIABLE_HEADER_SIZE 3
311 #define SEQNUM_INITIAL 65500
314 A buffer which stores reliable packets and sorts them internally
315 for fast access to the smallest one.
318 typedef std::list<BufferedPacket>::iterator RPBSearchResult;
320 class ReliablePacketBuffer
323 ReliablePacketBuffer();
325 bool getFirstSeqnum(u16& result);
327 BufferedPacket popFirst();
328 BufferedPacket popSeqnum(u16 seqnum);
329 void insert(BufferedPacket &p,u16 next_expected);
331 void incrementTimeouts(float dtime);
332 std::list<BufferedPacket> getTimedOuts(float timeout,
333 unsigned int max_packets);
337 bool containsPacket(u16 seqnum);
338 RPBSearchResult notFound();
343 RPBSearchResult findPacket(u16 seqnum);
345 std::list<BufferedPacket> m_list;
348 u16 m_oldest_non_answered_ack;
354 A buffer for reconstructing split packets
357 class IncomingSplitBuffer
360 ~IncomingSplitBuffer();
362 Returns a reference counted buffer of length != 0 when a full split
363 packet is constructed. If not, returns one of length 0.
365 SharedBuffer<u8> insert(BufferedPacket &p, bool reliable);
367 void removeUnreliableTimedOuts(float dtime, float timeout);
371 std::map<u16, IncomingSplitPacket*> m_buf;
376 struct OutgoingPacket
380 SharedBuffer<u8> data;
384 OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
385 bool reliable_,bool ack_=false):
387 channelnum(channelnum_),
395 enum ConnectionCommandType{
400 CONNCMD_DISCONNECT_PEER,
405 CONCMD_DISABLE_LEGACY
408 struct ConnectionCommand
410 enum ConnectionCommandType type;
418 ConnectionCommand(): type(CONNCMD_NONE), peer_id(PEER_ID_INEXISTENT), reliable(false), raw(false) {}
420 void serve(Address address_)
422 type = CONNCMD_SERVE;
425 void connect(Address address_)
427 type = CONNCMD_CONNECT;
432 type = CONNCMD_DISCONNECT;
434 void disconnect_peer(u16 peer_id_)
436 type = CONNCMD_DISCONNECT_PEER;
439 void send(u16 peer_id_, u8 channelnum_,
440 SharedBuffer<u8> data_, bool reliable_)
444 channelnum = channelnum_;
446 reliable = reliable_;
448 void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
450 type = CONNCMD_SEND_TO_ALL;
451 channelnum = channelnum_;
453 reliable = reliable_;
456 void ack(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_)
460 channelnum = channelnum_;
465 void createPeer(u16 peer_id_, SharedBuffer<u8> data_)
467 type = CONCMD_CREATE_PEER;
475 void disableLegacy(u16 peer_id_, SharedBuffer<u8> data_)
477 type = CONCMD_DISABLE_LEGACY;
490 u16 readNextIncomingSeqNum();
491 u16 incNextIncomingSeqNum();
493 u16 getOutgoingSequenceNumber(bool& successfull);
494 u16 readOutgoingSequenceNumber();
495 bool putBackSequenceNumber(u16);
497 u16 readNextSplitSeqNum();
498 void setNextSplitSeqNum(u16 seqnum);
500 // This is for buffering the incoming packets that are coming in
502 ReliablePacketBuffer incoming_reliables;
503 // This is for buffering the sent packets so that the sender can
504 // re-send them if no ACK is received
505 ReliablePacketBuffer outgoing_reliables_sent;
507 //queued reliable packets
508 Queue<BufferedPacket> queued_reliables;
510 //queue commands prior splitting to packets
511 Queue<ConnectionCommand> queued_commands;
513 IncomingSplitBuffer incoming_splits;
518 void UpdatePacketLossCounter(unsigned int count);
519 void UpdatePacketTooLateCounter();
520 void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
521 void UpdateBytesLost(unsigned int bytes);
522 void UpdateBytesReceived(unsigned int bytes);
524 void UpdateTimers(float dtime, bool legacy_peer);
526 const float getCurrentDownloadRateKB()
527 { JMutexAutoLock lock(m_internal_mutex); return cur_kbps; };
528 const float getMaxDownloadRateKB()
529 { JMutexAutoLock lock(m_internal_mutex); return max_kbps; };
531 const float getCurrentLossRateKB()
532 { JMutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
533 const float getMaxLossRateKB()
534 { JMutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
536 const float getCurrentIncomingRateKB()
537 { JMutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
538 const float getMaxIncomingRateKB()
539 { JMutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
541 const float getAvgDownloadRateKB()
542 { JMutexAutoLock lock(m_internal_mutex); return avg_kbps; };
543 const float getAvgLossRateKB()
544 { JMutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
545 const float getAvgIncomingRateKB()
546 { JMutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
548 const unsigned int getWindowSize() const { return window_size; };
550 void setWindowSize(unsigned int size) { window_size = size; };
552 JMutex m_internal_mutex;
555 u16 next_incoming_seqnum;
557 u16 next_outgoing_seqnum;
558 u16 next_outgoing_split_seqnum;
560 unsigned int current_packet_loss;
561 unsigned int current_packet_too_late;
562 unsigned int current_packet_successfull;
563 float packet_loss_counter;
565 unsigned int current_bytes_transfered;
566 unsigned int current_bytes_received;
567 unsigned int current_bytes_lost;
571 float max_incoming_kbps;
572 float cur_incoming_kbps;
573 float avg_incoming_kbps;
579 unsigned int rate_samples;
603 virtual ~PeerHandler()
608 This is called after the Peer has been inserted into the
609 Connection's peer container.
611 virtual void peerAdded(Peer *peer) = 0;
613 This is called before the Peer has been removed from the
614 Connection's peer container.
616 virtual void deletingPeer(Peer *peer, bool timeout) = 0;
623 PeerHelper(Peer* peer);
626 PeerHelper& operator=(Peer* peer);
627 Peer* operator->() const;
629 Peer* operator&() const;
630 bool operator!=(void* ptr);
658 friend class PeerHelper;
660 Peer(Address address_,u16 id_,Connection* connection) :
662 m_increment_packets_remaining(9),
663 m_increment_bytes_remaining(0),
664 m_pending_deletion(false),
665 m_connection(connection),
670 m_timeout_counter(0.0),
671 m_last_timeout_check(porting::getTimeMs()),
672 m_has_sent_with_id(false)
674 m_rtt.avg_rtt = -1.0;
675 m_rtt.jitter_avg = -1.0;
676 m_rtt.jitter_max = 0.0;
678 m_rtt.jitter_min = FLT_MAX;
679 m_rtt.min_rtt = FLT_MAX;
683 JMutexAutoLock usage_lock(m_exclusive_access_mutex);
684 assert(m_usage == 0);
687 // Unique id of the peer
692 virtual void PutReliableSendCommand(ConnectionCommand &c,
693 unsigned int max_packet_size) {};
695 virtual bool isActive() { return false; };
697 virtual bool getAddress(MTProtocols type, Address& toset) = 0;
700 {JMutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; };
702 bool isTimedOut(float timeout);
705 { JMutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; };
708 { JMutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; };
710 unsigned int m_increment_packets_remaining;
711 unsigned int m_increment_bytes_remaining;
713 virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
714 virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
715 virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
716 BufferedPacket toadd,
719 fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
720 return SharedBuffer<u8>(0);
723 virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
725 virtual float getStat(rtt_stat_type type) const {
728 return m_rtt.min_rtt;
730 return m_rtt.max_rtt;
732 return m_rtt.avg_rtt;
734 return m_rtt.jitter_min;
736 return m_rtt.jitter_max;
738 return m_rtt.jitter_avg;
743 virtual void reportRTT(float rtt) {};
745 void RTTStatistics(float rtt,
746 std::string profiler_id="",
747 unsigned int num_samples=1000);
752 JMutex m_exclusive_access_mutex;
754 bool m_pending_deletion;
756 Connection* m_connection;
758 // Address of the peer
777 // current usage count
778 unsigned int m_usage;
780 // Seconds from last receive
781 float m_timeout_counter;
783 u32 m_last_timeout_check;
785 bool m_has_sent_with_id;
788 class UDPPeer : public Peer
792 friend class PeerHelper;
793 friend class ConnectionReceiveThread;
794 friend class ConnectionSendThread;
795 friend class Connection;
797 UDPPeer(u16 a_id, Address a_address, Connection* connection);
798 virtual ~UDPPeer() {};
800 void PutReliableSendCommand(ConnectionCommand &c,
801 unsigned int max_packet_size);
804 { return ((hasSentWithID()) && (!m_pending_deletion)); };
806 bool getAddress(MTProtocols type, Address& toset);
808 void setNonLegacyPeer();
811 { return m_legacy_peer; }
813 u16 getNextSplitSequenceNumber(u8 channel);
814 void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
816 SharedBuffer<u8> addSpiltPacket(u8 channel,
817 BufferedPacket toadd,
823 Calculates avg_rtt and resend_timeout.
824 rtt=-1 only recalculates resend_timeout
826 void reportRTT(float rtt);
828 void RunCommandQueues(
829 unsigned int max_packet_size,
830 unsigned int maxcommands,
831 unsigned int maxtransfer);
833 float getResendTimeout()
834 { JMutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
836 void setResendTimeout(float timeout)
837 { JMutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
838 bool Ping(float dtime,SharedBuffer<u8>& data);
840 Channel channels[CHANNEL_COUNT];
841 bool m_pending_disconnect;
843 // This is changed dynamically
844 float resend_timeout;
846 bool processReliableSendCommand(
847 ConnectionCommand &c,
848 unsigned int max_packet_size);
857 enum ConnectionEventType{
859 CONNEVENT_DATA_RECEIVED,
860 CONNEVENT_PEER_ADDED,
861 CONNEVENT_PEER_REMOVED,
862 CONNEVENT_BIND_FAILED,
865 struct ConnectionEvent
867 enum ConnectionEventType type;
873 ConnectionEvent(): type(CONNEVENT_NONE) {}
875 std::string describe()
879 return "CONNEVENT_NONE";
880 case CONNEVENT_DATA_RECEIVED:
881 return "CONNEVENT_DATA_RECEIVED";
882 case CONNEVENT_PEER_ADDED:
883 return "CONNEVENT_PEER_ADDED";
884 case CONNEVENT_PEER_REMOVED:
885 return "CONNEVENT_PEER_REMOVED";
886 case CONNEVENT_BIND_FAILED:
887 return "CONNEVENT_BIND_FAILED";
889 return "Invalid ConnectionEvent";
892 void dataReceived(u16 peer_id_, SharedBuffer<u8> data_)
894 type = CONNEVENT_DATA_RECEIVED;
898 void peerAdded(u16 peer_id_, Address address_)
900 type = CONNEVENT_PEER_ADDED;
904 void peerRemoved(u16 peer_id_, bool timeout_, Address address_)
906 type = CONNEVENT_PEER_REMOVED;
913 type = CONNEVENT_BIND_FAILED;
917 class ConnectionSendThread : public JThread {
920 friend class UDPPeer;
922 ConnectionSendThread(unsigned int max_packet_size, float timeout);
928 void setParent(Connection* parent) {
929 assert(parent != NULL);
930 m_connection = parent;
933 void setPeerTimeout(float peer_timeout)
934 { m_timeout = peer_timeout; }
937 void runTimeouts (float dtime);
938 void rawSend (const BufferedPacket &packet);
939 bool rawSendAsPacket(u16 peer_id, u8 channelnum,
940 SharedBuffer<u8> data, bool reliable);
942 void processReliableCommand (ConnectionCommand &c);
943 void processNonReliableCommand (ConnectionCommand &c);
944 void serve (Address bind_address);
945 void connect (Address address);
947 void disconnect_peer(u16 peer_id);
948 void send (u16 peer_id, u8 channelnum,
949 SharedBuffer<u8> data);
950 void sendReliable (ConnectionCommand &c);
951 void sendToAll (u8 channelnum,
952 SharedBuffer<u8> data);
953 void sendToAllReliable(ConnectionCommand &c);
955 void sendPackets (float dtime);
957 void sendAsPacket (u16 peer_id, u8 channelnum,
958 SharedBuffer<u8> data,bool ack=false);
960 void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
962 bool packetsQueued();
964 Connection* m_connection;
965 unsigned int m_max_packet_size;
967 Queue<OutgoingPacket> m_outgoing_queue;
968 JSemaphore m_send_sleep_semaphore;
970 unsigned int m_iteration_packets_avaialble;
971 unsigned int m_max_commands_per_iteration;
972 unsigned int m_max_data_packets_per_iteration;
973 unsigned int m_max_packets_requeued;
976 class ConnectionReceiveThread : public JThread {
978 ConnectionReceiveThread(unsigned int max_packet_size);
982 void setParent(Connection* parent) {
983 assert(parent != NULL);
984 m_connection = parent;
990 // Returns next data from a buffer if possible
991 // If found, returns true; if not, false.
992 // If found, sets peer_id and dst
993 bool getFromBuffers (u16 &peer_id, SharedBuffer<u8> &dst);
995 bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
996 SharedBuffer<u8> &dst);
999 Processes a packet with the basic header stripped out.
1001 packetdata: Data in packet (with no base headers)
1002 peer_id: peer id of the sender of the packet in question
1003 channelnum: channel on which the packet was sent
1004 reliable: true if recursing into a reliable packet
1006 SharedBuffer<u8> processPacket(Channel *channel,
1007 SharedBuffer<u8> packetdata, u16 peer_id,
1008 u8 channelnum, bool reliable);
1011 Connection* m_connection;
1017 friend class ConnectionSendThread;
1018 friend class ConnectionReceiveThread;
1020 Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6);
1021 Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
1022 PeerHandler *peerhandler);
1026 ConnectionEvent getEvent();
1027 ConnectionEvent waitEvent(u32 timeout_ms);
1028 void putCommand(ConnectionCommand &c);
1030 void SetTimeoutMs(int timeout) { m_bc_receive_timeout = timeout; }
1031 void Serve(Address bind_addr);
1032 void Connect(Address address);
1035 u32 Receive(u16 &peer_id, SharedBuffer<u8> &data);
1036 void Send(u16 peer_id, u8 channelnum, NetworkPacket* pkt, bool reliable);
1037 u16 GetPeerID() { return m_peer_id; }
1038 Address GetPeerAddress(u16 peer_id);
1039 float getPeerStat(u16 peer_id, rtt_stat_type type);
1040 float getLocalStat(rate_stat_type type);
1041 const u32 GetProtocolID() const { return m_protocol_id; };
1042 const std::string getDesc();
1043 void DisconnectPeer(u16 peer_id);
1046 PeerHelper getPeer(u16 peer_id);
1047 PeerHelper getPeerNoEx(u16 peer_id);
1048 u16 lookupPeer(Address& sender);
1050 u16 createPeer(Address& sender, MTProtocols protocol, int fd);
1051 UDPPeer* createServerPeer(Address& sender);
1052 bool deletePeer(u16 peer_id, bool timeout);
1054 void SetPeerID(u16 id) { m_peer_id = id; }
1056 void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
1058 void PrintInfo(std::ostream &out);
1061 std::list<u16> getPeerIDs() { return m_peer_ids; }
1063 UDPSocket m_udpSocket;
1064 MutexedQueue<ConnectionCommand> m_command_queue;
1066 void putEvent(ConnectionEvent &e);
1069 { m_sendThread.Trigger(); }
1071 std::list<Peer*> getPeers();
1073 MutexedQueue<ConnectionEvent> m_event_queue;
1078 std::map<u16, Peer*> m_peers;
1079 std::list<u16> m_peer_ids;
1080 JMutex m_peers_mutex;
1082 ConnectionSendThread m_sendThread;
1083 ConnectionReceiveThread m_receiveThread;
1085 JMutex m_info_mutex;
1087 // Backwards compatibility
1088 PeerHandler *m_bc_peerhandler;
1089 int m_bc_receive_timeout;
1091 bool m_shutting_down;
1093 u16 m_next_remote_peer_id;