X-Git-Url: https://git.lizzy.rs/?a=blobdiff_plain;f=src%2Fnetwork%2Fconnection.cpp;h=0ba8c36b28feef2cf22babd7e2c1ab93240bae6c;hb=674d67f312c815e7f10dc00705e352bc392fc2af;hp=495b15426b85491d769e792bf25d37399d9055dc;hpb=d7d451c647f1fff7983178a9817888f4d0bab695;p=minetest.git diff --git a/src/network/connection.cpp b/src/network/connection.cpp index 495b15426..0ba8c36b2 100644 --- a/src/network/connection.cpp +++ b/src/network/connection.cpp @@ -41,22 +41,28 @@ namespace con /* defines used for debugging and profiling */ /******************************************************************************/ #ifdef NDEBUG -#define LOG(a) a -#define PROFILE(a) + #define LOG(a) a + #define PROFILE(a) #else -/* this mutex is used to achieve log message consistency */ -std::mutex log_message_mutex; -#define LOG(a) \ - { \ - MutexAutoLock loglock(log_message_mutex); \ - a; \ - } -#define PROFILE(a) a + #if 0 + /* this mutex is used to achieve log message consistency */ + std::mutex log_message_mutex; + #define LOG(a) \ + { \ + MutexAutoLock loglock(log_message_mutex); \ + a; \ + } + #else + // Prevent deadlocks until a solution is found after 5.2.0 (TODO) + #define LOG(a) a + #endif + + #define PROFILE(a) a #endif #define PING_TIMEOUT 5.0 -BufferedPacket makePacket(Address &address, SharedBuffer data, +BufferedPacket makePacket(Address &address, const SharedBuffer &data, u32 protocol_id, session_t sender_peer_id, u8 channel) { u32 packet_size = data.getSize() + BASE_HEADER_SIZE; @@ -126,7 +132,7 @@ void makeSplitPacket(const SharedBuffer &data, u32 chunksize_max, u16 seqnum } } -void makeAutoSplitPacket(SharedBuffer data, u32 chunksize_max, +void makeAutoSplitPacket(const SharedBuffer &data, u32 chunksize_max, u16 &split_seqnum, std::list> *list) { u32 original_header_size = 1; @@ -140,7 +146,7 @@ void makeAutoSplitPacket(SharedBuffer data, u32 chunksize_max, list->push_back(makeOriginalPacket(data)); } -SharedBuffer makeReliablePacket(SharedBuffer data, u16 seqnum) +SharedBuffer makeReliablePacket(const SharedBuffer &data, u16 seqnum) { u32 header_size = 3; u32 packet_size = data.getSize() + header_size; @@ -169,6 +175,7 @@ void ReliablePacketBuffer::print() index++; } } + bool ReliablePacketBuffer::empty() { MutexAutoLock listlock(m_list_mutex); @@ -177,12 +184,8 @@ bool ReliablePacketBuffer::empty() u32 ReliablePacketBuffer::size() { - return m_list_size; -} - -bool ReliablePacketBuffer::containsPacket(u16 seqnum) -{ - return !(findPacket(seqnum) == m_list.end()); + MutexAutoLock listlock(m_list_mutex); + return m_list.size(); } RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum) @@ -191,24 +194,24 @@ RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum) for(; i != m_list.end(); ++i) { u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); - /*dout_con<<"findPacket(): finding seqnum="<data[BASE_HEADER_SIZE + 1]); } return p; } + BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) { MutexAutoLock listlock(m_list_mutex); @@ -249,15 +252,17 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) } m_list.erase(r); - --m_list_size; - if (m_list_size == 0) - { m_oldest_non_answered_ack = 0; } - else - { m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]); } + if (m_list.empty()) { + m_oldest_non_answered_ack = 0; + } else { + m_oldest_non_answered_ack = + readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]); + } return p; } -void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) + +void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected) { MutexAutoLock listlock(m_list_mutex); if (p.data.getSize() < BASE_HEADER_SIZE + 3) { @@ -284,8 +289,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) return; } - ++m_list_size; - sanity_check(m_list_size <= SEQNUM_MAX+1); // FIXME: Handle the error? + sanity_check(m_list.size() <= SEQNUM_MAX); // FIXME: Handle the error? // Find the right place for the packet and insert it there // If list is empty, just add it @@ -322,6 +326,8 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) } if (s == seqnum) { + /* nothing to do this seems to be a resent packet */ + /* for paranoia reason data should be compared */ if ( (readU16(&(i->data[BASE_HEADER_SIZE+1])) != seqnum) || (i->data.getSize() != p.data.getSize()) || @@ -340,16 +346,11 @@ void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected) p.address.serializeString().c_str()); throw IncomingDataCorruption("duplicated packet isn't same as original one"); } - - /* nothing to do this seems to be a resent packet */ - /* for paranoia reason data should be compared */ - --m_list_size; } /* insert or push back */ else if (i != m_list.end()) { m_list.insert(i, p); - } - else { + } else { m_list.push_back(p); } @@ -384,6 +385,48 @@ std::list ReliablePacketBuffer::getTimedOuts(float timeout, return timed_outs; } +/* + IncomingSplitPacket +*/ + +bool IncomingSplitPacket::insert(u32 chunk_num, SharedBuffer &chunkdata) +{ + sanity_check(chunk_num < chunk_count); + + // If chunk already exists, ignore it. + // Sometimes two identical packets may arrive when there is network + // lag and the server re-sends stuff. + if (chunks.find(chunk_num) != chunks.end()) + return false; + + // Set chunk data in buffer + chunks[chunk_num] = chunkdata; + + return true; +} + +SharedBuffer IncomingSplitPacket::reassemble() +{ + sanity_check(allReceived()); + + // Calculate total size + u32 totalsize = 0; + for (const auto &chunk : chunks) + totalsize += chunk.second.getSize(); + + SharedBuffer fulldata(totalsize); + + // Copy chunks to data buffer + u32 start = 0; + for (u32 chunk_i = 0; chunk_i < chunk_count; chunk_i++) { + const SharedBuffer &buf = chunks[chunk_i]; + memcpy(&fulldata[start], *buf, buf.getSize()); + start += buf.getSize(); + } + + return fulldata; +} + /* IncomingSplitBuffer */ @@ -395,10 +438,7 @@ IncomingSplitBuffer::~IncomingSplitBuffer() delete i.second; } } -/* - This will throw a GotSplitPacketException when a full - split packet is constructed. -*/ + SharedBuffer IncomingSplitBuffer::insert(const BufferedPacket &p, bool reliable) { MutexAutoLock listlock(m_map_mutex); @@ -417,57 +457,45 @@ SharedBuffer IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia << std::endl; return SharedBuffer(); } + if (chunk_num >= chunk_count) { + errorstream << "IncomingSplitBuffer::insert(): chunk_num=" << chunk_num + << " >= chunk_count=" << chunk_count << std::endl; + return SharedBuffer(); + } // Add if doesn't exist + IncomingSplitPacket *sp; if (m_buf.find(seqnum) == m_buf.end()) { - m_buf[seqnum] = new IncomingSplitPacket(chunk_count, reliable); + sp = new IncomingSplitPacket(chunk_count, reliable); + m_buf[seqnum] = sp; + } else { + sp = m_buf[seqnum]; } - IncomingSplitPacket *sp = m_buf[seqnum]; - - if (chunk_count != sp->chunk_count) - LOG(derr_con<<"Connection: WARNING: chunk_count="<chunk_count="<chunk_count - <chunk_count) { + errorstream << "IncomingSplitBuffer::insert(): chunk_count=" + << chunk_count << " != sp->chunk_count=" << sp->chunk_count + << std::endl; + return SharedBuffer(); + } if (reliable != sp->reliable) LOG(derr_con<<"Connection: WARNING: reliable="<reliable="<reliable <chunks.find(chunk_num) != sp->chunks.end()) - return SharedBuffer(); - // Cut chunk data out of packet u32 chunkdatasize = p.data.getSize() - headersize; SharedBuffer chunkdata(chunkdatasize); memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize); - // Set chunk data in buffer - sp->chunks[chunk_num] = chunkdata; + if (!sp->insert(chunk_num, chunkdata)) + return SharedBuffer(); // If not all chunks are received, return empty buffer if (!sp->allReceived()) return SharedBuffer(); - // Calculate total size - u32 totalsize = 0; - for (const auto &chunk : sp->chunks) { - totalsize += chunk.second.getSize(); - } - - SharedBuffer fulldata(totalsize); - - // Copy chunks to data buffer - u32 start = 0; - for (u32 chunk_i=0; chunk_ichunk_count; chunk_i++) { - const SharedBuffer &buf = sp->chunks[chunk_i]; - u16 buf_chunkdatasize = buf.getSize(); - memcpy(&fulldata[start], *buf, buf_chunkdatasize); - start += buf_chunkdatasize; - } + SharedBuffer fulldata = sp->reassemble(); // Remove sp from buffer m_buf.erase(seqnum); @@ -475,6 +503,7 @@ SharedBuffer IncomingSplitBuffer::insert(const BufferedPacket &p, bool relia return fulldata; } + void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) { std::deque remove_queue; @@ -809,9 +838,8 @@ void Peer::DecUseCount() delete this; } -void Peer::RTTStatistics(float rtt, const std::string &profiler_id) -{ - static const float avg_factor = 100.0f / MAX_RELIABLE_WINDOW_SIZE; +void Peer::RTTStatistics(float rtt, const std::string &profiler_id, + unsigned int num_samples) { if (m_last_rtt > 0) { /* set min max values */ @@ -822,14 +850,21 @@ void Peer::RTTStatistics(float rtt, const std::string &profiler_id) /* do average calculation */ if (m_rtt.avg_rtt < 0.0) - m_rtt.avg_rtt = rtt; + m_rtt.avg_rtt = rtt; else - m_rtt.avg_rtt += (rtt - m_rtt.avg_rtt) * avg_factor; + m_rtt.avg_rtt = m_rtt.avg_rtt * (num_samples/(num_samples-1)) + + rtt * (1/num_samples); /* do jitter calculation */ //just use some neutral value at beginning - float jitter = std::fabs(rtt - m_last_rtt); + float jitter = m_rtt.jitter_min; + + if (rtt > m_last_rtt) + jitter = rtt-m_last_rtt; + + if (rtt <= m_last_rtt) + jitter = m_last_rtt - rtt; if (jitter < m_rtt.jitter_min) m_rtt.jitter_min = jitter; @@ -837,13 +872,14 @@ void Peer::RTTStatistics(float rtt, const std::string &profiler_id) m_rtt.jitter_max = jitter; if (m_rtt.jitter_avg < 0.0) - m_rtt.jitter_avg = jitter; + m_rtt.jitter_avg = jitter; else - m_rtt.jitter_avg += (jitter - m_rtt.jitter_avg) * avg_factor; + m_rtt.jitter_avg = m_rtt.jitter_avg * (num_samples/(num_samples-1)) + + jitter * (1/num_samples); if (!profiler_id.empty()) { - g_profiler->graphAdd(profiler_id + "_rtt", rtt); - g_profiler->graphAdd(profiler_id + "_jitter", jitter); + g_profiler->graphAdd(profiler_id + " RTT [ms]", rtt * 1000.f); + g_profiler->graphAdd(profiler_id + " jitter [ms]", jitter * 1000.f); } } /* save values required for next loop */ @@ -888,7 +924,7 @@ UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) : Peer(a_address,a_id,connection) { for (Channel &channel : channels) - channel.setWindowSize(g_settings->getU16("max_packets_per_iteration")); + channel.setWindowSize(START_RELIABLE_WINDOW_SIZE); } bool UDPPeer::getAddress(MTProtocols type,Address& toset) @@ -904,12 +940,16 @@ bool UDPPeer::getAddress(MTProtocols type,Address& toset) void UDPPeer::reportRTT(float rtt) { - assert(rtt >= 0.0f); - - RTTStatistics(rtt, "rudp"); + if (rtt < 0.0) { + return; + } + RTTStatistics(rtt,"rudp",MAX_RELIABLE_WINDOW_SIZE*10); float timeout = getStat(AVG_RTT) * RESEND_TIMEOUT_FACTOR; - timeout = rangelim(timeout, RESEND_TIMEOUT_MIN, RESEND_TIMEOUT_MAX); + if (timeout < RESEND_TIMEOUT_MIN) + timeout = RESEND_TIMEOUT_MIN; + if (timeout > RESEND_TIMEOUT_MAX) + timeout = RESEND_TIMEOUT_MAX; MutexAutoLock usage_lock(m_exclusive_access_mutex); resend_timeout = timeout; @@ -935,22 +975,29 @@ void UDPPeer::PutReliableSendCommand(ConnectionCommand &c, if (m_pending_disconnect) return; - if ( channels[c.channelnum].queued_commands.empty() && + Channel &chan = channels[c.channelnum]; + + if (chan.queued_commands.empty() && /* don't queue more packets then window size */ - (channels[c.channelnum].queued_reliables.size() - < (channels[c.channelnum].getWindowSize()/2))) { + (chan.queued_reliables.size() < chan.getWindowSize() / 2)) { LOG(dout_con<getDesc() <<" processing reliable command for peer id: " << c.peer_id <<" data size: " << c.data.getSize() << std::endl); if (!processReliableSendCommand(c,max_packet_size)) { - channels[c.channelnum].queued_commands.push_back(c); + chan.queued_commands.push_back(c); } } else { LOG(dout_con<getDesc() <<" Queueing reliable command for peer id: " << c.peer_id <<" data size: " << c.data.getSize() <= chan.getWindowSize() / 2) { + LOG(derr_con << m_connection->getDesc() + << "Possible packet stall to peer id: " << c.peer_id + << " queued_commands=" << chan.queued_commands.size() + << std::endl); + } } } @@ -961,6 +1008,8 @@ bool UDPPeer::processReliableSendCommand( if (m_pending_disconnect) return true; + Channel &chan = channels[c.channelnum]; + u32 chunksize_max = max_packet_size - BASE_HEADER_SIZE - RELIABLE_HEADER_SIZE; @@ -968,13 +1017,13 @@ bool UDPPeer::processReliableSendCommand( sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512); std::list> originals; - u16 split_sequence_number = channels[c.channelnum].readNextSplitSeqNum(); + u16 split_sequence_number = chan.readNextSplitSeqNum(); if (c.raw) { originals.emplace_back(c.data); } else { makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number, &originals); - channels[c.channelnum].setNextSplitSeqNum(split_sequence_number); + chan.setNextSplitSeqNum(split_sequence_number); } bool have_sequence_number = true; @@ -983,7 +1032,7 @@ bool UDPPeer::processReliableSendCommand( volatile u16 initial_sequence_number = 0; for (SharedBuffer &original : originals) { - u16 seqnum = channels[c.channelnum].getOutgoingSequenceNumber(have_sequence_number); + u16 seqnum = chan.getOutgoingSequenceNumber(have_sequence_number); /* oops, we don't have enough sequence numbers to send this packet */ if (!have_sequence_number) @@ -1015,10 +1064,10 @@ bool UDPPeer::processReliableSendCommand( // << " channel: " << (c.channelnum&0xFF) // << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1]) // << std::endl) - channels[c.channelnum].queued_reliables.push(p); + chan.queued_reliables.push(p); pcount++; } - sanity_check(channels[c.channelnum].queued_reliables.size() < 0xFFFF); + sanity_check(chan.queued_reliables.size() < 0xFFFF); return true; } @@ -1033,12 +1082,16 @@ bool UDPPeer::processReliableSendCommand( toadd.pop(); bool successfully_put_back_sequence_number - = channels[c.channelnum].putBackSequenceNumber( + = chan.putBackSequenceNumber( (initial_sequence_number+toadd.size() % (SEQNUM_MAX+1))); FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error"); } + // DO NOT REMOVE n_queued! It avoids a deadlock of async locked + // 'log_message_mutex' and 'm_list_mutex'. + u32 n_queued = chan.outgoing_reliables_sent.size(); + LOG(dout_con<getDesc() << " Windowsize exceeded on reliable sending " << c.data.getSize() << " bytes" @@ -1047,7 +1100,7 @@ bool UDPPeer::processReliableSendCommand( << std::endl << "\t\tgot at most : " << packets_available << " packets" << std::endl << "\t\tpackets queued : " - << channels[c.channelnum].outgoing_reliables_sent.size() + << n_queued << std::endl); return false; @@ -1120,7 +1173,9 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, m_bc_peerhandler(peerhandler) { - m_udpSocket.setTimeoutMs(5); + /* Amount of time Receive() will wait for data, this is entirely different + * from the connection timeout */ + m_udpSocket.setTimeoutMs(500); m_sendThread->setParent(this); m_receiveThread->setParent(this); @@ -1214,7 +1269,8 @@ bool Connection::deletePeer(session_t peer_id, bool timeout) return false; peer = m_peers[peer_id]; m_peers.erase(peer_id); - m_peer_ids.remove(peer_id); + auto it = std::find(m_peer_ids.begin(), m_peer_ids.end(), peer_id); + m_peer_ids.erase(it); } Address peer_address; @@ -1289,16 +1345,21 @@ void Connection::Disconnect() putCommand(c); } -void Connection::Receive(NetworkPacket* pkt) +bool Connection::Receive(NetworkPacket *pkt, u32 timeout) { + /* + Note that this function can potentially wait infinitely if non-data + events keep happening before the timeout expires. + This is not considered to be a problem (is it?) + */ for(;;) { - ConnectionEvent e = waitEvent(m_bc_receive_timeout); + ConnectionEvent e = waitEvent(timeout); if (e.type != CONNEVENT_NONE) LOG(dout_con << getDesc() << ": Receive: got event: " << e.describe() << std::endl); switch(e.type) { case CONNEVENT_NONE: - throw NoIncomingDataException("No incoming data"); + return false; case CONNEVENT_DATA_RECEIVED: // Data size is lesser than command size, ignoring packet if (e.data.getSize() < 2) { @@ -1306,7 +1367,7 @@ void Connection::Receive(NetworkPacket* pkt) } pkt->putRawPacket(*e.data, e.data.getSize(), e.peer_id); - return; + return true; case CONNEVENT_PEER_ADDED: { UDPPeer tmp(e.peer_id, e.address, this); if (m_bc_peerhandler) @@ -1324,7 +1385,19 @@ void Connection::Receive(NetworkPacket* pkt) "(port already in use?)"); } } - throw NoIncomingDataException("No incoming data"); + return false; +} + +void Connection::Receive(NetworkPacket *pkt) +{ + bool any = Receive(pkt, m_bc_receive_timeout); + if (!any) + throw NoIncomingDataException("No incoming data"); +} + +bool Connection::TryReceive(NetworkPacket *pkt) +{ + return Receive(pkt, 0); } void Connection::Send(session_t peer_id, u8 channelnum, @@ -1493,7 +1566,7 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum) UDPPeer* Connection::createServerPeer(Address& address) { - if (getPeerNoEx(PEER_ID_SERVER) != 0) + if (ConnectedToServer()) { throw ConnectionException("Already connected to a server"); }