m_timeout(timeout),
m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration"))
{
+ SANITY_CHECK(m_max_data_packets_per_iteration > 1);
}
void *ConnectionSendThread::run()
curtime = porting::getTimeMs();
float dtime = CALC_DTIME(lasttime, curtime);
- /* first do all the reliable stuff */
+ /* first resend timed-out packets */
runTimeouts(dtime);
+ if (m_iteration_packets_avaialble == 0) {
+ LOG(warningstream << m_connection->getDesc()
+ << " Packet quota used up after re-sending packets, "
+ << "max=" << m_max_data_packets_per_iteration << std::endl);
+ }
/* translate commands to packets */
ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
c = m_connection->m_command_queue.pop_frontNoEx(0);
}
- /* send non reliable packets */
+ /* send queued packets */
sendPackets(dtime);
END_DEBUG_EXCEPTION_HANDLER
infostream << m_connection->getDesc()
<< "RunTimeouts(): Peer " << peer->id
<< " has timed out."
- << " (source=peer->timeout_counter)"
<< std::endl;
// Add peer to the list
timeouted_peers.push_back(peer->id);
for (Channel &channel : udpPeer->channels) {
std::list<BufferedPacket> timed_outs;
- if (udpPeer->getLegacyPeer())
- channel.setWindowSize(WINDOW_SIZE);
-
// Remove timed out incomplete unreliable split packets
channel.incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
break; /* no need to check other channels if we already did timeout */
}
- channel.UpdateTimers(dtime, udpPeer->getLegacyPeer());
+ channel.UpdateTimers(dtime);
}
/* skip to next peer if we did timeout */
// Remove timed out peers
for (u16 timeouted_peer : timeouted_peers) {
- LOG(derr_con << m_connection->getDesc()
+ LOG(dout_con << m_connection->getDesc()
<< "RunTimeouts(): Removing peer " << timeouted_peer << std::endl);
m_connection->deletePeer(timeouted_peer, true);
}
}
bool ConnectionSendThread::rawSendAsPacket(session_t peer_id, u8 channelnum,
- SharedBuffer<u8> data, bool reliable)
+ const SharedBuffer<u8> &data, bool reliable)
{
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer) {
- LOG(dout_con << m_connection->getDesc()
- << " INFO: dropped packet for non existent peer_id: "
- << peer_id << std::endl);
- FATAL_ERROR_IF(!reliable,
- "Trying to send raw packet reliable but no peer found!");
+ LOG(errorstream << m_connection->getDesc()
+ << " dropped " << (reliable ? "reliable " : "")
+ << "packet for non existent peer_id: " << peer_id << std::endl);
return false;
}
Channel *channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
}
return;
- case CONCMD_DISABLE_LEGACY:
- LOG(dout_con << m_connection->getDesc()
- << "UDP processing reliable CONCMD_DISABLE_LEGACY" << std::endl);
- if (!rawSendAsPacket(c.peer_id, c.channelnum, c.data, c.reliable)) {
- /* put to queue if we couldn't send it immediately */
- sendReliable(c);
- }
- return;
-
case CONNCMD_SERVE:
case CONNCMD_CONNECT:
case CONNCMD_DISCONNECT:
}
void ConnectionSendThread::send(session_t peer_id, u8 channelnum,
- SharedBuffer<u8> data)
+ const SharedBuffer<u8> &data)
{
assert(channelnum < CHANNEL_COUNT); // Pre-condition
peer->PutReliableSendCommand(c, m_max_packet_size);
}
-void ConnectionSendThread::sendToAll(u8 channelnum, SharedBuffer<u8> data)
+void ConnectionSendThread::sendToAll(u8 channelnum, const SharedBuffer<u8> &data)
{
std::list<session_t> peerids = m_connection->getPeerIDs();
std::list<session_t> pendingDisconnect;
std::map<session_t, bool> pending_unreliable;
+ const unsigned int peer_packet_quota = m_iteration_packets_avaialble
+ / MYMAX(peerIds.size(), 1);
+
for (session_t peerId : peerIds) {
PeerHelper peer = m_connection->getPeerNoEx(peerId);
//peer may have been removed
<< std::endl);
continue;
}
- peer->m_increment_packets_remaining =
- m_iteration_packets_avaialble / m_connection->m_peers.size();
+ peer->m_increment_packets_remaining = peer_packet_quota;
UDPPeer *udpPeer = dynamic_cast<UDPPeer *>(&peer);
}
/* send acks immediately */
- if (packet.ack) {
+ if (packet.ack || peer->m_increment_packets_remaining > 0 || stopRequested()) {
rawSendAsPacket(packet.peer_id, packet.channelnum,
packet.data, packet.reliable);
- peer->m_increment_packets_remaining =
- MYMIN(0, peer->m_increment_packets_remaining--);
- } else if (
- (peer->m_increment_packets_remaining > 0) ||
- (stopRequested())) {
- rawSendAsPacket(packet.peer_id, packet.channelnum,
- packet.data, packet.reliable);
- peer->m_increment_packets_remaining--;
+ if (peer->m_increment_packets_remaining > 0)
+ peer->m_increment_packets_remaining--;
} else {
m_outgoing_queue.push(packet);
pending_unreliable[packet.peer_id] = true;
}
}
+ if (peer_packet_quota > 0) {
+ for (session_t peerId : peerIds) {
+ PeerHelper peer = m_connection->getPeerNoEx(peerId);
+ if (!peer)
+ continue;
+ if (peer->m_increment_packets_remaining == 0) {
+ LOG(warningstream << m_connection->getDesc()
+ << " Packet quota used up for peer_id=" << peerId
+ << ", was " << peer_packet_quota << " pkts" << std::endl);
+ }
+ }
+ }
+
for (session_t peerId : pendingDisconnect) {
if (!pending_unreliable[peerId]) {
m_connection->deletePeer(peerId, false);
}
void ConnectionSendThread::sendAsPacket(session_t peer_id, u8 channelnum,
- SharedBuffer<u8> data, bool ack)
+ const SharedBuffer<u8> &data, bool ack)
{
OutgoingPacket packet(peer_id, channelnum, data, false, ack);
m_outgoing_queue.push(packet);
ThreadIdentifier);
PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
+ // use IPv6 minimum allowed MTU as receive buffer size as this is
+ // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
+ // infrastructure
+ const unsigned int packet_maxsize = 1500;
+ SharedBuffer<u8> packetdata(packet_maxsize);
+
+ bool packet_queued = true;
+
#ifdef DEBUG_CONNECTION_KBPS
u64 curtime = porting::getTimeMs();
u64 lasttime = curtime;
#endif
/* receive packets */
- receive();
+ receive(packetdata, packet_queued);
#ifdef DEBUG_CONNECTION_KBPS
debug_print_timer += dtime;
}
// Receive packets from the network and buffers and create ConnectionEvents
-void ConnectionReceiveThread::receive()
+void ConnectionReceiveThread::receive(SharedBuffer<u8> &packetdata,
+ bool &packet_queued)
{
- // use IPv6 minimum allowed MTU as receive buffer size as this is
- // theoretical reliable upper boundary of a udp packet for all IPv6 enabled
- // infrastructure
- unsigned int packet_maxsize = 1500;
- SharedBuffer<u8> packetdata(packet_maxsize);
-
- bool packet_queued = true;
-
- unsigned int loop_count = 0;
-
- /* first of all read packets from socket */
- /* check for incoming data available */
- while ((loop_count < 10) &&
- (m_connection->m_udpSocket.WaitData(50))) {
- loop_count++;
- try {
- if (packet_queued) {
- bool data_left = true;
- session_t peer_id;
- SharedBuffer<u8> resultdata;
- while (data_left) {
- try {
- data_left = getFromBuffers(peer_id, resultdata);
- if (data_left) {
- ConnectionEvent e;
- e.dataReceived(peer_id, resultdata);
- m_connection->putEvent(e);
- }
- }
- catch (ProcessedSilentlyException &e) {
- /* try reading again */
+ try {
+ // First, see if there any buffered packets we can process now
+ if (packet_queued) {
+ bool data_left = true;
+ session_t peer_id;
+ SharedBuffer<u8> resultdata;
+ while (data_left) {
+ try {
+ data_left = getFromBuffers(peer_id, resultdata);
+ if (data_left) {
+ ConnectionEvent e;
+ e.dataReceived(peer_id, resultdata);
+ m_connection->putEvent(e);
}
}
- packet_queued = false;
- }
-
- Address sender;
- s32 received_size = m_connection->m_udpSocket.Receive(sender, *packetdata,
- packet_maxsize);
-
- if ((received_size < BASE_HEADER_SIZE) ||
- (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
- LOG(derr_con << m_connection->getDesc()
- << "Receive(): Invalid incoming packet, "
- << "size: " << received_size
- << ", protocol: "
- << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
- << std::endl);
- continue;
+ catch (ProcessedSilentlyException &e) {
+ /* try reading again */
+ }
}
+ packet_queued = false;
+ }
- session_t peer_id = readPeerId(*packetdata);
- u8 channelnum = readChannel(*packetdata);
+ // Call Receive() to wait for incoming data
+ Address sender;
+ s32 received_size = m_connection->m_udpSocket.Receive(sender,
+ *packetdata, packetdata.getSize());
+ if (received_size < 0)
+ return;
- if (channelnum > CHANNEL_COUNT - 1) {
- LOG(derr_con << m_connection->getDesc()
- << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
- throw InvalidIncomingDataException("Channel doesn't exist");
- }
+ if ((received_size < BASE_HEADER_SIZE) ||
+ (readU32(&packetdata[0]) != m_connection->GetProtocolID())) {
+ LOG(derr_con << m_connection->getDesc()
+ << "Receive(): Invalid incoming packet, "
+ << "size: " << received_size
+ << ", protocol: "
+ << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
+ << std::endl);
+ return;
+ }
- /* Try to identify peer by sender address (may happen on join) */
- if (peer_id == PEER_ID_INEXISTENT) {
- peer_id = m_connection->lookupPeer(sender);
- // We do not have to remind the peer of its
- // peer id as the CONTROLTYPE_SET_PEER_ID
- // command was sent reliably.
- }
+ session_t peer_id = readPeerId(*packetdata);
+ u8 channelnum = readChannel(*packetdata);
- /* The peer was not found in our lists. Add it. */
- if (peer_id == PEER_ID_INEXISTENT) {
- peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
- }
+ if (channelnum > CHANNEL_COUNT - 1) {
+ LOG(derr_con << m_connection->getDesc()
+ << "Receive(): Invalid channel " << (u32)channelnum << std::endl);
+ return;
+ }
- PeerHelper peer = m_connection->getPeerNoEx(peer_id);
+ /* Try to identify peer by sender address (may happen on join) */
+ if (peer_id == PEER_ID_INEXISTENT) {
+ peer_id = m_connection->lookupPeer(sender);
+ // We do not have to remind the peer of its
+ // peer id as the CONTROLTYPE_SET_PEER_ID
+ // command was sent reliably.
+ }
- if (!peer) {
- LOG(dout_con << m_connection->getDesc()
- << " got packet from unknown peer_id: "
- << peer_id << " Ignoring." << std::endl);
- continue;
- }
+ /* The peer was not found in our lists. Add it. */
+ if (peer_id == PEER_ID_INEXISTENT) {
+ peer_id = m_connection->createPeer(sender, MTP_MINETEST_RELIABLE_UDP, 0);
+ }
- // Validate peer address
+ PeerHelper peer = m_connection->getPeerNoEx(peer_id);
+ if (!peer) {
+ LOG(dout_con << m_connection->getDesc()
+ << " got packet from unknown peer_id: "
+ << peer_id << " Ignoring." << std::endl);
+ return;
+ }
- Address peer_address;
+ // Validate peer address
- if (peer->getAddress(MTP_UDP, peer_address)) {
- if (peer_address != sender) {
- LOG(derr_con << m_connection->getDesc()
- << m_connection->getDesc()
- << " Peer " << peer_id << " sending from different address."
- " Ignoring." << std::endl);
- continue;
- }
- } else {
-
- bool invalid_address = true;
- if (invalid_address) {
- LOG(derr_con << m_connection->getDesc()
- << m_connection->getDesc()
- << " Peer " << peer_id << " unknown."
- " Ignoring." << std::endl);
- continue;
- }
+ Address peer_address;
+ if (peer->getAddress(MTP_UDP, peer_address)) {
+ if (peer_address != sender) {
+ LOG(derr_con << m_connection->getDesc()
+ << " Peer " << peer_id << " sending from different address."
+ " Ignoring." << std::endl);
+ return;
}
+ } else {
+ LOG(derr_con << m_connection->getDesc()
+ << " Peer " << peer_id << " doesn't have an address?!"
+ " Ignoring." << std::endl);
+ return;
+ }
- peer->ResetTimeout();
-
- Channel *channel = 0;
+ peer->ResetTimeout();
- if (dynamic_cast<UDPPeer *>(&peer) != 0) {
- channel = &(dynamic_cast<UDPPeer *>(&peer)->channels[channelnum]);
- }
+ Channel *channel = nullptr;
+ if (dynamic_cast<UDPPeer *>(&peer)) {
+ channel = &dynamic_cast<UDPPeer *>(&peer)->channels[channelnum];
+ } else {
+ LOG(derr_con << m_connection->getDesc()
+ << "Receive(): peer_id=" << peer_id << " isn't an UDPPeer?!"
+ " Ignoring." << std::endl);
+ return;
+ }
- if (channel != 0) {
- channel->UpdateBytesReceived(received_size);
- }
+ channel->UpdateBytesReceived(received_size);
- // Throw the received packet to channel->processPacket()
+ // Throw the received packet to channel->processPacket()
- // Make a new SharedBuffer from the data without the base headers
- SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
- memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
- strippeddata.getSize());
+ // Make a new SharedBuffer from the data without the base headers
+ SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
+ memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
+ strippeddata.getSize());
- try {
- // Process it (the result is some data with no headers made by us)
- SharedBuffer<u8> resultdata = processPacket
- (channel, strippeddata, peer_id, channelnum, false);
+ try {
+ // Process it (the result is some data with no headers made by us)
+ SharedBuffer<u8> resultdata = processPacket
+ (channel, strippeddata, peer_id, channelnum, false);
- LOG(dout_con << m_connection->getDesc()
- << " ProcessPacket from peer_id: " << peer_id
- << ", channel: " << (u32)channelnum << ", returned "
- << resultdata.getSize() << " bytes" << std::endl);
+ LOG(dout_con << m_connection->getDesc()
+ << " ProcessPacket from peer_id: " << peer_id
+ << ", channel: " << (u32)channelnum << ", returned "
+ << resultdata.getSize() << " bytes" << std::endl);
- ConnectionEvent e;
- e.dataReceived(peer_id, resultdata);
- m_connection->putEvent(e);
- }
- catch (ProcessedSilentlyException &e) {
- }
- catch (ProcessedQueued &e) {
- packet_queued = true;
- }
- }
- catch (InvalidIncomingDataException &e) {
+ ConnectionEvent e;
+ e.dataReceived(peer_id, resultdata);
+ m_connection->putEvent(e);
}
catch (ProcessedSilentlyException &e) {
}
+ catch (ProcessedQueued &e) {
+ // we set it to true anyway (see below)
+ }
+
+ /* Every time we receive a packet it can happen that a previously
+ * buffered packet is now ready to process. */
+ packet_queued = true;
+ }
+ catch (InvalidIncomingDataException &e) {
}
}
}
SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
- SharedBuffer<u8> packetdata, session_t peer_id, u8 channelnum, bool reliable)
+ const SharedBuffer<u8> &packetdata, session_t peer_id, u8 channelnum, bool reliable)
{
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
};
SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Control(Channel *channel,
- SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
+ const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
{
if (packetdata.getSize() < 2)
throw InvalidIncomingDataException("packetdata.getSize() < 2");
// a overflow is quite unlikely but as it'd result in major
// rtt miscalculation we handle it here
- float rtt;
if (current_time > p.absolute_send_time) {
- rtt = (current_time - p.absolute_send_time) / 1000.0f;
+ float rtt = (current_time - p.absolute_send_time) / 1000.0;
+
+ // Let peer calculate stuff according to it
+ // (avg_rtt and resend_timeout)
+ dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
} else if (p.totaltime > 0) {
- rtt = p.totaltime;
- }
+ float rtt = p.totaltime;
- // Let peer calculate stuff according to it
- // (avg_rtt and resend_timeout)
- dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
+ // Let peer calculate stuff according to it
+ // (avg_rtt and resend_timeout)
+ dynamic_cast<UDPPeer *>(peer)->reportRTT(rtt);
+ }
}
// put bytes for max bandwidth calculation
channel->UpdateBytesSent(p.data.getSize(), 1);
m_connection->TriggerSend();
} catch (NotFoundException &e) {
LOG(derr_con << m_connection->getDesc()
- << "WARNING: ACKed packet not in outgoing queue" << std::endl);
+ << "WARNING: ACKed packet not in outgoing queue"
+ << " seqnum=" << seqnum << std::endl);
channel->UpdatePacketTooLateCounter();
}
m_connection->SetPeerID(peer_id_new);
}
- ConnectionCommand cmd;
-
- SharedBuffer<u8> reply(2);
- writeU8(&reply[0], PACKET_TYPE_CONTROL);
- writeU8(&reply[1], CONTROLTYPE_ENABLE_BIG_SEND_WINDOW);
- cmd.disableLegacy(PEER_ID_SERVER, reply);
- m_connection->putCommand(cmd);
-
throw ProcessedSilentlyException("Got a SET_PEER_ID");
} else if (controltype == CONTROLTYPE_PING) {
// Just ignore it, the incoming data already reset
}
throw ProcessedSilentlyException("Got a DISCO");
- } else if (controltype == CONTROLTYPE_ENABLE_BIG_SEND_WINDOW) {
- dynamic_cast<UDPPeer *>(peer)->setNonLegacyPeer();
- throw ProcessedSilentlyException("Got non legacy control");
} else {
LOG(derr_con << m_connection->getDesc()
<< "INVALID TYPE_CONTROL: invalid controltype="
}
SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Original(Channel *channel,
- SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
+ const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
{
if (packetdata.getSize() <= ORIGINAL_HEADER_SIZE)
throw InvalidIncomingDataException
}
SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Split(Channel *channel,
- SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
+ const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
{
Address peer_address;
}
SharedBuffer<u8> ConnectionReceiveThread::handlePacketType_Reliable(Channel *channel,
- SharedBuffer<u8> packetdata, Peer *peer, u8 channelnum, bool reliable)
+ const SharedBuffer<u8> &packetdata, Peer *peer, u8 channelnum, bool reliable)
{
assert(channel != NULL);