fprintf(stderr, "Old: seqnum: %05d size: %04d, address: %s\n",
readU16(&(i->data[BASE_HEADER_SIZE+1])),i->data.getSize(),
i->address.serializeString().c_str());
- fprintf(stderr, "New: seqnum: %05d size: %04d, address: %s\n",
+ fprintf(stderr, "New: seqnum: %05d size: %04u, address: %s\n",
readU16(&(p.data[BASE_HEADER_SIZE+1])),p.data.getSize(),
p.address.serializeString().c_str());
throw IncomingDataCorruption("duplicated packet isn't same as original one");
unsigned int packet_loss = 11; /* use a neutral value for initialization */
unsigned int packets_successfull = 0;
- unsigned int packet_too_late = 0;
+ //unsigned int packet_too_late = 0;
bool reasonable_amount_of_data_transmitted = false;
{
JMutexAutoLock internal(m_internal_mutex);
packet_loss = current_packet_loss;
- packet_too_late = current_packet_too_late;
+ //packet_too_late = current_packet_too_late;
packets_successfull = current_packet_successfull;
if (current_bytes_transfered > (unsigned int) (window_size*512/2))
/* Connection Threads */
/******************************************************************************/
-ConnectionSendThread::ConnectionSendThread(Connection* parent,
- unsigned int max_packet_size,
+ConnectionSendThread::ConnectionSendThread( unsigned int max_packet_size,
float timeout) :
- m_connection(parent),
+ m_connection(NULL),
m_max_packet_size(max_packet_size),
m_timeout(timeout),
m_max_commands_per_iteration(1),
void * ConnectionSendThread::Thread()
{
+ assert(m_connection != NULL);
ThreadStarted();
log_register_thread("ConnectionSend");
/* send non reliable packets */
sendPackets(dtime);
- END_DEBUG_EXCEPTION_HANDLER(derr_con);
+ END_DEBUG_EXCEPTION_HANDLER(errorstream);
}
PROFILE(g_profiler->remove(ThreadIdentifier.str()));
{
std::list<u16> peerIds = m_connection->getPeerIDs();
- if ((this->m_outgoing_queue.size() > 0) && (peerIds.size() > 0))
+ if (!m_outgoing_queue.empty() && !peerIds.empty())
return true;
for(std::list<u16>::iterator j = peerIds.begin();
(m_max_data_packets_per_iteration/numpeers));
channel->UpdatePacketLossCounter(timed_outs.size());
+ g_profiler->graphAdd("packets_lost", timed_outs.size());
m_iteration_packets_avaialble -= timed_outs.size();
u16 seqnum = readU16(&(k->data[BASE_HEADER_SIZE+1]));
channel->UpdateBytesLost(k->data.getSize());
+ k->resend_count++;
LOG(derr_con<<m_connection->getDesc()
<<"RE-SENDING timed-out RELIABLE to "
m_outgoing_queue.push_back(packet);
}
-ConnectionReceiveThread::ConnectionReceiveThread(Connection* parent,
- unsigned int max_packet_size) :
- m_connection(parent)
+ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
+ m_connection(NULL)
{
}
void * ConnectionReceiveThread::Thread()
{
+ assert(m_connection != NULL);
ThreadStarted();
log_register_thread("ConnectionReceive");
}
}
#endif
- END_DEBUG_EXCEPTION_HANDLER(derr_con);
+ END_DEBUG_EXCEPTION_HANDLER(errorstream);
}
PROFILE(g_profiler->remove(ThreadIdentifier.str()));
return NULL;
LOG(derr_con<<m_connection->getDesc()
<<"Receive(): Invalid incoming packet, "
<<"size: " << received_size
- <<", protocol: " << readU32(&packetdata[0]) <<std::endl);
+ <<", protocol: "
+ << ((received_size >= 4) ? readU32(&packetdata[0]) : -1)
+ << std::endl);
continue;
}
SharedBuffer<u8> ConnectionReceiveThread::processPacket(Channel *channel,
SharedBuffer<u8> packetdata, u16 peer_id, u8 channelnum, bool reliable)
{
- PeerHelper peer = m_connection->getPeer(peer_id);
+ PeerHelper peer = m_connection->getPeerNoEx(peer_id);
+
+ if (!peer) {
+ errorstream << "Peer not found (possible timeout)" << std::endl;
+ throw ProcessedSilentlyException("Peer not found (possible timeout)");
+ }
if(packetdata.getSize() < 1)
throw InvalidIncomingDataException("packetdata.getSize() < 1");
u8 type = readU8(&(packetdata[0]));
+ if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
+ errorstream << "Something is wrong with peer_id" << std::endl;
+ assert(0);
+ }
+
if(type == TYPE_CONTROL)
{
if(packetdata.getSize() < 2)
u8 controltype = readU8(&(packetdata[1]));
- if( (controltype == CONTROLTYPE_ACK)
- && (peer_id <= MAX_UDP_PEERS))
+ if(controltype == CONTROLTYPE_ACK)
{
assert(channel != 0);
if(packetdata.getSize() < 4)
try{
BufferedPacket p =
channel->outgoing_reliables_sent.popSeqnum(seqnum);
- // Get round trip time
- unsigned int current_time = porting::getTimeMs();
- if (current_time > p.absolute_send_time)
- {
- float rtt = (current_time - p.absolute_send_time) / 1000.0;
+ // only calculate rtt from straight sent packets
+ if (p.resend_count == 0) {
+ // Get round trip time
+ unsigned int current_time = porting::getTimeMs();
- // Let peer calculate stuff according to it
- // (avg_rtt and resend_timeout)
- dynamic_cast<UDPPeer*>(&peer)->reportRTT(rtt);
- }
- else if (p.totaltime > 0)
- {
- float rtt = p.totaltime;
+ // a overflow is quite unlikely but as it'd result in major
+ // rtt miscalculation we handle it here
+ if (current_time > p.absolute_send_time)
+ {
+ float rtt = (current_time - p.absolute_send_time) / 1000.0;
+
+ // Let peer calculate stuff according to it
+ // (avg_rtt and resend_timeout)
+ dynamic_cast<UDPPeer*>(&peer)->reportRTT(rtt);
+ }
+ else if (p.totaltime > 0)
+ {
+ float rtt = p.totaltime;
- // Let peer calculate stuff according to it
- // (avg_rtt and resend_timeout)
- dynamic_cast<UDPPeer*>(&peer)->reportRTT(rtt);
+ // Let peer calculate stuff according to it
+ // (avg_rtt and resend_timeout)
+ dynamic_cast<UDPPeer*>(&peer)->reportRTT(rtt);
+ }
}
//put bytes for max bandwidth calculation
channel->UpdateBytesSent(p.data.getSize(),1);
}
throw ProcessedSilentlyException("Got an ACK");
}
- else if((controltype == CONTROLTYPE_SET_PEER_ID)
- && (peer_id <= MAX_UDP_PEERS))
+ else if(controltype == CONTROLTYPE_SET_PEER_ID)
{
// Got a packet to set our peer id
if(packetdata.getSize() < 4)
throw ProcessedSilentlyException("Got a SET_PEER_ID");
}
- else if((controltype == CONTROLTYPE_PING)
- && (peer_id <= MAX_UDP_PEERS))
+ else if(controltype == CONTROLTYPE_PING)
{
// Just ignore it, the incoming data already reset
// the timeout counter
throw ProcessedSilentlyException("Got a DISCO");
}
- else if((controltype == CONTROLTYPE_ENABLE_BIG_SEND_WINDOW)
- && (peer_id <= MAX_UDP_PEERS))
+ else if(controltype == CONTROLTYPE_ENABLE_BIG_SEND_WINDOW)
{
dynamic_cast<UDPPeer*>(&peer)->setNonLegacyPeer();
throw ProcessedSilentlyException("Got non legacy control");
//TODO throw some error
}
}
- else if((peer_id <= MAX_UDP_PEERS) && (type == TYPE_RELIABLE))
+ else if(type == TYPE_RELIABLE)
{
assert(channel != 0);
// Recursive reliable packets not allowed
// we already have this packet so this one was on wire at least
// the current timeout
- dynamic_cast<UDPPeer*>(&peer)->reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
+ // we don't know how long this packet was on wire don't do silly guessing
+ // dynamic_cast<UDPPeer*>(&peer)->reportRTT(dynamic_cast<UDPPeer*>(&peer)->getResendTimeout());
throw ProcessedSilentlyException("Retransmitting ack for old packet");
}
m_event_queue(),
m_peer_id(0),
m_protocol_id(protocol_id),
- m_sendThread(this, max_packet_size, timeout),
- m_receiveThread(this, max_packet_size),
+ m_sendThread(max_packet_size, timeout),
+ m_receiveThread(max_packet_size),
m_info_mutex(),
m_bc_peerhandler(0),
m_bc_receive_timeout(0),
{
m_udpSocket.setTimeoutMs(5);
+ m_sendThread.setParent(this);
+ m_receiveThread.setParent(this);
+
m_sendThread.Start();
m_receiveThread.Start();
}
m_event_queue(),
m_peer_id(0),
m_protocol_id(protocol_id),
- m_sendThread(this, max_packet_size, timeout),
- m_receiveThread(this, max_packet_size),
+ m_sendThread(max_packet_size, timeout),
+ m_receiveThread(max_packet_size),
m_info_mutex(),
m_bc_peerhandler(peerhandler),
m_bc_receive_timeout(0),
{
m_udpSocket.setTimeoutMs(5);
+ m_sendThread.setParent(this);
+ m_receiveThread.setParent(this);
+
m_sendThread.Start();
m_receiveThread.Start();