+ JMutexAutoLock lock(m_exclusive_access_mutex);
+ assert(m_usage > 0);
+ m_usage--;
+
+ if (!((m_pending_deletion) && (m_usage == 0)))
+ return;
+ }
+ delete this;
+}
+
+void Peer::RTTStatistics(float rtt,
+ std::string profiler_id,
+ unsigned int num_samples) {
+
+ if (m_last_rtt > 0) {
+ /* set min max values */
+ if (rtt < m_rtt.min_rtt)
+ m_rtt.min_rtt = rtt;
+ if (rtt >= m_rtt.max_rtt)
+ m_rtt.max_rtt = rtt;
+
+ /* do average calculation */
+ if(m_rtt.avg_rtt < 0.0)
+ m_rtt.avg_rtt = rtt;
+ else
+ m_rtt.avg_rtt = m_rtt.avg_rtt * (num_samples/(num_samples-1)) +
+ rtt * (1/num_samples);
+
+ /* do jitter calculation */
+
+ //just use some neutral value at beginning
+ float jitter = m_rtt.jitter_min;
+
+ if (rtt > m_last_rtt)
+ jitter = rtt-m_last_rtt;
+
+ if (rtt <= m_last_rtt)
+ jitter = m_last_rtt - rtt;
+
+ if (jitter < m_rtt.jitter_min)
+ m_rtt.jitter_min = jitter;
+ if (jitter >= m_rtt.jitter_max)
+ m_rtt.jitter_max = jitter;
+
+ if(m_rtt.jitter_avg < 0.0)
+ m_rtt.jitter_avg = jitter;
+ else
+ m_rtt.jitter_avg = m_rtt.jitter_avg * (num_samples/(num_samples-1)) +
+ jitter * (1/num_samples);
+
+ if (profiler_id != "")
+ {
+ g_profiler->graphAdd(profiler_id + "_rtt", rtt);
+ g_profiler->graphAdd(profiler_id + "_jitter", jitter);
+ }
+ }
+ /* save values required for next loop */
+ m_last_rtt = rtt;
+}
+
+bool Peer::isTimedOut(float timeout)
+{
+ JMutexAutoLock lock(m_exclusive_access_mutex);
+ u32 current_time = porting::getTimeMs();
+
+ float dtime = CALC_DTIME(m_last_timeout_check,current_time);
+ m_last_timeout_check = current_time;
+
+ m_timeout_counter += dtime;
+
+ return m_timeout_counter > timeout;
+}
+
+void Peer::Drop()
+{
+ {
+ JMutexAutoLock usage_lock(m_exclusive_access_mutex);
+ m_pending_deletion = true;
+ if (m_usage != 0)
+ return;
+ }
+
+ PROFILE(std::stringstream peerIdentifier1);
+ PROFILE(peerIdentifier1 << "runTimeouts[" << m_connection->getDesc() << ";" << id << ";RELIABLE]");
+ PROFILE(g_profiler->remove(peerIdentifier1.str()));
+ PROFILE(std::stringstream peerIdentifier2);
+ PROFILE(peerIdentifier2 << "sendPackets[" << m_connection->getDesc() << ";" << id << ";RELIABLE]");
+ PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier2.str(), SPT_AVG));
+
+ delete this;
+}
+
+UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) :
+ Peer(a_address,a_id,connection),
+ m_pending_disconnect(false),
+ resend_timeout(0.5),
+ m_legacy_peer(true)
+{
+}
+
+bool UDPPeer::getAddress(MTProtocols type,Address& toset)
+{
+ if ((type == UDP) || (type == MINETEST_RELIABLE_UDP) || (type == PRIMARY))
+ {
+ toset = address;
+ return true;
+ }
+
+ return false;
+}
+
+void UDPPeer::setNonLegacyPeer()
+{
+ m_legacy_peer = false;
+ for(unsigned int i=0; i< CHANNEL_COUNT; i++)
+ {
+ channels->setWindowSize(g_settings->getU16("max_packets_per_iteration"));
+ }
+}
+
+void UDPPeer::reportRTT(float rtt)
+{
+ if (rtt < 0.0) {
+ return;
+ }
+ RTTStatistics(rtt,"rudp",MAX_RELIABLE_WINDOW_SIZE*10);
+
+ float timeout = getStat(AVG_RTT) * RESEND_TIMEOUT_FACTOR;
+ if(timeout < RESEND_TIMEOUT_MIN)
+ timeout = RESEND_TIMEOUT_MIN;
+ if(timeout > RESEND_TIMEOUT_MAX)
+ timeout = RESEND_TIMEOUT_MAX;
+
+ JMutexAutoLock usage_lock(m_exclusive_access_mutex);
+ resend_timeout = timeout;
+}
+
+bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data)
+{
+ m_ping_timer += dtime;
+ if(m_ping_timer >= PING_TIMEOUT)
+ {
+ // Create and send PING packet
+ writeU8(&data[0], TYPE_CONTROL);
+ writeU8(&data[1], CONTROLTYPE_PING);
+ m_ping_timer = 0.0;
+ return true;
+ }
+ return false;
+}
+
+void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
+ unsigned int max_packet_size)
+{
+ if (m_pending_disconnect)
+ return;
+
+ if ( channels[c.channelnum].queued_commands.empty() &&
+ /* don't queue more packets then window size */
+ (channels[c.channelnum].queued_reliables.size()
+ < (channels[c.channelnum].getWindowSize()/2)))
+ {
+ LOG(dout_con<<m_connection->getDesc()
+ <<" processing reliable command for peer id: " << c.peer_id
+ <<" data size: " << c.data.getSize() << std::endl);
+ if (!processReliableSendCommand(c,max_packet_size))
+ {
+ channels[c.channelnum].queued_commands.push_back(c);
+ }
+ }
+ else
+ {
+ LOG(dout_con<<m_connection->getDesc()
+ <<" Queueing reliable command for peer id: " << c.peer_id
+ <<" data size: " << c.data.getSize() <<std::endl);
+ channels[c.channelnum].queued_commands.push_back(c);
+ }
+}
+
+bool UDPPeer::processReliableSendCommand(
+ ConnectionCommand &c,
+ unsigned int max_packet_size)
+{
+ if (m_pending_disconnect)
+ return true;
+
+ u32 chunksize_max = max_packet_size
+ - BASE_HEADER_SIZE
+ - RELIABLE_HEADER_SIZE;
+
+ assert(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512);
+
+ std::list<SharedBuffer<u8> > originals;
+ u16 split_sequence_number = channels[c.channelnum].readNextSplitSeqNum();
+
+ if (c.raw)
+ {
+ originals.push_back(c.data);
+ }
+ else {
+ originals = makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number);
+ channels[c.channelnum].setNextSplitSeqNum(split_sequence_number);
+ }
+
+ bool have_sequence_number = true;
+ bool have_initial_sequence_number = false;
+ Queue<BufferedPacket> toadd;
+ volatile u16 initial_sequence_number = 0;
+
+ for(std::list<SharedBuffer<u8> >::iterator i = originals.begin();
+ i != originals.end(); ++i)
+ {
+ u16 seqnum = channels[c.channelnum].getOutgoingSequenceNumber(have_sequence_number);
+
+ /* oops, we don't have enough sequence numbers to send this packet */
+ if (!have_sequence_number)
+ break;
+
+ if (!have_initial_sequence_number)
+ {
+ initial_sequence_number = seqnum;
+ have_initial_sequence_number = true;
+ }
+
+ SharedBuffer<u8> reliable = makeReliablePacket(*i, seqnum);
+
+ // Add base headers and make a packet
+ BufferedPacket p = con::makePacket(address, reliable,
+ m_connection->GetProtocolID(), m_connection->GetPeerID(),
+ c.channelnum);
+
+ toadd.push_back(p);
+ }
+
+ if (have_sequence_number) {
+ volatile u16 pcount = 0;
+ while(toadd.size() > 0) {
+ BufferedPacket p = toadd.pop_front();
+// LOG(dout_con<<connection->getDesc()
+// << " queuing reliable packet for peer_id: " << c.peer_id
+// << " channel: " << (c.channelnum&0xFF)
+// << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
+// << std::endl)
+ channels[c.channelnum].queued_reliables.push_back(p);
+ pcount++;
+ }
+ assert(channels[c.channelnum].queued_reliables.size() < 0xFFFF);
+ return true;
+ }
+ else {
+ volatile u16 packets_available = toadd.size();
+ /* we didn't get a single sequence number no need to fill queue */
+ if (!have_initial_sequence_number)
+ {
+ return false;
+ }
+ while(toadd.size() > 0) {
+ /* remove packet */
+ toadd.pop_front();
+
+ bool successfully_put_back_sequence_number
+ = channels[c.channelnum].putBackSequenceNumber(
+ (initial_sequence_number+toadd.size() % (SEQNUM_MAX+1)));
+
+ assert(successfully_put_back_sequence_number);
+ }
+ LOG(dout_con<<m_connection->getDesc()
+ << " Windowsize exceeded on reliable sending " << c.data.getSize() << " bytes"
+ << std::endl << "\t\tinitial_sequence_number: " << initial_sequence_number
+ << std::endl << "\t\tgot at most : " << packets_available << " packets"
+ << std::endl << "\t\tpackets queued : " << channels[c.channelnum].outgoing_reliables_sent.size()
+ << std::endl);
+ return false;
+ }
+}
+
+void UDPPeer::RunCommandQueues(
+ unsigned int max_packet_size,
+ unsigned int maxcommands,
+ unsigned int maxtransfer)
+{
+
+ for (unsigned int i = 0; i < CHANNEL_COUNT; i++)
+ {
+ unsigned int commands_processed = 0;
+
+ if ((channels[i].queued_commands.size() > 0) &&
+ (channels[i].queued_reliables.size() < maxtransfer) &&
+ (commands_processed < maxcommands))
+ {
+ try {
+ ConnectionCommand c = channels[i].queued_commands.pop_front();
+ LOG(dout_con<<m_connection->getDesc()
+ <<" processing queued reliable command "<<std::endl);
+ if (!processReliableSendCommand(c,max_packet_size)) {
+ LOG(dout_con<<m_connection->getDesc()
+ << " Failed to queue packets for peer_id: " << c.peer_id
+ << ", delaying sending of " << c.data.getSize() << " bytes" << std::endl);
+ channels[i].queued_commands.push_front(c);
+ }
+ }
+ catch (ItemNotFoundException &e) {
+ // intentionally empty
+ }
+ }
+ }
+}
+
+u16 UDPPeer::getNextSplitSequenceNumber(u8 channel)
+{
+ assert(channel < CHANNEL_COUNT);
+ return channels[channel].readNextIncomingSeqNum();
+}
+
+void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum)
+{
+ assert(channel < CHANNEL_COUNT);
+ channels[channel].setNextSplitSeqNum(seqnum);
+}
+
+SharedBuffer<u8> UDPPeer::addSpiltPacket(u8 channel,
+ BufferedPacket toadd,
+ bool reliable)
+{
+ assert(channel < CHANNEL_COUNT);
+ return channels[channel].incoming_splits.insert(toadd,reliable);
+}
+
+/******************************************************************************/
+/* Connection Threads */
+/******************************************************************************/
+
+ConnectionSendThread::ConnectionSendThread(Connection* parent,
+ unsigned int max_packet_size,
+ float timeout) :
+ m_connection(parent),
+ m_max_packet_size(max_packet_size),
+ m_timeout(timeout),
+ m_max_commands_per_iteration(1),
+ m_max_data_packets_per_iteration(g_settings->getU16("max_packets_per_iteration")),
+ m_max_packets_requeued(256)
+{
+}
+
+void * ConnectionSendThread::Thread()
+{
+ ThreadStarted();
+ log_register_thread("ConnectionSend");
+
+ LOG(dout_con<<m_connection->getDesc()
+ <<"ConnectionSend thread started"<<std::endl);
+
+ u32 curtime = porting::getTimeMs();
+ u32 lasttime = curtime;
+
+ PROFILE(std::stringstream ThreadIdentifier);
+ PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
+
+ porting::setThreadName("ConnectionSend");
+
+ /* if stop is requested don't stop immediately but try to send all */
+ /* packets first */
+ while(!StopRequested() || packetsQueued()) {
+ BEGIN_DEBUG_EXCEPTION_HANDLER
+ PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
+
+ m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
+
+ /* wait for trigger or timeout */
+ m_send_sleep_semaphore.Wait(50);
+
+ /* remove all triggers */
+ while(m_send_sleep_semaphore.Wait(0)) {}
+
+ lasttime = curtime;
+ curtime = porting::getTimeMs();
+ float dtime = CALC_DTIME(lasttime,curtime);
+
+ /* first do all the reliable stuff */
+ runTimeouts(dtime);
+
+ /* translate commands to packets */
+ ConnectionCommand c = m_connection->m_command_queue.pop_frontNoEx(0);
+ while(c.type != CONNCMD_NONE)
+ {
+ if (c.reliable)
+ processReliableCommand(c);
+ else
+ processNonReliableCommand(c);
+
+ c = m_connection->m_command_queue.pop_frontNoEx(0);
+ }
+
+ /* send non reliable packets */
+ sendPackets(dtime);
+
+ END_DEBUG_EXCEPTION_HANDLER(derr_con);
+ }
+
+ PROFILE(g_profiler->remove(ThreadIdentifier.str()));
+ return NULL;
+}
+
+void ConnectionSendThread::Trigger()
+{
+ m_send_sleep_semaphore.Post();
+}
+
+bool ConnectionSendThread::packetsQueued()
+{
+ std::list<u16> peerIds = m_connection->getPeerIDs();
+
+ if ((this->m_outgoing_queue.size() > 0) && (peerIds.size() > 0))
+ return true;
+
+ for(std::list<u16>::iterator j = peerIds.begin();
+ j != peerIds.end(); ++j)
+ {
+ PeerHelper peer = m_connection->getPeerNoEx(*j);
+
+ if (!peer)
+ continue;
+
+ if (dynamic_cast<UDPPeer*>(&peer) == 0)
+ continue;
+
+ for(u16 i=0; i<CHANNEL_COUNT; i++)
+ {
+ Channel *channel = &(dynamic_cast<UDPPeer*>(&peer))->channels[i];
+
+ if (channel->queued_commands.size() > 0)
+ {
+ return true;
+ }
+ }
+ }
+
+
+ return false;
+}
+
+void ConnectionSendThread::runTimeouts(float dtime)
+{
+ std::list<u16> timeouted_peers;
+ std::list<u16> peerIds = m_connection->getPeerIDs();
+
+ for(std::list<u16>::iterator j = peerIds.begin();
+ j != peerIds.end(); ++j)
+ {
+ PeerHelper peer = m_connection->getPeerNoEx(*j);
+
+ if (!peer)
+ continue;
+
+ if(dynamic_cast<UDPPeer*>(&peer) == 0)
+ continue;
+
+ PROFILE(std::stringstream peerIdentifier);
+ PROFILE(peerIdentifier << "runTimeouts[" << m_connection->getDesc() << ";" << *j << ";RELIABLE]");
+ PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier.str(), SPT_AVG));
+
+ SharedBuffer<u8> data(2); // data for sending ping, required here because of goto
+