/* defines used for debugging and profiling */
/******************************************************************************/
#ifdef NDEBUG
-#define LOG(a) a
-#define PROFILE(a)
+ #define LOG(a) a
+ #define PROFILE(a)
#else
-/* this mutex is used to achieve log message consistency */
-std::mutex log_message_mutex;
-#define LOG(a) \
- { \
- MutexAutoLock loglock(log_message_mutex); \
- a; \
- }
-#define PROFILE(a) a
+ #if 0
+ /* this mutex is used to achieve log message consistency */
+ std::mutex log_message_mutex;
+ #define LOG(a) \
+ { \
+ MutexAutoLock loglock(log_message_mutex); \
+ a; \
+ }
+ #else
+ // Prevent deadlocks until a solution is found after 5.2.0 (TODO)
+ #define LOG(a) a
+ #endif
+
+ #define PROFILE(a) a
#endif
#define PING_TIMEOUT 5.0
-BufferedPacket makePacket(Address &address, SharedBuffer<u8> data,
+BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
u32 protocol_id, session_t sender_peer_id, u8 channel)
{
u32 packet_size = data.getSize() + BASE_HEADER_SIZE;
}
}
-void makeAutoSplitPacket(SharedBuffer<u8> data, u32 chunksize_max,
+void makeAutoSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max,
u16 &split_seqnum, std::list<SharedBuffer<u8>> *list)
{
u32 original_header_size = 1;
list->push_back(makeOriginalPacket(data));
}
-SharedBuffer<u8> makeReliablePacket(SharedBuffer<u8> data, u16 seqnum)
+SharedBuffer<u8> makeReliablePacket(const SharedBuffer<u8> &data, u16 seqnum)
{
u32 header_size = 3;
u32 packet_size = data.getSize() + header_size;
index++;
}
}
+
bool ReliablePacketBuffer::empty()
{
MutexAutoLock listlock(m_list_mutex);
u32 ReliablePacketBuffer::size()
{
- return m_list_size;
-}
-
-bool ReliablePacketBuffer::containsPacket(u16 seqnum)
-{
- return !(findPacket(seqnum) == m_list.end());
+ MutexAutoLock listlock(m_list_mutex);
+ return m_list.size();
}
RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
for(; i != m_list.end(); ++i)
{
u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
- /*dout_con<<"findPacket(): finding seqnum="<<seqnum
- <<", comparing to s="<<s<<std::endl;*/
if (s == seqnum)
break;
}
return i;
}
+
RPBSearchResult ReliablePacketBuffer::notFound()
{
return m_list.end();
}
+
bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
{
MutexAutoLock listlock(m_list_mutex);
if (m_list.empty())
return false;
- BufferedPacket p = *m_list.begin();
- result = readU16(&p.data[BASE_HEADER_SIZE+1]);
+ const BufferedPacket &p = *m_list.begin();
+ result = readU16(&p.data[BASE_HEADER_SIZE + 1]);
return true;
}
throw NotFoundException("Buffer is empty");
BufferedPacket p = *m_list.begin();
m_list.erase(m_list.begin());
- --m_list_size;
- if (m_list_size == 0) {
+ if (m_list.empty()) {
m_oldest_non_answered_ack = 0;
} else {
m_oldest_non_answered_ack =
- readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]);
+ readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]);
}
return p;
}
+
BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
{
MutexAutoLock listlock(m_list_mutex);
}
m_list.erase(r);
- --m_list_size;
- if (m_list_size == 0)
- { m_oldest_non_answered_ack = 0; }
- else
- { m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]); }
+ if (m_list.empty()) {
+ m_oldest_non_answered_ack = 0;
+ } else {
+ m_oldest_non_answered_ack =
+ readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]);
+ }
return p;
}
-void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
+
+void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected)
{
MutexAutoLock listlock(m_list_mutex);
if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
return;
}
- ++m_list_size;
- sanity_check(m_list_size <= SEQNUM_MAX+1); // FIXME: Handle the error?
+ sanity_check(m_list.size() <= SEQNUM_MAX); // FIXME: Handle the error?
// Find the right place for the packet and insert it there
// If list is empty, just add it
}
if (s == seqnum) {
+ /* nothing to do this seems to be a resent packet */
+ /* for paranoia reason data should be compared */
if (
(readU16(&(i->data[BASE_HEADER_SIZE+1])) != seqnum) ||
(i->data.getSize() != p.data.getSize()) ||
p.address.serializeString().c_str());
throw IncomingDataCorruption("duplicated packet isn't same as original one");
}
-
- /* nothing to do this seems to be a resent packet */
- /* for paranoia reason data should be compared */
- --m_list_size;
}
/* insert or push back */
else if (i != m_list.end()) {
m_list.insert(i, p);
- }
- else {
+ } else {
m_list.push_back(p);
}
return timed_outs;
}
+/*
+ IncomingSplitPacket
+*/
+
+bool IncomingSplitPacket::insert(u32 chunk_num, SharedBuffer<u8> &chunkdata)
+{
+ sanity_check(chunk_num < chunk_count);
+
+ // If chunk already exists, ignore it.
+ // Sometimes two identical packets may arrive when there is network
+ // lag and the server re-sends stuff.
+ if (chunks.find(chunk_num) != chunks.end())
+ return false;
+
+ // Set chunk data in buffer
+ chunks[chunk_num] = chunkdata;
+
+ return true;
+}
+
+SharedBuffer<u8> IncomingSplitPacket::reassemble()
+{
+ sanity_check(allReceived());
+
+ // Calculate total size
+ u32 totalsize = 0;
+ for (const auto &chunk : chunks)
+ totalsize += chunk.second.getSize();
+
+ SharedBuffer<u8> fulldata(totalsize);
+
+ // Copy chunks to data buffer
+ u32 start = 0;
+ for (u32 chunk_i = 0; chunk_i < chunk_count; chunk_i++) {
+ const SharedBuffer<u8> &buf = chunks[chunk_i];
+ memcpy(&fulldata[start], *buf, buf.getSize());
+ start += buf.getSize();
+ }
+
+ return fulldata;
+}
+
/*
IncomingSplitBuffer
*/
delete i.second;
}
}
-/*
- This will throw a GotSplitPacketException when a full
- split packet is constructed.
-*/
+
SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool reliable)
{
MutexAutoLock listlock(m_map_mutex);
<< std::endl;
return SharedBuffer<u8>();
}
+ if (chunk_num >= chunk_count) {
+ errorstream << "IncomingSplitBuffer::insert(): chunk_num=" << chunk_num
+ << " >= chunk_count=" << chunk_count << std::endl;
+ return SharedBuffer<u8>();
+ }
// Add if doesn't exist
+ IncomingSplitPacket *sp;
if (m_buf.find(seqnum) == m_buf.end()) {
- m_buf[seqnum] = new IncomingSplitPacket(chunk_count, reliable);
+ sp = new IncomingSplitPacket(chunk_count, reliable);
+ m_buf[seqnum] = sp;
+ } else {
+ sp = m_buf[seqnum];
}
- IncomingSplitPacket *sp = m_buf[seqnum];
-
- if (chunk_count != sp->chunk_count)
- LOG(derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
- <<" != sp->chunk_count="<<sp->chunk_count
- <<std::endl);
+ if (chunk_count != sp->chunk_count) {
+ errorstream << "IncomingSplitBuffer::insert(): chunk_count="
+ << chunk_count << " != sp->chunk_count=" << sp->chunk_count
+ << std::endl;
+ return SharedBuffer<u8>();
+ }
if (reliable != sp->reliable)
LOG(derr_con<<"Connection: WARNING: reliable="<<reliable
<<" != sp->reliable="<<sp->reliable
<<std::endl);
- // If chunk already exists, ignore it.
- // Sometimes two identical packets may arrive when there is network
- // lag and the server re-sends stuff.
- if (sp->chunks.find(chunk_num) != sp->chunks.end())
- return SharedBuffer<u8>();
-
// Cut chunk data out of packet
u32 chunkdatasize = p.data.getSize() - headersize;
SharedBuffer<u8> chunkdata(chunkdatasize);
memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
- // Set chunk data in buffer
- sp->chunks[chunk_num] = chunkdata;
+ if (!sp->insert(chunk_num, chunkdata))
+ return SharedBuffer<u8>();
// If not all chunks are received, return empty buffer
if (!sp->allReceived())
return SharedBuffer<u8>();
- // Calculate total size
- u32 totalsize = 0;
- for (const auto &chunk : sp->chunks) {
- totalsize += chunk.second.getSize();
- }
-
- SharedBuffer<u8> fulldata(totalsize);
-
- // Copy chunks to data buffer
- u32 start = 0;
- for (u32 chunk_i=0; chunk_i<sp->chunk_count; chunk_i++) {
- const SharedBuffer<u8> &buf = sp->chunks[chunk_i];
- u16 buf_chunkdatasize = buf.getSize();
- memcpy(&fulldata[start], *buf, buf_chunkdatasize);
- start += buf_chunkdatasize;
- }
+ SharedBuffer<u8> fulldata = sp->reassemble();
// Remove sp from buffer
m_buf.erase(seqnum);
return fulldata;
}
+
void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
{
std::deque<u16> remove_queue;
delete this;
}
-void Peer::RTTStatistics(float rtt, const std::string &profiler_id)
-{
- static const float avg_factor = 100.0f / MAX_RELIABLE_WINDOW_SIZE;
+void Peer::RTTStatistics(float rtt, const std::string &profiler_id,
+ unsigned int num_samples) {
if (m_last_rtt > 0) {
/* set min max values */
/* do average calculation */
if (m_rtt.avg_rtt < 0.0)
- m_rtt.avg_rtt = rtt;
+ m_rtt.avg_rtt = rtt;
else
- m_rtt.avg_rtt += (rtt - m_rtt.avg_rtt) * avg_factor;
+ m_rtt.avg_rtt = m_rtt.avg_rtt * (num_samples/(num_samples-1)) +
+ rtt * (1/num_samples);
/* do jitter calculation */
//just use some neutral value at beginning
- float jitter = std::fabs(rtt - m_last_rtt);
+ float jitter = m_rtt.jitter_min;
+
+ if (rtt > m_last_rtt)
+ jitter = rtt-m_last_rtt;
+
+ if (rtt <= m_last_rtt)
+ jitter = m_last_rtt - rtt;
if (jitter < m_rtt.jitter_min)
m_rtt.jitter_min = jitter;
m_rtt.jitter_max = jitter;
if (m_rtt.jitter_avg < 0.0)
- m_rtt.jitter_avg = jitter;
+ m_rtt.jitter_avg = jitter;
else
- m_rtt.jitter_avg += (jitter - m_rtt.jitter_avg) * avg_factor;
+ m_rtt.jitter_avg = m_rtt.jitter_avg * (num_samples/(num_samples-1)) +
+ jitter * (1/num_samples);
if (!profiler_id.empty()) {
- g_profiler->graphAdd(profiler_id + "_rtt", rtt);
- g_profiler->graphAdd(profiler_id + "_jitter", jitter);
+ g_profiler->graphAdd(profiler_id + " RTT [ms]", rtt * 1000.f);
+ g_profiler->graphAdd(profiler_id + " jitter [ms]", jitter * 1000.f);
}
}
/* save values required for next loop */
Peer(a_address,a_id,connection)
{
for (Channel &channel : channels)
- channel.setWindowSize(g_settings->getU16("max_packets_per_iteration"));
+ channel.setWindowSize(START_RELIABLE_WINDOW_SIZE);
}
bool UDPPeer::getAddress(MTProtocols type,Address& toset)
void UDPPeer::reportRTT(float rtt)
{
- assert(rtt >= 0.0f);
-
- RTTStatistics(rtt, "rudp");
+ if (rtt < 0.0) {
+ return;
+ }
+ RTTStatistics(rtt,"rudp",MAX_RELIABLE_WINDOW_SIZE*10);
float timeout = getStat(AVG_RTT) * RESEND_TIMEOUT_FACTOR;
- timeout = rangelim(timeout, RESEND_TIMEOUT_MIN, RESEND_TIMEOUT_MAX);
+ if (timeout < RESEND_TIMEOUT_MIN)
+ timeout = RESEND_TIMEOUT_MIN;
+ if (timeout > RESEND_TIMEOUT_MAX)
+ timeout = RESEND_TIMEOUT_MAX;
MutexAutoLock usage_lock(m_exclusive_access_mutex);
resend_timeout = timeout;
if (m_pending_disconnect)
return;
- if ( channels[c.channelnum].queued_commands.empty() &&
+ Channel &chan = channels[c.channelnum];
+
+ if (chan.queued_commands.empty() &&
/* don't queue more packets then window size */
- (channels[c.channelnum].queued_reliables.size()
- < (channels[c.channelnum].getWindowSize()/2))) {
+ (chan.queued_reliables.size() < chan.getWindowSize() / 2)) {
LOG(dout_con<<m_connection->getDesc()
<<" processing reliable command for peer id: " << c.peer_id
<<" data size: " << c.data.getSize() << std::endl);
if (!processReliableSendCommand(c,max_packet_size)) {
- channels[c.channelnum].queued_commands.push_back(c);
+ chan.queued_commands.push_back(c);
}
}
else {
LOG(dout_con<<m_connection->getDesc()
<<" Queueing reliable command for peer id: " << c.peer_id
<<" data size: " << c.data.getSize() <<std::endl);
- channels[c.channelnum].queued_commands.push_back(c);
+ chan.queued_commands.push_back(c);
+ if (chan.queued_commands.size() >= chan.getWindowSize() / 2) {
+ LOG(derr_con << m_connection->getDesc()
+ << "Possible packet stall to peer id: " << c.peer_id
+ << " queued_commands=" << chan.queued_commands.size()
+ << std::endl);
+ }
}
}
if (m_pending_disconnect)
return true;
+ Channel &chan = channels[c.channelnum];
+
u32 chunksize_max = max_packet_size
- BASE_HEADER_SIZE
- RELIABLE_HEADER_SIZE;
sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512);
std::list<SharedBuffer<u8>> originals;
- u16 split_sequence_number = channels[c.channelnum].readNextSplitSeqNum();
+ u16 split_sequence_number = chan.readNextSplitSeqNum();
if (c.raw) {
originals.emplace_back(c.data);
} else {
makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number, &originals);
- channels[c.channelnum].setNextSplitSeqNum(split_sequence_number);
+ chan.setNextSplitSeqNum(split_sequence_number);
}
bool have_sequence_number = true;
volatile u16 initial_sequence_number = 0;
for (SharedBuffer<u8> &original : originals) {
- u16 seqnum = channels[c.channelnum].getOutgoingSequenceNumber(have_sequence_number);
+ u16 seqnum = chan.getOutgoingSequenceNumber(have_sequence_number);
/* oops, we don't have enough sequence numbers to send this packet */
if (!have_sequence_number)
// << " channel: " << (c.channelnum&0xFF)
// << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
// << std::endl)
- channels[c.channelnum].queued_reliables.push(p);
+ chan.queued_reliables.push(p);
pcount++;
}
- sanity_check(channels[c.channelnum].queued_reliables.size() < 0xFFFF);
+ sanity_check(chan.queued_reliables.size() < 0xFFFF);
return true;
}
toadd.pop();
bool successfully_put_back_sequence_number
- = channels[c.channelnum].putBackSequenceNumber(
+ = chan.putBackSequenceNumber(
(initial_sequence_number+toadd.size() % (SEQNUM_MAX+1)));
FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error");
}
+ // DO NOT REMOVE n_queued! It avoids a deadlock of async locked
+ // 'log_message_mutex' and 'm_list_mutex'.
+ u32 n_queued = chan.outgoing_reliables_sent.size();
+
LOG(dout_con<<m_connection->getDesc()
<< " Windowsize exceeded on reliable sending "
<< c.data.getSize() << " bytes"
<< std::endl << "\t\tgot at most : "
<< packets_available << " packets"
<< std::endl << "\t\tpackets queued : "
- << channels[c.channelnum].outgoing_reliables_sent.size()
+ << n_queued
<< std::endl);
return false;
m_bc_peerhandler(peerhandler)
{
- m_udpSocket.setTimeoutMs(5);
+ /* Amount of time Receive() will wait for data, this is entirely different
+ * from the connection timeout */
+ m_udpSocket.setTimeoutMs(500);
m_sendThread->setParent(this);
m_receiveThread->setParent(this);
return false;
peer = m_peers[peer_id];
m_peers.erase(peer_id);
- m_peer_ids.remove(peer_id);
+ auto it = std::find(m_peer_ids.begin(), m_peer_ids.end(), peer_id);
+ m_peer_ids.erase(it);
}
Address peer_address;
putCommand(c);
}
-void Connection::Receive(NetworkPacket* pkt)
+bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
{
+ /*
+ Note that this function can potentially wait infinitely if non-data
+ events keep happening before the timeout expires.
+ This is not considered to be a problem (is it?)
+ */
for(;;) {
- ConnectionEvent e = waitEvent(m_bc_receive_timeout);
+ ConnectionEvent e = waitEvent(timeout);
if (e.type != CONNEVENT_NONE)
LOG(dout_con << getDesc() << ": Receive: got event: "
<< e.describe() << std::endl);
switch(e.type) {
case CONNEVENT_NONE:
- throw NoIncomingDataException("No incoming data");
+ return false;
case CONNEVENT_DATA_RECEIVED:
// Data size is lesser than command size, ignoring packet
if (e.data.getSize() < 2) {
}
pkt->putRawPacket(*e.data, e.data.getSize(), e.peer_id);
- return;
+ return true;
case CONNEVENT_PEER_ADDED: {
UDPPeer tmp(e.peer_id, e.address, this);
if (m_bc_peerhandler)
"(port already in use?)");
}
}
- throw NoIncomingDataException("No incoming data");
+ return false;
+}
+
+void Connection::Receive(NetworkPacket *pkt)
+{
+ bool any = Receive(pkt, m_bc_receive_timeout);
+ if (!any)
+ throw NoIncomingDataException("No incoming data");
+}
+
+bool Connection::TryReceive(NetworkPacket *pkt)
+{
+ return Receive(pkt, 0);
}
void Connection::Send(session_t peer_id, u8 channelnum,
UDPPeer* Connection::createServerPeer(Address& address)
{
- if (getPeerNoEx(PEER_ID_SERVER) != 0)
+ if (ConnectedToServer())
{
throw ConnectionException("Already connected to a server");
}