]> git.lizzy.rs Git - dragonfireclient.git/blobdiff - src/network/connection.cpp
Merge pull request #59 from PrairieAstronomer/readme_irrlicht_change
[dragonfireclient.git] / src / network / connection.cpp
index a4970954ffd1272313b7421d1e779452582db95e..6fb676f25c4fe30e8e23195dbc91d1fffa4ea84e 100644 (file)
@@ -41,39 +41,37 @@ namespace con
 /* defines used for debugging and profiling                                   */
 /******************************************************************************/
 #ifdef NDEBUG
-       #define LOG(a) a
        #define PROFILE(a)
 #else
-       #if 0
-       /* this mutex is used to achieve log message consistency */
-       std::mutex log_message_mutex;
-       #define LOG(a)                                                                 \
-               {                                                                          \
-               MutexAutoLock loglock(log_message_mutex);                                 \
-               a;                                                                         \
-               }
-       #else
-       // Prevent deadlocks until a solution is found after 5.2.0 (TODO)
-       #define LOG(a) a
-       #endif
-
        #define PROFILE(a) a
 #endif
 
+// TODO: Clean this up.
+#define LOG(a) a
+
 #define PING_TIMEOUT 5.0
 
-BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
+u16 BufferedPacket::getSeqnum() const
+{
+       if (size() < BASE_HEADER_SIZE + 3)
+               return 0; // should never happen
+
+       return readU16(&data[BASE_HEADER_SIZE + 1]);
+}
+
+BufferedPacketPtr makePacket(Address &address, const SharedBuffer<u8> &data,
                u32 protocol_id, session_t sender_peer_id, u8 channel)
 {
        u32 packet_size = data.getSize() + BASE_HEADER_SIZE;
-       BufferedPacket p(packet_size);
-       p.address = address;
 
-       writeU32(&p.data[0], protocol_id);
-       writeU16(&p.data[4], sender_peer_id);
-       writeU8(&p.data[6], channel);
+       BufferedPacketPtr p(new BufferedPacket(packet_size));
+       p->address = address;
+
+       writeU32(&p->data[0], protocol_id);
+       writeU16(&p->data[4], sender_peer_id);
+       writeU8(&p->data[6], channel);
 
-       memcpy(&p.data[BASE_HEADER_SIZE], *data, data.getSize());
+       memcpy(&p->data[BASE_HEADER_SIZE], *data, data.getSize());
 
        return p;
 }
@@ -169,9 +167,8 @@ void ReliablePacketBuffer::print()
        MutexAutoLock listlock(m_list_mutex);
        LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl);
        unsigned int index = 0;
-       for (BufferedPacket &bufferedPacket : m_list) {
-               u16 s = readU16(&(bufferedPacket.data[BASE_HEADER_SIZE+1]));
-               LOG(dout_con<<index<< ":" << s << std::endl);
+       for (BufferedPacketPtr &packet : m_list) {
+               LOG(dout_con<<index<< ":" << packet->getSeqnum() << std::endl);
                index++;
        }
 }
@@ -188,16 +185,13 @@ u32 ReliablePacketBuffer::size()
        return m_list.size();
 }
 
-RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
+RPBSearchResult ReliablePacketBuffer::findPacketNoLock(u16 seqnum)
 {
-       std::list<BufferedPacket>::iterator i = m_list.begin();
-       for(; i != m_list.end(); ++i)
-       {
-               u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
-               if (s == seqnum)
-                       break;
+       for (auto it = m_list.begin(); it != m_list.end(); ++it) {
+               if ((*it)->getSeqnum() == seqnum)
+                       return it;
        }
-       return i;
+       return m_list.end();
 }
 
 bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
@@ -205,54 +199,54 @@ bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
        MutexAutoLock listlock(m_list_mutex);
        if (m_list.empty())
                return false;
-       const BufferedPacket &p = m_list.front();
-       result = readU16(&p.data[BASE_HEADER_SIZE + 1]);
+       result = m_list.front()->getSeqnum();
        return true;
 }
 
-BufferedPacket ReliablePacketBuffer::popFirst()
+BufferedPacketPtr ReliablePacketBuffer::popFirst()
 {
        MutexAutoLock listlock(m_list_mutex);
        if (m_list.empty())
                throw NotFoundException("Buffer is empty");
-       BufferedPacket p = std::move(m_list.front());
+
+       BufferedPacketPtr p(m_list.front());
        m_list.pop_front();
 
        if (m_list.empty()) {
                m_oldest_non_answered_ack = 0;
        } else {
-               m_oldest_non_answered_ack =
-                               readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]);
+               m_oldest_non_answered_ack = m_list.front()->getSeqnum();
        }
        return p;
 }
 
-BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
+BufferedPacketPtr ReliablePacketBuffer::popSeqnum(u16 seqnum)
 {
        MutexAutoLock listlock(m_list_mutex);
-       RPBSearchResult r = findPacket(seqnum);
-       if (r == notFound()) {
+       RPBSearchResult r = findPacketNoLock(seqnum);
+       if (r == m_list.end()) {
                LOG(dout_con<<"Sequence number: " << seqnum
                                << " not found in reliable buffer"<<std::endl);
                throw NotFoundException("seqnum not found in buffer");
        }
-       BufferedPacket p = std::move(*r);
 
+       BufferedPacketPtr p(*r);
        m_list.erase(r);
 
        if (m_list.empty()) {
                m_oldest_non_answered_ack = 0;
        } else {
-               m_oldest_non_answered_ack =
-                               readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]);
+               m_oldest_non_answered_ack = m_list.front()->getSeqnum();
        }
        return p;
 }
 
-void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
+void ReliablePacketBuffer::insert(BufferedPacketPtr &p_ptr, u16 next_expected)
 {
        MutexAutoLock listlock(m_list_mutex);
-       if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
+       const BufferedPacket &p = *p_ptr;
+
+       if (p.size() < BASE_HEADER_SIZE + 3) {
                errorstream << "ReliablePacketBuffer::insert(): Invalid data size for "
                        "reliable packet" << std::endl;
                return;
@@ -263,7 +257,7 @@ void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
                        << std::endl;
                return;
        }
-       u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
+       const u16 seqnum = p.getSeqnum();
 
        if (!seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE)) {
                errorstream << "ReliablePacketBuffer::insert(): seqnum is outside of "
@@ -280,44 +274,44 @@ void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
 
        // Find the right place for the packet and insert it there
        // If list is empty, just add it
-       if (m_list.empty())
-       {
-               m_list.push_back(p);
+       if (m_list.empty()) {
+               m_list.push_back(p_ptr);
                m_oldest_non_answered_ack = seqnum;
                // Done.
                return;
        }
 
        // Otherwise find the right place
-       std::list<BufferedPacket>::iterator i = m_list.begin();
+       auto it = m_list.begin();
        // Find the first packet in the list which has a higher seqnum
-       u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+       u16 s = (*it)->getSeqnum();
 
        /* case seqnum is smaller then next_expected seqnum */
        /* this is true e.g. on wrap around */
        if (seqnum < next_expected) {
-               while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) {
-                       ++i;
-                       if (i != m_list.end())
-                               s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+               while(((s < seqnum) || (s >= next_expected)) && (it != m_list.end())) {
+                       ++it;
+                       if (it != m_list.end())
+                               s = (*it)->getSeqnum();
                }
        }
        /* non wrap around case (at least for incoming and next_expected */
        else
        {
-               while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) {
-                       ++i;
-                       if (i != m_list.end())
-                               s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+               while(((s < seqnum) && (s >= next_expected)) && (it != m_list.end())) {
+                       ++it;
+                       if (it != m_list.end())
+                               s = (*it)->getSeqnum();
                }
        }
 
        if (s == seqnum) {
                /* nothing to do this seems to be a resent packet */
                /* for paranoia reason data should be compared */
+               auto &i = *it;
                if (
-                       (readU16(&(i->data[BASE_HEADER_SIZE+1])) != seqnum) ||
-                       (i->data.getSize() != p.data.getSize()) ||
+                       (i->getSeqnum() != seqnum) ||
+                       (i->size() != p.size()) ||
                        (i->address != p.address)
                        )
                {
@@ -325,51 +319,52 @@ void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
                        fprintf(stderr,
                                        "Duplicated seqnum %d non matching packet detected:\n",
                                        seqnum);
-                       fprintf(stderr, "Old: seqnum: %05d size: %04d, address: %s\n",
-                                       readU16(&(i->data[BASE_HEADER_SIZE+1])),i->data.getSize(),
+                       fprintf(stderr, "Old: seqnum: %05d size: %04zu, address: %s\n",
+                                       i->getSeqnum(), i->size(),
                                        i->address.serializeString().c_str());
-                       fprintf(stderr, "New: seqnum: %05d size: %04u, address: %s\n",
-                                       readU16(&(p.data[BASE_HEADER_SIZE+1])),p.data.getSize(),
+                       fprintf(stderr, "New: seqnum: %05d size: %04zu, address: %s\n",
+                                       p.getSeqnum(), p.size(),
                                        p.address.serializeString().c_str());
                        throw IncomingDataCorruption("duplicated packet isn't same as original one");
                }
        }
        /* insert or push back */
-       else if (i != m_list.end()) {
-               m_list.insert(i, p);
+       else if (it != m_list.end()) {
+               m_list.insert(it, p_ptr);
        } else {
-               m_list.push_back(p);
+               m_list.push_back(p_ptr);
        }
 
        /* update last packet number */
-       m_oldest_non_answered_ack = readU16(&m_list.front().data[BASE_HEADER_SIZE+1]);
+       m_oldest_non_answered_ack = m_list.front()->getSeqnum();
 }
 
 void ReliablePacketBuffer::incrementTimeouts(float dtime)
 {
        MutexAutoLock listlock(m_list_mutex);
-       for (BufferedPacket &bufferedPacket : m_list) {
-               bufferedPacket.time += dtime;
-               bufferedPacket.totaltime += dtime;
+       for (auto &packet : m_list) {
+               packet->time += dtime;
+               packet->totaltime += dtime;
        }
 }
 
-std::list<BufferedPacket>
+std::list<ConstSharedPtr<BufferedPacket>>
        ReliablePacketBuffer::getTimedOuts(float timeout, u32 max_packets)
 {
        MutexAutoLock listlock(m_list_mutex);
-       std::list<BufferedPacket> timed_outs;
-       for (BufferedPacket &bufferedPacket : m_list) {
-               if (bufferedPacket.time >= timeout) {
-                       // caller will resend packet so reset time and increase counter
-                       bufferedPacket.time = 0.0f;
-                       bufferedPacket.resend_count++;
+       std::list<ConstSharedPtr<BufferedPacket>> timed_outs;
+       for (auto &packet : m_list) {
+               if (packet->time < timeout)
+                       continue;
 
-                       timed_outs.push_back(bufferedPacket);
+               // caller will resend packet so reset time and increase counter
+               packet->time = 0.0f;
+               packet->resend_count++;
 
-                       if (timed_outs.size() >= max_packets)
-                               break;
-               }
+               timed_outs.emplace_back(packet);
+
+               if (timed_outs.size() >= max_packets)
+                       break;
        }
        return timed_outs;
 }
@@ -428,11 +423,13 @@ IncomingSplitBuffer::~IncomingSplitBuffer()
        }
 }
 
-SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool reliable)
+SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacketPtr &p_ptr, bool reliable)
 {
        MutexAutoLock listlock(m_map_mutex);
+       const BufferedPacket &p = *p_ptr;
+
        u32 headersize = BASE_HEADER_SIZE + 7;
-       if (p.data.getSize() < headersize) {
+       if (p.size() < headersize) {
                errorstream << "Invalid data size for split packet" << std::endl;
                return SharedBuffer<u8>();
        }
@@ -473,7 +470,7 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia
                                <<std::endl);
 
        // Cut chunk data out of packet
-       u32 chunkdatasize = p.data.getSize() - headersize;
+       u32 chunkdatasize = p.size() - headersize;
        SharedBuffer<u8> chunkdata(chunkdatasize);
        memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
 
@@ -520,14 +517,67 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
        ConnectionCommand
  */
 
-void ConnectionCommand::send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt,
-       bool reliable_)
+ConnectionCommandPtr ConnectionCommand::create(ConnectionCommandType type)
+{
+       return ConnectionCommandPtr(new ConnectionCommand(type));
+}
+
+ConnectionCommandPtr ConnectionCommand::serve(Address address)
 {
-       type = CONNCMD_SEND;
-       peer_id = peer_id_;
-       channelnum = channelnum_;
-       data = pkt->oldForgePacket();
-       reliable = reliable_;
+       auto c = create(CONNCMD_SERVE);
+       c->address = address;
+       return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::connect(Address address)
+{
+       auto c = create(CONNCMD_CONNECT);
+       c->address = address;
+       return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::disconnect()
+{
+       return create(CONNCMD_DISCONNECT);
+}
+
+ConnectionCommandPtr ConnectionCommand::disconnect_peer(session_t peer_id)
+{
+       auto c = create(CONNCMD_DISCONNECT_PEER);
+       c->peer_id = peer_id;
+       return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::send(session_t peer_id, u8 channelnum,
+       NetworkPacket *pkt, bool reliable)
+{
+       auto c = create(CONNCMD_SEND);
+       c->peer_id = peer_id;
+       c->channelnum = channelnum;
+       c->reliable = reliable;
+       c->data = pkt->oldForgePacket();
+       return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::ack(session_t peer_id, u8 channelnum, const Buffer<u8> &data)
+{
+       auto c = create(CONCMD_ACK);
+       c->peer_id = peer_id;
+       c->channelnum = channelnum;
+       c->reliable = false;
+       data.copyTo(c->data);
+       return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::createPeer(session_t peer_id, const Buffer<u8> &data)
+{
+       auto c = create(CONCMD_CREATE_PEER);
+       c->peer_id = peer_id;
+       c->channelnum = 0;
+       c->reliable = true;
+       c->raw = true;
+       data.copyTo(c->data);
+       return c;
 }
 
 /*
@@ -562,39 +612,38 @@ void Channel::setNextSplitSeqNum(u16 seqnum)
 u16 Channel::getOutgoingSequenceNumber(bool& successful)
 {
        MutexAutoLock internal(m_internal_mutex);
+
        u16 retval = next_outgoing_seqnum;
-       u16 lowest_unacked_seqnumber;
+       successful = false;
 
        /* shortcut if there ain't any packet in outgoing list */
-       if (outgoing_reliables_sent.empty())
-       {
+       if (outgoing_reliables_sent.empty()) {
+               successful = true;
                next_outgoing_seqnum++;
                return retval;
        }
 
-       if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber))
-       {
+       u16 lowest_unacked_seqnumber;
+       if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber)) {
                if (lowest_unacked_seqnumber < next_outgoing_seqnum) {
                        // ugly cast but this one is required in order to tell compiler we
                        // know about difference of two unsigned may be negative in general
                        // but we already made sure it won't happen in this case
-                       if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > window_size) {
-                               successful = false;
+                       if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > m_window_size) {
                                return 0;
                        }
-               }
-               else {
+               } else {
                        // ugly cast but this one is required in order to tell compiler we
                        // know about difference of two unsigned may be negative in general
                        // but we already made sure it won't happen in this case
                        if ((next_outgoing_seqnum + (u16)(SEQNUM_MAX - lowest_unacked_seqnumber)) >
-                               window_size) {
-                               successful = false;
+                                       m_window_size) {
                                return 0;
                        }
                }
        }
 
+       successful = true;
        next_outgoing_seqnum++;
        return retval;
 }
@@ -666,7 +715,7 @@ void Channel::UpdateTimers(float dtime)
                        //packet_too_late = current_packet_too_late;
                        packets_successful = current_packet_successful;
 
-                       if (current_bytes_transfered > (unsigned int) (window_size*512/2)) {
+                       if (current_bytes_transfered > (unsigned int) (m_window_size*512/2)) {
                                reasonable_amount_of_data_transmitted = true;
                        }
                        current_packet_loss = 0;
@@ -681,37 +730,25 @@ void Channel::UpdateTimers(float dtime)
                if (packets_successful > 0) {
                        successful_to_lost_ratio = packet_loss/packets_successful;
                } else if (packet_loss > 0) {
-                       window_size = std::max(
-                                       (window_size - 10),
-                                       MIN_RELIABLE_WINDOW_SIZE);
+                       setWindowSize(m_window_size - 10);
                        done = true;
                }
 
                if (!done) {
-                       if ((successful_to_lost_ratio < 0.01f) &&
-                               (window_size < MAX_RELIABLE_WINDOW_SIZE)) {
+                       if (successful_to_lost_ratio < 0.01f) {
                                /* don't even think about increasing if we didn't even
                                 * use major parts of our window */
                                if (reasonable_amount_of_data_transmitted)
-                                       window_size = std::min(
-                                                       (window_size + 100),
-                                                       MAX_RELIABLE_WINDOW_SIZE);
-                       } else if ((successful_to_lost_ratio < 0.05f) &&
-                                       (window_size < MAX_RELIABLE_WINDOW_SIZE)) {
+                                       setWindowSize(m_window_size + 100);
+                       } else if (successful_to_lost_ratio < 0.05f) {
                                /* don't even think about increasing if we didn't even
                                 * use major parts of our window */
                                if (reasonable_amount_of_data_transmitted)
-                                       window_size = std::min(
-                                                       (window_size + 50),
-                                                       MAX_RELIABLE_WINDOW_SIZE);
+                                       setWindowSize(m_window_size + 50);
                        } else if (successful_to_lost_ratio > 0.15f) {
-                               window_size = std::max(
-                                               (window_size - 100),
-                                               MIN_RELIABLE_WINDOW_SIZE);
+                               setWindowSize(m_window_size - 100);
                        } else if (successful_to_lost_ratio > 0.1f) {
-                               window_size = std::max(
-                                               (window_size - 50),
-                                               MIN_RELIABLE_WINDOW_SIZE);
+                               setWindowSize(m_window_size - 50);
                        }
                }
        }
@@ -958,45 +995,45 @@ bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data)
        return false;
 }
 
-void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
+void UDPPeer::PutReliableSendCommand(ConnectionCommandPtr &c,
                unsigned int max_packet_size)
 {
        if (m_pending_disconnect)
                return;
 
-       Channel &chan = channels[c.channelnum];
+       Channel &chan = channels[c->channelnum];
 
        if (chan.queued_commands.empty() &&
                        /* don't queue more packets then window size */
-                       (chan.queued_reliables.size() < chan.getWindowSize() / 2)) {
+                       (chan.queued_reliables.size() + 1 < chan.getWindowSize() / 2)) {
                LOG(dout_con<<m_connection->getDesc()
-                               <<" processing reliable command for peer id: " << c.peer_id
-                               <<" data size: " << c.data.getSize() << std::endl);
-               if (!processReliableSendCommand(c,max_packet_size)) {
-                       chan.queued_commands.push_back(c);
-               }
-       }
-       else {
+                               <<" processing reliable command for peer id: " << c->peer_id
+                               <<" data size: " << c->data.getSize() << std::endl);
+               if (processReliableSendCommand(c, max_packet_size))
+                       return;
+       } else {
                LOG(dout_con<<m_connection->getDesc()
-                               <<" Queueing reliable command for peer id: " << c.peer_id
-                               <<" data size: " << c.data.getSize() <<std::endl);
-               chan.queued_commands.push_back(c);
-               if (chan.queued_commands.size() >= chan.getWindowSize() / 2) {
+                               <<" Queueing reliable command for peer id: " << c->peer_id
+                               <<" data size: " << c->data.getSize() <<std::endl);
+
+               if (chan.queued_commands.size() + 1 >= chan.getWindowSize() / 2) {
                        LOG(derr_con << m_connection->getDesc()
-                                       << "Possible packet stall to peer id: " << c.peer_id
+                                       << "Possible packet stall to peer id: " << c->peer_id
                                        << " queued_commands=" << chan.queued_commands.size()
                                        << std::endl);
                }
        }
+       chan.queued_commands.push_back(c);
 }
 
 bool UDPPeer::processReliableSendCommand(
-                               ConnectionCommand &c,
+                               ConnectionCommandPtr &c_ptr,
                                unsigned int max_packet_size)
 {
        if (m_pending_disconnect)
                return true;
 
+       const auto &c = *c_ptr;
        Channel &chan = channels[c.channelnum];
 
        u32 chunksize_max = max_packet_size
@@ -1015,9 +1052,9 @@ bool UDPPeer::processReliableSendCommand(
                chan.setNextSplitSeqNum(split_sequence_number);
        }
 
-       bool have_sequence_number = true;
+       bool have_sequence_number = false;
        bool have_initial_sequence_number = false;
-       std::queue<BufferedPacket> toadd;
+       std::queue<BufferedPacketPtr> toadd;
        volatile u16 initial_sequence_number = 0;
 
        for (SharedBuffer<u8> &original : originals) {
@@ -1036,25 +1073,23 @@ bool UDPPeer::processReliableSendCommand(
                SharedBuffer<u8> reliable = makeReliablePacket(original, seqnum);
 
                // Add base headers and make a packet
-               BufferedPacket p = con::makePacket(address, reliable,
+               BufferedPacketPtr p = con::makePacket(address, reliable,
                                m_connection->GetProtocolID(), m_connection->GetPeerID(),
                                c.channelnum);
 
-               toadd.push(std::move(p));
+               toadd.push(p);
        }
 
        if (have_sequence_number) {
-               volatile u16 pcount = 0;
                while (!toadd.empty()) {
-                       BufferedPacket p = std::move(toadd.front());
+                       BufferedPacketPtr p = toadd.front();
                        toadd.pop();
 //                     LOG(dout_con<<connection->getDesc()
 //                                     << " queuing reliable packet for peer_id: " << c.peer_id
 //                                     << " channel: " << (c.channelnum&0xFF)
 //                                     << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
 //                                     << std::endl)
-                       chan.queued_reliables.push(std::move(p));
-                       pcount++;
+                       chan.queued_reliables.push(p);
                }
                sanity_check(chan.queued_reliables.size() < 0xFFFF);
                return true;
@@ -1063,6 +1098,7 @@ bool UDPPeer::processReliableSendCommand(
        volatile u16 packets_available = toadd.size();
        /* we didn't get a single sequence number no need to fill queue */
        if (!have_initial_sequence_number) {
+               LOG(derr_con << m_connection->getDesc() << "Ran out of sequence numbers!" << std::endl);
                return false;
        }
 
@@ -1108,18 +1144,18 @@ void UDPPeer::RunCommandQueues(
                                (channel.queued_reliables.size() < maxtransfer) &&
                                (commands_processed < maxcommands)) {
                        try {
-                               ConnectionCommand c = channel.queued_commands.front();
+                               ConnectionCommandPtr c = channel.queued_commands.front();
 
                                LOG(dout_con << m_connection->getDesc()
                                                << " processing queued reliable command " << std::endl);
 
                                // Packet is processed, remove it from queue
-                               if (processReliableSendCommand(c,max_packet_size)) {
+                               if (processReliableSendCommand(c, max_packet_size)) {
                                        channel.queued_commands.pop_front();
                                } else {
                                        LOG(dout_con << m_connection->getDesc()
-                                                       << " Failed to queue packets for peer_id: " << c.peer_id
-                                                       << ", delaying sending of " << c.data.getSize()
+                                                       << " Failed to queue packets for peer_id: " << c->peer_id
+                                                       << ", delaying sending of " << c->data.getSize()
                                                        << " bytes" << std::endl);
                                }
                        }
@@ -1142,13 +1178,70 @@ void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum)
        channels[channel].setNextSplitSeqNum(seqnum);
 }
 
-SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, const BufferedPacket &toadd,
+SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, BufferedPacketPtr &toadd,
        bool reliable)
 {
        assert(channel < CHANNEL_COUNT); // Pre-condition
        return channels[channel].incoming_splits.insert(toadd, reliable);
 }
 
+/*
+       ConnectionEvent
+*/
+
+const char *ConnectionEvent::describe() const
+{
+       switch(type) {
+       case CONNEVENT_NONE:
+               return "CONNEVENT_NONE";
+       case CONNEVENT_DATA_RECEIVED:
+               return "CONNEVENT_DATA_RECEIVED";
+       case CONNEVENT_PEER_ADDED:
+               return "CONNEVENT_PEER_ADDED";
+       case CONNEVENT_PEER_REMOVED:
+               return "CONNEVENT_PEER_REMOVED";
+       case CONNEVENT_BIND_FAILED:
+               return "CONNEVENT_BIND_FAILED";
+       }
+       return "Invalid ConnectionEvent";
+}
+
+
+ConnectionEventPtr ConnectionEvent::create(ConnectionEventType type)
+{
+       return std::shared_ptr<ConnectionEvent>(new ConnectionEvent(type));
+}
+
+ConnectionEventPtr ConnectionEvent::dataReceived(session_t peer_id, const Buffer<u8> &data)
+{
+       auto e = create(CONNEVENT_DATA_RECEIVED);
+       e->peer_id = peer_id;
+       data.copyTo(e->data);
+       return e;
+}
+
+ConnectionEventPtr ConnectionEvent::peerAdded(session_t peer_id, Address address)
+{
+       auto e = create(CONNEVENT_PEER_ADDED);
+       e->peer_id = peer_id;
+       e->address = address;
+       return e;
+}
+
+ConnectionEventPtr ConnectionEvent::peerRemoved(session_t peer_id, bool is_timeout, Address address)
+{
+       auto e = create(CONNEVENT_PEER_REMOVED);
+       e->peer_id = peer_id;
+       e->timeout = is_timeout;
+       e->address = address;
+       return e;
+}
+
+ConnectionEventPtr ConnectionEvent::bindFailed()
+{
+       return create(CONNEVENT_BIND_FAILED);
+}
+
 /*
        Connection
 */
@@ -1198,18 +1291,12 @@ Connection::~Connection()
 
 /* Internal stuff */
 
-void Connection::putEvent(const ConnectionEvent &e)
+void Connection::putEvent(ConnectionEventPtr e)
 {
-       assert(e.type != CONNEVENT_NONE); // Pre-condition
+       assert(e->type != CONNEVENT_NONE); // Pre-condition
        m_event_queue.push_back(e);
 }
 
-void Connection::putEvent(ConnectionEvent &&e)
-{
-       assert(e.type != CONNEVENT_NONE); // Pre-condition
-       m_event_queue.push_back(std::move(e));
-}
-
 void Connection::TriggerSend()
 {
        m_sendThread->Trigger();
@@ -1272,11 +1359,9 @@ bool Connection::deletePeer(session_t peer_id, bool timeout)
        Address peer_address;
        //any peer has a primary address this never fails!
        peer->getAddress(MTP_PRIMARY, peer_address);
-       // Create event
-       ConnectionEvent e;
-       e.peerRemoved(peer_id, timeout, peer_address);
-       putEvent(e);
 
+       // Create event
+       putEvent(ConnectionEvent::peerRemoved(peer_id, timeout, peer_address));
 
        peer->Drop();
        return true;
@@ -1284,18 +1369,16 @@ bool Connection::deletePeer(session_t peer_id, bool timeout)
 
 /* Interface */
 
-ConnectionEvent Connection::waitEvent(u32 timeout_ms)
+ConnectionEventPtr Connection::waitEvent(u32 timeout_ms)
 {
        try {
                return m_event_queue.pop_front(timeout_ms);
        } catch(ItemNotFoundException &ex) {
-               ConnectionEvent e;
-               e.type = CONNEVENT_NONE;
-               return e;
+               return ConnectionEvent::create(CONNEVENT_NONE);
        }
 }
 
-void Connection::putCommand(const ConnectionCommand &c)
+void Connection::putCommand(ConnectionCommandPtr c)
 {
        if (!m_shutting_down) {
                m_command_queue.push_back(c);
@@ -1303,26 +1386,14 @@ void Connection::putCommand(const ConnectionCommand &c)
        }
 }
 
-void Connection::putCommand(ConnectionCommand &&c)
-{
-       if (!m_shutting_down) {
-               m_command_queue.push_back(std::move(c));
-               m_sendThread->Trigger();
-       }
-}
-
 void Connection::Serve(Address bind_addr)
 {
-       ConnectionCommand c;
-       c.serve(bind_addr);
-       putCommand(c);
+       putCommand(ConnectionCommand::serve(bind_addr));
 }
 
 void Connection::Connect(Address address)
 {
-       ConnectionCommand c;
-       c.connect(address);
-       putCommand(c);
+       putCommand(ConnectionCommand::connect(address));
 }
 
 bool Connection::Connected()
@@ -1344,9 +1415,7 @@ bool Connection::Connected()
 
 void Connection::Disconnect()
 {
-       ConnectionCommand c;
-       c.disconnect();
-       putCommand(c);
+       putCommand(ConnectionCommand::disconnect());
 }
 
 bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
@@ -1357,11 +1426,15 @@ bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
                This is not considered to be a problem (is it?)
        */
        for(;;) {
-               ConnectionEvent e = waitEvent(timeout);
-               if (e.type != CONNEVENT_NONE)
+               ConnectionEventPtr e_ptr = waitEvent(timeout);
+               const ConnectionEvent &e = *e_ptr;
+
+               if (e.type != CONNEVENT_NONE) {
                        LOG(dout_con << getDesc() << ": Receive: got event: "
                                        << e.describe() << std::endl);
-               switch(e.type) {
+               }
+
+               switch (e.type) {
                case CONNEVENT_NONE:
                        return false;
                case CONNEVENT_DATA_RECEIVED:
@@ -1409,10 +1482,7 @@ void Connection::Send(session_t peer_id, u8 channelnum,
 {
        assert(channelnum < CHANNEL_COUNT); // Pre-condition
 
-       ConnectionCommand c;
-
-       c.send(peer_id, channelnum, pkt, reliable);
-       putCommand(std::move(c));
+       putCommand(ConnectionCommand::send(peer_id, channelnum, pkt, reliable));
 }
 
 Address Connection::GetPeerAddress(session_t peer_id)
@@ -1511,41 +1581,31 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
        LOG(dout_con << getDesc()
                        << "createPeer(): giving peer_id=" << peer_id_new << std::endl);
 
-       ConnectionCommand cmd;
-       Buffer<u8> reply(4);
-       writeU8(&reply[0], PACKET_TYPE_CONTROL);
-       writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
-       writeU16(&reply[2], peer_id_new);
-       cmd.createPeer(peer_id_new,reply);
-       putCommand(std::move(cmd));
+       {
+               Buffer<u8> reply(4);
+               writeU8(&reply[0], PACKET_TYPE_CONTROL);
+               writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
+               writeU16(&reply[2], peer_id_new);
+               putCommand(ConnectionCommand::createPeer(peer_id_new, reply));
+       }
 
        // Create peer addition event
-       ConnectionEvent e;
-       e.peerAdded(peer_id_new, sender);
-       putEvent(e);
+       putEvent(ConnectionEvent::peerAdded(peer_id_new, sender));
 
        // We're now talking to a valid peer_id
        return peer_id_new;
 }
 
-void Connection::PrintInfo(std::ostream &out)
-{
-       m_info_mutex.lock();
-       out<<getDesc()<<": ";
-       m_info_mutex.unlock();
-}
-
 const std::string Connection::getDesc()
 {
+       MutexAutoLock _(m_info_mutex);
        return std::string("con(")+
                        itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")";
 }
 
 void Connection::DisconnectPeer(session_t peer_id)
 {
-       ConnectionCommand discon;
-       discon.disconnect_peer(peer_id);
-       putCommand(discon);
+       putCommand(ConnectionCommand::disconnect_peer(peer_id));
 }
 
 void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
@@ -1557,14 +1617,12 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
                        " channel: " << (channelnum & 0xFF) <<
                        " seqnum: " << seqnum << std::endl);
 
-       ConnectionCommand c;
        SharedBuffer<u8> ack(4);
        writeU8(&ack[0], PACKET_TYPE_CONTROL);
        writeU8(&ack[1], CONTROLTYPE_ACK);
        writeU16(&ack[2], seqnum);
 
-       c.ack(peer_id, channelnum, ack);
-       putCommand(std::move(c));
+       putCommand(ConnectionCommand::ack(peer_id, channelnum, ack));
        m_sendThread->Trigger();
 }