X-Git-Url: https://git.lizzy.rs/?a=blobdiff_plain;f=src%2Fnetwork%2Fconnection.cpp;h=0ba8c36b28feef2cf22babd7e2c1ab93240bae6c;hb=674d67f312c815e7f10dc00705e352bc392fc2af;hp=0bc13a2f0f25f23a64d13bc6ee152dcc078e2ba4;hpb=13b22e2afb57eb2da0513f2caca4e88549f55d0a;p=minetest.git diff --git a/src/network/connection.cpp b/src/network/connection.cpp index 0bc13a2f0..0ba8c36b2 100644 --- a/src/network/connection.cpp +++ b/src/network/connection.cpp @@ -41,17 +41,23 @@ namespace con /* defines used for debugging and profiling */ /******************************************************************************/ #ifdef NDEBUG -#define LOG(a) a -#define PROFILE(a) + #define LOG(a) a + #define PROFILE(a) #else -/* this mutex is used to achieve log message consistency */ -std::mutex log_message_mutex; -#define LOG(a) \ - { \ - MutexAutoLock loglock(log_message_mutex); \ - a; \ - } -#define PROFILE(a) a + #if 0 + /* this mutex is used to achieve log message consistency */ + std::mutex log_message_mutex; + #define LOG(a) \ + { \ + MutexAutoLock loglock(log_message_mutex); \ + a; \ + } + #else + // Prevent deadlocks until a solution is found after 5.2.0 (TODO) + #define LOG(a) a + #endif + + #define PROFILE(a) a #endif #define PING_TIMEOUT 5.0 @@ -918,7 +924,7 @@ UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) : Peer(a_address,a_id,connection) { for (Channel &channel : channels) - channel.setWindowSize(g_settings->getU16("max_packets_per_iteration")); + channel.setWindowSize(START_RELIABLE_WINDOW_SIZE); } bool UDPPeer::getAddress(MTProtocols type,Address& toset) @@ -969,22 +975,29 @@ void UDPPeer::PutReliableSendCommand(ConnectionCommand &c, if (m_pending_disconnect) return; - if ( channels[c.channelnum].queued_commands.empty() && + Channel &chan = channels[c.channelnum]; + + if (chan.queued_commands.empty() && /* don't queue more packets then window size */ - (channels[c.channelnum].queued_reliables.size() - < (channels[c.channelnum].getWindowSize()/2))) { + (chan.queued_reliables.size() < chan.getWindowSize() / 2)) { LOG(dout_con<getDesc() <<" processing reliable command for peer id: " << c.peer_id <<" data size: " << c.data.getSize() << std::endl); if (!processReliableSendCommand(c,max_packet_size)) { - channels[c.channelnum].queued_commands.push_back(c); + chan.queued_commands.push_back(c); } } else { LOG(dout_con<getDesc() <<" Queueing reliable command for peer id: " << c.peer_id <<" data size: " << c.data.getSize() <= chan.getWindowSize() / 2) { + LOG(derr_con << m_connection->getDesc() + << "Possible packet stall to peer id: " << c.peer_id + << " queued_commands=" << chan.queued_commands.size() + << std::endl); + } } } @@ -995,6 +1008,8 @@ bool UDPPeer::processReliableSendCommand( if (m_pending_disconnect) return true; + Channel &chan = channels[c.channelnum]; + u32 chunksize_max = max_packet_size - BASE_HEADER_SIZE - RELIABLE_HEADER_SIZE; @@ -1002,13 +1017,13 @@ bool UDPPeer::processReliableSendCommand( sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512); std::list> originals; - u16 split_sequence_number = channels[c.channelnum].readNextSplitSeqNum(); + u16 split_sequence_number = chan.readNextSplitSeqNum(); if (c.raw) { originals.emplace_back(c.data); } else { makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number, &originals); - channels[c.channelnum].setNextSplitSeqNum(split_sequence_number); + chan.setNextSplitSeqNum(split_sequence_number); } bool have_sequence_number = true; @@ -1017,7 +1032,7 @@ bool UDPPeer::processReliableSendCommand( volatile u16 initial_sequence_number = 0; for (SharedBuffer &original : originals) { - u16 seqnum = channels[c.channelnum].getOutgoingSequenceNumber(have_sequence_number); + u16 seqnum = chan.getOutgoingSequenceNumber(have_sequence_number); /* oops, we don't have enough sequence numbers to send this packet */ if (!have_sequence_number) @@ -1049,10 +1064,10 @@ bool UDPPeer::processReliableSendCommand( // << " channel: " << (c.channelnum&0xFF) // << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1]) // << std::endl) - channels[c.channelnum].queued_reliables.push(p); + chan.queued_reliables.push(p); pcount++; } - sanity_check(channels[c.channelnum].queued_reliables.size() < 0xFFFF); + sanity_check(chan.queued_reliables.size() < 0xFFFF); return true; } @@ -1067,12 +1082,16 @@ bool UDPPeer::processReliableSendCommand( toadd.pop(); bool successfully_put_back_sequence_number - = channels[c.channelnum].putBackSequenceNumber( + = chan.putBackSequenceNumber( (initial_sequence_number+toadd.size() % (SEQNUM_MAX+1))); FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error"); } + // DO NOT REMOVE n_queued! It avoids a deadlock of async locked + // 'log_message_mutex' and 'm_list_mutex'. + u32 n_queued = chan.outgoing_reliables_sent.size(); + LOG(dout_con<getDesc() << " Windowsize exceeded on reliable sending " << c.data.getSize() << " bytes" @@ -1081,7 +1100,7 @@ bool UDPPeer::processReliableSendCommand( << std::endl << "\t\tgot at most : " << packets_available << " packets" << std::endl << "\t\tpackets queued : " - << channels[c.channelnum].outgoing_reliables_sent.size() + << n_queued << std::endl); return false; @@ -1154,7 +1173,9 @@ Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, m_bc_peerhandler(peerhandler) { - m_udpSocket.setTimeoutMs(5); + /* Amount of time Receive() will wait for data, this is entirely different + * from the connection timeout */ + m_udpSocket.setTimeoutMs(500); m_sendThread->setParent(this); m_receiveThread->setParent(this); @@ -1248,7 +1269,8 @@ bool Connection::deletePeer(session_t peer_id, bool timeout) return false; peer = m_peers[peer_id]; m_peers.erase(peer_id); - m_peer_ids.remove(peer_id); + auto it = std::find(m_peer_ids.begin(), m_peer_ids.end(), peer_id); + m_peer_ids.erase(it); } Address peer_address; @@ -1323,16 +1345,21 @@ void Connection::Disconnect() putCommand(c); } -void Connection::Receive(NetworkPacket* pkt) +bool Connection::Receive(NetworkPacket *pkt, u32 timeout) { + /* + Note that this function can potentially wait infinitely if non-data + events keep happening before the timeout expires. + This is not considered to be a problem (is it?) + */ for(;;) { - ConnectionEvent e = waitEvent(m_bc_receive_timeout); + ConnectionEvent e = waitEvent(timeout); if (e.type != CONNEVENT_NONE) LOG(dout_con << getDesc() << ": Receive: got event: " << e.describe() << std::endl); switch(e.type) { case CONNEVENT_NONE: - throw NoIncomingDataException("No incoming data"); + return false; case CONNEVENT_DATA_RECEIVED: // Data size is lesser than command size, ignoring packet if (e.data.getSize() < 2) { @@ -1340,7 +1367,7 @@ void Connection::Receive(NetworkPacket* pkt) } pkt->putRawPacket(*e.data, e.data.getSize(), e.peer_id); - return; + return true; case CONNEVENT_PEER_ADDED: { UDPPeer tmp(e.peer_id, e.address, this); if (m_bc_peerhandler) @@ -1358,7 +1385,19 @@ void Connection::Receive(NetworkPacket* pkt) "(port already in use?)"); } } - throw NoIncomingDataException("No incoming data"); + return false; +} + +void Connection::Receive(NetworkPacket *pkt) +{ + bool any = Receive(pkt, m_bc_receive_timeout); + if (!any) + throw NoIncomingDataException("No incoming data"); +} + +bool Connection::TryReceive(NetworkPacket *pkt) +{ + return Receive(pkt, 0); } void Connection::Send(session_t peer_id, u8 channelnum, @@ -1527,7 +1566,7 @@ void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum) UDPPeer* Connection::createServerPeer(Address& address) { - if (getPeerNoEx(PEER_ID_SERVER) != 0) + if (ConnectedToServer()) { throw ConnectionException("Already connected to a server"); }