X-Git-Url: https://git.lizzy.rs/?a=blobdiff_plain;f=src%2Fnetwork%2Fconnectionthreads.cpp;h=9a6617a1cd9b01f389febb39869a59d23da741de;hb=044a12666e6140f15e073f528a9168348554dc52;hp=9d948c59a2790868b61b9e5ac755d554eb5dfc5c;hpb=968ce9af598024ec71e9ffb2d15c3997a13ad754;p=dragonfireclient.git diff --git a/src/network/connectionthreads.cpp b/src/network/connectionthreads.cpp index 9d948c59a..9a6617a1c 100644 --- a/src/network/connectionthreads.cpp +++ b/src/network/connectionthreads.cpp @@ -73,6 +73,7 @@ ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size, m_timeout(timeout), m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration")) { + SANITY_CHECK(m_max_data_packets_per_iteration > 1); } void *ConnectionSendThread::run() @@ -107,8 +108,13 @@ void *ConnectionSendThread::run() curtime = porting::getTimeMs(); float dtime = CALC_DTIME(lasttime, curtime); - /* first do all the reliable stuff */ + /* first resend timed-out packets */ runTimeouts(dtime); + if (m_iteration_packets_avaialble == 0) { + LOG(warningstream << m_connection->getDesc() + << " Packet quota used up after re-sending packets, " + << "max=" << m_max_data_packets_per_iteration << std::endl); + } /* translate commands to packets */ ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0); @@ -121,7 +127,7 @@ void *ConnectionSendThread::run() c = m_connection->m_command_queue.pop_frontNoEx(0); } - /* send non reliable packets */ + /* send queued packets */ sendPackets(dtime); END_DEBUG_EXCEPTION_HANDLER @@ -193,7 +199,6 @@ void ConnectionSendThread::runTimeouts(float dtime) infostream << m_connection->getDesc() << "RunTimeouts(): Peer " << peer->id << " has timed out." - << " (source=peer->timeout_counter)" << std::endl; // Add peer to the list timeouted_peers.push_back(peer->id); @@ -206,9 +211,6 @@ void ConnectionSendThread::runTimeouts(float dtime) for (Channel &channel : udpPeer->channels) { std::list timed_outs; - if (udpPeer->getLegacyPeer()) - channel.setWindowSize(WINDOW_SIZE); - // Remove timed out incomplete unreliable split packets channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout); @@ -264,7 +266,7 @@ void ConnectionSendThread::runTimeouts(float dtime) break; /* no need to check other channels if we already did timeout */ } - channel.UpdateTimers(dtime, udpPeer->getLegacyPeer()); + channel.UpdateTimers(dtime); } /* skip to next peer if we did timeout */ @@ -289,7 +291,7 @@ void ConnectionSendThread::runTimeouts(float dtime) // Remove timed out peers for (u16 timeouted_peer : timeouted_peers) { - LOG(derr_con << m_connection->getDesc() + LOG(dout_con << m_connection->getDesc() << "RunTimeouts(): Removing peer " << timeouted_peer << std::endl); m_connection->deletePeer(timeouted_peer, true); } @@ -330,15 +332,13 @@ void ConnectionSendThread::sendAsPacketReliable(BufferedPacket &p, Channel *chan } bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum, - SharedBuffer data, bool reliable) + const SharedBuffer &data, bool reliable) { PeerHelper peer = m_connection->getPeerNoEx(peer_id); if (!peer) { - LOG(dout_con << m_connection->getDesc() - << " INFO: dropped packet for non existent peer_id: " - << peer_id << std::endl); - FATAL_ERROR_IF(!reliable, - "Trying to send raw packet reliable but no peer found!"); + LOG(errorstream << m_connection->getDesc() + << " dropped " << (reliable ? "reliable " : "") + << "packet for non existent peer_id: " << peer_id << std::endl); return false; } Channel *channel = &(dynamic_cast(&peer)->channels[channelnum]); @@ -428,15 +428,6 @@ void ConnectionSendThread::processReliableCommand(ConnectionCommand &c) } return; - case CONCMD_DISABLE_LEGACY: - LOG(dout_con << m_connection->getDesc() - << "UDP processing reliable CONCMD_DISABLE_LEGACY" << std::endl); - if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) { - /* put to queue if we couldn't send it immediately */ - sendReliable(c); - } - return; - case CONNCMD_SERVE: case CONNCMD_CONNECT: case CONNCMD_DISCONNECT: @@ -587,7 +578,7 @@ void ConnectionSendThread::disconnect_peer(session_t peer_id) } void ConnectionSendThread::send(session_t peer_id, u8 channelnum, - SharedBuffer data) + const SharedBuffer &data) { assert(channelnum < CHANNEL_COUNT); // Pre-condition @@ -627,7 +618,7 @@ void ConnectionSendThread::sendReliable(ConnectionCommand &c) peer->PutReliableSendCommand(c, m_max_packet_size); } -void ConnectionSendThread::sendToAll(u8 channelnum, SharedBuffer data) +void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer &data) { std::list peerids = m_connection->getPeerIDs(); @@ -656,6 +647,9 @@ void ConnectionSendThread::sendPackets(float dtime) std::list pendingDisconnect; std::map pending_unreliable; + const unsigned int peer_packet_quota = m_iteration_packets_avaialble + / MYMAX(peerIds.size(), 1); + for (session_t peerId : peerIds) { PeerHelper peer = m_connection->getPeerNoEx(peerId); //peer may have been removed @@ -665,8 +659,7 @@ void ConnectionSendThread::sendPackets(float dtime) << std::endl); continue; } - peer->m_increment_packets_remaining = - m_iteration_packets_avaialble / m_connection->m_peers.size(); + peer->m_increment_packets_remaining = peer_packet_quota; UDPPeer *udpPeer = dynamic_cast(&peer); @@ -763,23 +756,30 @@ void ConnectionSendThread::sendPackets(float dtime) } /* send acks immediately */ - if (packet.ack) { + if (packet.ack || peer->m_increment_packets_remaining > 0 || stopRequested()) { rawSendAsPacket(packet.peer_id, packet.channelnum, packet.data, packet.reliable); - peer->m_increment_packets_remaining = - MYMIN(0, peer->m_increment_packets_remaining--); - } else if ( - (peer->m_increment_packets_remaining > 0) || - (stopRequested())) { - rawSendAsPacket(packet.peer_id, packet.channelnum, - packet.data, packet.reliable); - peer->m_increment_packets_remaining--; + if (peer->m_increment_packets_remaining > 0) + peer->m_increment_packets_remaining--; } else { m_outgoing_queue.push(packet); pending_unreliable[packet.peer_id] = true; } } + if (peer_packet_quota > 0) { + for (session_t peerId : peerIds) { + PeerHelper peer = m_connection->getPeerNoEx(peerId); + if (!peer) + continue; + if (peer->m_increment_packets_remaining == 0) { + LOG(warningstream << m_connection->getDesc() + << " Packet quota used up for peer_id=" << peerId + << ", was " << peer_packet_quota << " pkts" << std::endl); + } + } + } + for (session_t peerId : pendingDisconnect) { if (!pending_unreliable[peerId]) { m_connection->deletePeer(peerId, false); @@ -788,7 +788,7 @@ void ConnectionSendThread::sendPackets(float dtime) } void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum, - SharedBuffer data, bool ack) + const SharedBuffer &data, bool ack) { OutgoingPacket packet(peer_id, channelnum, data, false, ack); m_outgoing_queue.push(packet); @@ -810,6 +810,14 @@ void *ConnectionReceiveThread::run() ThreadIdentifier); PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]"); + // use IPv6 minimum allowed MTU as receive buffer size as this is + // theoretical reliable upper boundary of a udp packet for all IPv6 enabled + // infrastructure + const unsigned int packet_maxsize = 1500; + SharedBuffer packetdata(packet_maxsize); + + bool packet_queued = true; + #ifdef DEBUG_CONNECTION_KBPS u64 curtime = porting::getTimeMs(); u64 lasttime = curtime; @@ -828,7 +836,7 @@ void *ConnectionReceiveThread::run() #endif /* receive packets */ - receive(); + receive(packetdata, packet_queued); #ifdef DEBUG_CONNECTION_KBPS debug_print_timer += dtime; @@ -890,157 +898,142 @@ void *ConnectionReceiveThread::run() } // Receive packets from the network and buffers and create ConnectionEvents -void ConnectionReceiveThread::receive() +void ConnectionReceiveThread::receive(SharedBuffer &packetdata, + bool &packet_queued) { - // use IPv6 minimum allowed MTU as receive buffer size as this is - // theoretical reliable upper boundary of a udp packet for all IPv6 enabled - // infrastructure - unsigned int packet_maxsize = 1500; - SharedBuffer packetdata(packet_maxsize); - - bool packet_queued = true; - - unsigned int loop_count = 0; - - /* first of all read packets from socket */ - /* check for incoming data available */ - while ((loop_count < 10) && - (m_connection->m_udpSocket.WaitData(50))) { - loop_count++; - try { - if (packet_queued) { - bool data_left = true; - session_t peer_id; - SharedBuffer resultdata; - while (data_left) { - try { - data_left = getFromBuffers(peer_id, resultdata); - if (data_left) { - ConnectionEvent e; - e.dataReceived(peer_id, resultdata); - m_connection->putEvent(e); - } - } - catch (ProcessedSilentlyException &e) { - /* try reading again */ + try { + // First, see if there any buffered packets we can process now + if (packet_queued) { + bool data_left = true; + session_t peer_id; + SharedBuffer resultdata; + while (data_left) { + try { + data_left = getFromBuffers(peer_id, resultdata); + if (data_left) { + ConnectionEvent e; + e.dataReceived(peer_id, resultdata); + m_connection->putEvent(e); } } - packet_queued = false; - } - - Address sender; - s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata, - packet_maxsize); - - if ((received_size < BASE_HEADER_SIZE) || - (readU32(&packetdata[0]) != m_connection->GetProtocolID())) { - LOG(derr_con << m_connection->getDesc() - << "Receive(): Invalid incoming packet, " - << "size: " << received_size - << ", protocol: " - << ((received_size >= 4) ? readU32(&packetdata[0]) : -1) - << std::endl); - continue; + catch (ProcessedSilentlyException &e) { + /* try reading again */ + } } + packet_queued = false; + } - session_t peer_id = readPeerId(*packetdata); - u8 channelnum = readChannel(*packetdata); + // Call Receive() to wait for incoming data + Address sender; + s32 received_size = m_connection->m_udpSocket.Receive(sender, + *packetdata, packetdata.getSize()); + if (received_size < 0) + return; - if (channelnum > CHANNEL_COUNT - 1) { - LOG(derr_con << m_connection->getDesc() - << "Receive(): Invalid channel " << (u32)channelnum << std::endl); - throw InvalidIncomingDataException("Channel doesn't exist"); - } + if ((received_size < BASE_HEADER_SIZE) || + (readU32(&packetdata[0]) != m_connection->GetProtocolID())) { + LOG(derr_con << m_connection->getDesc() + << "Receive(): Invalid incoming packet, " + << "size: " << received_size + << ", protocol: " + << ((received_size >= 4) ? readU32(&packetdata[0]) : -1) + << std::endl); + return; + } - /* Try to identify peer by sender address (may happen on join) */ - if (peer_id == PEER_ID_INEXISTENT) { - peer_id = m_connection->lookupPeer(sender); - // We do not have to remind the peer of its - // peer id as the CONTROLTYPE_SET_PEER_ID - // command was sent reliably. - } + session_t peer_id = readPeerId(*packetdata); + u8 channelnum = readChannel(*packetdata); - /* The peer was not found in our lists. Add it. */ - if (peer_id == PEER_ID_INEXISTENT) { - peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0); - } + if (channelnum > CHANNEL_COUNT - 1) { + LOG(derr_con << m_connection->getDesc() + << "Receive(): Invalid channel " << (u32)channelnum << std::endl); + return; + } - PeerHelper peer = m_connection->getPeerNoEx(peer_id); + /* Try to identify peer by sender address (may happen on join) */ + if (peer_id == PEER_ID_INEXISTENT) { + peer_id = m_connection->lookupPeer(sender); + // We do not have to remind the peer of its + // peer id as the CONTROLTYPE_SET_PEER_ID + // command was sent reliably. + } - if (!peer) { - LOG(dout_con << m_connection->getDesc() - << " got packet from unknown peer_id: " - << peer_id << " Ignoring." << std::endl); - continue; - } + /* The peer was not found in our lists. Add it. */ + if (peer_id == PEER_ID_INEXISTENT) { + peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0); + } - // Validate peer address + PeerHelper peer = m_connection->getPeerNoEx(peer_id); + if (!peer) { + LOG(dout_con << m_connection->getDesc() + << " got packet from unknown peer_id: " + << peer_id << " Ignoring." << std::endl); + return; + } - Address peer_address; + // Validate peer address - if (peer->getAddress(MTP_UDP, peer_address)) { - if (peer_address != sender) { - LOG(derr_con << m_connection->getDesc() - << m_connection->getDesc() - << " Peer " << peer_id << " sending from different address." - " Ignoring." << std::endl); - continue; - } - } else { - - bool invalid_address = true; - if (invalid_address) { - LOG(derr_con << m_connection->getDesc() - << m_connection->getDesc() - << " Peer " << peer_id << " unknown." - " Ignoring." << std::endl); - continue; - } + Address peer_address; + if (peer->getAddress(MTP_UDP, peer_address)) { + if (peer_address != sender) { + LOG(derr_con << m_connection->getDesc() + << " Peer " << peer_id << " sending from different address." + " Ignoring." << std::endl); + return; } + } else { + LOG(derr_con << m_connection->getDesc() + << " Peer " << peer_id << " doesn't have an address?!" + " Ignoring." << std::endl); + return; + } - peer->ResetTimeout(); - - Channel *channel = 0; + peer->ResetTimeout(); - if (dynamic_cast(&peer) != 0) { - channel = &(dynamic_cast(&peer)->channels[channelnum]); - } + Channel *channel = nullptr; + if (dynamic_cast(&peer)) { + channel = &dynamic_cast(&peer)->channels[channelnum]; + } else { + LOG(derr_con << m_connection->getDesc() + << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!" + " Ignoring." << std::endl); + return; + } - if (channel != 0) { - channel->UpdateBytesReceived(received_size); - } + channel->UpdateBytesReceived(received_size); - // Throw the received packet to channel->processPacket() + // Throw the received packet to channel->processPacket() - // Make a new SharedBuffer from the data without the base headers - SharedBuffer strippeddata(received_size - BASE_HEADER_SIZE); - memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE], - strippeddata.getSize()); + // Make a new SharedBuffer from the data without the base headers + SharedBuffer strippeddata(received_size - BASE_HEADER_SIZE); + memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE], + strippeddata.getSize()); - try { - // Process it (the result is some data with no headers made by us) - SharedBuffer resultdata = processPacket - (channel, strippeddata, peer_id, channelnum, false); + try { + // Process it (the result is some data with no headers made by us) + SharedBuffer resultdata = processPacket + (channel, strippeddata, peer_id, channelnum, false); - LOG(dout_con << m_connection->getDesc() - << " ProcessPacket from peer_id: " << peer_id - << ", channel: " << (u32)channelnum << ", returned " - << resultdata.getSize() << " bytes" << std::endl); + LOG(dout_con << m_connection->getDesc() + << " ProcessPacket from peer_id: " << peer_id + << ", channel: " << (u32)channelnum << ", returned " + << resultdata.getSize() << " bytes" << std::endl); - ConnectionEvent e; - e.dataReceived(peer_id, resultdata); - m_connection->putEvent(e); - } - catch (ProcessedSilentlyException &e) { - } - catch (ProcessedQueued &e) { - packet_queued = true; - } - } - catch (InvalidIncomingDataException &e) { + ConnectionEvent e; + e.dataReceived(peer_id, resultdata); + m_connection->putEvent(e); } catch (ProcessedSilentlyException &e) { } + catch (ProcessedQueued &e) { + // we set it to true anyway (see below) + } + + /* Every time we receive a packet it can happen that a previously + * buffered packet is now ready to process. */ + packet_queued = true; + } + catch (InvalidIncomingDataException &e) { } } @@ -1098,7 +1091,7 @@ bool ConnectionReceiveThread::checkIncomingBuffers(Channel *channel, } SharedBuffer ConnectionReceiveThread::processPacket(Channel *channel, - SharedBuffer packetdata, session_t peer_id, u8 channelnum, bool reliable) + const SharedBuffer &packetdata, session_t peer_id, u8 channelnum, bool reliable) { PeerHelper peer = m_connection->getPeerNoEx(peer_id); @@ -1137,7 +1130,7 @@ const ConnectionReceiveThread::PacketTypeHandler }; SharedBuffer ConnectionReceiveThread::handlePacketType_Control(Channel *channel, - SharedBuffer packetdata, Peer *peer, u8 channelnum, bool reliable) + const SharedBuffer &packetdata, Peer *peer, u8 channelnum, bool reliable) { if (packetdata.getSize() < 2) throw InvalidIncomingDataException("packetdata.getSize() < 2"); @@ -1167,16 +1160,19 @@ SharedBuffer ConnectionReceiveThread::handlePacketType_Control(Channel *chan // a overflow is quite unlikely but as it'd result in major // rtt miscalculation we handle it here - float rtt; if (current_time > p.absolute_send_time) { - rtt = (current_time - p.absolute_send_time) / 1000.0f; + float rtt = (current_time - p.absolute_send_time) / 1000.0; + + // Let peer calculate stuff according to it + // (avg_rtt and resend_timeout) + dynamic_cast(peer)->reportRTT(rtt); } else if (p.totaltime > 0) { - rtt = p.totaltime; - } + float rtt = p.totaltime; - // Let peer calculate stuff according to it - // (avg_rtt and resend_timeout) - dynamic_cast(peer)->reportRTT(rtt); + // Let peer calculate stuff according to it + // (avg_rtt and resend_timeout) + dynamic_cast(peer)->reportRTT(rtt); + } } // put bytes for max bandwidth calculation channel->UpdateBytesSent(p.data.getSize(), 1); @@ -1184,7 +1180,8 @@ SharedBuffer ConnectionReceiveThread::handlePacketType_Control(Channel *chan m_connection->TriggerSend(); } catch (NotFoundException &e) { LOG(derr_con << m_connection->getDesc() - << "WARNING: ACKed packet not in outgoing queue" << std::endl); + << "WARNING: ACKed packet not in outgoing queue" + << " seqnum=" << seqnum << std::endl); channel->UpdatePacketTooLateCounter(); } @@ -1207,14 +1204,6 @@ SharedBuffer ConnectionReceiveThread::handlePacketType_Control(Channel *chan m_connection->SetPeerID(peer_id_new); } - ConnectionCommand cmd; - - SharedBuffer reply(2); - writeU8(&reply[0], PACKET_TYPE_CONTROL); - writeU8(&reply[1], CONTROLTYPE_ENABLE_BIG_SEND_WINDOW); - cmd.disableLegacy(PEER_ID_SERVER, reply); - m_connection->putCommand(cmd); - throw ProcessedSilentlyException("Got a SET_PEER_ID"); } else if (controltype == CONTROLTYPE_PING) { // Just ignore it, the incoming data already reset @@ -1232,9 +1221,6 @@ SharedBuffer ConnectionReceiveThread::handlePacketType_Control(Channel *chan } throw ProcessedSilentlyException("Got a DISCO"); - } else if (controltype == CONTROLTYPE_ENABLE_BIG_SEND_WINDOW) { - dynamic_cast(peer)->setNonLegacyPeer(); - throw ProcessedSilentlyException("Got non legacy control"); } else { LOG(derr_con << m_connection->getDesc() << "INVALID TYPE_CONTROL: invalid controltype=" @@ -1244,7 +1230,7 @@ SharedBuffer ConnectionReceiveThread::handlePacketType_Control(Channel *chan } SharedBuffer ConnectionReceiveThread::handlePacketType_Original(Channel *channel, - SharedBuffer packetdata, Peer *peer, u8 channelnum, bool reliable) + const SharedBuffer &packetdata, Peer *peer, u8 channelnum, bool reliable) { if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE) throw InvalidIncomingDataException @@ -1258,7 +1244,7 @@ SharedBuffer ConnectionReceiveThread::handlePacketType_Original(Channel *cha } SharedBuffer ConnectionReceiveThread::handlePacketType_Split(Channel *channel, - SharedBuffer packetdata, Peer *peer, u8 channelnum, bool reliable) + const SharedBuffer &packetdata, Peer *peer, u8 channelnum, bool reliable) { Address peer_address; @@ -1289,7 +1275,7 @@ SharedBuffer ConnectionReceiveThread::handlePacketType_Split(Channel *channe } SharedBuffer ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel, - SharedBuffer packetdata, Peer *peer, u8 channelnum, bool reliable) + const SharedBuffer &packetdata, Peer *peer, u8 channelnum, bool reliable) { assert(channel != NULL);