#include <iomanip>
#include <errno.h>
#include "connection.h"
-#include "main.h"
#include "serialization.h"
#include "log.h"
#include "porting.h"
+#include "network/networkpacket.h"
#include "util/serialize.h"
#include "util/numeric.h"
#include "util/string.h"
#undef DEBUG_CONNECTION_KBPS
#else
/* this mutex is used to achieve log message consistency */
-JMutex log_message_mutex;
+std::mutex log_message_mutex;
#define LOG(a) \
{ \
- JMutexAutoLock loglock(log_message_mutex); \
+ MutexAutoLock loglock(log_message_mutex); \
a; \
}
#define PROFILE(a) a
#endif
-static inline float CALC_DTIME(unsigned int lasttime, unsigned int curtime) {
+static inline float CALC_DTIME(u64 lasttime, u64 curtime)
+{
float value = ( curtime - lasttime) / 1000.0;
return MYMAX(MYMIN(value,0.1),0.0);
}
#define PING_TIMEOUT 5.0
+/* maximum number of retries for reliable packets */
+#define MAX_RELIABLE_RETRY 5
+
static u16 readPeerId(u8 *packetdata)
{
return readU16(&packetdata[4]);
void ReliablePacketBuffer::print()
{
- JMutexAutoLock listlock(m_list_mutex);
+ MutexAutoLock listlock(m_list_mutex);
LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl);
unsigned int index = 0;
for(std::list<BufferedPacket>::iterator i = m_list.begin();
}
bool ReliablePacketBuffer::empty()
{
- JMutexAutoLock listlock(m_list_mutex);
+ MutexAutoLock listlock(m_list_mutex);
return m_list.empty();
}
}
bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
{
- JMutexAutoLock listlock(m_list_mutex);
+ MutexAutoLock listlock(m_list_mutex);
if (m_list.empty())
return false;
BufferedPacket p = *m_list.begin();
BufferedPacket ReliablePacketBuffer::popFirst()
{
- JMutexAutoLock listlock(m_list_mutex);
+ MutexAutoLock listlock(m_list_mutex);
if (m_list.empty())
throw NotFoundException("Buffer is empty");
BufferedPacket p = *m_list.begin();
}
BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
{
- JMutexAutoLock listlock(m_list_mutex);
+ MutexAutoLock listlock(m_list_mutex);
RPBSearchResult r = findPacket(seqnum);
if (r == notFound()) {
LOG(dout_con<<"Sequence number: " << seqnum
RPBSearchResult next = r;
- next++;
+ ++next;
if (next != notFound()) {
u16 s = readU16(&(next->data[BASE_HEADER_SIZE+1]));
m_oldest_non_answered_ack = s;
}
void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
{
- JMutexAutoLock listlock(m_list_mutex);
- assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
- u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
- assert(type == TYPE_RELIABLE);
- u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
+ MutexAutoLock listlock(m_list_mutex);
+ if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
+ errorstream << "ReliablePacketBuffer::insert(): Invalid data size for "
+ "reliable packet" << std::endl;
+ return;
+ }
+ u8 type = readU8(&p.data[BASE_HEADER_SIZE + 0]);
+ if (type != TYPE_RELIABLE) {
+ errorstream << "ReliablePacketBuffer::insert(): type is not reliable"
+ << std::endl;
+ return;
+ }
+ u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
- assert(seqnum_in_window(seqnum,next_expected,MAX_RELIABLE_WINDOW_SIZE));
- assert(seqnum != next_expected);
+ if (!seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE)) {
+ errorstream << "ReliablePacketBuffer::insert(): seqnum is outside of "
+ "expected window " << std::endl;
+ return;
+ }
+ if (seqnum == next_expected) {
+ errorstream << "ReliablePacketBuffer::insert(): seqnum is next expected"
+ << std::endl;
+ return;
+ }
++m_list_size;
- assert(m_list_size <= SEQNUM_MAX+1);
+ sanity_check(m_list_size <= SEQNUM_MAX+1); // FIXME: Handle the error?
// Find the right place for the packet and insert it there
// If list is empty, just add it
/* this is true e.g. on wrap around */
if (seqnum < next_expected) {
while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) {
- i++;
+ ++i;
if (i != m_list.end())
s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
}
else
{
while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) {
- i++;
+ ++i;
if (i != m_list.end())
s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
}
throw IncomingDataCorruption("duplicated packet isn't same as original one");
}
- assert(readU16(&(i->data[BASE_HEADER_SIZE+1])) == seqnum);
- assert(i->data.getSize() == p.data.getSize());
- assert(i->address == p.address);
-
/* nothing to do this seems to be a resent packet */
/* for paranoia reason data should be compared */
--m_list_size;
void ReliablePacketBuffer::incrementTimeouts(float dtime)
{
- JMutexAutoLock listlock(m_list_mutex);
+ MutexAutoLock listlock(m_list_mutex);
for(std::list<BufferedPacket>::iterator i = m_list.begin();
i != m_list.end(); ++i)
{
std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout,
unsigned int max_packets)
{
- JMutexAutoLock listlock(m_list_mutex);
+ MutexAutoLock listlock(m_list_mutex);
std::list<BufferedPacket> timed_outs;
for(std::list<BufferedPacket>::iterator i = m_list.begin();
i != m_list.end(); ++i)
IncomingSplitBuffer::~IncomingSplitBuffer()
{
- JMutexAutoLock listlock(m_map_mutex);
+ MutexAutoLock listlock(m_map_mutex);
for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
i != m_buf.end(); ++i)
{
*/
SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
{
- JMutexAutoLock listlock(m_map_mutex);
+ MutexAutoLock listlock(m_map_mutex);
u32 headersize = BASE_HEADER_SIZE + 7;
- assert(p.data.getSize() >= headersize);
+ if (p.data.getSize() < headersize) {
+ errorstream << "Invalid data size for split packet" << std::endl;
+ return SharedBuffer<u8>();
+ }
u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
- assert(type == TYPE_SPLIT);
u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
+ if (type != TYPE_SPLIT) {
+ errorstream << "IncomingSplitBuffer::insert(): type is not split"
+ << std::endl;
+ return SharedBuffer<u8>();
+ }
+
// Add if doesn't exist
if (m_buf.find(seqnum) == m_buf.end())
{
{
std::list<u16> remove_queue;
{
- JMutexAutoLock listlock(m_map_mutex);
+ MutexAutoLock listlock(m_map_mutex);
for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
i != m_buf.end(); ++i)
{
for(std::list<u16>::iterator j = remove_queue.begin();
j != remove_queue.end(); ++j)
{
- JMutexAutoLock listlock(m_map_mutex);
+ MutexAutoLock listlock(m_map_mutex);
LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"<<std::endl);
delete m_buf[*j];
m_buf.erase(*j);
u16 Channel::readNextIncomingSeqNum()
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
return next_incoming_seqnum;
}
u16 Channel::incNextIncomingSeqNum()
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
u16 retval = next_incoming_seqnum;
next_incoming_seqnum++;
return retval;
u16 Channel::readNextSplitSeqNum()
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
return next_outgoing_split_seqnum;
}
void Channel::setNextSplitSeqNum(u16 seqnum)
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
next_outgoing_split_seqnum = seqnum;
}
-u16 Channel::getOutgoingSequenceNumber(bool& successfull)
+u16 Channel::getOutgoingSequenceNumber(bool& successful)
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
u16 retval = next_outgoing_seqnum;
u16 lowest_unacked_seqnumber;
// know about difference of two unsigned may be negative in general
// but we already made sure it won't happen in this case
if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > window_size) {
- successfull = false;
+ successful = false;
return 0;
}
}
// but we already made sure it won't happen in this case
if ((next_outgoing_seqnum + (u16)(SEQNUM_MAX - lowest_unacked_seqnumber)) >
window_size) {
- successfull = false;
+ successful = false;
return 0;
}
}
u16 Channel::readOutgoingSequenceNumber()
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
return next_outgoing_seqnum;
}
void Channel::UpdateBytesSent(unsigned int bytes, unsigned int packets)
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
current_bytes_transfered += bytes;
current_packet_successfull += packets;
}
void Channel::UpdateBytesReceived(unsigned int bytes) {
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
current_bytes_received += bytes;
}
void Channel::UpdateBytesLost(unsigned int bytes)
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
current_bytes_lost += bytes;
}
void Channel::UpdatePacketLossCounter(unsigned int count)
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
current_packet_loss += count;
}
void Channel::UpdatePacketTooLateCounter()
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
current_packet_too_late++;
}
bool reasonable_amount_of_data_transmitted = false;
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
packet_loss = current_packet_loss;
//packet_too_late = current_packet_too_late;
packets_successfull = current_packet_successfull;
if (bpm_counter > 10.0)
{
{
- JMutexAutoLock internal(m_internal_mutex);
+ MutexAutoLock internal(m_internal_mutex);
cur_kbps =
(((float) current_bytes_transfered)/bpm_counter)/1024.0;
current_bytes_transfered = 0;
bool Peer::IncUseCount()
{
- JMutexAutoLock lock(m_exclusive_access_mutex);
+ MutexAutoLock lock(m_exclusive_access_mutex);
if (!m_pending_deletion)
{
void Peer::DecUseCount()
{
{
- JMutexAutoLock lock(m_exclusive_access_mutex);
- assert(m_usage > 0);
+ MutexAutoLock lock(m_exclusive_access_mutex);
+ sanity_check(m_usage > 0);
m_usage--;
if (!((m_pending_deletion) && (m_usage == 0)))
delete this;
}
-void Peer::RTTStatistics(float rtt, std::string profiler_id,
+void Peer::RTTStatistics(float rtt, const std::string &profiler_id,
unsigned int num_samples) {
if (m_last_rtt > 0) {
m_rtt.jitter_avg = m_rtt.jitter_avg * (num_samples/(num_samples-1)) +
jitter * (1/num_samples);
- if (profiler_id != "")
- {
+ if (profiler_id != "") {
g_profiler->graphAdd(profiler_id + "_rtt", rtt);
g_profiler->graphAdd(profiler_id + "_jitter", jitter);
}
bool Peer::isTimedOut(float timeout)
{
- JMutexAutoLock lock(m_exclusive_access_mutex);
- u32 current_time = porting::getTimeMs();
+ MutexAutoLock lock(m_exclusive_access_mutex);
+ u64 current_time = porting::getTimeMs();
float dtime = CALC_DTIME(m_last_timeout_check,current_time);
m_last_timeout_check = current_time;
void Peer::Drop()
{
{
- JMutexAutoLock usage_lock(m_exclusive_access_mutex);
+ MutexAutoLock usage_lock(m_exclusive_access_mutex);
m_pending_deletion = true;
if (m_usage != 0)
return;
if (timeout > RESEND_TIMEOUT_MAX)
timeout = RESEND_TIMEOUT_MAX;
- JMutexAutoLock usage_lock(m_exclusive_access_mutex);
+ MutexAutoLock usage_lock(m_exclusive_access_mutex);
resend_timeout = timeout;
}
if ( channels[c.channelnum].queued_commands.empty() &&
/* don't queue more packets then window size */
(channels[c.channelnum].queued_reliables.size()
- < (channels[c.channelnum].getWindowSize()/2)))
- {
+ < (channels[c.channelnum].getWindowSize()/2))) {
LOG(dout_con<<m_connection->getDesc()
<<" processing reliable command for peer id: " << c.peer_id
<<" data size: " << c.data.getSize() << std::endl);
- if (!processReliableSendCommand(c,max_packet_size))
- {
+ if (!processReliableSendCommand(c,max_packet_size)) {
channels[c.channelnum].queued_commands.push_back(c);
}
}
- else
- {
+ else {
LOG(dout_con<<m_connection->getDesc()
<<" Queueing reliable command for peer id: " << c.peer_id
<<" data size: " << c.data.getSize() <<std::endl);
- BASE_HEADER_SIZE
- RELIABLE_HEADER_SIZE;
- assert(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512);
+ sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512);
std::list<SharedBuffer<u8> > originals;
u16 split_sequence_number = channels[c.channelnum].readNextSplitSeqNum();
bool have_sequence_number = true;
bool have_initial_sequence_number = false;
- Queue<BufferedPacket> toadd;
+ std::queue<BufferedPacket> toadd;
volatile u16 initial_sequence_number = 0;
for(std::list<SharedBuffer<u8> >::iterator i = originals.begin();
m_connection->GetProtocolID(), m_connection->GetPeerID(),
c.channelnum);
- toadd.push_back(p);
+ toadd.push(p);
}
if (have_sequence_number) {
volatile u16 pcount = 0;
while(toadd.size() > 0) {
- BufferedPacket p = toadd.pop_front();
+ BufferedPacket p = toadd.front();
+ toadd.pop();
// LOG(dout_con<<connection->getDesc()
// << " queuing reliable packet for peer_id: " << c.peer_id
// << " channel: " << (c.channelnum&0xFF)
// << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
// << std::endl)
- channels[c.channelnum].queued_reliables.push_back(p);
+ channels[c.channelnum].queued_reliables.push(p);
pcount++;
}
- assert(channels[c.channelnum].queued_reliables.size() < 0xFFFF);
+ sanity_check(channels[c.channelnum].queued_reliables.size() < 0xFFFF);
return true;
}
else {
}
while(toadd.size() > 0) {
/* remove packet */
- toadd.pop_front();
+ toadd.pop();
bool successfully_put_back_sequence_number
= channels[c.channelnum].putBackSequenceNumber(
(initial_sequence_number+toadd.size() % (SEQNUM_MAX+1)));
- assert(successfully_put_back_sequence_number);
+ FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error");
}
LOG(dout_con<<m_connection->getDesc()
<< " Windowsize exceeded on reliable sending "
unsigned int maxtransfer)
{
- for (unsigned int i = 0; i < CHANNEL_COUNT; i++)
- {
+ for (unsigned int i = 0; i < CHANNEL_COUNT; i++) {
unsigned int commands_processed = 0;
if ((channels[i].queued_commands.size() > 0) &&
(channels[i].queued_reliables.size() < maxtransfer) &&
- (commands_processed < maxcommands))
- {
+ (commands_processed < maxcommands)) {
try {
- ConnectionCommand c = channels[i].queued_commands.pop_front();
- LOG(dout_con<<m_connection->getDesc()
- <<" processing queued reliable command "<<std::endl);
- if (!processReliableSendCommand(c,max_packet_size)) {
- LOG(dout_con<<m_connection->getDesc()
+ ConnectionCommand c = channels[i].queued_commands.front();
+
+ LOG(dout_con << m_connection->getDesc()
+ << " processing queued reliable command " << std::endl);
+
+ // Packet is processed, remove it from queue
+ if (processReliableSendCommand(c,max_packet_size)) {
+ channels[i].queued_commands.pop_front();
+ } else {
+ LOG(dout_con << m_connection->getDesc()
<< " Failed to queue packets for peer_id: " << c.peer_id
<< ", delaying sending of " << c.data.getSize()
<< " bytes" << std::endl);
- channels[i].queued_commands.push_front(c);
}
}
catch (ItemNotFoundException &e) {
u16 UDPPeer::getNextSplitSequenceNumber(u8 channel)
{
- assert(channel < CHANNEL_COUNT);
- return channels[channel].readNextIncomingSeqNum();
+ assert(channel < CHANNEL_COUNT); // Pre-condition
+ return channels[channel].readNextSplitSeqNum();
}
void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum)
{
- assert(channel < CHANNEL_COUNT);
+ assert(channel < CHANNEL_COUNT); // Pre-condition
channels[channel].setNextSplitSeqNum(seqnum);
}
BufferedPacket toadd,
bool reliable)
{
- assert(channel < CHANNEL_COUNT);
+ assert(channel < CHANNEL_COUNT); // Pre-condition
return channels[channel].incoming_splits.insert(toadd,reliable);
}
/* Connection Threads */
/******************************************************************************/
-ConnectionSendThread::ConnectionSendThread( unsigned int max_packet_size,
- float timeout) :
+ConnectionSendThread::ConnectionSendThread(unsigned int max_packet_size,
+ float timeout) :
+ Thread("ConnectionSend"),
m_connection(NULL),
m_max_packet_size(max_packet_size),
m_timeout(timeout),
{
}
-void * ConnectionSendThread::Thread()
+void * ConnectionSendThread::run()
{
- assert(m_connection != NULL);
- ThreadStarted();
- log_register_thread("ConnectionSend");
+ assert(m_connection);
LOG(dout_con<<m_connection->getDesc()
<<"ConnectionSend thread started"<<std::endl);
- u32 curtime = porting::getTimeMs();
- u32 lasttime = curtime;
+ u64 curtime = porting::getTimeMs();
+ u64 lasttime = curtime;
PROFILE(std::stringstream ThreadIdentifier);
PROFILE(ThreadIdentifier << "ConnectionSend: [" << m_connection->getDesc() << "]");
- porting::setThreadName("ConnectionSend");
-
/* if stop is requested don't stop immediately but try to send all */
/* packets first */
- while(!StopRequested() || packetsQueued()) {
+ while(!stopRequested() || packetsQueued()) {
BEGIN_DEBUG_EXCEPTION_HANDLER
PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
m_iteration_packets_avaialble = m_max_data_packets_per_iteration;
/* wait for trigger or timeout */
- m_send_sleep_semaphore.Wait(50);
+ m_send_sleep_semaphore.wait(50);
/* remove all triggers */
- while(m_send_sleep_semaphore.Wait(0)) {}
+ while(m_send_sleep_semaphore.wait(0)) {}
lasttime = curtime;
curtime = porting::getTimeMs();
/* send non reliable packets */
sendPackets(dtime);
- END_DEBUG_EXCEPTION_HANDLER(errorstream);
+ END_DEBUG_EXCEPTION_HANDLER
}
PROFILE(g_profiler->remove(ThreadIdentifier.str()));
void ConnectionSendThread::Trigger()
{
- m_send_sleep_semaphore.Post();
+ m_send_sleep_semaphore.post();
}
bool ConnectionSendThread::packetsQueued()
if (dynamic_cast<UDPPeer*>(&peer) == 0)
continue;
- for(u16 i=0; i<CHANNEL_COUNT; i++)
- {
+ for(u16 i=0; i < CHANNEL_COUNT; i++) {
Channel *channel = &(dynamic_cast<UDPPeer*>(&peer))->channels[i];
- if (channel->queued_commands.size() > 0)
- {
+ if (channel->queued_commands.size() > 0) {
return true;
}
}
}
float resend_timeout = dynamic_cast<UDPPeer*>(&peer)->getResendTimeout();
+ bool retry_count_exceeded = false;
for(u16 i=0; i<CHANNEL_COUNT; i++)
{
std::list<BufferedPacket> timed_outs;
channel->UpdateBytesLost(k->data.getSize());
k->resend_count++;
+ if (k-> resend_count > MAX_RELIABLE_RETRY) {
+ retry_count_exceeded = true;
+ timeouted_peers.push_back(peer->id);
+ /* no need to check additional packets if a single one did timeout*/
+ break;
+ }
+
LOG(derr_con<<m_connection->getDesc()
<<"RE-SENDING timed-out RELIABLE to "
<< k->address.serializeString()
// do not handle rtt here as we can't decide if this packet was
// lost or really takes more time to transmit
}
+
+ if (retry_count_exceeded) {
+ break; /* no need to check other channels if we already did timeout */
+ }
+
channel->UpdateTimers(dtime,dynamic_cast<UDPPeer*>(&peer)->getLegacyPeer());
}
+ /* skip to next peer if we did timeout */
+ if (retry_count_exceeded)
+ continue;
+
/* send ping if necessary */
if (dynamic_cast<UDPPeer*>(&peer)->Ping(dtime,data)) {
LOG(dout_con<<m_connection->getDesc()
LOG(derr_con<<m_connection->getDesc()
<<"WARNING: Going to send a reliable packet"
<<" in outgoing buffer" <<std::endl);
- //assert(0);
}
// Send the packet
LOG(dout_con<<m_connection->getDesc()
<<" INFO: dropped packet for non existent peer_id: "
<< peer_id << std::endl);
- assert(reliable && "trying to send raw packet reliable but no peer found!");
+ FATAL_ERROR_IF(!reliable, "Trying to send raw packet reliable but no peer found!");
return false;
}
Channel *channel = &(dynamic_cast<UDPPeer*>(&peer)->channels[channelnum]);
<<" INFO: queueing reliable packet for peer_id: " << peer_id
<<" channel: " << channelnum
<<" seqnum: " << seqnum << std::endl);
- channel->queued_reliables.push_back(p);
+ channel->queued_reliables.push(p);
return false;
}
}
void ConnectionSendThread::processReliableCommand(ConnectionCommand &c)
{
- assert(c.reliable);
+ assert(c.reliable); // Pre-condition
switch(c.type) {
case CONNCMD_NONE:
case CONNCMD_CONNECT:
case CONNCMD_DISCONNECT:
case CONCMD_ACK:
- assert("Got command that shouldn't be reliable as reliable command" == 0);
+ FATAL_ERROR("Got command that shouldn't be reliable as reliable command");
default:
LOG(dout_con<<m_connection->getDesc()
<<" Invalid reliable command type: " << c.type <<std::endl);
void ConnectionSendThread::processNonReliableCommand(ConnectionCommand &c)
{
- assert(!c.reliable);
+ assert(!c.reliable); // Pre-condition
switch(c.type) {
case CONNCMD_NONE:
sendAsPacket(c.peer_id,c.channelnum,c.data,true);
return;
case CONCMD_CREATE_PEER:
- assert("Got command that should be reliable as unreliable command" == 0);
+ FATAL_ERROR("Got command that should be reliable as unreliable command");
default:
LOG(dout_con<<m_connection->getDesc()
<<" Invalid command type: " << c.type <<std::endl);
// Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
m_connection->SetPeerID(PEER_ID_INEXISTENT);
- NetworkPacket* pkt = new NetworkPacket(0,0);
- m_connection->Send(PEER_ID_SERVER, 0, pkt, true);
+ NetworkPacket pkt(0,0);
+ m_connection->Send(PEER_ID_SERVER, 0, &pkt, true);
}
void ConnectionSendThread::disconnect()
for (std::list<u16>::iterator i = peerids.begin();
i != peerids.end();
- i++)
+ ++i)
{
sendAsPacket(*i, 0,data,false);
}
void ConnectionSendThread::send(u16 peer_id, u8 channelnum,
SharedBuffer<u8> data)
{
- assert(channelnum < CHANNEL_COUNT);
+ assert(channelnum < CHANNEL_COUNT); // Pre-condition
PeerHelper peer = m_connection->getPeerNoEx(peer_id);
if (!peer)
for (std::list<u16>::iterator i = peerids.begin();
i != peerids.end();
- i++)
+ ++i)
{
send(*i, channelnum, data);
}
for (std::list<u16>::iterator i = peerids.begin();
i != peerids.end();
- i++)
+ ++i)
{
PeerHelper peer = m_connection->getPeerNoEx(*i);
< dynamic_cast<UDPPeer*>(&peer)->channels[i].getWindowSize())&&
(peer->m_increment_packets_remaining > 0))
{
- BufferedPacket p = dynamic_cast<UDPPeer*>(&peer)->channels[i].queued_reliables.pop_front();
+ BufferedPacket p = dynamic_cast<UDPPeer*>(&peer)->channels[i].queued_reliables.front();
+ dynamic_cast<UDPPeer*>(&peer)->channels[i].queued_reliables.pop();
Channel* channel = &(dynamic_cast<UDPPeer*>(&peer)->channels[i]);
LOG(dout_con<<m_connection->getDesc()
<<" INFO: sending a queued reliable packet "
unsigned int initial_queuesize = m_outgoing_queue.size();
/* send non reliable packets*/
for(unsigned int i=0;i < initial_queuesize;i++) {
- OutgoingPacket packet = m_outgoing_queue.pop_front();
+ OutgoingPacket packet = m_outgoing_queue.front();
+ m_outgoing_queue.pop();
- assert(!packet.reliable &&
- "reliable packets are not allowed in outgoing queue!");
+ if (packet.reliable)
+ continue;
PeerHelper peer = m_connection->getPeerNoEx(packet.peer_id);
if (!peer) {
}
else if (
( peer->m_increment_packets_remaining > 0) ||
- (StopRequested())) {
+ (stopRequested())) {
rawSendAsPacket(packet.peer_id, packet.channelnum,
packet.data, packet.reliable);
peer->m_increment_packets_remaining--;
}
else {
- m_outgoing_queue.push_back(packet);
+ m_outgoing_queue.push(packet);
pending_unreliable[packet.peer_id] = true;
}
}
SharedBuffer<u8> data, bool ack)
{
OutgoingPacket packet(peer_id, channelnum, data, false, ack);
- m_outgoing_queue.push_back(packet);
+ m_outgoing_queue.push(packet);
}
ConnectionReceiveThread::ConnectionReceiveThread(unsigned int max_packet_size) :
+ Thread("ConnectionReceive"),
m_connection(NULL)
{
}
-void * ConnectionReceiveThread::Thread()
+void * ConnectionReceiveThread::run()
{
- assert(m_connection != NULL);
- ThreadStarted();
- log_register_thread("ConnectionReceive");
+ assert(m_connection);
LOG(dout_con<<m_connection->getDesc()
<<"ConnectionReceive thread started"<<std::endl);
PROFILE(std::stringstream ThreadIdentifier);
PROFILE(ThreadIdentifier << "ConnectionReceive: [" << m_connection->getDesc() << "]");
- porting::setThreadName("ConnectionReceive");
-
#ifdef DEBUG_CONNECTION_KBPS
- u32 curtime = porting::getTimeMs();
- u32 lasttime = curtime;
+ u64 curtime = porting::getTimeMs();
+ u64 lasttime = curtime;
float debug_print_timer = 0.0;
#endif
- while(!StopRequested()) {
+ while(!stopRequested()) {
BEGIN_DEBUG_EXCEPTION_HANDLER
PROFILE(ScopeProfiler sp(g_profiler, ThreadIdentifier.str(), SPT_AVG));
}
}
#endif
- END_DEBUG_EXCEPTION_HANDLER(errorstream);
+ END_DEBUG_EXCEPTION_HANDLER
}
+
PROFILE(g_profiler->remove(ThreadIdentifier.str()));
return NULL;
}
throw InvalidIncomingDataException("Channel doesn't exist");
}
- /* preserve original peer_id for later usage */
- u16 packet_peer_id = peer_id;
-
/* Try to identify peer by sender address (may happen on join) */
if (peer_id == PEER_ID_INEXISTENT) {
peer_id = m_connection->lookupPeer(sender);
+ // We do not have to remind the peer of its
+ // peer id as the CONTROLTYPE_SET_PEER_ID
+ // command was sent reliably.
}
/* The peer was not found in our lists. Add it. */
}
}
-
- /* mark peer as seen with id */
- if (!(packet_peer_id == PEER_ID_INEXISTENT))
- peer->setSentWithID();
-
peer->ResetTimeout();
Channel *channel = 0;
u8 type = readU8(&(packetdata[0]));
if (MAX_UDP_PEERS <= 65535 && peer_id >= MAX_UDP_PEERS) {
- errorstream << "Something is wrong with peer_id" << std::endl;
- assert(0);
+ std::string errmsg = "Invalid peer_id=" + itos(peer_id);
+ errorstream << errmsg << std::endl;
+ throw InvalidIncomingDataException(errmsg.c_str());
}
if (type == TYPE_CONTROL)
if (controltype == CONTROLTYPE_ACK)
{
- assert(channel != 0);
- if (packetdata.getSize() < 4)
- throw InvalidIncomingDataException
- ("packetdata.getSize() < 4 (ACK header size)");
+ assert(channel != NULL);
+
+ if (packetdata.getSize() < 4) {
+ throw InvalidIncomingDataException(
+ "packetdata.getSize() < 4 (ACK header size)");
+ }
u16 seqnum = readU16(&packetdata[2]);
LOG(dout_con<<m_connection->getDesc()
// only calculate rtt from straight sent packets
if (p.resend_count == 0) {
// Get round trip time
- unsigned int current_time = porting::getTimeMs();
+ u64 current_time = porting::getTimeMs();
// a overflow is quite unlikely but as it'd result in major
// rtt miscalculation we handle it here
}
else if (type == TYPE_RELIABLE)
{
- assert(channel != 0);
+ assert(channel != NULL);
+
// Recursive reliable packets not allowed
if (reliable)
throw InvalidIncomingDataException("Found nested reliable packets");
}
// We should never get here.
- // If you get here, add an exception or a return to some of the
- // above conditionals.
- assert(0);
- throw BaseException("Error in Channel::ProcessPacket()");
+ FATAL_ERROR("Invalid execution point");
}
/*
Connection
*/
-Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
- bool ipv6) :
- m_udpSocket(ipv6),
- m_command_queue(),
- m_event_queue(),
- m_peer_id(0),
- m_protocol_id(protocol_id),
- m_sendThread(max_packet_size, timeout),
- m_receiveThread(max_packet_size),
- m_info_mutex(),
- m_bc_peerhandler(0),
- m_bc_receive_timeout(0),
- m_shutting_down(false),
- m_next_remote_peer_id(2)
-{
- m_udpSocket.setTimeoutMs(5);
-
- m_sendThread.setParent(this);
- m_receiveThread.setParent(this);
-
- m_sendThread.Start();
- m_receiveThread.Start();
-}
-
Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
bool ipv6, PeerHandler *peerhandler) :
m_udpSocket(ipv6),
m_sendThread.setParent(this);
m_receiveThread.setParent(this);
- m_sendThread.Start();
- m_receiveThread.Start();
+ m_sendThread.start();
+ m_receiveThread.start();
}
{
m_shutting_down = true;
// request threads to stop
- m_sendThread.Stop();
- m_receiveThread.Stop();
+ m_sendThread.stop();
+ m_receiveThread.stop();
//TODO for some unkonwn reason send/receive threads do not exit as they're
// supposed to be but wait on peer timeout. To speed up shutdown we reduce
m_sendThread.setPeerTimeout(0.5);
// wait for threads to finish
- m_sendThread.Wait();
- m_receiveThread.Wait();
+ m_sendThread.wait();
+ m_receiveThread.wait();
// Delete peers
for(std::map<u16, Peer*>::iterator
/* Internal stuff */
void Connection::putEvent(ConnectionEvent &e)
{
- assert(e.type != CONNEVENT_NONE);
+ assert(e.type != CONNEVENT_NONE); // Pre-condition
m_event_queue.push_back(e);
}
PeerHelper Connection::getPeer(u16 peer_id)
{
- JMutexAutoLock peerlock(m_peers_mutex);
+ MutexAutoLock peerlock(m_peers_mutex);
std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
if (node == m_peers.end()) {
}
// Error checking
- assert(node->second->id == peer_id);
+ FATAL_ERROR_IF(node->second->id != peer_id, "Invalid peer id");
return PeerHelper(node->second);
}
PeerHelper Connection::getPeerNoEx(u16 peer_id)
{
- JMutexAutoLock peerlock(m_peers_mutex);
+ MutexAutoLock peerlock(m_peers_mutex);
std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
if (node == m_peers.end()) {
}
// Error checking
- assert(node->second->id == peer_id);
+ FATAL_ERROR_IF(node->second->id != peer_id, "Invalid peer id");
return PeerHelper(node->second);
}
/* find peer_id for address */
u16 Connection::lookupPeer(Address& sender)
{
- JMutexAutoLock peerlock(m_peers_mutex);
+ MutexAutoLock peerlock(m_peers_mutex);
std::map<u16, Peer*>::iterator j;
j = m_peers.begin();
for(; j != m_peers.end(); ++j)
{
Peer *peer = j->second;
- if (peer->isActive())
+ if (peer->isPendingDeletion())
continue;
Address tocheck;
/* lock list as short as possible */
{
- JMutexAutoLock peerlock(m_peers_mutex);
+ MutexAutoLock peerlock(m_peers_mutex);
if (m_peers.find(peer_id) == m_peers.end())
return false;
peer = m_peers[peer_id];
/* Interface */
-ConnectionEvent Connection::getEvent()
-{
- if (m_event_queue.empty()) {
- ConnectionEvent e;
- e.type = CONNEVENT_NONE;
- return e;
- }
- return m_event_queue.pop_frontNoEx();
-}
-
ConnectionEvent Connection::waitEvent(u32 timeout_ms)
{
try {
bool Connection::Connected()
{
- JMutexAutoLock peerlock(m_peers_mutex);
+ MutexAutoLock peerlock(m_peers_mutex);
if (m_peers.size() != 1)
return false;
putCommand(c);
}
-u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
+void Connection::Receive(NetworkPacket* pkt)
{
for(;;) {
ConnectionEvent e = waitEvent(m_bc_receive_timeout);
if (e.type != CONNEVENT_NONE)
- LOG(dout_con<<getDesc()<<": Receive: got event: "
- <<e.describe()<<std::endl);
+ LOG(dout_con << getDesc() << ": Receive: got event: "
+ << e.describe() << std::endl);
switch(e.type) {
case CONNEVENT_NONE:
throw NoIncomingDataException("No incoming data");
case CONNEVENT_DATA_RECEIVED:
- peer_id = e.peer_id;
- data = SharedBuffer<u8>(e.data);
- return e.data.getSize();
+ // Data size is lesser than command size, ignoring packet
+ if (e.data.getSize() < 2) {
+ continue;
+ }
+
+ pkt->putRawPacket(*e.data, e.data.getSize(), e.peer_id);
+ return;
case CONNEVENT_PEER_ADDED: {
UDPPeer tmp(e.peer_id, e.address, this);
if (m_bc_peerhandler)
m_bc_peerhandler->peerAdded(&tmp);
- continue; }
+ continue;
+ }
case CONNEVENT_PEER_REMOVED: {
UDPPeer tmp(e.peer_id, e.address, this);
if (m_bc_peerhandler)
m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
- continue; }
+ continue;
+ }
case CONNEVENT_BIND_FAILED:
throw ConnectionBindFailed("Failed to bind socket "
"(port already in use?)");
void Connection::Send(u16 peer_id, u8 channelnum,
NetworkPacket* pkt, bool reliable)
{
- assert(channelnum < CHANNEL_COUNT);
+ assert(channelnum < CHANNEL_COUNT); // Pre-condition
ConnectionCommand c;
- c.send(peer_id, channelnum, pkt->oldForgePacket(), reliable);
+ c.send(peer_id, channelnum, pkt, reliable);
putCommand(c);
}
{
PeerHelper peer = getPeerNoEx(PEER_ID_SERVER);
- if (!peer) {
- assert("Connection::getLocalStat we couldn't get our own peer? are you serious???" == 0);
- }
+ FATAL_ERROR_IF(!peer, "Connection::getLocalStat we couldn't get our own peer? are you serious???");
float retval = 0.0;
retval += dynamic_cast<UDPPeer*>(&peer)->channels[j].getCurrentLossRateKB();
break;
default:
- assert("Connection::getLocalStat Invalid stat type" == 0);
+ FATAL_ERROR("Connection::getLocalStat Invalid stat type");
}
}
return retval;
/*
Find an unused peer id
*/
- JMutexAutoLock lock(m_peers_mutex);
+ MutexAutoLock lock(m_peers_mutex);
bool out_of_ids = false;
for(;;) {
// Check if exists
void Connection::PrintInfo(std::ostream &out)
{
- m_info_mutex.Lock();
+ m_info_mutex.lock();
out<<getDesc()<<": ";
- m_info_mutex.Unlock();
+ m_info_mutex.unlock();
}
void Connection::PrintInfo()
void Connection::sendAck(u16 peer_id, u8 channelnum, u16 seqnum)
{
- assert(channelnum < CHANNEL_COUNT);
+ assert(channelnum < CHANNEL_COUNT); // Pre-condition
LOG(dout_con<<getDesc()
<<" Queuing ACK command to peer_id: " << peer_id <<
UDPPeer *peer = new UDPPeer(PEER_ID_SERVER, address, this);
{
- JMutexAutoLock lock(m_peers_mutex);
+ MutexAutoLock lock(m_peers_mutex);
m_peers[peer->id] = peer;
m_peer_ids.push_back(peer->id);
}