X-Git-Url: https://git.lizzy.rs/?a=blobdiff_plain;f=src%2Fconnection.h;h=9d646f4991740a7975d66ad569de17f09f7c4667;hb=5fefc4bbf6380960f11f0b125fc51b6efdc19e2e;hp=570bc92ab22ba817873c0e549af2a59882b46940;hpb=4b6138e69b65271b0e568f821a4d1bd285affedd;p=minetest.git diff --git a/src/connection.h b/src/connection.h index 570bc92ab..9d646f499 100644 --- a/src/connection.h +++ b/src/connection.h @@ -1,18 +1,18 @@ /* -Minetest-c55 -Copyright (C) 2010 celeron55, Perttu Ahola +Minetest +Copyright (C) 2013 celeron55, Perttu Ahola This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; either version 2 of the License, or +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. +GNU Lesser General Public License for more details. -You should have received a copy of the GNU General Public License along +You should have received a copy of the GNU Lesser General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ @@ -20,14 +20,18 @@ with this program; if not, write to the Free Software Foundation, Inc., #ifndef CONNECTION_HEADER #define CONNECTION_HEADER -#include -#include -#include "debug.h" -#include "common_irrlicht.h" +#include "irrlichttypes_bloated.h" #include "socket.h" -#include "utility.h" #include "exceptions.h" #include "constants.h" +#include "util/pointer.h" +#include "util/container.h" +#include "util/thread.h" +#include "util/numeric.h" +#include +#include +#include +#include namespace con { @@ -59,13 +63,13 @@ class ConnectionException : public BaseException {} }; -/*class ThrottlingException : public BaseException +class ConnectionBindFailed : public BaseException { public: - ThrottlingException(const char *s): + ConnectionBindFailed(const char *s): BaseException(s) {} -};*/ +}; class InvalidIncomingDataException : public BaseException { @@ -99,35 +103,74 @@ class ProcessedSilentlyException : public BaseException {} }; -inline u16 readPeerId(u8 *packetdata) +class ProcessedQueued : public BaseException { - return readU16(&packetdata[4]); -} -inline u8 readChannel(u8 *packetdata) +public: + ProcessedQueued(const char *s): + BaseException(s) + {} +}; + +class IncomingDataCorruption : public BaseException { - return readU8(&packetdata[6]); -} +public: + IncomingDataCorruption(const char *s): + BaseException(s) + {} +}; + +typedef enum MTProtocols { + PRIMARY, + UDP, + MINETEST_RELIABLE_UDP +} MTProtocols; #define SEQNUM_MAX 65535 -inline bool seqnum_higher(u16 higher, u16 lower) +inline bool seqnum_higher(u16 totest, u16 base) { - if(lower > higher && lower - higher > SEQNUM_MAX/2){ - return true; + if (totest > base) + { + if((totest - base) > (SEQNUM_MAX/2)) + return false; + else + return true; + } + else + { + if((base - totest) > (SEQNUM_MAX/2)) + return true; + else + return false; + } +} + +inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size) +{ + u16 window_start = next; + u16 window_end = ( next + window_size ) % (SEQNUM_MAX+1); + + if (window_start < window_end) + { + return ((seqnum >= window_start) && (seqnum < window_end)); + } + else + { + return ((seqnum < window_end) || (seqnum >= window_start)); } - return (higher > lower); } struct BufferedPacket { BufferedPacket(u8 *a_data, u32 a_size): - data(a_data, a_size), time(0.0), totaltime(0.0) + data(a_data, a_size), time(0.0), totaltime(0.0), absolute_send_time(-1) {} BufferedPacket(u32 a_size): - data(a_size), time(0.0), totaltime(0.0) + data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1) {} SharedBuffer data; // Data of the packet, including headers float time; // Seconds from buffering the packet or re-sending float totaltime; // Seconds from buffering the packet + unsigned int absolute_send_time; Address address; // Sender or destination }; @@ -142,14 +185,14 @@ SharedBuffer makeOriginalPacket( SharedBuffer data); // Split data in chunks and add TYPE_SPLIT headers to them -core::list > makeSplitPacket( +std::list > makeSplitPacket( SharedBuffer data, u32 chunksize_max, u16 seqnum); // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet // Increments split_seqnum if a split packet is made -core::list > makeAutoSplitPacket( +std::list > makeAutoSplitPacket( SharedBuffer data, u32 chunksize_max, u16 &split_seqnum); @@ -167,7 +210,7 @@ struct IncomingSplitPacket reliable = false; } // Key is chunk number, value is data without headers - core::map > chunks; + std::map > chunks; u32 chunk_count; float time; // Seconds from adding bool reliable; // If true, isn't deleted on timeout @@ -189,15 +232,14 @@ TODO: Should we have a receiver_peer_id also? [6] u8 channel sender_peer_id: Unique to each peer. - value 0 is reserved for making new connections - value 1 is reserved for server + value 0 (PEER_ID_INEXISTENT) is reserved for making new connections + value 1 (PEER_ID_SERVER) is reserved for server + these constants are defined in constants.h channel: The lower the number, the higher the priority is. Only channels 0, 1 and 2 exist. */ #define BASE_HEADER_SIZE 7 -#define PEER_ID_INEXISTENT 0 -#define PEER_ID_SERVER 1 #define CHANNEL_COUNT 3 /* Packet types: @@ -222,6 +264,8 @@ controltype and data description: #define CONTROLTYPE_SET_PEER_ID 1 #define CONTROLTYPE_PING 2 #define CONTROLTYPE_DISCO 3 +#define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4 + /* ORIGINAL: This is a plain packet with no control and no error checking at all. @@ -260,7 +304,6 @@ with a buffer in the receiving and transmitting end. */ #define TYPE_RELIABLE 3 #define RELIABLE_HEADER_SIZE 3 -//#define SEQNUM_INITIAL 0x10 #define SEQNUM_INITIAL 65500 /* @@ -268,28 +311,41 @@ with a buffer in the receiving and transmitting end. for fast access to the smallest one. */ -typedef core::list::Iterator RPBSearchResult; +typedef std::list::iterator RPBSearchResult; class ReliablePacketBuffer { public: - - void print(); - bool empty(); - u32 size(); - RPBSearchResult findPacket(u16 seqnum); - RPBSearchResult notFound(); - u16 getFirstSeqnum(); + ReliablePacketBuffer(); + + bool getFirstSeqnum(u16& result); + BufferedPacket popFirst(); BufferedPacket popSeqnum(u16 seqnum); - void insert(BufferedPacket &p); + void insert(BufferedPacket &p,u16 next_expected); + void incrementTimeouts(float dtime); - void resetTimedOuts(float timeout); - bool anyTotaltimeReached(float timeout); - core::list getTimedOuts(float timeout); + std::list getTimedOuts(float timeout, + unsigned int max_packets); + + void print(); + bool empty(); + bool containsPacket(u16 seqnum); + RPBSearchResult notFound(); + u32 size(); + private: - core::list m_list; + RPBSearchResult findPacket(u16 seqnum); + + std::list m_list; + u16 m_list_size; + + u16 m_oldest_non_answered_ack; + + JMutex m_list_mutex; + + unsigned int writeptr; }; /* @@ -310,42 +366,228 @@ class IncomingSplitBuffer private: // Key is seqnum - core::map m_buf; + std::map m_buf; + + JMutex m_map_mutex; }; -class Connection; +struct OutgoingPacket +{ + u16 peer_id; + u8 channelnum; + SharedBuffer data; + bool reliable; + bool ack; + + OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer data_, + bool reliable_,bool ack_=false): + peer_id(peer_id_), + channelnum(channelnum_), + data(data_), + reliable(reliable_), + ack(ack_) + { + } +}; + +enum ConnectionCommandType{ + CONNCMD_NONE, + CONNCMD_SERVE, + CONNCMD_CONNECT, + CONNCMD_DISCONNECT, + CONNCMD_DISCONNECT_PEER, + CONNCMD_SEND, + CONNCMD_SEND_TO_ALL, + CONCMD_ACK, + CONCMD_CREATE_PEER, + CONCMD_DISABLE_LEGACY +}; -struct Channel +struct ConnectionCommand { - Channel(); - ~Channel(); + enum ConnectionCommandType type; + Address address; + u16 peer_id; + u8 channelnum; + Buffer data; + bool reliable; + bool raw; - u16 next_outgoing_seqnum; - u16 next_incoming_seqnum; - u16 next_outgoing_split_seqnum; + ConnectionCommand(): type(CONNCMD_NONE), peer_id(PEER_ID_INEXISTENT), reliable(false), raw(false) {} + + void serve(Address address_) + { + type = CONNCMD_SERVE; + address = address_; + } + void connect(Address address_) + { + type = CONNCMD_CONNECT; + address = address_; + } + void disconnect() + { + type = CONNCMD_DISCONNECT; + } + void disconnect_peer(u16 peer_id_) + { + type = CONNCMD_DISCONNECT_PEER; + peer_id = peer_id_; + } + void send(u16 peer_id_, u8 channelnum_, + SharedBuffer data_, bool reliable_) + { + type = CONNCMD_SEND; + peer_id = peer_id_; + channelnum = channelnum_; + data = data_; + reliable = reliable_; + } + void sendToAll(u8 channelnum_, SharedBuffer data_, bool reliable_) + { + type = CONNCMD_SEND_TO_ALL; + channelnum = channelnum_; + data = data_; + reliable = reliable_; + } + + void ack(u16 peer_id_, u8 channelnum_, SharedBuffer data_) + { + type = CONCMD_ACK; + peer_id = peer_id_; + channelnum = channelnum_; + data = data_; + reliable = false; + } + + void createPeer(u16 peer_id_, SharedBuffer data_) + { + type = CONCMD_CREATE_PEER; + peer_id = peer_id_; + data = data_; + channelnum = 0; + reliable = true; + raw = true; + } + + void disableLegacy(u16 peer_id_, SharedBuffer data_) + { + type = CONCMD_DISABLE_LEGACY; + peer_id = peer_id_; + data = data_; + channelnum = 0; + reliable = true; + raw = true; + } +}; + +class Channel +{ + +public: + u16 readNextIncomingSeqNum(); + u16 incNextIncomingSeqNum(); + + u16 getOutgoingSequenceNumber(bool& successfull); + u16 readOutgoingSequenceNumber(); + bool putBackSequenceNumber(u16); + + u16 readNextSplitSeqNum(); + void setNextSplitSeqNum(u16 seqnum); // This is for buffering the incoming packets that are coming in // the wrong order ReliablePacketBuffer incoming_reliables; // This is for buffering the sent packets so that the sender can // re-send them if no ACK is received - ReliablePacketBuffer outgoing_reliables; + ReliablePacketBuffer outgoing_reliables_sent; + + //queued reliable packets + Queue queued_reliables; + + //queue commands prior splitting to packets + Queue queued_commands; IncomingSplitBuffer incoming_splits; + + Channel(); + ~Channel(); + + void UpdatePacketLossCounter(unsigned int count); + void UpdatePacketTooLateCounter(); + void UpdateBytesSent(unsigned int bytes,unsigned int packages=1); + void UpdateBytesLost(unsigned int bytes); + + void UpdateTimers(float dtime); + + const float getCurrentDownloadRateKB() + { JMutexAutoLock lock(m_internal_mutex); return cur_kbps; }; + const float getMaxDownloadRateKB() + { JMutexAutoLock lock(m_internal_mutex); return max_kbps; }; + + const float getCurrentLossRateKB() + { JMutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; }; + const float getMaxLossRateKB() + { JMutexAutoLock lock(m_internal_mutex); return max_kbps_lost; }; + + const float getAvgDownloadRateKB() + { JMutexAutoLock lock(m_internal_mutex); return avg_kbps; }; + const float getAvgLossRateKB() + { JMutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; }; + + const unsigned int getWindowSize() const { return window_size; }; + + void setWindowSize(unsigned int size) { window_size = size; }; +private: + JMutex m_internal_mutex; + unsigned int window_size; + + u16 next_incoming_seqnum; + + u16 next_outgoing_seqnum; + u16 next_outgoing_split_seqnum; + + unsigned int current_packet_loss; + unsigned int current_packet_too_late; + unsigned int current_packet_successfull; + float packet_loss_counter; + + unsigned int current_bytes_transfered; + unsigned int current_bytes_lost; + float max_kbps; + float cur_kbps; + float avg_kbps; + float max_kbps_lost; + float cur_kbps_lost; + float avg_kbps_lost; + float bpm_counter; }; class Peer; +enum PeerChangeType +{ + PEER_ADDED, + PEER_REMOVED +}; +struct PeerChange +{ + PeerChangeType type; + u16 peer_id; + bool timeout; +}; + class PeerHandler { public: + PeerHandler() { } virtual ~PeerHandler() { } - + /* This is called after the Peer has been inserted into the Connection's peer container. @@ -358,79 +600,247 @@ class PeerHandler virtual void deletingPeer(Peer *peer, bool timeout) = 0; }; -class Peer +class PeerHelper { public: + PeerHelper(); + PeerHelper(Peer* peer); + ~PeerHelper(); - Peer(u16 a_id, Address a_address); - virtual ~Peer(); - + PeerHelper& operator=(Peer* peer); + Peer* operator->() const; + bool operator!(); + Peer* operator&() const; + bool operator!=(void* ptr); + +private: + Peer* m_peer; +}; + +class Connection; + +typedef enum rtt_stat_type { + MIN_RTT, + MAX_RTT, + AVG_RTT, + MIN_JITTER, + MAX_JITTER, + AVG_JITTER +} rtt_stat_type; + +class Peer { + public: + friend class PeerHelper; + + Peer(Address address_,u16 id_,Connection* connection) : + id(id_), + m_increment_packets_remaining(9), + m_increment_bytes_remaining(0), + m_pending_deletion(false), + m_connection(connection), + address(address_), + m_ping_timer(0.0), + m_last_rtt(-1.0), + m_usage(0), + m_timeout_counter(0.0), + m_last_timeout_check(porting::getTimeMs()), + m_has_sent_with_id(false) + { + m_rtt.avg_rtt = -1.0; + m_rtt.jitter_avg = -1.0; + m_rtt.jitter_max = 0.0; + m_rtt.max_rtt = 0.0; + m_rtt.jitter_min = FLT_MAX; + m_rtt.min_rtt = FLT_MAX; + }; + + virtual ~Peer() { + JMutexAutoLock usage_lock(m_exclusive_access_mutex); + assert(m_usage == 0); + }; + + // Unique id of the peer + u16 id; + + void Drop(); + + virtual void PutReliableSendCommand(ConnectionCommand &c, + unsigned int max_packet_size) {}; + + virtual bool isActive() { return false; }; + + virtual bool getAddress(MTProtocols type, Address& toset) = 0; + + void ResetTimeout() + {JMutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; }; + + bool isTimedOut(float timeout); + + void setSentWithID() + { JMutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; }; + + bool hasSentWithID() + { JMutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; }; + + unsigned int m_increment_packets_remaining; + unsigned int m_increment_bytes_remaining; + + virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; }; + virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {}; + virtual SharedBuffer addSpiltPacket(u8 channel, + BufferedPacket toadd, + bool reliable) + { + fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n"); + return SharedBuffer(0); + }; + + virtual bool Ping(float dtime, SharedBuffer& data) { return false; }; + + virtual float getStat(rtt_stat_type type) const { + switch (type) { + case MIN_RTT: + return m_rtt.min_rtt; + case MAX_RTT: + return m_rtt.max_rtt; + case AVG_RTT: + return m_rtt.avg_rtt; + case MIN_JITTER: + return m_rtt.jitter_min; + case MAX_JITTER: + return m_rtt.jitter_max; + case AVG_JITTER: + return m_rtt.jitter_avg; + } + return -1; + } + protected: + virtual void reportRTT(float rtt) {}; + + void RTTStatistics(float rtt, + std::string profiler_id="", + unsigned int num_samples=1000); + + bool IncUseCount(); + void DecUseCount(); + + JMutex m_exclusive_access_mutex; + + bool m_pending_deletion; + + Connection* m_connection; + + // Address of the peer + Address address; + + // Ping timer + float m_ping_timer; + private: + + struct rttstats { + float jitter_min; + float jitter_max; + float jitter_avg; + float min_rtt; + float max_rtt; + float avg_rtt; + }; + + rttstats m_rtt; + float m_last_rtt; + + // current usage count + unsigned int m_usage; + + // Seconds from last receive + float m_timeout_counter; + + u32 m_last_timeout_check; + + bool m_has_sent_with_id; +}; + +class UDPPeer : public Peer +{ +public: + + friend class PeerHelper; + friend class ConnectionReceiveThread; + friend class ConnectionSendThread; + + UDPPeer(u16 a_id, Address a_address, Connection* connection); + virtual ~UDPPeer() {}; + + void PutReliableSendCommand(ConnectionCommand &c, + unsigned int max_packet_size); + + bool isActive() + { return ((hasSentWithID()) && (!m_pending_deletion)); }; + + bool getAddress(MTProtocols type, Address& toset); + + void setNonLegacyPeer(); + + bool getLegacyPeer() + { return m_legacy_peer; } + + u16 getNextSplitSequenceNumber(u8 channel); + void setNextSplitSequenceNumber(u8 channel, u16 seqnum); + + SharedBuffer addSpiltPacket(u8 channel, + BufferedPacket toadd, + bool reliable); + + +protected: /* Calculates avg_rtt and resend_timeout. - rtt=-1 only recalculates resend_timeout */ void reportRTT(float rtt); - Channel channels[CHANNEL_COUNT]; + void RunCommandQueues( + unsigned int max_packet_size, + unsigned int maxcommands, + unsigned int maxtransfer); - // Address of the peer - Address address; - // Unique id of the peer - u16 id; - // Seconds from last receive - float timeout_counter; - // Ping timer - float ping_timer; + float getResendTimeout() + { JMutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; } + + void setResendTimeout(float timeout) + { JMutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; } + bool Ping(float dtime,SharedBuffer& data); + + Channel channels[CHANNEL_COUNT]; + bool m_pending_disconnect; +private: // This is changed dynamically float resend_timeout; - // Updated when an ACK is received - float avg_rtt; - // This is set to true when the peer has actually sent something - // with the id we have given to it - bool has_sent_with_id; - - float m_sendtime_accu; - float m_max_packets_per_second; - int m_num_sent; - int m_max_num_sent; - -private: + + bool processReliableSendCommand( + ConnectionCommand &c, + unsigned int max_packet_size); + + bool m_legacy_peer; }; /* Connection */ -struct OutgoingPacket -{ - u16 peer_id; - u8 channelnum; - SharedBuffer data; - bool reliable; - - OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer data_, - bool reliable_): - peer_id(peer_id_), - channelnum(channelnum_), - data(data_), - reliable(reliable_) - { - } -}; - enum ConnectionEventType{ CONNEVENT_NONE, CONNEVENT_DATA_RECEIVED, CONNEVENT_PEER_ADDED, CONNEVENT_PEER_REMOVED, + CONNEVENT_BIND_FAILED, }; struct ConnectionEvent { enum ConnectionEventType type; u16 peer_id; - SharedBuffer data; + Buffer data; bool timeout; Address address; @@ -443,10 +853,12 @@ struct ConnectionEvent return "CONNEVENT_NONE"; case CONNEVENT_DATA_RECEIVED: return "CONNEVENT_DATA_RECEIVED"; - case CONNEVENT_PEER_ADDED: + case CONNEVENT_PEER_ADDED: return "CONNEVENT_PEER_ADDED"; - case CONNEVENT_PEER_REMOVED: + case CONNEVENT_PEER_REMOVED: return "CONNEVENT_PEER_REMOVED"; + case CONNEVENT_BIND_FAILED: + return "CONNEVENT_BIND_FAILED"; } return "Invalid ConnectionEvent"; } @@ -470,157 +882,181 @@ struct ConnectionEvent timeout = timeout_; address = address_; } + void bindFailed() + { + type = CONNEVENT_BIND_FAILED; + } }; -enum ConnectionCommandType{ - CONNCMD_NONE, - CONNCMD_SERVE, - CONNCMD_CONNECT, - CONNCMD_DISCONNECT, - CONNCMD_SEND, - CONNCMD_SEND_TO_ALL, - CONNCMD_DELETE_PEER, +class ConnectionSendThread : public JThread { + +public: + friend class UDPPeer; + + ConnectionSendThread(Connection* parent, + unsigned int max_packet_size, float timeout); + + void * Thread (); + + void Trigger(); + + void setPeerTimeout(float peer_timeout) + { m_timeout = peer_timeout; } + +private: + void runTimeouts (float dtime); + void rawSend (const BufferedPacket &packet); + bool rawSendAsPacket(u16 peer_id, u8 channelnum, + SharedBuffer data, bool reliable); + + void processReliableCommand (ConnectionCommand &c); + void processNonReliableCommand (ConnectionCommand &c); + void serve (Address bind_address); + void connect (Address address); + void disconnect (); + void disconnect_peer(u16 peer_id); + void send (u16 peer_id, u8 channelnum, + SharedBuffer data); + void sendReliable (ConnectionCommand &c); + void sendToAll (u8 channelnum, + SharedBuffer data); + void sendToAllReliable(ConnectionCommand &c); + + void sendPackets (float dtime); + + void sendAsPacket (u16 peer_id, u8 channelnum, + SharedBuffer data,bool ack=false); + + void sendAsPacketReliable(BufferedPacket& p, Channel* channel); + + bool packetsQueued(); + + Connection* m_connection; + unsigned int m_max_packet_size; + float m_timeout; + Queue m_outgoing_queue; + JSemaphore m_send_sleep_semaphore; + + unsigned int m_iteration_packets_avaialble; + unsigned int m_max_commands_per_iteration; + unsigned int m_max_data_packets_per_iteration; + unsigned int m_max_packets_requeued; }; -struct ConnectionCommand -{ - enum ConnectionCommandType type; - u16 port; - Address address; - u16 peer_id; - u8 channelnum; - SharedBuffer data; - bool reliable; - - ConnectionCommand(): type(CONNCMD_NONE) {} +class ConnectionReceiveThread : public JThread { +public: + ConnectionReceiveThread(Connection* parent, + unsigned int max_packet_size); - void serve(u16 port_) - { - type = CONNCMD_SERVE; - port = port_; - } - void connect(Address address_) - { - type = CONNCMD_CONNECT; - address = address_; - } - void disconnect() - { - type = CONNCMD_DISCONNECT; - } - void send(u16 peer_id_, u8 channelnum_, - SharedBuffer data_, bool reliable_) - { - type = CONNCMD_SEND; - peer_id = peer_id_; - channelnum = channelnum_; - data = data_; - reliable = reliable_; - } - void sendToAll(u8 channelnum_, SharedBuffer data_, bool reliable_) - { - type = CONNCMD_SEND_TO_ALL; - channelnum = channelnum_; - data = data_; - reliable = reliable_; - } - void deletePeer(u16 peer_id_) - { - type = CONNCMD_DELETE_PEER; - peer_id = peer_id_; - } + void * Thread (); + +private: + void receive (); + + // Returns next data from a buffer if possible + // If found, returns true; if not, false. + // If found, sets peer_id and dst + bool getFromBuffers (u16 &peer_id, SharedBuffer &dst); + + bool checkIncomingBuffers(Channel *channel, u16 &peer_id, + SharedBuffer &dst); + + /* + Processes a packet with the basic header stripped out. + Parameters: + packetdata: Data in packet (with no base headers) + peer_id: peer id of the sender of the packet in question + channelnum: channel on which the packet was sent + reliable: true if recursing into a reliable packet + */ + SharedBuffer processPacket(Channel *channel, + SharedBuffer packetdata, u16 peer_id, + u8 channelnum, bool reliable); + + + Connection* m_connection; + unsigned int m_max_packet_size; }; -class Connection: public SimpleThread +class Connection { public: - Connection(u32 protocol_id, u32 max_packet_size, float timeout); - Connection(u32 protocol_id, u32 max_packet_size, float timeout, + friend class ConnectionSendThread; + friend class ConnectionReceiveThread; + + Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6); + Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6, PeerHandler *peerhandler); ~Connection(); - void * Thread(); /* Interface */ - ConnectionEvent getEvent(); ConnectionEvent waitEvent(u32 timeout_ms); void putCommand(ConnectionCommand &c); void SetTimeoutMs(int timeout){ m_bc_receive_timeout = timeout; } - void Serve(unsigned short port); + void Serve(Address bind_addr); void Connect(Address address); bool Connected(); void Disconnect(); - u32 Receive(u16 &peer_id, u8 *data, u32 datasize); + u32 Receive(u16 &peer_id, SharedBuffer &data); void SendToAll(u8 channelnum, SharedBuffer data, bool reliable); void Send(u16 peer_id, u8 channelnum, SharedBuffer data, bool reliable); - void RunTimeouts(float dtime); // dummy u16 GetPeerID(){ return m_peer_id; } Address GetPeerAddress(u16 peer_id); float GetPeerAvgRTT(u16 peer_id); - void DeletePeer(u16 peer_id); - -private: - void putEvent(ConnectionEvent &e); - void processCommand(ConnectionCommand &c); - void send(float dtime); - void receive(); - void runTimeouts(float dtime); - void serve(u16 port); - void connect(Address address); - void disconnect(); - void sendToAll(u8 channelnum, SharedBuffer data, bool reliable); - void send(u16 peer_id, u8 channelnum, SharedBuffer data, bool reliable); - void sendAsPacket(u16 peer_id, u8 channelnum, - SharedBuffer data, bool reliable); - void rawSendAsPacket(u16 peer_id, u8 channelnum, - SharedBuffer data, bool reliable); - void rawSend(const BufferedPacket &packet); - Peer* getPeer(u16 peer_id); - Peer* getPeerNoEx(u16 peer_id); - core::list getPeers(); - bool getFromBuffers(u16 &peer_id, SharedBuffer &dst); - // Returns next data from a buffer if possible - // If found, returns true; if not, false. - // If found, sets peer_id and dst - bool checkIncomingBuffers(Channel *channel, u16 &peer_id, - SharedBuffer &dst); - /* - Processes a packet with the basic header stripped out. - Parameters: - packetdata: Data in packet (with no base headers) - peer_id: peer id of the sender of the packet in question - channelnum: channel on which the packet was sent - reliable: true if recursing into a reliable packet - */ - SharedBuffer processPacket(Channel *channel, - SharedBuffer packetdata, u16 peer_id, - u8 channelnum, bool reliable); + const u32 GetProtocolID() const { return m_protocol_id; }; + const std::string getDesc(); + void DisconnectPeer(u16 peer_id); + +protected: + PeerHelper getPeer(u16 peer_id); + PeerHelper getPeerNoEx(u16 peer_id); + u16 lookupPeer(Address& sender); + + u16 createPeer(Address& sender, MTProtocols protocol, int fd); + UDPPeer* createServerPeer(Address& sender); bool deletePeer(u16 peer_id, bool timeout); - - Queue m_outgoing_queue; - MutexedQueue m_event_queue; + + void SetPeerID(u16 id){ m_peer_id = id; } + + void sendAck(u16 peer_id, u8 channelnum, u16 seqnum); + + void PrintInfo(std::ostream &out); + void PrintInfo(); + + std::list getPeerIDs(); + + UDPSocket m_udpSocket; MutexedQueue m_command_queue; - - u32 m_protocol_id; - u32 m_max_packet_size; - float m_timeout; - UDPSocket m_socket; + + void putEvent(ConnectionEvent &e); + + void TriggerSend() + { m_sendThread.Trigger(); } +private: + std::list getPeers(); + + MutexedQueue m_event_queue; + u16 m_peer_id; + u32 m_protocol_id; - core::map m_peers; + std::map m_peers; JMutex m_peers_mutex; + ConnectionSendThread m_sendThread; + ConnectionReceiveThread m_receiveThread; + + JMutex m_info_mutex; + // Backwards compatibility PeerHandler *m_bc_peerhandler; int m_bc_receive_timeout; - - void SetPeerID(u16 id){ m_peer_id = id; } - u32 GetProtocolID(){ return m_protocol_id; } - void PrintInfo(std::ostream &out); - void PrintInfo(); - std::string getDesc(); - u16 m_indentation; + + bool m_shutting_down; + + u16 m_next_remote_peer_id; }; } // namespace