]> git.lizzy.rs Git - minetest.git/commitdiff
Network: Delete copy constructor and use std::move instead (#11642)
authorSmallJoker <SmallJoker@users.noreply.github.com>
Wed, 1 Dec 2021 19:22:33 +0000 (20:22 +0100)
committerGitHub <noreply@github.com>
Wed, 1 Dec 2021 19:22:33 +0000 (20:22 +0100)
This is a follow-up change which disables class copies where possible to avoid unnecessary memory movements.

src/client/client.cpp
src/network/connection.cpp
src/network/connection.h
src/network/connectionthreads.cpp
src/network/connectionthreads.h
src/unittest/test_connection.cpp
src/util/pointer.h

index 45cc62a33578e4fcef557a27d6b2111ccc5cd2c3..3ee1298ff182646909539621dab0a90917aa506a 100644 (file)
@@ -877,7 +877,7 @@ void Client::ProcessData(NetworkPacket *pkt)
        */
        if(sender_peer_id != PEER_ID_SERVER) {
                infostream << "Client::ProcessData(): Discarding data not "
-                       "coming from server: peer_id=" << sender_peer_id
+                       "coming from server: peer_id=" << sender_peer_id << " command=" << pkt->getCommand()
                        << std::endl;
                return;
        }
index 548b2e3a004c9aa5f60b75652f616226ff8eed9b..2d3cf6e885da9cfb322720a22abf3883f24d6c19 100644 (file)
@@ -62,18 +62,27 @@ namespace con
 
 #define PING_TIMEOUT 5.0
 
-BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
+u16 BufferedPacket::getSeqnum() const
+{
+       if (size() < BASE_HEADER_SIZE + 3)
+               return 0; // should never happen
+
+       return readU16(&data[BASE_HEADER_SIZE + 1]);
+}
+
+BufferedPacketPtr makePacket(Address &address, const SharedBuffer<u8> &data,
                u32 protocol_id, session_t sender_peer_id, u8 channel)
 {
        u32 packet_size = data.getSize() + BASE_HEADER_SIZE;
-       BufferedPacket p(packet_size);
-       p.address = address;
 
-       writeU32(&p.data[0], protocol_id);
-       writeU16(&p.data[4], sender_peer_id);
-       writeU8(&p.data[6], channel);
+       BufferedPacketPtr p(new BufferedPacket(packet_size));
+       p->address = address;
+
+       writeU32(&p->data[0], protocol_id);
+       writeU16(&p->data[4], sender_peer_id);
+       writeU8(&p->data[6], channel);
 
-       memcpy(&p.data[BASE_HEADER_SIZE], *data, data.getSize());
+       memcpy(&p->data[BASE_HEADER_SIZE], *data, data.getSize());
 
        return p;
 }
@@ -169,9 +178,8 @@ void ReliablePacketBuffer::print()
        MutexAutoLock listlock(m_list_mutex);
        LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl);
        unsigned int index = 0;
-       for (BufferedPacket &bufferedPacket : m_list) {
-               u16 s = readU16(&(bufferedPacket.data[BASE_HEADER_SIZE+1]));
-               LOG(dout_con<<index<< ":" << s << std::endl);
+       for (BufferedPacketPtr &packet : m_list) {
+               LOG(dout_con<<index<< ":" << packet->getSeqnum() << std::endl);
                index++;
        }
 }
@@ -188,16 +196,13 @@ u32 ReliablePacketBuffer::size()
        return m_list.size();
 }
 
-RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
+RPBSearchResult ReliablePacketBuffer::findPacketNoLock(u16 seqnum)
 {
-       std::list<BufferedPacket>::iterator i = m_list.begin();
-       for(; i != m_list.end(); ++i)
-       {
-               u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
-               if (s == seqnum)
-                       break;
+       for (auto it = m_list.begin(); it != m_list.end(); ++it) {
+               if ((*it)->getSeqnum() == seqnum)
+                       return it;
        }
-       return i;
+       return m_list.end();
 }
 
 bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
@@ -205,54 +210,54 @@ bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
        MutexAutoLock listlock(m_list_mutex);
        if (m_list.empty())
                return false;
-       const BufferedPacket &p = m_list.front();
-       result = readU16(&p.data[BASE_HEADER_SIZE + 1]);
+       result = m_list.front()->getSeqnum();
        return true;
 }
 
-BufferedPacket ReliablePacketBuffer::popFirst()
+BufferedPacketPtr ReliablePacketBuffer::popFirst()
 {
        MutexAutoLock listlock(m_list_mutex);
        if (m_list.empty())
                throw NotFoundException("Buffer is empty");
-       BufferedPacket p = std::move(m_list.front());
+
+       BufferedPacketPtr p(m_list.front());
        m_list.pop_front();
 
        if (m_list.empty()) {
                m_oldest_non_answered_ack = 0;
        } else {
-               m_oldest_non_answered_ack =
-                               readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]);
+               m_oldest_non_answered_ack = m_list.front()->getSeqnum();
        }
        return p;
 }
 
-BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
+BufferedPacketPtr ReliablePacketBuffer::popSeqnum(u16 seqnum)
 {
        MutexAutoLock listlock(m_list_mutex);
-       RPBSearchResult r = findPacket(seqnum);
-       if (r == notFound()) {
+       RPBSearchResult r = findPacketNoLock(seqnum);
+       if (r == m_list.end()) {
                LOG(dout_con<<"Sequence number: " << seqnum
                                << " not found in reliable buffer"<<std::endl);
                throw NotFoundException("seqnum not found in buffer");
        }
-       BufferedPacket p = std::move(*r);
 
+       BufferedPacketPtr p(*r);
        m_list.erase(r);
 
        if (m_list.empty()) {
                m_oldest_non_answered_ack = 0;
        } else {
-               m_oldest_non_answered_ack =
-                               readU16(&m_list.front().data[BASE_HEADER_SIZE + 1]);
+               m_oldest_non_answered_ack = m_list.front()->getSeqnum();
        }
        return p;
 }
 
-void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
+void ReliablePacketBuffer::insert(BufferedPacketPtr &p_ptr, u16 next_expected)
 {
        MutexAutoLock listlock(m_list_mutex);
-       if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
+       const BufferedPacket &p = *p_ptr;
+
+       if (p.size() < BASE_HEADER_SIZE + 3) {
                errorstream << "ReliablePacketBuffer::insert(): Invalid data size for "
                        "reliable packet" << std::endl;
                return;
@@ -263,7 +268,7 @@ void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
                        << std::endl;
                return;
        }
-       u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
+       const u16 seqnum = p.getSeqnum();
 
        if (!seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE)) {
                errorstream << "ReliablePacketBuffer::insert(): seqnum is outside of "
@@ -280,44 +285,44 @@ void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
 
        // Find the right place for the packet and insert it there
        // If list is empty, just add it
-       if (m_list.empty())
-       {
-               m_list.push_back(p);
+       if (m_list.empty()) {
+               m_list.push_back(p_ptr);
                m_oldest_non_answered_ack = seqnum;
                // Done.
                return;
        }
 
        // Otherwise find the right place
-       std::list<BufferedPacket>::iterator i = m_list.begin();
+       auto it = m_list.begin();
        // Find the first packet in the list which has a higher seqnum
-       u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+       u16 s = (*it)->getSeqnum();
 
        /* case seqnum is smaller then next_expected seqnum */
        /* this is true e.g. on wrap around */
        if (seqnum < next_expected) {
-               while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) {
-                       ++i;
-                       if (i != m_list.end())
-                               s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+               while(((s < seqnum) || (s >= next_expected)) && (it != m_list.end())) {
+                       ++it;
+                       if (it != m_list.end())
+                               s = (*it)->getSeqnum();
                }
        }
        /* non wrap around case (at least for incoming and next_expected */
        else
        {
-               while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) {
-                       ++i;
-                       if (i != m_list.end())
-                               s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+               while(((s < seqnum) && (s >= next_expected)) && (it != m_list.end())) {
+                       ++it;
+                       if (it != m_list.end())
+                               s = (*it)->getSeqnum();
                }
        }
 
        if (s == seqnum) {
                /* nothing to do this seems to be a resent packet */
                /* for paranoia reason data should be compared */
+               auto &i = *it;
                if (
-                       (readU16(&(i->data[BASE_HEADER_SIZE+1])) != seqnum) ||
-                       (i->data.getSize() != p.data.getSize()) ||
+                       (i->getSeqnum() != seqnum) ||
+                       (i->size() != p.size()) ||
                        (i->address != p.address)
                        )
                {
@@ -325,51 +330,52 @@ void ReliablePacketBuffer::insert(const BufferedPacket &p, u16 next_expected)
                        fprintf(stderr,
                                        "Duplicated seqnum %d non matching packet detected:\n",
                                        seqnum);
-                       fprintf(stderr, "Old: seqnum: %05d size: %04d, address: %s\n",
-                                       readU16(&(i->data[BASE_HEADER_SIZE+1])),i->data.getSize(),
+                       fprintf(stderr, "Old: seqnum: %05d size: %04zu, address: %s\n",
+                                       i->getSeqnum(), i->size(),
                                        i->address.serializeString().c_str());
-                       fprintf(stderr, "New: seqnum: %05d size: %04u, address: %s\n",
-                                       readU16(&(p.data[BASE_HEADER_SIZE+1])),p.data.getSize(),
+                       fprintf(stderr, "New: seqnum: %05d size: %04zu, address: %s\n",
+                                       p.getSeqnum(), p.size(),
                                        p.address.serializeString().c_str());
                        throw IncomingDataCorruption("duplicated packet isn't same as original one");
                }
        }
        /* insert or push back */
-       else if (i != m_list.end()) {
-               m_list.insert(i, p);
+       else if (it != m_list.end()) {
+               m_list.insert(it, p_ptr);
        } else {
-               m_list.push_back(p);
+               m_list.push_back(p_ptr);
        }
 
        /* update last packet number */
-       m_oldest_non_answered_ack = readU16(&m_list.front().data[BASE_HEADER_SIZE+1]);
+       m_oldest_non_answered_ack = m_list.front()->getSeqnum();
 }
 
 void ReliablePacketBuffer::incrementTimeouts(float dtime)
 {
        MutexAutoLock listlock(m_list_mutex);
-       for (BufferedPacket &bufferedPacket : m_list) {
-               bufferedPacket.time += dtime;
-               bufferedPacket.totaltime += dtime;
+       for (auto &packet : m_list) {
+               packet->time += dtime;
+               packet->totaltime += dtime;
        }
 }
 
-std::list<BufferedPacket>
+std::list<ConstSharedPtr<BufferedPacket>>
        ReliablePacketBuffer::getTimedOuts(float timeout, u32 max_packets)
 {
        MutexAutoLock listlock(m_list_mutex);
-       std::list<BufferedPacket> timed_outs;
-       for (BufferedPacket &bufferedPacket : m_list) {
-               if (bufferedPacket.time >= timeout) {
-                       // caller will resend packet so reset time and increase counter
-                       bufferedPacket.time = 0.0f;
-                       bufferedPacket.resend_count++;
+       std::list<ConstSharedPtr<BufferedPacket>> timed_outs;
+       for (auto &packet : m_list) {
+               if (packet->time < timeout)
+                       continue;
 
-                       timed_outs.push_back(bufferedPacket);
+               // caller will resend packet so reset time and increase counter
+               packet->time = 0.0f;
+               packet->resend_count++;
 
-                       if (timed_outs.size() >= max_packets)
-                               break;
-               }
+               timed_outs.emplace_back(packet);
+
+               if (timed_outs.size() >= max_packets)
+                       break;
        }
        return timed_outs;
 }
@@ -428,11 +434,13 @@ IncomingSplitBuffer::~IncomingSplitBuffer()
        }
 }
 
-SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool reliable)
+SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacketPtr &p_ptr, bool reliable)
 {
        MutexAutoLock listlock(m_map_mutex);
+       const BufferedPacket &p = *p_ptr;
+
        u32 headersize = BASE_HEADER_SIZE + 7;
-       if (p.data.getSize() < headersize) {
+       if (p.size() < headersize) {
                errorstream << "Invalid data size for split packet" << std::endl;
                return SharedBuffer<u8>();
        }
@@ -473,7 +481,7 @@ SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia
                                <<std::endl);
 
        // Cut chunk data out of packet
-       u32 chunkdatasize = p.data.getSize() - headersize;
+       u32 chunkdatasize = p.size() - headersize;
        SharedBuffer<u8> chunkdata(chunkdatasize);
        memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
 
@@ -520,14 +528,67 @@ void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
        ConnectionCommand
  */
 
-void ConnectionCommand::send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt,
-       bool reliable_)
+ConnectionCommandPtr ConnectionCommand::create(ConnectionCommandType type)
+{
+       return ConnectionCommandPtr(new ConnectionCommand(type));
+}
+
+ConnectionCommandPtr ConnectionCommand::serve(Address address)
+{
+       auto c = create(CONNCMD_SERVE);
+       c->address = address;
+       return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::connect(Address address)
 {
-       type = CONNCMD_SEND;
-       peer_id = peer_id_;
-       channelnum = channelnum_;
-       data = pkt->oldForgePacket();
-       reliable = reliable_;
+       auto c = create(CONNCMD_CONNECT);
+       c->address = address;
+       return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::disconnect()
+{
+       return create(CONNCMD_DISCONNECT);
+}
+
+ConnectionCommandPtr ConnectionCommand::disconnect_peer(session_t peer_id)
+{
+       auto c = create(CONNCMD_DISCONNECT_PEER);
+       c->peer_id = peer_id;
+       return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::send(session_t peer_id, u8 channelnum,
+       NetworkPacket *pkt, bool reliable)
+{
+       auto c = create(CONNCMD_SEND);
+       c->peer_id = peer_id;
+       c->channelnum = channelnum;
+       c->reliable = reliable;
+       c->data = pkt->oldForgePacket();
+       return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::ack(session_t peer_id, u8 channelnum, const Buffer<u8> &data)
+{
+       auto c = create(CONCMD_ACK);
+       c->peer_id = peer_id;
+       c->channelnum = channelnum;
+       c->reliable = false;
+       data.copyTo(c->data);
+       return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::createPeer(session_t peer_id, const Buffer<u8> &data)
+{
+       auto c = create(CONCMD_CREATE_PEER);
+       c->peer_id = peer_id;
+       c->channelnum = 0;
+       c->reliable = true;
+       c->raw = true;
+       data.copyTo(c->data);
+       return c;
 }
 
 /*
@@ -562,39 +623,38 @@ void Channel::setNextSplitSeqNum(u16 seqnum)
 u16 Channel::getOutgoingSequenceNumber(bool& successful)
 {
        MutexAutoLock internal(m_internal_mutex);
+
        u16 retval = next_outgoing_seqnum;
-       u16 lowest_unacked_seqnumber;
+       successful = false;
 
        /* shortcut if there ain't any packet in outgoing list */
-       if (outgoing_reliables_sent.empty())
-       {
+       if (outgoing_reliables_sent.empty()) {
+               successful = true;
                next_outgoing_seqnum++;
                return retval;
        }
 
-       if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber))
-       {
+       u16 lowest_unacked_seqnumber;
+       if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber)) {
                if (lowest_unacked_seqnumber < next_outgoing_seqnum) {
                        // ugly cast but this one is required in order to tell compiler we
                        // know about difference of two unsigned may be negative in general
                        // but we already made sure it won't happen in this case
                        if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > m_window_size) {
-                               successful = false;
                                return 0;
                        }
-               }
-               else {
+               } else {
                        // ugly cast but this one is required in order to tell compiler we
                        // know about difference of two unsigned may be negative in general
                        // but we already made sure it won't happen in this case
                        if ((next_outgoing_seqnum + (u16)(SEQNUM_MAX - lowest_unacked_seqnumber)) >
                                        m_window_size) {
-                               successful = false;
                                return 0;
                        }
                }
        }
 
+       successful = true;
        next_outgoing_seqnum++;
        return retval;
 }
@@ -946,45 +1006,45 @@ bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data)
        return false;
 }
 
-void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
+void UDPPeer::PutReliableSendCommand(ConnectionCommandPtr &c,
                unsigned int max_packet_size)
 {
        if (m_pending_disconnect)
                return;
 
-       Channel &chan = channels[c.channelnum];
+       Channel &chan = channels[c->channelnum];
 
        if (chan.queued_commands.empty() &&
                        /* don't queue more packets then window size */
-                       (chan.queued_reliables.size() < chan.getWindowSize() / 2)) {
+                       (chan.queued_reliables.size() + 1 < chan.getWindowSize() / 2)) {
                LOG(dout_con<<m_connection->getDesc()
-                               <<" processing reliable command for peer id: " << c.peer_id
-                               <<" data size: " << c.data.getSize() << std::endl);
-               if (!processReliableSendCommand(c,max_packet_size)) {
-                       chan.queued_commands.push_back(c);
-               }
-       }
-       else {
+                               <<" processing reliable command for peer id: " << c->peer_id
+                               <<" data size: " << c->data.getSize() << std::endl);
+               if (processReliableSendCommand(c, max_packet_size))
+                       return;
+       } else {
                LOG(dout_con<<m_connection->getDesc()
-                               <<" Queueing reliable command for peer id: " << c.peer_id
-                               <<" data size: " << c.data.getSize() <<std::endl);
-               chan.queued_commands.push_back(c);
-               if (chan.queued_commands.size() >= chan.getWindowSize() / 2) {
+                               <<" Queueing reliable command for peer id: " << c->peer_id
+                               <<" data size: " << c->data.getSize() <<std::endl);
+
+               if (chan.queued_commands.size() + 1 >= chan.getWindowSize() / 2) {
                        LOG(derr_con << m_connection->getDesc()
-                                       << "Possible packet stall to peer id: " << c.peer_id
+                                       << "Possible packet stall to peer id: " << c->peer_id
                                        << " queued_commands=" << chan.queued_commands.size()
                                        << std::endl);
                }
        }
+       chan.queued_commands.push_back(c);
 }
 
 bool UDPPeer::processReliableSendCommand(
-                               ConnectionCommand &c,
+                               ConnectionCommandPtr &c_ptr,
                                unsigned int max_packet_size)
 {
        if (m_pending_disconnect)
                return true;
 
+       const auto &c = *c_ptr;
        Channel &chan = channels[c.channelnum];
 
        u32 chunksize_max = max_packet_size
@@ -1003,9 +1063,9 @@ bool UDPPeer::processReliableSendCommand(
                chan.setNextSplitSeqNum(split_sequence_number);
        }
 
-       bool have_sequence_number = true;
+       bool have_sequence_number = false;
        bool have_initial_sequence_number = false;
-       std::queue<BufferedPacket> toadd;
+       std::queue<BufferedPacketPtr> toadd;
        volatile u16 initial_sequence_number = 0;
 
        for (SharedBuffer<u8> &original : originals) {
@@ -1024,25 +1084,23 @@ bool UDPPeer::processReliableSendCommand(
                SharedBuffer<u8> reliable = makeReliablePacket(original, seqnum);
 
                // Add base headers and make a packet
-               BufferedPacket p = con::makePacket(address, reliable,
+               BufferedPacketPtr p = con::makePacket(address, reliable,
                                m_connection->GetProtocolID(), m_connection->GetPeerID(),
                                c.channelnum);
 
-               toadd.push(std::move(p));
+               toadd.push(p);
        }
 
        if (have_sequence_number) {
-               volatile u16 pcount = 0;
                while (!toadd.empty()) {
-                       BufferedPacket p = std::move(toadd.front());
+                       BufferedPacketPtr p = toadd.front();
                        toadd.pop();
 //                     LOG(dout_con<<connection->getDesc()
 //                                     << " queuing reliable packet for peer_id: " << c.peer_id
 //                                     << " channel: " << (c.channelnum&0xFF)
 //                                     << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
 //                                     << std::endl)
-                       chan.queued_reliables.push(std::move(p));
-                       pcount++;
+                       chan.queued_reliables.push(p);
                }
                sanity_check(chan.queued_reliables.size() < 0xFFFF);
                return true;
@@ -1051,6 +1109,7 @@ bool UDPPeer::processReliableSendCommand(
        volatile u16 packets_available = toadd.size();
        /* we didn't get a single sequence number no need to fill queue */
        if (!have_initial_sequence_number) {
+               LOG(derr_con << m_connection->getDesc() << "Ran out of sequence numbers!" << std::endl);
                return false;
        }
 
@@ -1096,18 +1155,18 @@ void UDPPeer::RunCommandQueues(
                                (channel.queued_reliables.size() < maxtransfer) &&
                                (commands_processed < maxcommands)) {
                        try {
-                               ConnectionCommand c = channel.queued_commands.front();
+                               ConnectionCommandPtr c = channel.queued_commands.front();
 
                                LOG(dout_con << m_connection->getDesc()
                                                << " processing queued reliable command " << std::endl);
 
                                // Packet is processed, remove it from queue
-                               if (processReliableSendCommand(c,max_packet_size)) {
+                               if (processReliableSendCommand(c, max_packet_size)) {
                                        channel.queued_commands.pop_front();
                                } else {
                                        LOG(dout_con << m_connection->getDesc()
-                                                       << " Failed to queue packets for peer_id: " << c.peer_id
-                                                       << ", delaying sending of " << c.data.getSize()
+                                                       << " Failed to queue packets for peer_id: " << c->peer_id
+                                                       << ", delaying sending of " << c->data.getSize()
                                                        << " bytes" << std::endl);
                                }
                        }
@@ -1130,13 +1189,70 @@ void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum)
        channels[channel].setNextSplitSeqNum(seqnum);
 }
 
-SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, const BufferedPacket &toadd,
+SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, BufferedPacketPtr &toadd,
        bool reliable)
 {
        assert(channel < CHANNEL_COUNT); // Pre-condition
        return channels[channel].incoming_splits.insert(toadd, reliable);
 }
 
+/*
+       ConnectionEvent
+*/
+
+const char *ConnectionEvent::describe() const
+{
+       switch(type) {
+       case CONNEVENT_NONE:
+               return "CONNEVENT_NONE";
+       case CONNEVENT_DATA_RECEIVED:
+               return "CONNEVENT_DATA_RECEIVED";
+       case CONNEVENT_PEER_ADDED:
+               return "CONNEVENT_PEER_ADDED";
+       case CONNEVENT_PEER_REMOVED:
+               return "CONNEVENT_PEER_REMOVED";
+       case CONNEVENT_BIND_FAILED:
+               return "CONNEVENT_BIND_FAILED";
+       }
+       return "Invalid ConnectionEvent";
+}
+
+
+ConnectionEventPtr ConnectionEvent::create(ConnectionEventType type)
+{
+       return std::shared_ptr<ConnectionEvent>(new ConnectionEvent(type));
+}
+
+ConnectionEventPtr ConnectionEvent::dataReceived(session_t peer_id, const Buffer<u8> &data)
+{
+       auto e = create(CONNEVENT_DATA_RECEIVED);
+       e->peer_id = peer_id;
+       data.copyTo(e->data);
+       return e;
+}
+
+ConnectionEventPtr ConnectionEvent::peerAdded(session_t peer_id, Address address)
+{
+       auto e = create(CONNEVENT_PEER_ADDED);
+       e->peer_id = peer_id;
+       e->address = address;
+       return e;
+}
+
+ConnectionEventPtr ConnectionEvent::peerRemoved(session_t peer_id, bool is_timeout, Address address)
+{
+       auto e = create(CONNEVENT_PEER_REMOVED);
+       e->peer_id = peer_id;
+       e->timeout = is_timeout;
+       e->address = address;
+       return e;
+}
+
+ConnectionEventPtr ConnectionEvent::bindFailed()
+{
+       return create(CONNEVENT_BIND_FAILED);
+}
+
 /*
        Connection
 */
@@ -1186,18 +1302,12 @@ Connection::~Connection()
 
 /* Internal stuff */
 
-void Connection::putEvent(const ConnectionEvent &e)
+void Connection::putEvent(ConnectionEventPtr e)
 {
-       assert(e.type != CONNEVENT_NONE); // Pre-condition
+       assert(e->type != CONNEVENT_NONE); // Pre-condition
        m_event_queue.push_back(e);
 }
 
-void Connection::putEvent(ConnectionEvent &&e)
-{
-       assert(e.type != CONNEVENT_NONE); // Pre-condition
-       m_event_queue.push_back(std::move(e));
-}
-
 void Connection::TriggerSend()
 {
        m_sendThread->Trigger();
@@ -1260,11 +1370,9 @@ bool Connection::deletePeer(session_t peer_id, bool timeout)
        Address peer_address;
        //any peer has a primary address this never fails!
        peer->getAddress(MTP_PRIMARY, peer_address);
-       // Create event
-       ConnectionEvent e;
-       e.peerRemoved(peer_id, timeout, peer_address);
-       putEvent(e);
 
+       // Create event
+       putEvent(ConnectionEvent::peerRemoved(peer_id, timeout, peer_address));
 
        peer->Drop();
        return true;
@@ -1272,18 +1380,16 @@ bool Connection::deletePeer(session_t peer_id, bool timeout)
 
 /* Interface */
 
-ConnectionEvent Connection::waitEvent(u32 timeout_ms)
+ConnectionEventPtr Connection::waitEvent(u32 timeout_ms)
 {
        try {
                return m_event_queue.pop_front(timeout_ms);
        } catch(ItemNotFoundException &ex) {
-               ConnectionEvent e;
-               e.type = CONNEVENT_NONE;
-               return e;
+               return ConnectionEvent::create(CONNEVENT_NONE);
        }
 }
 
-void Connection::putCommand(const ConnectionCommand &c)
+void Connection::putCommand(ConnectionCommandPtr c)
 {
        if (!m_shutting_down) {
                m_command_queue.push_back(c);
@@ -1291,26 +1397,14 @@ void Connection::putCommand(const ConnectionCommand &c)
        }
 }
 
-void Connection::putCommand(ConnectionCommand &&c)
-{
-       if (!m_shutting_down) {
-               m_command_queue.push_back(std::move(c));
-               m_sendThread->Trigger();
-       }
-}
-
 void Connection::Serve(Address bind_addr)
 {
-       ConnectionCommand c;
-       c.serve(bind_addr);
-       putCommand(c);
+       putCommand(ConnectionCommand::serve(bind_addr));
 }
 
 void Connection::Connect(Address address)
 {
-       ConnectionCommand c;
-       c.connect(address);
-       putCommand(c);
+       putCommand(ConnectionCommand::connect(address));
 }
 
 bool Connection::Connected()
@@ -1332,9 +1426,7 @@ bool Connection::Connected()
 
 void Connection::Disconnect()
 {
-       ConnectionCommand c;
-       c.disconnect();
-       putCommand(c);
+       putCommand(ConnectionCommand::disconnect());
 }
 
 bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
@@ -1345,11 +1437,15 @@ bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
                This is not considered to be a problem (is it?)
        */
        for(;;) {
-               ConnectionEvent e = waitEvent(timeout);
-               if (e.type != CONNEVENT_NONE)
+               ConnectionEventPtr e_ptr = waitEvent(timeout);
+               const ConnectionEvent &e = *e_ptr;
+
+               if (e.type != CONNEVENT_NONE) {
                        LOG(dout_con << getDesc() << ": Receive: got event: "
                                        << e.describe() << std::endl);
-               switch(e.type) {
+               }
+
+               switch (e.type) {
                case CONNEVENT_NONE:
                        return false;
                case CONNEVENT_DATA_RECEIVED:
@@ -1397,10 +1493,7 @@ void Connection::Send(session_t peer_id, u8 channelnum,
 {
        assert(channelnum < CHANNEL_COUNT); // Pre-condition
 
-       ConnectionCommand c;
-
-       c.send(peer_id, channelnum, pkt, reliable);
-       putCommand(std::move(c));
+       putCommand(ConnectionCommand::send(peer_id, channelnum, pkt, reliable));
 }
 
 Address Connection::GetPeerAddress(session_t peer_id)
@@ -1499,41 +1592,31 @@ u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
        LOG(dout_con << getDesc()
                        << "createPeer(): giving peer_id=" << peer_id_new << std::endl);
 
-       ConnectionCommand cmd;
-       Buffer<u8> reply(4);
-       writeU8(&reply[0], PACKET_TYPE_CONTROL);
-       writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
-       writeU16(&reply[2], peer_id_new);
-       cmd.createPeer(peer_id_new,reply);
-       putCommand(std::move(cmd));
+       {
+               Buffer<u8> reply(4);
+               writeU8(&reply[0], PACKET_TYPE_CONTROL);
+               writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
+               writeU16(&reply[2], peer_id_new);
+               putCommand(ConnectionCommand::createPeer(peer_id_new, reply));
+       }
 
        // Create peer addition event
-       ConnectionEvent e;
-       e.peerAdded(peer_id_new, sender);
-       putEvent(e);
+       putEvent(ConnectionEvent::peerAdded(peer_id_new, sender));
 
        // We're now talking to a valid peer_id
        return peer_id_new;
 }
 
-void Connection::PrintInfo(std::ostream &out)
-{
-       m_info_mutex.lock();
-       out<<getDesc()<<": ";
-       m_info_mutex.unlock();
-}
-
 const std::string Connection::getDesc()
 {
+       MutexAutoLock _(m_info_mutex);
        return std::string("con(")+
                        itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")";
 }
 
 void Connection::DisconnectPeer(session_t peer_id)
 {
-       ConnectionCommand discon;
-       discon.disconnect_peer(peer_id);
-       putCommand(discon);
+       putCommand(ConnectionCommand::disconnect_peer(peer_id));
 }
 
 void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
@@ -1545,14 +1628,12 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
                        " channel: " << (channelnum & 0xFF) <<
                        " seqnum: " << seqnum << std::endl);
 
-       ConnectionCommand c;
        SharedBuffer<u8> ack(4);
        writeU8(&ack[0], PACKET_TYPE_CONTROL);
        writeU8(&ack[1], CONTROLTYPE_ACK);
        writeU16(&ack[2], seqnum);
 
-       c.ack(peer_id, channelnum, ack);
-       putCommand(std::move(c));
+       putCommand(ConnectionCommand::ack(peer_id, channelnum, ack));
        m_sendThread->Trigger();
 }
 
index ea74ffb1ca35812a2dd76e46208bc778582fb883..1afb4ae841f63dda3d855829567a04ff02b7a8ce 100644 (file)
@@ -32,6 +32,95 @@ with this program; if not, write to the Free Software Foundation, Inc.,
 #include <vector>
 #include <map>
 
+#define MAX_UDP_PEERS 65535
+
+/*
+=== NOTES ===
+
+A packet is sent through a channel to a peer with a basic header:
+       Header (7 bytes):
+       [0] u32 protocol_id
+       [4] session_t sender_peer_id
+       [6] u8 channel
+sender_peer_id:
+       Unique to each peer.
+       value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
+       value 1 (PEER_ID_SERVER) is reserved for server
+       these constants are defined in constants.h
+channel:
+       Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist.
+*/
+#define BASE_HEADER_SIZE 7
+#define CHANNEL_COUNT 3
+
+/*
+Packet types:
+
+CONTROL: This is a packet used by the protocol.
+- When this is processed, nothing is handed to the user.
+       Header (2 byte):
+       [0] u8 type
+       [1] u8 controltype
+controltype and data description:
+       CONTROLTYPE_ACK
+               [2] u16 seqnum
+       CONTROLTYPE_SET_PEER_ID
+               [2] session_t peer_id_new
+       CONTROLTYPE_PING
+       - There is no actual reply, but this can be sent in a reliable
+         packet to get a reply
+       CONTROLTYPE_DISCO
+*/
+enum ControlType : u8 {
+       CONTROLTYPE_ACK = 0,
+       CONTROLTYPE_SET_PEER_ID = 1,
+       CONTROLTYPE_PING = 2,
+       CONTROLTYPE_DISCO = 3,
+};
+
+/*
+ORIGINAL: This is a plain packet with no control and no error
+checking at all.
+- When this is processed, it is directly handed to the user.
+       Header (1 byte):
+       [0] u8 type
+*/
+//#define TYPE_ORIGINAL 1
+#define ORIGINAL_HEADER_SIZE 1
+
+/*
+SPLIT: These are sequences of packets forming one bigger piece of
+data.
+- When processed and all the packet_nums 0...packet_count-1 are
+  present (this should be buffered), the resulting data shall be
+  directly handed to the user.
+- If the data fails to come up in a reasonable time, the buffer shall
+  be silently discarded.
+- These can be sent as-is or atop of a RELIABLE packet stream.
+       Header (7 bytes):
+       [0] u8 type
+       [1] u16 seqnum
+       [3] u16 chunk_count
+       [5] u16 chunk_num
+*/
+//#define TYPE_SPLIT 2
+
+/*
+RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
+and they shall be delivered in the same order as sent. This is done
+with a buffer in the receiving and transmitting end.
+- When this is processed, the contents of each packet is recursively
+  processed as packets.
+       Header (3 bytes):
+       [0] u8 type
+       [1] u16 seqnum
+
+*/
+//#define TYPE_RELIABLE 3
+#define RELIABLE_HEADER_SIZE 3
+#define SEQNUM_INITIAL 65500
+#define SEQNUM_MAX 65535
+
 class NetworkPacket;
 
 namespace con
@@ -46,9 +135,13 @@ typedef enum MTProtocols {
        MTP_MINETEST_RELIABLE_UDP
 } MTProtocols;
 
-#define MAX_UDP_PEERS 65535
-
-#define SEQNUM_MAX 65535
+enum PacketType : u8 {
+       PACKET_TYPE_CONTROL = 0,
+       PACKET_TYPE_ORIGINAL = 1,
+       PACKET_TYPE_SPLIT = 2,
+       PACKET_TYPE_RELIABLE = 3,
+       PACKET_TYPE_MAX
+};
 
 inline bool seqnum_higher(u16 totest, u16 base)
 {
@@ -85,24 +178,40 @@ static inline float CALC_DTIME(u64 lasttime, u64 curtime)
        return MYMAX(MYMIN(value,0.1),0.0);
 }
 
-struct BufferedPacket
-{
-       BufferedPacket(u8 *a_data, u32 a_size):
-               data(a_data, a_size)
-       {}
-       BufferedPacket(u32 a_size):
-               data(a_size)
-       {}
-       Buffer<u8> data; // Data of the packet, including headers
+/*
+       Struct for all kinds of packets. Includes following data:
+               BASE_HEADER
+               u8[] packet data (usually copied from SharedBuffer<u8>)
+*/
+struct BufferedPacket {
+       BufferedPacket(u32 a_size)
+       {
+               m_data.resize(a_size);
+               data = &m_data[0];
+       }
+
+       DISABLE_CLASS_COPY(BufferedPacket)
+
+       u16 getSeqnum() const;
+
+       inline const size_t size() const { return m_data.size(); }
+
+       u8 *data; // Direct memory access
        float time = 0.0f; // Seconds from buffering the packet or re-sending
        float totaltime = 0.0f; // Seconds from buffering the packet
        u64 absolute_send_time = -1;
        Address address; // Sender or destination
        unsigned int resend_count = 0;
+
+private:
+       std::vector<u8> m_data; // Data of the packet, including headers
 };
 
+typedef std::shared_ptr<BufferedPacket> BufferedPacketPtr;
+
+
 // This adds the base headers to the data and makes a packet out of it
-BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
+BufferedPacketPtr makePacket(Address &address, const SharedBuffer<u8> &data,
                u32 protocol_id, session_t sender_peer_id, u8 channel);
 
 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
@@ -136,101 +245,12 @@ struct IncomingSplitPacket
        std::map<u16, SharedBuffer<u8>> chunks;
 };
 
-/*
-=== NOTES ===
-
-A packet is sent through a channel to a peer with a basic header:
-       Header (7 bytes):
-       [0] u32 protocol_id
-       [4] session_t sender_peer_id
-       [6] u8 channel
-sender_peer_id:
-       Unique to each peer.
-       value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
-       value 1 (PEER_ID_SERVER) is reserved for server
-       these constants are defined in constants.h
-channel:
-       Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist.
-*/
-#define BASE_HEADER_SIZE 7
-#define CHANNEL_COUNT 3
-/*
-Packet types:
-
-CONTROL: This is a packet used by the protocol.
-- When this is processed, nothing is handed to the user.
-       Header (2 byte):
-       [0] u8 type
-       [1] u8 controltype
-controltype and data description:
-       CONTROLTYPE_ACK
-               [2] u16 seqnum
-       CONTROLTYPE_SET_PEER_ID
-               [2] session_t peer_id_new
-       CONTROLTYPE_PING
-       - There is no actual reply, but this can be sent in a reliable
-         packet to get a reply
-       CONTROLTYPE_DISCO
-*/
-//#define TYPE_CONTROL 0
-#define CONTROLTYPE_ACK 0
-#define CONTROLTYPE_SET_PEER_ID 1
-#define CONTROLTYPE_PING 2
-#define CONTROLTYPE_DISCO 3
-
-/*
-ORIGINAL: This is a plain packet with no control and no error
-checking at all.
-- When this is processed, it is directly handed to the user.
-       Header (1 byte):
-       [0] u8 type
-*/
-//#define TYPE_ORIGINAL 1
-#define ORIGINAL_HEADER_SIZE 1
-/*
-SPLIT: These are sequences of packets forming one bigger piece of
-data.
-- When processed and all the packet_nums 0...packet_count-1 are
-  present (this should be buffered), the resulting data shall be
-  directly handed to the user.
-- If the data fails to come up in a reasonable time, the buffer shall
-  be silently discarded.
-- These can be sent as-is or atop of a RELIABLE packet stream.
-       Header (7 bytes):
-       [0] u8 type
-       [1] u16 seqnum
-       [3] u16 chunk_count
-       [5] u16 chunk_num
-*/
-//#define TYPE_SPLIT 2
-/*
-RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
-and they shall be delivered in the same order as sent. This is done
-with a buffer in the receiving and transmitting end.
-- When this is processed, the contents of each packet is recursively
-  processed as packets.
-       Header (3 bytes):
-       [0] u8 type
-       [1] u16 seqnum
-
-*/
-//#define TYPE_RELIABLE 3
-#define RELIABLE_HEADER_SIZE 3
-#define SEQNUM_INITIAL 65500
-
-enum PacketType: u8 {
-       PACKET_TYPE_CONTROL = 0,
-       PACKET_TYPE_ORIGINAL = 1,
-       PACKET_TYPE_SPLIT = 2,
-       PACKET_TYPE_RELIABLE = 3,
-       PACKET_TYPE_MAX
-};
 /*
        A buffer which stores reliable packets and sorts them internally
        for fast access to the smallest one.
 */
 
-typedef std::list<BufferedPacket>::iterator RPBSearchResult;
+typedef std::list<BufferedPacketPtr>::iterator RPBSearchResult;
 
 class ReliablePacketBuffer
 {
@@ -239,12 +259,12 @@ class ReliablePacketBuffer
 
        bool getFirstSeqnum(u16& result);
 
-       BufferedPacket popFirst();
-       BufferedPacket popSeqnum(u16 seqnum);
-       void insert(const BufferedPacket &p, u16 next_expected);
+       BufferedPacketPtr popFirst();
+       BufferedPacketPtr popSeqnum(u16 seqnum);
+       void insert(BufferedPacketPtr &p_ptr, u16 next_expected);
 
        void incrementTimeouts(float dtime);
-       std::list<BufferedPacket> getTimedOuts(float timeout, u32 max_packets);
+       std::list<ConstSharedPtr<BufferedPacket>> getTimedOuts(float timeout, u32 max_packets);
 
        void print();
        bool empty();
@@ -252,10 +272,9 @@ class ReliablePacketBuffer
 
 
 private:
-       RPBSearchResult findPacket(u16 seqnum); // does not perform locking
-       inline RPBSearchResult notFound() { return m_list.end(); }
+       RPBSearchResult findPacketNoLock(u16 seqnum);
 
-       std::list<BufferedPacket> m_list;
+       std::list<BufferedPacketPtr> m_list;
 
        u16 m_oldest_non_answered_ack;
 
@@ -274,7 +293,7 @@ class IncomingSplitBuffer
                Returns a reference counted buffer of length != 0 when a full split
                packet is constructed. If not, returns one of length 0.
        */
-       SharedBuffer<u8> insert(const BufferedPacket &p, bool reliable);
+       SharedBuffer<u8> insert(BufferedPacketPtr &p_ptr, bool reliable);
 
        void removeUnreliableTimedOuts(float dtime, float timeout);
 
@@ -285,25 +304,6 @@ class IncomingSplitBuffer
        std::mutex m_map_mutex;
 };
 
-struct OutgoingPacket
-{
-       session_t peer_id;
-       u8 channelnum;
-       SharedBuffer<u8> data;
-       bool reliable;
-       bool ack;
-
-       OutgoingPacket(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
-                       bool reliable_,bool ack_=false):
-               peer_id(peer_id_),
-               channelnum(channelnum_),
-               data(data_),
-               reliable(reliable_),
-               ack(ack_)
-       {
-       }
-};
-
 enum ConnectionCommandType{
        CONNCMD_NONE,
        CONNCMD_SERVE,
@@ -316,9 +316,13 @@ enum ConnectionCommandType{
        CONCMD_CREATE_PEER
 };
 
+struct ConnectionCommand;
+typedef std::shared_ptr<ConnectionCommand> ConnectionCommandPtr;
+
+// This is very similar to ConnectionEvent
 struct ConnectionCommand
 {
-       enum ConnectionCommandType type = CONNCMD_NONE;
+       const ConnectionCommandType type;
        Address address;
        session_t peer_id = PEER_ID_INEXISTENT;
        u8 channelnum = 0;
@@ -326,48 +330,21 @@ struct ConnectionCommand
        bool reliable = false;
        bool raw = false;
 
-       ConnectionCommand() = default;
-
-       void serve(Address address_)
-       {
-               type = CONNCMD_SERVE;
-               address = address_;
-       }
-       void connect(Address address_)
-       {
-               type = CONNCMD_CONNECT;
-               address = address_;
-       }
-       void disconnect()
-       {
-               type = CONNCMD_DISCONNECT;
-       }
-       void disconnect_peer(session_t peer_id_)
-       {
-               type = CONNCMD_DISCONNECT_PEER;
-               peer_id = peer_id_;
-       }
+       DISABLE_CLASS_COPY(ConnectionCommand);
 
-       void send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, bool reliable_);
+       static ConnectionCommandPtr serve(Address address);
+       static ConnectionCommandPtr connect(Address address);
+       static ConnectionCommandPtr disconnect();
+       static ConnectionCommandPtr disconnect_peer(session_t peer_id);
+       static ConnectionCommandPtr send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable);
+       static ConnectionCommandPtr ack(session_t peer_id, u8 channelnum, const Buffer<u8> &data);
+       static ConnectionCommandPtr createPeer(session_t peer_id, const Buffer<u8> &data);
 
-       void ack(session_t peer_id_, u8 channelnum_, const Buffer<u8> &data_)
-       {
-               type = CONCMD_ACK;
-               peer_id = peer_id_;
-               channelnum = channelnum_;
-               data = data_;
-               reliable = false;
-       }
+private:
+       ConnectionCommand(ConnectionCommandType type_) :
+               type(type_) {}
 
-       void createPeer(session_t peer_id_, const Buffer<u8> &data_)
-       {
-               type = CONCMD_CREATE_PEER;
-               peer_id = peer_id_;
-               data = data_;
-               channelnum = 0;
-               reliable = true;
-               raw = true;
-       }
+       static ConnectionCommandPtr create(ConnectionCommandType type);
 };
 
 /* maximum window size to use, 0xFFFF is theoretical maximum. don't think about
@@ -402,10 +379,10 @@ class Channel
        ReliablePacketBuffer outgoing_reliables_sent;
 
        //queued reliable packets
-       std::queue<BufferedPacket> queued_reliables;
+       std::queue<BufferedPacketPtr> queued_reliables;
 
        //queue commands prior splitting to packets
-       std::deque<ConnectionCommand> queued_commands;
+       std::deque<ConnectionCommandPtr> queued_commands;
 
        IncomingSplitBuffer incoming_splits;
 
@@ -514,7 +491,7 @@ class Peer {
        public:
                friend class PeerHelper;
 
-               Peer(Address address_,u16 id_,Connection* connection) :
+               Peer(Address address_,session_t id_,Connection* connection) :
                        id(id_),
                        m_connection(connection),
                        address(address_),
@@ -528,11 +505,11 @@ class Peer {
                };
 
                // Unique id of the peer
-               u16 id;
+               const session_t id;
 
                void Drop();
 
-               virtual void PutReliableSendCommand(ConnectionCommand &c,
+               virtual void PutReliableSendCommand(ConnectionCommandPtr &c,
                                                unsigned int max_packet_size) {};
 
                virtual bool getAddress(MTProtocols type, Address& toset) = 0;
@@ -549,7 +526,7 @@ class Peer {
 
                virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
                virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
-               virtual SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
+               virtual SharedBuffer<u8> addSplitPacket(u8 channel, BufferedPacketPtr &toadd,
                                bool reliable)
                {
                        errorstream << "Peer::addSplitPacket called,"
@@ -586,7 +563,7 @@ class Peer {
                bool IncUseCount();
                void DecUseCount();
 
-               std::mutex m_exclusive_access_mutex;
+               mutable std::mutex m_exclusive_access_mutex;
 
                bool m_pending_deletion = false;
 
@@ -634,7 +611,7 @@ class UDPPeer : public Peer
        UDPPeer(u16 a_id, Address a_address, Connection* connection);
        virtual ~UDPPeer() = default;
 
-       void PutReliableSendCommand(ConnectionCommand &c,
+       void PutReliableSendCommand(ConnectionCommandPtr &c,
                                                        unsigned int max_packet_size);
 
        bool getAddress(MTProtocols type, Address& toset);
@@ -642,7 +619,7 @@ class UDPPeer : public Peer
        u16 getNextSplitSequenceNumber(u8 channel);
        void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
 
-       SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
+       SharedBuffer<u8> addSplitPacket(u8 channel, BufferedPacketPtr &toadd,
                bool reliable);
 
 protected:
@@ -671,7 +648,7 @@ class UDPPeer : public Peer
        float resend_timeout = 0.5;
 
        bool processReliableSendCommand(
-                                       ConnectionCommand &c,
+                                       ConnectionCommandPtr &c_ptr,
                                        unsigned int max_packet_size);
 };
 
@@ -679,7 +656,7 @@ class UDPPeer : public Peer
        Connection
 */
 
-enum ConnectionEventType{
+enum ConnectionEventType {
        CONNEVENT_NONE,
        CONNEVENT_DATA_RECEIVED,
        CONNEVENT_PEER_ADDED,
@@ -687,56 +664,32 @@ enum ConnectionEventType{
        CONNEVENT_BIND_FAILED,
 };
 
+struct ConnectionEvent;
+typedef std::shared_ptr<ConnectionEvent> ConnectionEventPtr;
+
+// This is very similar to ConnectionCommand
 struct ConnectionEvent
 {
-       enum ConnectionEventType type = CONNEVENT_NONE;
+       const ConnectionEventType type;
        session_t peer_id = 0;
        Buffer<u8> data;
        bool timeout = false;
        Address address;
 
-       ConnectionEvent() = default;
+       // We don't want to copy "data"
+       DISABLE_CLASS_COPY(ConnectionEvent);
 
-       const char *describe() const
-       {
-               switch(type) {
-               case CONNEVENT_NONE:
-                       return "CONNEVENT_NONE";
-               case CONNEVENT_DATA_RECEIVED:
-                       return "CONNEVENT_DATA_RECEIVED";
-               case CONNEVENT_PEER_ADDED:
-                       return "CONNEVENT_PEER_ADDED";
-               case CONNEVENT_PEER_REMOVED:
-                       return "CONNEVENT_PEER_REMOVED";
-               case CONNEVENT_BIND_FAILED:
-                       return "CONNEVENT_BIND_FAILED";
-               }
-               return "Invalid ConnectionEvent";
-       }
+       static ConnectionEventPtr create(ConnectionEventType type);
+       static ConnectionEventPtr dataReceived(session_t peer_id, const Buffer<u8> &data);
+       static ConnectionEventPtr peerAdded(session_t peer_id, Address address);
+       static ConnectionEventPtr peerRemoved(session_t peer_id, bool is_timeout, Address address);
+       static ConnectionEventPtr bindFailed();
 
-       void dataReceived(session_t peer_id_, const Buffer<u8> &data_)
-       {
-               type = CONNEVENT_DATA_RECEIVED;
-               peer_id = peer_id_;
-               data = data_;
-       }
-       void peerAdded(session_t peer_id_, Address address_)
-       {
-               type = CONNEVENT_PEER_ADDED;
-               peer_id = peer_id_;
-               address = address_;
-       }
-       void peerRemoved(session_t peer_id_, bool timeout_, Address address_)
-       {
-               type = CONNEVENT_PEER_REMOVED;
-               peer_id = peer_id_;
-               timeout = timeout_;
-               address = address_;
-       }
-       void bindFailed()
-       {
-               type = CONNEVENT_BIND_FAILED;
-       }
+       const char *describe() const;
+
+private:
+       ConnectionEvent(ConnectionEventType type_) :
+               type(type_) {}
 };
 
 class PeerHandler;
@@ -752,10 +705,9 @@ class Connection
        ~Connection();
 
        /* Interface */
-       ConnectionEvent waitEvent(u32 timeout_ms);
-       // Warning: creates an unnecessary copy, prefer putCommand(T&&) if possible
-       void putCommand(const ConnectionCommand &c);
-       void putCommand(ConnectionCommand &&c);
+       ConnectionEventPtr waitEvent(u32 timeout_ms);
+
+       void putCommand(ConnectionCommandPtr c);
 
        void SetTimeoutMs(u32 timeout) { m_bc_receive_timeout = timeout; }
        void Serve(Address bind_addr);
@@ -785,8 +737,6 @@ class Connection
 
        void sendAck(session_t peer_id, u8 channelnum, u16 seqnum);
 
-       void PrintInfo(std::ostream &out);
-
        std::vector<session_t> getPeerIDs()
        {
                MutexAutoLock peerlock(m_peers_mutex);
@@ -795,13 +745,11 @@ class Connection
 
        UDPSocket m_udpSocket;
        // Command queue: user -> SendThread
-       MutexedQueue<ConnectionCommand> m_command_queue;
+       MutexedQueue<ConnectionCommandPtr> m_command_queue;
 
        bool Receive(NetworkPacket *pkt, u32 timeout);
 
-       // Warning: creates an unnecessary copy, prefer putEvent(T&&) if possible
-       void putEvent(const ConnectionEvent &e);
-       void putEvent(ConnectionEvent &&e);
+       void putEvent(ConnectionEventPtr e);
 
        void TriggerSend();
        
@@ -811,7 +759,7 @@ class Connection
        }
 private:
        // Event queue: ReceiveThread -> user
-       MutexedQueue<ConnectionEvent> m_event_queue;
+       MutexedQueue<ConnectionEventPtr> m_event_queue;
 
        session_t m_peer_id = 0;
        u32 m_protocol_id;
@@ -823,7 +771,7 @@ class Connection
        std::unique_ptr<ConnectionSendThread> m_sendThread;
        std::unique_ptr<ConnectionReceiveThread> m_receiveThread;
 
-       std::mutex m_info_mutex;
+       mutable std::mutex m_info_mutex;
 
        // Backwards compatibility
        PeerHandler *m_bc_peerhandler;
index a306ced9bb1d5655ec19286cc83d320b09ec6850..dca065ae1fc7ce25e50cd728c604f4fe05bfc711 100644 (file)
@@ -50,11 +50,11 @@ std::mutex log_conthread_mutex;
 
 #define WINDOW_SIZE 5
 
-static session_t readPeerId(u8 *packetdata)
+static session_t readPeerId(const u8 *packetdata)
 {
        return readU16(&packetdata[4]);
 }
-static u8 readChannel(u8 *packetdata)
+static u8 readChannel(const u8 *packetdata)
 {
        return readU8(&packetdata[6]);
 }
@@ -114,9 +114,9 @@ void *ConnectionSendThread::run()
                }
 
                /* translate commands to packets */
-               ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
-               while (c.type != CONNCMD_NONE) {
-                       if (c.reliable)
+               auto c = m_connection->m_command_queue.pop_frontNoEx(0);
+               while (c && c->type != CONNCMD_NONE) {
+                       if (c->reliable)
                                processReliableCommand(c);
                        else
                                processNonReliableCommand(c);
@@ -227,21 +227,21 @@ void ConnectionSendThread::runTimeouts(float dtime)
                        m_iteration_packets_avaialble -= timed_outs.size();
 
                        for (const auto &k : timed_outs) {
-                               u8 channelnum = readChannel(*k.data);
-                               u16 seqnum = readU16(&(k.data[BASE_HEADER_SIZE + 1]));
+                               u8 channelnum = readChannel(k->data);
+                               u16 seqnum = k->getSeqnum();
 
-                               channel.UpdateBytesLost(k.data.getSize());
+                               channel.UpdateBytesLost(k->size());
 
                                LOG(derr_con << m_connection->getDesc()
                                        << "RE-SENDING timed-out RELIABLE to "
-                                       << k.address.serializeString()
+                                       << k->address.serializeString()
                                        << "(t/o=" << resend_timeout << "): "
-                                       << "count=" << k.resend_count
+                                       << "count=" << k->resend_count
                                        << ", channel=" << ((int) channelnum & 0xff)
                                        << ", seqnum=" << seqnum
                                        << std::endl);
 
-                               rawSend(k);
+                               rawSend(k.get());
 
                                // do not handle rtt here as we can't decide if this packet was
                                // lost or really takes more time to transmit
@@ -274,25 +274,24 @@ void ConnectionSendThread::runTimeouts(float dtime)
        }
 }
 
-void ConnectionSendThread::rawSend(const BufferedPacket &packet)
+void ConnectionSendThread::rawSend(const BufferedPacket *p)
 {
        try {
-               m_connection->m_udpSocket.Send(packet.address, *packet.data,
-                       packet.data.getSize());
+               m_connection->m_udpSocket.Send(p->address, p->data, p->size());
                LOG(dout_con << m_connection->getDesc()
-                       << " rawSend: " << packet.data.getSize()
+                       << " rawSend: " << p->size()
                        << " bytes sent" << std::endl);
        } catch (SendFailedException &e) {
                LOG(derr_con << m_connection->getDesc()
                        << "Connection::rawSend(): SendFailedException: "
-                       << packet.address.serializeString() << std::endl);
+                       << p->address.serializeString() << std::endl);
        }
 }
 
-void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *channel)
+void ConnectionSendThread::sendAsPacketReliable(BufferedPacketPtr &p, Channel *channel)
 {
        try {
-               p.absolute_send_time = porting::getTimeMs();
+               p->absolute_send_time = porting::getTimeMs();
                // Buffer the packet
                channel->outgoing_reliables_sent.insert(p,
                        (channel->readOutgoingSequenceNumber() - MAX_RELIABLE_WINDOW_SIZE)
@@ -305,7 +304,7 @@ void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *chan
        }
 
        // Send the packet
-       rawSend(p);
+       rawSend(p.get());
 }
 
 bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
@@ -321,11 +320,10 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
        Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
 
        if (reliable) {
-               bool have_sequence_number_for_raw_packet = true;
-               u16 seqnum =
-                       channel->getOutgoingSequenceNumber(have_sequence_number_for_raw_packet);
+               bool have_seqnum = false;
+               const u16 seqnum = channel->getOutgoingSequenceNumber(have_seqnum);
 
-               if (!have_sequence_number_for_raw_packet)
+               if (!have_seqnum)
                        return false;
 
                SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
@@ -333,13 +331,12 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
                peer->getAddress(MTP_MINETEST_RELIABLE_UDP, peer_address);
 
                // Add base headers and make a packet
-               BufferedPacket p = con::makePacket(peer_address, reliable,
+               BufferedPacketPtr p = con::makePacket(peer_address, reliable,
                        m_connection->GetProtocolID(), m_connection->GetPeerID(),
                        channelnum);
 
                // first check if our send window is already maxed out
-               if (channel->outgoing_reliables_sent.size()
-                       < channel->getWindowSize()) {
+               if (channel->outgoing_reliables_sent.size() < channel->getWindowSize()) {
                        LOG(dout_con << m_connection->getDesc()
                                << " INFO: sending a reliable packet to peer_id " << peer_id
                                << " channel: " << (u32)channelnum
@@ -352,19 +349,19 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
                        << " INFO: queueing reliable packet for peer_id: " << peer_id
                        << " channel: " << (u32)channelnum
                        << " seqnum: " << seqnum << std::endl);
-               channel->queued_reliables.push(std::move(p));
+               channel->queued_reliables.push(p);
                return false;
        }
 
        Address peer_address;
        if (peer->getAddress(MTP_UDP, peer_address)) {
                // Add base headers and make a packet
-               BufferedPacket p = con::makePacket(peer_address, data,
+               BufferedPacketPtr p = con::makePacket(peer_address, data,
                        m_connection->GetProtocolID(), m_connection->GetPeerID(),
                        channelnum);
 
                // Send the packet
-               rawSend(p);
+               rawSend(p.get());
                return true;
        }
 
@@ -374,11 +371,11 @@ bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
        return false;
 }
 
-void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
+void ConnectionSendThread::processReliableCommand(ConnectionCommandPtr &c)
 {
-       assert(c.reliable);  // Pre-condition
+       assert(c->reliable);  // Pre-condition
 
-       switch (c.type) {
+       switch (c->type) {
                case CONNCMD_NONE:
                        LOG(dout_con << m_connection->getDesc()
                                << "UDP processing reliable CONNCMD_NONE" << std::endl);
@@ -399,7 +396,7 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
                case CONCMD_CREATE_PEER:
                        LOG(dout_con << m_connection->getDesc()
                                << "UDP processing reliable CONCMD_CREATE_PEER" << std::endl);
-                       if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
+                       if (!rawSendAsPacket(c->peer_id, c->channelnum, c->data, c->reliable)) {
                                /* put to queue if we couldn't send it immediately */
                                sendReliable(c);
                        }
@@ -412,13 +409,14 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
                        FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
                default:
                        LOG(dout_con << m_connection->getDesc()
-                               << " Invalid reliable command type: " << c.type << std::endl);
+                               << " Invalid reliable command type: " << c->type << std::endl);
        }
 }
 
 
-void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
+void ConnectionSendThread::processNonReliableCommand(ConnectionCommandPtr &c_ptr)
 {
+       const ConnectionCommand &c = *c_ptr;
        assert(!c.reliable); // Pre-condition
 
        switch (c.type) {
@@ -480,9 +478,7 @@ void ConnectionSendThread::serve(Address bind_address)
        }
        catch (SocketException &e) {
                // Create event
-               ConnectionEvent ce;
-               ce.bindFailed();
-               m_connection->putEvent(ce);
+               m_connection->putEvent(ConnectionEvent::bindFailed());
        }
 }
 
@@ -495,9 +491,7 @@ void ConnectionSendThread::connect(Address address)
        UDPPeer *peer = m_connection->createServerPeer(address);
 
        // Create event
-       ConnectionEvent e;
-       e.peerAdded(peer->id, peer->address);
-       m_connection->putEvent(e);
+       m_connection->putEvent(ConnectionEvent::peerAdded(peer->id, peer->address));
 
        Address bind_addr;
 
@@ -586,9 +580,9 @@ void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
        }
 }
 
-void ConnectionSendThread::sendReliable(ConnectionCommand &c)
+void ConnectionSendThread::sendReliable(ConnectionCommandPtr &c)
 {
-       PeerHelper peer = m_connection->getPeerNoEx(c.peer_id);
+       PeerHelper peer = m_connection->getPeerNoEx(c->peer_id);
        if (!peer)
                return;
 
@@ -604,7 +598,7 @@ void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data
        }
 }
 
-void ConnectionSendThread::sendToAllReliable(ConnectionCommand &c)
+void ConnectionSendThread::sendToAllReliable(ConnectionCommandPtr &c)
 {
        std::vector<session_t> peerids = m_connection->getPeerIDs();
 
@@ -663,8 +657,12 @@ void ConnectionSendThread::sendPackets(float dtime)
                // first send queued reliable packets for all peers (if possible)
                for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
                        Channel &channel = udpPeer->channels[i];
-                       u16 next_to_ack = 0;
 
+                       // Reduces logging verbosity
+                       if (channel.queued_reliables.empty())
+                               continue;
+
+                       u16 next_to_ack = 0;
                        channel.outgoing_reliables_sent.getFirstSeqnum(next_to_ack);
                        u16 next_to_receive = 0;
                        channel.incoming_reliables.getFirstSeqnum(next_to_receive);
@@ -694,13 +692,13 @@ void ConnectionSendThread::sendPackets(float dtime)
                                        channel.outgoing_reliables_sent.size()
                                        < channel.getWindowSize() &&
                                        peer->m_increment_packets_remaining > 0) {
-                               BufferedPacket p = std::move(channel.queued_reliables.front());
+                               BufferedPacketPtr p = channel.queued_reliables.front();
                                channel.queued_reliables.pop();
 
                                LOG(dout_con << m_connection->getDesc()
                                        << " INFO: sending a queued reliable packet "
                                        << " channel: " << i
-                                       << ", seqnum: " << readU16(&p.data[BASE_HEADER_SIZE + 1])
+                                       << ", seqnum: " << p->getSeqnum()
                                        << std::endl);
 
                                sendAsPacketReliable(p, &channel);
@@ -881,17 +879,14 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
        try {
                // First, see if there any buffered packets we can process now
                if (packet_queued) {
-                       bool data_left = true;
                        session_t peer_id;
                        SharedBuffer<u8> resultdata;
-                       while (data_left) {
+                       while (true) {
                                try {
-                                       data_left = getFromBuffers(peer_id, resultdata);
-                                       if (data_left) {
-                                               ConnectionEvent e;
-                                               e.dataReceived(peer_id, resultdata);
-                                               m_connection->putEvent(std::move(e));
-                                       }
+                                       if (!getFromBuffers(peer_id, resultdata))
+                                               break;
+
+                                       m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
                                }
                                catch (ProcessedSilentlyException &e) {
                                        /* try reading again */
@@ -908,7 +903,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
                        return;
 
                if ((received_size < BASE_HEADER_SIZE) ||
-                       (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
+                               (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
                        LOG(derr_con << m_connection->getDesc()
                                << "Receive(): Invalid incoming packet, "
                                << "size: " << received_size
@@ -999,9 +994,7 @@ void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
                                << ", channel: " << (u32)channelnum << ", returned "
                                << resultdata.getSize() << " bytes" << std::endl);
 
-                       ConnectionEvent e;
-                       e.dataReceived(peer_id, resultdata);
-                       m_connection->putEvent(std::move(e));
+                       m_connection->putEvent(ConnectionEvent::dataReceived(peer_id, resultdata));
                }
                catch (ProcessedSilentlyException &e) {
                }
@@ -1026,10 +1019,11 @@ bool ConnectionReceiveThread::getFromBuffers(session_t &peer_id, SharedBuffer<u8
                if (!peer)
                        continue;
 
-               if (dynamic_cast<UDPPeer *>(&peer) == 0)
+               UDPPeer *p = dynamic_cast<UDPPeer *>(&peer);
+               if (!p)
                        continue;
 
-               for (Channel &channel : (dynamic_cast<UDPPeer *>(&peer))->channels) {
+               for (Channel &channel : p->channels) {
                        if (checkIncomingBuffers(&channel, peer_id, dst)) {
                                return true;
                        }
@@ -1042,32 +1036,34 @@ bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel,
        session_t &peer_id, SharedBuffer<u8> &dst)
 {
        u16 firstseqnum = 0;
-       if (channel->incoming_reliables.getFirstSeqnum(firstseqnum)) {
-               if (firstseqnum == channel->readNextIncomingSeqNum()) {
-                       BufferedPacket p = channel->incoming_reliables.popFirst();
-                       peer_id = readPeerId(*p.data);
-                       u8 channelnum = readChannel(*p.data);
-                       u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
+       if (!channel->incoming_reliables.getFirstSeqnum(firstseqnum))
+               return false;
 
-                       LOG(dout_con << m_connection->getDesc()
-                               << "UNBUFFERING TYPE_RELIABLE"
-                               << " seqnum=" << seqnum
-                               << " peer_id=" << peer_id
-                               << " channel=" << ((int) channelnum & 0xff)
-                               << std::endl);
+       if (firstseqnum != channel->readNextIncomingSeqNum())
+               return false;
 
-                       channel->incNextIncomingSeqNum();
+       BufferedPacketPtr p = channel->incoming_reliables.popFirst();
 
-                       u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
-                       // Get out the inside packet and re-process it
-                       SharedBuffer<u8> payload(p.data.getSize() - headers_size);
-                       memcpy(*payload, &p.data[headers_size], payload.getSize());
+       peer_id = readPeerId(p->data); // Carried over to caller function
+       u8 channelnum = readChannel(p->data);
+       u16 seqnum = p->getSeqnum();
 
-                       dst = processPacket(channel, payload, peer_id, channelnum, true);
-                       return true;
-               }
-       }
-       return false;
+       LOG(dout_con << m_connection->getDesc()
+               << "UNBUFFERING TYPE_RELIABLE"
+               << " seqnum=" << seqnum
+               << " peer_id=" << peer_id
+               << " channel=" << ((int) channelnum & 0xff)
+               << std::endl);
+
+       channel->incNextIncomingSeqNum();
+
+       u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
+       // Get out the inside packet and re-process it
+       SharedBuffer<u8> payload(p->size() - headers_size);
+       memcpy(*payload, &p->data[headers_size], payload.getSize());
+
+       dst = processPacket(channel, payload, peer_id, channelnum, true);
+       return true;
 }
 
 SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
@@ -1115,7 +1111,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
        if (packetdata.getSize() < 2)
                throw InvalidIncomingDataException("packetdata.getSize() < 2");
 
-       u8 controltype = readU8(&(packetdata[1]));
+       ControlType controltype = (ControlType)readU8(&(packetdata[1]));
 
        if (controltype == CONTROLTYPE_ACK) {
                assert(channel != NULL);
@@ -1131,7 +1127,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
                        << seqnum << " ]" << std::endl);
 
                try {
-                       BufferedPacket p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
+                       BufferedPacketPtr p = channel->outgoing_reliables_sent.popSeqnum(seqnum);
 
                        // the rtt calculation will be a bit off for re-sent packets but that's okay
                        {
@@ -1140,14 +1136,14 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
 
                                // a overflow is quite unlikely but as it'd result in major
                                // rtt miscalculation we handle it here
-                               if (current_time > p.absolute_send_time) {
-                                       float rtt = (current_time - p.absolute_send_time) / 1000.0;
+                               if (current_time > p->absolute_send_time) {
+                                       float rtt = (current_time - p->absolute_send_time) / 1000.0;
 
                                        // Let peer calculate stuff according to it
                                        // (avg_rtt and resend_timeout)
                                        dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
-                               } else if (p.totaltime > 0) {
-                                       float rtt = p.totaltime;
+                               } else if (p->totaltime > 0) {
+                                       float rtt = p->totaltime;
 
                                        // Let peer calculate stuff according to it
                                        // (avg_rtt and resend_timeout)
@@ -1156,7 +1152,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
                        }
 
                        // put bytes for max bandwidth calculation
-                       channel->UpdateBytesSent(p.data.getSize(), 1);
+                       channel->UpdateBytesSent(p->size(), 1);
                        if (channel->outgoing_reliables_sent.size() == 0)
                                m_connection->TriggerSend();
                } catch (NotFoundException &e) {
@@ -1204,7 +1200,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *chan
                throw ProcessedSilentlyException("Got a DISCO");
        } else {
                LOG(derr_con << m_connection->getDesc()
-                       << "INVALID TYPE_CONTROL: invalid controltype="
+                       << "INVALID controltype="
                        << ((int) controltype & 0xff) << std::endl);
                throw InvalidIncomingDataException("Invalid control type");
        }
@@ -1232,7 +1228,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channe
        if (peer->getAddress(MTP_UDP, peer_address)) {
                // We have to create a packet again for buffering
                // This isn't actually too bad an idea.
-               BufferedPacket packet = makePacket(peer_address,
+               BufferedPacketPtr packet = con::makePacket(peer_address,
                        packetdata,
                        m_connection->GetProtocolID(),
                        peer->id,
@@ -1267,7 +1263,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
        if (packetdata.getSize() < RELIABLE_HEADER_SIZE)
                throw InvalidIncomingDataException("packetdata.getSize() < RELIABLE_HEADER_SIZE");
 
-       u16 seqnum = readU16(&packetdata[1]);
+       const u16 seqnum = readU16(&packetdata[1]);
        bool is_future_packet = false;
        bool is_old_packet = false;
 
@@ -1311,7 +1307,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
                // This one comes later, buffer it.
                // Actually we have to make a packet to buffer one.
                // Well, we have all the ingredients, so just do it.
-               BufferedPacket packet = con::makePacket(
+               BufferedPacketPtr packet = con::makePacket(
                        peer_address,
                        packetdata,
                        m_connection->GetProtocolID(),
@@ -1328,9 +1324,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
                        throw ProcessedQueued("Buffered future reliable packet");
                } catch (AlreadyExistsException &e) {
                } catch (IncomingDataCorruption &e) {
-                       ConnectionCommand discon;
-                       discon.disconnect_peer(peer->id);
-                       m_connection->putCommand(discon);
+                       m_connection->putCommand(ConnectionCommand::disconnect_peer(peer->id));
 
                        LOG(derr_con << m_connection->getDesc()
                                << "INVALID, TYPE_RELIABLE peer_id: " << peer->id
@@ -1351,7 +1345,7 @@ SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *cha
        u16 queued_seqnum = 0;
        if (channel->incoming_reliables.getFirstSeqnum(queued_seqnum)) {
                if (queued_seqnum == seqnum) {
-                       BufferedPacket queued_packet = channel->incoming_reliables.popFirst();
+                       BufferedPacketPtr queued_packet = channel->incoming_reliables.popFirst();
                        /** TODO find a way to verify the new against the old packet */
                }
        }
index 612407c3b5cf9190c6e665bbdb62736dc436ee99..c2e2dae123223c115b2cf022d875a506af99b6d3 100644 (file)
@@ -29,6 +29,25 @@ namespace con
 
 class Connection;
 
+struct OutgoingPacket
+{
+       session_t peer_id;
+       u8 channelnum;
+       SharedBuffer<u8> data;
+       bool reliable;
+       bool ack;
+
+       OutgoingPacket(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
+                       bool reliable_,bool ack_=false):
+               peer_id(peer_id_),
+               channelnum(channelnum_),
+               data(data_),
+               reliable(reliable_),
+               ack(ack_)
+       {
+       }
+};
+
 class ConnectionSendThread : public Thread
 {
 
@@ -51,27 +70,27 @@ class ConnectionSendThread : public Thread
 
 private:
        void runTimeouts(float dtime);
-       void rawSend(const BufferedPacket &packet);
+       void rawSend(const BufferedPacket *p);
        bool rawSendAsPacket(session_t peer_id, u8 channelnum,
                        const SharedBuffer<u8> &data, bool reliable);
 
-       void processReliableCommand(ConnectionCommand &c);
-       void processNonReliableCommand(ConnectionCommand &c);
+       void processReliableCommand(ConnectionCommandPtr &c);
+       void processNonReliableCommand(ConnectionCommandPtr &c);
        void serve(Address bind_address);
        void connect(Address address);
        void disconnect();
        void disconnect_peer(session_t peer_id);
        void send(session_t peer_id, u8 channelnum, const SharedBuffer<u8> &data);
-       void sendReliable(ConnectionCommand &c);
+       void sendReliable(ConnectionCommandPtr &c);
        void sendToAll(u8 channelnum, const SharedBuffer<u8> &data);
-       void sendToAllReliable(ConnectionCommand &c);
+       void sendToAllReliable(ConnectionCommandPtr &c);
 
        void sendPackets(float dtime);
 
        void sendAsPacket(session_t peer_id, u8 channelnum, const SharedBuffer<u8> &data,
                        bool ack = false);
 
-       void sendAsPacketReliable(BufferedPacket &p, Channel *channel);
+       void sendAsPacketReliable(BufferedPacketPtr &p, Channel *channel);
 
        bool packetsQueued();
 
index 23b7e91057a03239e68e5d6969ed80280d97ec44..04fea90d6a09072fe700cf36d321b8ca4e54404a 100644 (file)
@@ -124,7 +124,7 @@ void TestConnection::testHelpers()
        Address a(127,0,0,1, 10);
        const u16 seqnum = 34352;
 
-       con::BufferedPacket p1 = con::makePacket(a, data1,
+       con::BufferedPacketPtr p1 = con::makePacket(a, data1,
                        proto_id, peer_id, channel);
        /*
                We should now have a packet with this data:
@@ -135,10 +135,10 @@ void TestConnection::testHelpers()
                Data:
                        [7] u8 data1[0]
        */
-       UASSERT(readU32(&p1.data[0]) == proto_id);
-       UASSERT(readU16(&p1.data[4]) == peer_id);
-       UASSERT(readU8(&p1.data[6]) == channel);
-       UASSERT(readU8(&p1.data[7]) == data1[0]);
+       UASSERT(readU32(&p1->data[0]) == proto_id);
+       UASSERT(readU16(&p1->data[4]) == peer_id);
+       UASSERT(readU8(&p1->data[6]) == channel);
+       UASSERT(readU8(&p1->data[7]) == data1[0]);
 
        //infostream<<"initial data1[0]="<<((u32)data1[0]&0xff)<<std::endl;
 
index 7fc5de551d207aae26ef9ede883758cbc783b98f..245ac85bfbc028ab808e28ac5353053a331542f8 100644 (file)
@@ -22,6 +22,21 @@ with this program; if not, write to the Free Software Foundation, Inc.,
 #include "irrlichttypes.h"
 #include "debug.h" // For assert()
 #include <cstring>
+#include <memory> // std::shared_ptr
+
+
+template<typename T> class ConstSharedPtr {
+public:
+       ConstSharedPtr(T *ptr) : ptr(ptr) {}
+       ConstSharedPtr(const std::shared_ptr<T> &ptr) : ptr(ptr) {}
+
+       const T* get() const noexcept { return ptr.get(); }
+       const T& operator*() const noexcept { return *ptr.get(); }
+       const T* operator->() const noexcept { return ptr.get(); }
+
+private:
+       std::shared_ptr<T> ptr;
+};
 
 template <typename T>
 class Buffer
@@ -40,17 +55,11 @@ class Buffer
                else
                        data = NULL;
        }
-       Buffer(const Buffer &buffer)
-       {
-               m_size = buffer.m_size;
-               if(m_size != 0)
-               {
-                       data = new T[buffer.m_size];
-                       memcpy(data, buffer.data, buffer.m_size);
-               }
-               else
-                       data = NULL;
-       }
+
+       // Disable class copy
+       Buffer(const Buffer &) = delete;
+       Buffer &operator=(const Buffer &) = delete;
+
        Buffer(Buffer &&buffer)
        {
                m_size = buffer.m_size;
@@ -81,21 +90,6 @@ class Buffer
                drop();
        }
 
-       Buffer& operator=(const Buffer &buffer)
-       {
-               if(this == &buffer)
-                       return *this;
-               drop();
-               m_size = buffer.m_size;
-               if(m_size != 0)
-               {
-                       data = new T[buffer.m_size];
-                       memcpy(data, buffer.data, buffer.m_size);
-               }
-               else
-                       data = NULL;
-               return *this;
-       }
        Buffer& operator=(Buffer &&buffer)
        {
                if(this == &buffer)
@@ -113,6 +107,18 @@ class Buffer
                return *this;
        }
 
+       void copyTo(Buffer &buffer) const
+       {
+               buffer.drop();
+               buffer.m_size = m_size;
+               if (m_size != 0) {
+                       buffer.data = new T[m_size];
+                       memcpy(buffer.data, data, m_size);
+               } else {
+                       buffer.data = nullptr;
+               }
+       }
+
        T & operator[](unsigned int i) const
        {
                return data[i];