]> git.lizzy.rs Git - minetest.git/blobdiff - src/connection.h
Fix serializing of signed numbers in serializeStructToString
[minetest.git] / src / connection.h
index f99cd1bf94f1661c53f1bc85a32755a6d26118a8..9d646f4991740a7975d66ad569de17f09f7c4667 100644 (file)
@@ -1,6 +1,6 @@
 /*
-Minetest-c55
-Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
+Minetest
+Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
 
 This program is free software; you can redistribute it and/or modify
 it under the terms of the GNU Lesser General Public License as published by
@@ -27,8 +27,11 @@ with this program; if not, write to the Free Software Foundation, Inc.,
 #include "util/pointer.h"
 #include "util/container.h"
 #include "util/thread.h"
+#include "util/numeric.h"
 #include <iostream>
 #include <fstream>
+#include <list>
+#include <map>
 
 namespace con
 {
@@ -68,14 +71,6 @@ class ConnectionBindFailed : public BaseException
        {}
 };
 
-/*class ThrottlingException : public BaseException
-{
-public:
-       ThrottlingException(const char *s):
-               BaseException(s)
-       {}
-};*/
-
 class InvalidIncomingDataException : public BaseException
 {
 public:
@@ -108,26 +103,74 @@ class ProcessedSilentlyException : public BaseException
        {}
 };
 
+class ProcessedQueued : public BaseException
+{
+public:
+       ProcessedQueued(const char *s):
+               BaseException(s)
+       {}
+};
+
+class IncomingDataCorruption : public BaseException
+{
+public:
+       IncomingDataCorruption(const char *s):
+               BaseException(s)
+       {}
+};
+
+typedef enum MTProtocols {
+       PRIMARY,
+       UDP,
+       MINETEST_RELIABLE_UDP
+} MTProtocols;
+
 #define SEQNUM_MAX 65535
-inline bool seqnum_higher(u16 higher, u16 lower)
+inline bool seqnum_higher(u16 totest, u16 base)
 {
-       if(lower > higher && lower - higher > SEQNUM_MAX/2){
-               return true;
+       if (totest > base)
+       {
+               if((totest - base) > (SEQNUM_MAX/2))
+                       return false;
+               else
+                       return true;
+       }
+       else
+       {
+               if((base - totest) > (SEQNUM_MAX/2))
+                       return true;
+               else
+                       return false;
+       }
+}
+
+inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
+{
+       u16 window_start = next;
+       u16 window_end   = ( next + window_size ) % (SEQNUM_MAX+1);
+
+       if (window_start < window_end)
+       {
+               return ((seqnum >= window_start) && (seqnum < window_end));
+       }
+       else
+       {
+               return ((seqnum < window_end) || (seqnum >= window_start));
        }
-       return (higher > lower);
 }
 
 struct BufferedPacket
 {
        BufferedPacket(u8 *a_data, u32 a_size):
-               data(a_data, a_size), time(0.0), totaltime(0.0)
+               data(a_data, a_size), time(0.0), totaltime(0.0), absolute_send_time(-1)
        {}
        BufferedPacket(u32 a_size):
-               data(a_size), time(0.0), totaltime(0.0)
+               data(a_size), time(0.0), totaltime(0.0), absolute_send_time(-1)
        {}
        SharedBuffer<u8> data; // Data of the packet, including headers
        float time; // Seconds from buffering the packet or re-sending
        float totaltime; // Seconds from buffering the packet
+       unsigned int absolute_send_time;
        Address address; // Sender or destination
 };
 
@@ -142,14 +185,14 @@ SharedBuffer<u8> makeOriginalPacket(
                SharedBuffer<u8> data);
 
 // Split data in chunks and add TYPE_SPLIT headers to them
-core::list<SharedBuffer<u8> > makeSplitPacket(
+std::list<SharedBuffer<u8> > makeSplitPacket(
                SharedBuffer<u8> data,
                u32 chunksize_max,
                u16 seqnum);
 
 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
 // Increments split_seqnum if a split packet is made
-core::list<SharedBuffer<u8> > makeAutoSplitPacket(
+std::list<SharedBuffer<u8> > makeAutoSplitPacket(
                SharedBuffer<u8> data,
                u32 chunksize_max,
                u16 &split_seqnum);
@@ -167,7 +210,7 @@ struct IncomingSplitPacket
                reliable = false;
        }
        // Key is chunk number, value is data without headers
-       core::map<u16, SharedBuffer<u8> > chunks;
+       std::map<u16, SharedBuffer<u8> > chunks;
        u32 chunk_count;
        float time; // Seconds from adding
        bool reliable; // If true, isn't deleted on timeout
@@ -189,15 +232,14 @@ TODO: Should we have a receiver_peer_id also?
        [6] u8 channel
 sender_peer_id:
        Unique to each peer.
-       value 0 is reserved for making new connections
-       value 1 is reserved for server
+       value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
+       value 1 (PEER_ID_SERVER) is reserved for server
+       these constants are defined in constants.h
 channel:
        The lower the number, the higher the priority is.
        Only channels 0, 1 and 2 exist.
 */
 #define BASE_HEADER_SIZE 7
-#define PEER_ID_INEXISTENT 0
-#define PEER_ID_SERVER 1
 #define CHANNEL_COUNT 3
 /*
 Packet types:
@@ -222,6 +264,8 @@ controltype and data description:
 #define CONTROLTYPE_SET_PEER_ID 1
 #define CONTROLTYPE_PING 2
 #define CONTROLTYPE_DISCO 3
+#define CONTROLTYPE_ENABLE_BIG_SEND_WINDOW 4
+
 /*
 ORIGINAL: This is a plain packet with no control and no error
 checking at all.
@@ -260,7 +304,6 @@ with a buffer in the receiving and transmitting end.
 */
 #define TYPE_RELIABLE 3
 #define RELIABLE_HEADER_SIZE 3
-//#define SEQNUM_INITIAL 0x10
 #define SEQNUM_INITIAL 65500
 
 /*
@@ -268,28 +311,41 @@ with a buffer in the receiving and transmitting end.
        for fast access to the smallest one.
 */
 
-typedef core::list<BufferedPacket>::Iterator RPBSearchResult;
+typedef std::list<BufferedPacket>::iterator RPBSearchResult;
 
 class ReliablePacketBuffer
 {
 public:
-       
-       void print();
-       bool empty();
-       u32 size();
-       RPBSearchResult findPacket(u16 seqnum);
-       RPBSearchResult notFound();
-       u16 getFirstSeqnum();
+       ReliablePacketBuffer();
+
+       bool getFirstSeqnum(u16& result);
+
        BufferedPacket popFirst();
        BufferedPacket popSeqnum(u16 seqnum);
-       void insert(BufferedPacket &p);
+       void insert(BufferedPacket &p,u16 next_expected);
+
        void incrementTimeouts(float dtime);
-       void resetTimedOuts(float timeout);
-       bool anyTotaltimeReached(float timeout);
-       core::list<BufferedPacket> getTimedOuts(float timeout);
+       std::list<BufferedPacket> getTimedOuts(float timeout,
+                       unsigned int max_packets);
+
+       void print();
+       bool empty();
+       bool containsPacket(u16 seqnum);
+       RPBSearchResult notFound();
+       u32 size();
+
 
 private:
-       core::list<BufferedPacket> m_list;
+       RPBSearchResult findPacket(u16 seqnum);
+
+       std::list<BufferedPacket> m_list;
+       u16 m_list_size;
+
+       u16 m_oldest_non_answered_ack;
+
+       JMutex m_list_mutex;
+
+       unsigned int writeptr;
 };
 
 /*
@@ -310,42 +366,228 @@ class IncomingSplitBuffer
        
 private:
        // Key is seqnum
-       core::map<u16, IncomingSplitPacket*> m_buf;
+       std::map<u16, IncomingSplitPacket*> m_buf;
+
+       JMutex m_map_mutex;
 };
 
-class Connection;
+struct OutgoingPacket
+{
+       u16 peer_id;
+       u8 channelnum;
+       SharedBuffer<u8> data;
+       bool reliable;
+       bool ack;
+
+       OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
+                       bool reliable_,bool ack_=false):
+               peer_id(peer_id_),
+               channelnum(channelnum_),
+               data(data_),
+               reliable(reliable_),
+               ack(ack_)
+       {
+       }
+};
+
+enum ConnectionCommandType{
+       CONNCMD_NONE,
+       CONNCMD_SERVE,
+       CONNCMD_CONNECT,
+       CONNCMD_DISCONNECT,
+       CONNCMD_DISCONNECT_PEER,
+       CONNCMD_SEND,
+       CONNCMD_SEND_TO_ALL,
+       CONCMD_ACK,
+       CONCMD_CREATE_PEER,
+       CONCMD_DISABLE_LEGACY
+};
 
-struct Channel
+struct ConnectionCommand
 {
-       Channel();
-       ~Channel();
+       enum ConnectionCommandType type;
+       Address address;
+       u16 peer_id;
+       u8 channelnum;
+       Buffer<u8> data;
+       bool reliable;
+       bool raw;
 
-       u16 next_outgoing_seqnum;
-       u16 next_incoming_seqnum;
-       u16 next_outgoing_split_seqnum;
+       ConnectionCommand(): type(CONNCMD_NONE), peer_id(PEER_ID_INEXISTENT), reliable(false), raw(false) {}
+
+       void serve(Address address_)
+       {
+               type = CONNCMD_SERVE;
+               address = address_;
+       }
+       void connect(Address address_)
+       {
+               type = CONNCMD_CONNECT;
+               address = address_;
+       }
+       void disconnect()
+       {
+               type = CONNCMD_DISCONNECT;
+       }
+       void disconnect_peer(u16 peer_id_)
+       {
+               type = CONNCMD_DISCONNECT_PEER;
+               peer_id = peer_id_;
+       }
+       void send(u16 peer_id_, u8 channelnum_,
+                       SharedBuffer<u8> data_, bool reliable_)
+       {
+               type = CONNCMD_SEND;
+               peer_id = peer_id_;
+               channelnum = channelnum_;
+               data = data_;
+               reliable = reliable_;
+       }
+       void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
+       {
+               type = CONNCMD_SEND_TO_ALL;
+               channelnum = channelnum_;
+               data = data_;
+               reliable = reliable_;
+       }
+
+       void ack(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_)
+       {
+               type = CONCMD_ACK;
+               peer_id = peer_id_;
+               channelnum = channelnum_;
+               data = data_;
+               reliable = false;
+       }
+
+       void createPeer(u16 peer_id_, SharedBuffer<u8> data_)
+       {
+               type = CONCMD_CREATE_PEER;
+               peer_id = peer_id_;
+               data = data_;
+               channelnum = 0;
+               reliable = true;
+               raw = true;
+       }
+
+       void disableLegacy(u16 peer_id_, SharedBuffer<u8> data_)
+       {
+               type = CONCMD_DISABLE_LEGACY;
+               peer_id = peer_id_;
+               data = data_;
+               channelnum = 0;
+               reliable = true;
+               raw = true;
+       }
+};
+
+class Channel
+{
+
+public:
+       u16 readNextIncomingSeqNum();
+       u16 incNextIncomingSeqNum();
+
+       u16 getOutgoingSequenceNumber(bool& successfull);
+       u16 readOutgoingSequenceNumber();
+       bool putBackSequenceNumber(u16);
+
+       u16 readNextSplitSeqNum();
+       void setNextSplitSeqNum(u16 seqnum);
        
        // This is for buffering the incoming packets that are coming in
        // the wrong order
        ReliablePacketBuffer incoming_reliables;
        // This is for buffering the sent packets so that the sender can
        // re-send them if no ACK is received
-       ReliablePacketBuffer outgoing_reliables;
+       ReliablePacketBuffer outgoing_reliables_sent;
+
+       //queued reliable packets
+       Queue<BufferedPacket> queued_reliables;
+
+       //queue commands prior splitting to packets
+       Queue<ConnectionCommand> queued_commands;
 
        IncomingSplitBuffer incoming_splits;
+
+       Channel();
+       ~Channel();
+
+       void UpdatePacketLossCounter(unsigned int count);
+       void UpdatePacketTooLateCounter();
+       void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
+       void UpdateBytesLost(unsigned int bytes);
+
+       void UpdateTimers(float dtime);
+
+       const float getCurrentDownloadRateKB()
+               { JMutexAutoLock lock(m_internal_mutex); return cur_kbps; };
+       const float getMaxDownloadRateKB()
+               { JMutexAutoLock lock(m_internal_mutex); return max_kbps; };
+
+       const float getCurrentLossRateKB()
+               { JMutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
+       const float getMaxLossRateKB()
+               { JMutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
+
+       const float getAvgDownloadRateKB()
+               { JMutexAutoLock lock(m_internal_mutex); return avg_kbps; };
+       const float getAvgLossRateKB()
+               { JMutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
+
+       const unsigned int getWindowSize() const { return window_size; };
+
+       void setWindowSize(unsigned int size) { window_size = size; };
+private:
+       JMutex m_internal_mutex;
+       unsigned int window_size;
+
+       u16 next_incoming_seqnum;
+
+       u16 next_outgoing_seqnum;
+       u16 next_outgoing_split_seqnum;
+
+       unsigned int current_packet_loss;
+       unsigned int current_packet_too_late;
+       unsigned int current_packet_successfull;
+       float packet_loss_counter;
+
+       unsigned int current_bytes_transfered;
+       unsigned int current_bytes_lost;
+       float max_kbps;
+       float cur_kbps;
+       float avg_kbps;
+       float max_kbps_lost;
+       float cur_kbps_lost;
+       float avg_kbps_lost;
+       float bpm_counter;
 };
 
 class Peer;
 
+enum PeerChangeType
+{
+       PEER_ADDED,
+       PEER_REMOVED
+};
+struct PeerChange
+{
+       PeerChangeType type;
+       u16 peer_id;
+       bool timeout;
+};
+
 class PeerHandler
 {
 public:
+
        PeerHandler()
        {
        }
        virtual ~PeerHandler()
        {
        }
-       
+
        /*
                This is called after the Peer has been inserted into the
                Connection's peer container.
@@ -358,71 +600,234 @@ class PeerHandler
        virtual void deletingPeer(Peer *peer, bool timeout) = 0;
 };
 
-class Peer
+class PeerHelper
 {
 public:
+       PeerHelper();
+       PeerHelper(Peer* peer);
+       ~PeerHelper();
 
-       Peer(u16 a_id, Address a_address);
-       virtual ~Peer();
-       
+       PeerHelper&   operator=(Peer* peer);
+       Peer*         operator->() const;
+       bool          operator!();
+       Peer*         operator&() const;
+       bool          operator!=(void* ptr);
+
+private:
+       Peer* m_peer;
+};
+
+class Connection;
+
+typedef enum rtt_stat_type {
+       MIN_RTT,
+       MAX_RTT,
+       AVG_RTT,
+       MIN_JITTER,
+       MAX_JITTER,
+       AVG_JITTER
+} rtt_stat_type;
+
+class Peer {
+       public:
+               friend class PeerHelper;
+
+               Peer(Address address_,u16 id_,Connection* connection) :
+                       id(id_),
+                       m_increment_packets_remaining(9),
+                       m_increment_bytes_remaining(0),
+                       m_pending_deletion(false),
+                       m_connection(connection),
+                       address(address_),
+                       m_ping_timer(0.0),
+                       m_last_rtt(-1.0),
+                       m_usage(0),
+                       m_timeout_counter(0.0),
+                       m_last_timeout_check(porting::getTimeMs()),
+                       m_has_sent_with_id(false)
+               {
+                       m_rtt.avg_rtt = -1.0;
+                       m_rtt.jitter_avg = -1.0;
+                       m_rtt.jitter_max = 0.0;
+                       m_rtt.max_rtt = 0.0;
+                       m_rtt.jitter_min = FLT_MAX;
+                       m_rtt.min_rtt = FLT_MAX;
+               };
+
+               virtual ~Peer() {
+                       JMutexAutoLock usage_lock(m_exclusive_access_mutex);
+                       assert(m_usage == 0);
+               };
+
+               // Unique id of the peer
+               u16 id;
+
+               void Drop();
+
+               virtual void PutReliableSendCommand(ConnectionCommand &c,
+                                               unsigned int max_packet_size) {};
+
+               virtual bool isActive() { return false; };
+
+               virtual bool getAddress(MTProtocols type, Address& toset) = 0;
+
+               void ResetTimeout()
+                       {JMutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter=0.0; };
+
+               bool isTimedOut(float timeout);
+
+               void setSentWithID()
+               { JMutexAutoLock lock(m_exclusive_access_mutex); m_has_sent_with_id = true; };
+
+               bool hasSentWithID()
+               { JMutexAutoLock lock(m_exclusive_access_mutex); return m_has_sent_with_id; };
+
+               unsigned int m_increment_packets_remaining;
+               unsigned int m_increment_bytes_remaining;
+
+               virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
+               virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
+               virtual SharedBuffer<u8> addSpiltPacket(u8 channel,
+                                                                                               BufferedPacket toadd,
+                                                                                               bool reliable)
+                               {
+                                       fprintf(stderr,"Peer: addSplitPacket called, this is supposed to be never called!\n");
+                                       return SharedBuffer<u8>(0);
+                               };
+
+               virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
+
+               virtual float getStat(rtt_stat_type type) const {
+                       switch (type) {
+                               case MIN_RTT:
+                                       return m_rtt.min_rtt;
+                               case MAX_RTT:
+                                       return m_rtt.max_rtt;
+                               case AVG_RTT:
+                                       return m_rtt.avg_rtt;
+                               case MIN_JITTER:
+                                       return m_rtt.jitter_min;
+                               case MAX_JITTER:
+                                       return m_rtt.jitter_max;
+                               case AVG_JITTER:
+                                       return m_rtt.jitter_avg;
+                       }
+                       return -1;
+               }
+       protected:
+               virtual void reportRTT(float rtt) {};
+
+               void RTTStatistics(float rtt,
+                                                       std::string profiler_id="",
+                                                       unsigned int num_samples=1000);
+
+               bool IncUseCount();
+               void DecUseCount();
+
+               JMutex m_exclusive_access_mutex;
+
+               bool m_pending_deletion;
+
+               Connection* m_connection;
+
+               // Address of the peer
+               Address address;
+
+               // Ping timer
+               float m_ping_timer;
+       private:
+
+               struct rttstats {
+                       float jitter_min;
+                       float jitter_max;
+                       float jitter_avg;
+                       float min_rtt;
+                       float max_rtt;
+                       float avg_rtt;
+               };
+
+               rttstats m_rtt;
+               float    m_last_rtt;
+
+               // current usage count
+               unsigned int m_usage;
+
+               // Seconds from last receive
+               float m_timeout_counter;
+
+               u32 m_last_timeout_check;
+
+               bool m_has_sent_with_id;
+};
+
+class UDPPeer : public Peer
+{
+public:
+
+       friend class PeerHelper;
+       friend class ConnectionReceiveThread;
+       friend class ConnectionSendThread;
+
+       UDPPeer(u16 a_id, Address a_address, Connection* connection);
+       virtual ~UDPPeer() {};
+
+       void PutReliableSendCommand(ConnectionCommand &c,
+                                                       unsigned int max_packet_size);
+
+       bool isActive()
+       { return ((hasSentWithID()) && (!m_pending_deletion)); };
+
+       bool getAddress(MTProtocols type, Address& toset);
+
+       void setNonLegacyPeer();
+
+       bool getLegacyPeer()
+       { return m_legacy_peer; }
+
+       u16 getNextSplitSequenceNumber(u8 channel);
+       void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
+
+       SharedBuffer<u8> addSpiltPacket(u8 channel,
+                                                                       BufferedPacket toadd,
+                                                                       bool reliable);
+
+
+protected:
        /*
                Calculates avg_rtt and resend_timeout.
-
                rtt=-1 only recalculates resend_timeout
        */
        void reportRTT(float rtt);
 
-       Channel channels[CHANNEL_COUNT];
+       void RunCommandQueues(
+                                       unsigned int max_packet_size,
+                                       unsigned int maxcommands,
+                                       unsigned int maxtransfer);
 
-       // Address of the peer
-       Address address;
-       // Unique id of the peer
-       u16 id;
-       // Seconds from last receive
-       float timeout_counter;
-       // Ping timer
-       float ping_timer;
+       float getResendTimeout()
+               { JMutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
+
+       void setResendTimeout(float timeout)
+               { JMutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
+       bool Ping(float dtime,SharedBuffer<u8>& data);
+
+       Channel channels[CHANNEL_COUNT];
+       bool m_pending_disconnect;
+private:
        // This is changed dynamically
        float resend_timeout;
-       // Updated when an ACK is received
-       float avg_rtt;
-       // This is set to true when the peer has actually sent something
-       // with the id we have given to it
-       bool has_sent_with_id;
-       
-       float m_sendtime_accu;
-       float m_max_packets_per_second;
-       int m_num_sent;
-       int m_max_num_sent;
-
-       // Updated from configuration by Connection
-       float congestion_control_aim_rtt;
-       float congestion_control_max_rate;
-       float congestion_control_min_rate;
-private:
+
+       bool processReliableSendCommand(
+                                       ConnectionCommand &c,
+                                       unsigned int max_packet_size);
+
+       bool m_legacy_peer;
 };
 
 /*
        Connection
 */
 
-struct OutgoingPacket
-{
-       u16 peer_id;
-       u8 channelnum;
-       SharedBuffer<u8> data;
-       bool reliable;
-
-       OutgoingPacket(u16 peer_id_, u8 channelnum_, SharedBuffer<u8> data_,
-                       bool reliable_):
-               peer_id(peer_id_),
-               channelnum(channelnum_),
-               data(data_),
-               reliable(reliable_)
-       {
-       }
-};
-
 enum ConnectionEventType{
        CONNEVENT_NONE,
        CONNEVENT_DATA_RECEIVED,
@@ -448,11 +853,11 @@ struct ConnectionEvent
                        return "CONNEVENT_NONE";
                case CONNEVENT_DATA_RECEIVED:
                        return "CONNEVENT_DATA_RECEIVED";
-               case CONNEVENT_PEER_ADDED: 
+               case CONNEVENT_PEER_ADDED:
                        return "CONNEVENT_PEER_ADDED";
-               case CONNEVENT_PEER_REMOVED: 
+               case CONNEVENT_PEER_REMOVED:
                        return "CONNEVENT_PEER_REMOVED";
-               case CONNEVENT_BIND_FAILED: 
+               case CONNEVENT_BIND_FAILED:
                        return "CONNEVENT_BIND_FAILED";
                }
                return "Invalid ConnectionEvent";
@@ -483,155 +888,175 @@ struct ConnectionEvent
        }
 };
 
-enum ConnectionCommandType{
-       CONNCMD_NONE,
-       CONNCMD_SERVE,
-       CONNCMD_CONNECT,
-       CONNCMD_DISCONNECT,
-       CONNCMD_SEND,
-       CONNCMD_SEND_TO_ALL,
-       CONNCMD_DELETE_PEER,
+class ConnectionSendThread : public JThread {
+
+public:
+       friend class UDPPeer;
+
+       ConnectionSendThread(Connection* parent,
+                                                       unsigned int max_packet_size, float timeout);
+
+       void * Thread       ();
+
+       void Trigger();
+
+       void setPeerTimeout(float peer_timeout)
+               { m_timeout = peer_timeout; }
+
+private:
+       void runTimeouts    (float dtime);
+       void rawSend        (const BufferedPacket &packet);
+       bool rawSendAsPacket(u16 peer_id, u8 channelnum,
+                                                       SharedBuffer<u8> data, bool reliable);
+
+       void processReliableCommand (ConnectionCommand &c);
+       void processNonReliableCommand (ConnectionCommand &c);
+       void serve          (Address bind_address);
+       void connect        (Address address);
+       void disconnect     ();
+       void disconnect_peer(u16 peer_id);
+       void send           (u16 peer_id, u8 channelnum,
+                                                       SharedBuffer<u8> data);
+       void sendReliable   (ConnectionCommand &c);
+       void sendToAll      (u8 channelnum,
+                                                       SharedBuffer<u8> data);
+       void sendToAllReliable(ConnectionCommand &c);
+
+       void sendPackets    (float dtime);
+
+       void sendAsPacket   (u16 peer_id, u8 channelnum,
+                                                       SharedBuffer<u8> data,bool ack=false);
+
+       void sendAsPacketReliable(BufferedPacket& p, Channel* channel);
+
+       bool packetsQueued();
+
+       Connection*           m_connection;
+       unsigned int          m_max_packet_size;
+       float                 m_timeout;
+       Queue<OutgoingPacket> m_outgoing_queue;
+       JSemaphore            m_send_sleep_semaphore;
+
+       unsigned int          m_iteration_packets_avaialble;
+       unsigned int          m_max_commands_per_iteration;
+       unsigned int          m_max_data_packets_per_iteration;
+       unsigned int          m_max_packets_requeued;
 };
 
-struct ConnectionCommand
-{
-       enum ConnectionCommandType type;
-       u16 port;
-       Address address;
-       u16 peer_id;
-       u8 channelnum;
-       Buffer<u8> data;
-       bool reliable;
-       
-       ConnectionCommand(): type(CONNCMD_NONE) {}
+class ConnectionReceiveThread : public JThread {
+public:
+       ConnectionReceiveThread(Connection* parent,
+                                                       unsigned int max_packet_size);
 
-       void serve(u16 port_)
-       {
-               type = CONNCMD_SERVE;
-               port = port_;
-       }
-       void connect(Address address_)
-       {
-               type = CONNCMD_CONNECT;
-               address = address_;
-       }
-       void disconnect()
-       {
-               type = CONNCMD_DISCONNECT;
-       }
-       void send(u16 peer_id_, u8 channelnum_,
-                       SharedBuffer<u8> data_, bool reliable_)
-       {
-               type = CONNCMD_SEND;
-               peer_id = peer_id_;
-               channelnum = channelnum_;
-               data = data_;
-               reliable = reliable_;
-       }
-       void sendToAll(u8 channelnum_, SharedBuffer<u8> data_, bool reliable_)
-       {
-               type = CONNCMD_SEND_TO_ALL;
-               channelnum = channelnum_;
-               data = data_;
-               reliable = reliable_;
-       }
-       void deletePeer(u16 peer_id_)
-       {
-               type = CONNCMD_DELETE_PEER;
-               peer_id = peer_id_;
-       }
+       void * Thread       ();
+
+private:
+       void receive        ();
+
+       // Returns next data from a buffer if possible
+       // If found, returns true; if not, false.
+       // If found, sets peer_id and dst
+       bool getFromBuffers (u16 &peer_id, SharedBuffer<u8> &dst);
+
+       bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
+                                                       SharedBuffer<u8> &dst);
+
+       /*
+               Processes a packet with the basic header stripped out.
+               Parameters:
+                       packetdata: Data in packet (with no base headers)
+                       peer_id: peer id of the sender of the packet in question
+                       channelnum: channel on which the packet was sent
+                       reliable: true if recursing into a reliable packet
+       */
+       SharedBuffer<u8> processPacket(Channel *channel,
+                                                       SharedBuffer<u8> packetdata, u16 peer_id,
+                                                       u8 channelnum, bool reliable);
+
+
+       Connection*           m_connection;
+       unsigned int          m_max_packet_size;
 };
 
-class Connection: public SimpleThread
+class Connection
 {
 public:
-       Connection(u32 protocol_id, u32 max_packet_size, float timeout);
-       Connection(u32 protocol_id, u32 max_packet_size, float timeout,
+       friend class ConnectionSendThread;
+       friend class ConnectionReceiveThread;
+
+       Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6);
+       Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
                        PeerHandler *peerhandler);
        ~Connection();
-       void * Thread();
 
        /* Interface */
-
        ConnectionEvent getEvent();
        ConnectionEvent waitEvent(u32 timeout_ms);
        void putCommand(ConnectionCommand &c);
        
        void SetTimeoutMs(int timeout){ m_bc_receive_timeout = timeout; }
-       void Serve(unsigned short port);
+       void Serve(Address bind_addr);
        void Connect(Address address);
        bool Connected();
        void Disconnect();
        u32 Receive(u16 &peer_id, SharedBuffer<u8> &data);
        void SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
        void Send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
-       void RunTimeouts(float dtime); // dummy
        u16 GetPeerID(){ return m_peer_id; }
        Address GetPeerAddress(u16 peer_id);
        float GetPeerAvgRTT(u16 peer_id);
-       void DeletePeer(u16 peer_id);
-       
-private:
-       void putEvent(ConnectionEvent &e);
-       void processCommand(ConnectionCommand &c);
-       void send(float dtime);
-       void receive();
-       void runTimeouts(float dtime);
-       void serve(u16 port);
-       void connect(Address address);
-       void disconnect();
-       void sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable);
-       void send(u16 peer_id, u8 channelnum, SharedBuffer<u8> data, bool reliable);
-       void sendAsPacket(u16 peer_id, u8 channelnum,
-                       SharedBuffer<u8> data, bool reliable);
-       void rawSendAsPacket(u16 peer_id, u8 channelnum,
-                       SharedBuffer<u8> data, bool reliable);
-       void rawSend(const BufferedPacket &packet);
-       Peer* getPeer(u16 peer_id);
-       Peer* getPeerNoEx(u16 peer_id);
-       core::list<Peer*> getPeers();
-       bool getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst);
-       // Returns next data from a buffer if possible
-       // If found, returns true; if not, false.
-       // If found, sets peer_id and dst
-       bool checkIncomingBuffers(Channel *channel, u16 &peer_id,
-                       SharedBuffer<u8> &dst);
-       /*
-               Processes a packet with the basic header stripped out.
-               Parameters:
-                       packetdata: Data in packet (with no base headers)
-                       peer_id: peer id of the sender of the packet in question
-                       channelnum: channel on which the packet was sent
-                       reliable: true if recursing into a reliable packet
-       */
-       SharedBuffer<u8> processPacket(Channel *channel,
-                       SharedBuffer<u8> packetdata, u16 peer_id,
-                       u8 channelnum, bool reliable);
+       const u32 GetProtocolID() const { return m_protocol_id; };
+       const std::string getDesc();
+       void DisconnectPeer(u16 peer_id);
+
+protected:
+       PeerHelper getPeer(u16 peer_id);
+       PeerHelper getPeerNoEx(u16 peer_id);
+       u16   lookupPeer(Address& sender);
+
+       u16 createPeer(Address& sender, MTProtocols protocol, int fd);
+       UDPPeer*  createServerPeer(Address& sender);
        bool deletePeer(u16 peer_id, bool timeout);
-       
-       Queue<OutgoingPacket> m_outgoing_queue;
-       MutexedQueue<ConnectionEvent> m_event_queue;
+
+       void SetPeerID(u16 id){ m_peer_id = id; }
+
+       void sendAck(u16 peer_id, u8 channelnum, u16 seqnum);
+
+       void PrintInfo(std::ostream &out);
+       void PrintInfo();
+
+       std::list<u16> getPeerIDs();
+
+       UDPSocket m_udpSocket;
        MutexedQueue<ConnectionCommand> m_command_queue;
-       
-       u32 m_protocol_id;
-       u32 m_max_packet_size;
-       float m_timeout;
-       UDPSocket m_socket;
+
+       void putEvent(ConnectionEvent &e);
+
+       void TriggerSend()
+               { m_sendThread.Trigger(); }
+private:
+       std::list<Peer*> getPeers();
+
+       MutexedQueue<ConnectionEvent> m_event_queue;
+
        u16 m_peer_id;
+       u32 m_protocol_id;
        
-       core::map<u16, Peer*> m_peers;
+       std::map<u16, Peer*> m_peers;
        JMutex m_peers_mutex;
 
+       ConnectionSendThread m_sendThread;
+       ConnectionReceiveThread m_receiveThread;
+
+       JMutex m_info_mutex;
+
        // Backwards compatibility
        PeerHandler *m_bc_peerhandler;
        int m_bc_receive_timeout;
-       
-       void SetPeerID(u16 id){ m_peer_id = id; }
-       u32 GetProtocolID(){ return m_protocol_id; }
-       void PrintInfo(std::ostream &out);
-       void PrintInfo();
-       std::string getDesc();
-       u16 m_indentation;
+
+       bool m_shutting_down;
+
+       u16 m_next_remote_peer_id;
 };
 
 } // namespace