/* defines used for debugging and profiling */
/******************************************************************************/
#ifdef NDEBUG
-#define LOG(a) a
-#define PROFILE(a)
+ #define PROFILE(a)
#else
-/* this mutex is used to achieve log message consistency */
-std::mutex log_message_mutex;
-#define LOG(a) \
- { \
- MutexAutoLock loglock(log_message_mutex); \
- a; \
- }
-#define PROFILE(a) a
+ #define PROFILE(a) a
#endif
+// TODO: Clean this up.
+#define LOG(a) a
+
#define PING_TIMEOUT 5.0
-BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
+u16 BufferedPacket::getSeqnum() const
+{
+ if (size() < BASE_HEADER_SIZE + 3)
+ return 0; // should never happen
+
+ return readU16(&data[BASE_HEADER_SIZE + 1]);
+}
+
+BufferedPacketPtr makePacket(Address &address, const SharedBuffer<u8> &data,
u32 protocol_id, session_t sender_peer_id, u8 channel)
{
u32 packet_size = data.getSize() + BASE_HEADER_SIZE;
- BufferedPacket p(packet_size);
- p.address = address;
- writeU32(&p.data[0], protocol_id);
- writeU16(&p.data[4], sender_peer_id);
- writeU8(&p.data[6], channel);
+ BufferedPacketPtr p(new BufferedPacket(packet_size));
+ p->address = address;
+
+ writeU32(&p->data[0], protocol_id);
+ writeU16(&p->data[4], sender_peer_id);
+ writeU8(&p->data[6], channel);
- memcpy(&p.data[BASE_HEADER_SIZE], *data, data.getSize());
+ memcpy(&p->data[BASE_HEADER_SIZE], *data, data.getSize());
return p;
}
MutexAutoLock listlock(m_list_mutex);
LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl);
unsigned int index = 0;
- for (BufferedPacket &bufferedPacket : m_list) {
- u16 s = readU16(&(bufferedPacket.data[BASE_HEADER_SIZE+1]));
- LOG(dout_con<<index<< ":" << s << std::endl);
+ for (BufferedPacketPtr &packet : m_list) {
+ LOG(dout_con<<index<< ":" << packet->getSeqnum() << std::endl);
index++;
}
}
+
bool ReliablePacketBuffer::empty()
{
MutexAutoLock listlock(m_list_mutex);
u32 ReliablePacketBuffer::size()
{
- return m_list_size;
-}
-
-bool ReliablePacketBuffer::containsPacket(u16 seqnum)
-{
- return !(findPacket(seqnum) == m_list.end());
+ MutexAutoLock listlock(m_list_mutex);
+ return m_list.size();
}
-RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
+RPBSearchResult ReliablePacketBuffer::findPacketNoLock(u16 seqnum)
{
- std::list<BufferedPacket>::iterator i = m_list.begin();
- for(; i != m_list.end(); ++i)
- {
- u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
- /*dout_con<<"findPacket(): finding seqnum="<<seqnum
- <<", comparing to s="<<s<<std::endl;*/
- if (s == seqnum)
- break;
+ for (auto it = m_list.begin(); it != m_list.end(); ++it) {
+ if ((*it)->getSeqnum() == seqnum)
+ return it;
}
- return i;
-}
-RPBSearchResult ReliablePacketBuffer::notFound()
-{
return m_list.end();
}
+
bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
{
MutexAutoLock listlock(m_list_mutex);
if (m_list.empty())
return false;
- BufferedPacket p = *m_list.begin();
- result = readU16(&p.data[BASE_HEADER_SIZE+1]);
+ result = m_list.front()->getSeqnum();
return true;
}
-BufferedPacket ReliablePacketBuffer::popFirst()
+BufferedPacketPtr ReliablePacketBuffer::popFirst()
{
MutexAutoLock listlock(m_list_mutex);
if (m_list.empty())
throw NotFoundException("Buffer is empty");
- BufferedPacket p = *m_list.begin();
- m_list.erase(m_list.begin());
- --m_list_size;
- if (m_list_size == 0) {
+ BufferedPacketPtr p(m_list.front());
+ m_list.pop_front();
+
+ if (m_list.empty()) {
m_oldest_non_answered_ack = 0;
} else {
- m_oldest_non_answered_ack =
- readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]);
+ m_oldest_non_answered_ack = m_list.front()->getSeqnum();
}
return p;
}
-BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
+
+BufferedPacketPtr ReliablePacketBuffer::popSeqnum(u16 seqnum)
{
MutexAutoLock listlock(m_list_mutex);
- RPBSearchResult r = findPacket(seqnum);
- if (r == notFound()) {
+ RPBSearchResult r = findPacketNoLock(seqnum);
+ if (r == m_list.end()) {
LOG(dout_con<<"Sequence number: " << seqnum
<< " not found in reliable buffer"<<std::endl);
throw NotFoundException("seqnum not found in buffer");
}
- BufferedPacket p = *r;
-
-
- RPBSearchResult next = r;
- ++next;
- if (next != notFound()) {
- u16 s = readU16(&(next->data[BASE_HEADER_SIZE+1]));
- m_oldest_non_answered_ack = s;
- }
+ BufferedPacketPtr p(*r);
m_list.erase(r);
- --m_list_size;
- if (m_list_size == 0)
- { m_oldest_non_answered_ack = 0; }
- else
- { m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]); }
+ if (m_list.empty()) {
+ m_oldest_non_answered_ack = 0;
+ } else {
+ m_oldest_non_answered_ack = m_list.front()->getSeqnum();
+ }
return p;
}
-void ReliablePacketBuffer::insert(BufferedPacket &p,u16 next_expected)
+
+void ReliablePacketBuffer::insert(BufferedPacketPtr &p_ptr, u16 next_expected)
{
MutexAutoLock listlock(m_list_mutex);
- if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
+ const BufferedPacket &p = *p_ptr;
+
+ if (p.size() < BASE_HEADER_SIZE + 3) {
errorstream << "ReliablePacketBuffer::insert(): Invalid data size for "
"reliable packet" << std::endl;
return;
<< std::endl;
return;
}
- u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
+ const u16 seqnum = p.getSeqnum();
if (!seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE)) {
errorstream << "ReliablePacketBuffer::insert(): seqnum is outside of "
return;
}
- ++m_list_size;
- sanity_check(m_list_size <= SEQNUM_MAX+1); // FIXME: Handle the error?
+ sanity_check(m_list.size() <= SEQNUM_MAX); // FIXME: Handle the error?
// Find the right place for the packet and insert it there
// If list is empty, just add it
- if (m_list.empty())
- {
- m_list.push_back(p);
+ if (m_list.empty()) {
+ m_list.push_back(p_ptr);
m_oldest_non_answered_ack = seqnum;
// Done.
return;
}
// Otherwise find the right place
- std::list<BufferedPacket>::iterator i = m_list.begin();
+ auto it = m_list.begin();
// Find the first packet in the list which has a higher seqnum
- u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+ u16 s = (*it)->getSeqnum();
/* case seqnum is smaller then next_expected seqnum */
/* this is true e.g. on wrap around */
if (seqnum < next_expected) {
- while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) {
- ++i;
- if (i != m_list.end())
- s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+ while(((s < seqnum) || (s >= next_expected)) && (it != m_list.end())) {
+ ++it;
+ if (it != m_list.end())
+ s = (*it)->getSeqnum();
}
}
/* non wrap around case (at least for incoming and next_expected */
else
{
- while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) {
- ++i;
- if (i != m_list.end())
- s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
+ while(((s < seqnum) && (s >= next_expected)) && (it != m_list.end())) {
+ ++it;
+ if (it != m_list.end())
+ s = (*it)->getSeqnum();
}
}
if (s == seqnum) {
+ /* nothing to do this seems to be a resent packet */
+ /* for paranoia reason data should be compared */
+ auto &i = *it;
if (
- (readU16(&(i->data[BASE_HEADER_SIZE+1])) != seqnum) ||
- (i->data.getSize() != p.data.getSize()) ||
+ (i->getSeqnum() != seqnum) ||
+ (i->size() != p.size()) ||
(i->address != p.address)
)
{
fprintf(stderr,
"Duplicated seqnum %d non matching packet detected:\n",
seqnum);
- fprintf(stderr, "Old: seqnum: %05d size: %04d, address: %s\n",
- readU16(&(i->data[BASE_HEADER_SIZE+1])),i->data.getSize(),
+ fprintf(stderr, "Old: seqnum: %05d size: %04zu, address: %s\n",
+ i->getSeqnum(), i->size(),
i->address.serializeString().c_str());
- fprintf(stderr, "New: seqnum: %05d size: %04u, address: %s\n",
- readU16(&(p.data[BASE_HEADER_SIZE+1])),p.data.getSize(),
+ fprintf(stderr, "New: seqnum: %05d size: %04zu, address: %s\n",
+ p.getSeqnum(), p.size(),
p.address.serializeString().c_str());
throw IncomingDataCorruption("duplicated packet isn't same as original one");
}
-
- /* nothing to do this seems to be a resent packet */
- /* for paranoia reason data should be compared */
- --m_list_size;
}
/* insert or push back */
- else if (i != m_list.end()) {
- m_list.insert(i, p);
- }
- else {
- m_list.push_back(p);
+ else if (it != m_list.end()) {
+ m_list.insert(it, p_ptr);
+ } else {
+ m_list.push_back(p_ptr);
}
/* update last packet number */
- m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]);
+ m_oldest_non_answered_ack = m_list.front()->getSeqnum();
}
void ReliablePacketBuffer::incrementTimeouts(float dtime)
{
MutexAutoLock listlock(m_list_mutex);
- for (BufferedPacket &bufferedPacket : m_list) {
- bufferedPacket.time += dtime;
- bufferedPacket.totaltime += dtime;
+ for (auto &packet : m_list) {
+ packet->time += dtime;
+ packet->totaltime += dtime;
}
}
-std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout,
- unsigned int max_packets)
+std::list<ConstSharedPtr<BufferedPacket>>
+ ReliablePacketBuffer::getTimedOuts(float timeout, u32 max_packets)
{
MutexAutoLock listlock(m_list_mutex);
- std::list<BufferedPacket> timed_outs;
- for (BufferedPacket &bufferedPacket : m_list) {
- if (bufferedPacket.time >= timeout) {
- timed_outs.push_back(bufferedPacket);
-
- //this packet will be sent right afterwards reset timeout here
- bufferedPacket.time = 0.0f;
- if (timed_outs.size() >= max_packets)
- break;
- }
+ std::list<ConstSharedPtr<BufferedPacket>> timed_outs;
+ for (auto &packet : m_list) {
+ if (packet->time < timeout)
+ continue;
+
+ // caller will resend packet so reset time and increase counter
+ packet->time = 0.0f;
+ packet->resend_count++;
+
+ timed_outs.emplace_back(packet);
+
+ if (timed_outs.size() >= max_packets)
+ break;
}
return timed_outs;
}
+/*
+ IncomingSplitPacket
+*/
+
+bool IncomingSplitPacket::insert(u32 chunk_num, SharedBuffer<u8> &chunkdata)
+{
+ sanity_check(chunk_num < chunk_count);
+
+ // If chunk already exists, ignore it.
+ // Sometimes two identical packets may arrive when there is network
+ // lag and the server re-sends stuff.
+ if (chunks.find(chunk_num) != chunks.end())
+ return false;
+
+ // Set chunk data in buffer
+ chunks[chunk_num] = chunkdata;
+
+ return true;
+}
+
+SharedBuffer<u8> IncomingSplitPacket::reassemble()
+{
+ sanity_check(allReceived());
+
+ // Calculate total size
+ u32 totalsize = 0;
+ for (const auto &chunk : chunks)
+ totalsize += chunk.second.getSize();
+
+ SharedBuffer<u8> fulldata(totalsize);
+
+ // Copy chunks to data buffer
+ u32 start = 0;
+ for (u32 chunk_i = 0; chunk_i < chunk_count; chunk_i++) {
+ const SharedBuffer<u8> &buf = chunks[chunk_i];
+ memcpy(&fulldata[start], *buf, buf.getSize());
+ start += buf.getSize();
+ }
+
+ return fulldata;
+}
+
/*
IncomingSplitBuffer
*/
delete i.second;
}
}
-/*
- This will throw a GotSplitPacketException when a full
- split packet is constructed.
-*/
-SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool reliable)
+
+SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacketPtr &p_ptr, bool reliable)
{
MutexAutoLock listlock(m_map_mutex);
+ const BufferedPacket &p = *p_ptr;
+
u32 headersize = BASE_HEADER_SIZE + 7;
- if (p.data.getSize() < headersize) {
+ if (p.size() < headersize) {
errorstream << "Invalid data size for split packet" << std::endl;
return SharedBuffer<u8>();
}
<< std::endl;
return SharedBuffer<u8>();
}
+ if (chunk_num >= chunk_count) {
+ errorstream << "IncomingSplitBuffer::insert(): chunk_num=" << chunk_num
+ << " >= chunk_count=" << chunk_count << std::endl;
+ return SharedBuffer<u8>();
+ }
// Add if doesn't exist
+ IncomingSplitPacket *sp;
if (m_buf.find(seqnum) == m_buf.end()) {
- m_buf[seqnum] = new IncomingSplitPacket(chunk_count, reliable);
+ sp = new IncomingSplitPacket(chunk_count, reliable);
+ m_buf[seqnum] = sp;
+ } else {
+ sp = m_buf[seqnum];
}
- IncomingSplitPacket *sp = m_buf[seqnum];
-
- if (chunk_count != sp->chunk_count)
- LOG(derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
- <<" != sp->chunk_count="<<sp->chunk_count
- <<std::endl);
+ if (chunk_count != sp->chunk_count) {
+ errorstream << "IncomingSplitBuffer::insert(): chunk_count="
+ << chunk_count << " != sp->chunk_count=" << sp->chunk_count
+ << std::endl;
+ return SharedBuffer<u8>();
+ }
if (reliable != sp->reliable)
LOG(derr_con<<"Connection: WARNING: reliable="<<reliable
<<" != sp->reliable="<<sp->reliable
<<std::endl);
- // If chunk already exists, ignore it.
- // Sometimes two identical packets may arrive when there is network
- // lag and the server re-sends stuff.
- if (sp->chunks.find(chunk_num) != sp->chunks.end())
- return SharedBuffer<u8>();
-
// Cut chunk data out of packet
- u32 chunkdatasize = p.data.getSize() - headersize;
+ u32 chunkdatasize = p.size() - headersize;
SharedBuffer<u8> chunkdata(chunkdatasize);
memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
- // Set chunk data in buffer
- sp->chunks[chunk_num] = chunkdata;
+ if (!sp->insert(chunk_num, chunkdata))
+ return SharedBuffer<u8>();
// If not all chunks are received, return empty buffer
if (!sp->allReceived())
return SharedBuffer<u8>();
- // Calculate total size
- u32 totalsize = 0;
- for (const auto &chunk : sp->chunks) {
- totalsize += chunk.second.getSize();
- }
-
- SharedBuffer<u8> fulldata(totalsize);
-
- // Copy chunks to data buffer
- u32 start = 0;
- for (u32 chunk_i=0; chunk_i<sp->chunk_count; chunk_i++) {
- const SharedBuffer<u8> &buf = sp->chunks[chunk_i];
- u16 buf_chunkdatasize = buf.getSize();
- memcpy(&fulldata[start], *buf, buf_chunkdatasize);
- start += buf_chunkdatasize;
- }
+ SharedBuffer<u8> fulldata = sp->reassemble();
// Remove sp from buffer
m_buf.erase(seqnum);
return fulldata;
}
+
void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
{
std::deque<u16> remove_queue;
ConnectionCommand
*/
-void ConnectionCommand::send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt,
- bool reliable_)
+ConnectionCommandPtr ConnectionCommand::create(ConnectionCommandType type)
+{
+ return ConnectionCommandPtr(new ConnectionCommand(type));
+}
+
+ConnectionCommandPtr ConnectionCommand::serve(Address address)
+{
+ auto c = create(CONNCMD_SERVE);
+ c->address = address;
+ return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::connect(Address address)
+{
+ auto c = create(CONNCMD_CONNECT);
+ c->address = address;
+ return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::disconnect()
+{
+ return create(CONNCMD_DISCONNECT);
+}
+
+ConnectionCommandPtr ConnectionCommand::disconnect_peer(session_t peer_id)
+{
+ auto c = create(CONNCMD_DISCONNECT_PEER);
+ c->peer_id = peer_id;
+ return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::send(session_t peer_id, u8 channelnum,
+ NetworkPacket *pkt, bool reliable)
+{
+ auto c = create(CONNCMD_SEND);
+ c->peer_id = peer_id;
+ c->channelnum = channelnum;
+ c->reliable = reliable;
+ c->data = pkt->oldForgePacket();
+ return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::ack(session_t peer_id, u8 channelnum, const Buffer<u8> &data)
+{
+ auto c = create(CONCMD_ACK);
+ c->peer_id = peer_id;
+ c->channelnum = channelnum;
+ c->reliable = false;
+ data.copyTo(c->data);
+ return c;
+}
+
+ConnectionCommandPtr ConnectionCommand::createPeer(session_t peer_id, const Buffer<u8> &data)
{
- type = CONNCMD_SEND;
- peer_id = peer_id_;
- channelnum = channelnum_;
- data = pkt->oldForgePacket();
- reliable = reliable_;
+ auto c = create(CONCMD_CREATE_PEER);
+ c->peer_id = peer_id;
+ c->channelnum = 0;
+ c->reliable = true;
+ c->raw = true;
+ data.copyTo(c->data);
+ return c;
}
/*
u16 Channel::getOutgoingSequenceNumber(bool& successful)
{
MutexAutoLock internal(m_internal_mutex);
+
u16 retval = next_outgoing_seqnum;
- u16 lowest_unacked_seqnumber;
+ successful = false;
/* shortcut if there ain't any packet in outgoing list */
- if (outgoing_reliables_sent.empty())
- {
+ if (outgoing_reliables_sent.empty()) {
+ successful = true;
next_outgoing_seqnum++;
return retval;
}
- if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber))
- {
+ u16 lowest_unacked_seqnumber;
+ if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber)) {
if (lowest_unacked_seqnumber < next_outgoing_seqnum) {
// ugly cast but this one is required in order to tell compiler we
// know about difference of two unsigned may be negative in general
// but we already made sure it won't happen in this case
- if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > window_size) {
- successful = false;
+ if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > m_window_size) {
return 0;
}
- }
- else {
+ } else {
// ugly cast but this one is required in order to tell compiler we
// know about difference of two unsigned may be negative in general
// but we already made sure it won't happen in this case
if ((next_outgoing_seqnum + (u16)(SEQNUM_MAX - lowest_unacked_seqnumber)) >
- window_size) {
- successful = false;
+ m_window_size) {
return 0;
}
}
}
+ successful = true;
next_outgoing_seqnum++;
return retval;
}
//packet_too_late = current_packet_too_late;
packets_successful = current_packet_successful;
- if (current_bytes_transfered > (unsigned int) (window_size*512/2)) {
+ if (current_bytes_transfered > (unsigned int) (m_window_size*512/2)) {
reasonable_amount_of_data_transmitted = true;
}
current_packet_loss = 0;
if (packets_successful > 0) {
successful_to_lost_ratio = packet_loss/packets_successful;
} else if (packet_loss > 0) {
- window_size = std::max(
- (window_size - 10),
- MIN_RELIABLE_WINDOW_SIZE);
+ setWindowSize(m_window_size - 10);
done = true;
}
if (!done) {
- if ((successful_to_lost_ratio < 0.01f) &&
- (window_size < MAX_RELIABLE_WINDOW_SIZE)) {
+ if (successful_to_lost_ratio < 0.01f) {
/* don't even think about increasing if we didn't even
* use major parts of our window */
if (reasonable_amount_of_data_transmitted)
- window_size = std::min(
- (window_size + 100),
- MAX_RELIABLE_WINDOW_SIZE);
- } else if ((successful_to_lost_ratio < 0.05f) &&
- (window_size < MAX_RELIABLE_WINDOW_SIZE)) {
+ setWindowSize(m_window_size + 100);
+ } else if (successful_to_lost_ratio < 0.05f) {
/* don't even think about increasing if we didn't even
* use major parts of our window */
if (reasonable_amount_of_data_transmitted)
- window_size = std::min(
- (window_size + 50),
- MAX_RELIABLE_WINDOW_SIZE);
+ setWindowSize(m_window_size + 50);
} else if (successful_to_lost_ratio > 0.15f) {
- window_size = std::max(
- (window_size - 100),
- MIN_RELIABLE_WINDOW_SIZE);
+ setWindowSize(m_window_size - 100);
} else if (successful_to_lost_ratio > 0.1f) {
- window_size = std::max(
- (window_size - 50),
- MIN_RELIABLE_WINDOW_SIZE);
+ setWindowSize(m_window_size - 50);
}
}
}
jitter * (1/num_samples);
if (!profiler_id.empty()) {
- g_profiler->graphAdd(profiler_id + "_rtt", rtt);
- g_profiler->graphAdd(profiler_id + "_jitter", jitter);
+ g_profiler->graphAdd(profiler_id + " RTT [ms]", rtt * 1000.f);
+ g_profiler->graphAdd(profiler_id + " jitter [ms]", jitter * 1000.f);
}
}
/* save values required for next loop */
Peer(a_address,a_id,connection)
{
for (Channel &channel : channels)
- channel.setWindowSize(g_settings->getU16("max_packets_per_iteration"));
+ channel.setWindowSize(START_RELIABLE_WINDOW_SIZE);
}
bool UDPPeer::getAddress(MTProtocols type,Address& toset)
return false;
}
-void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
+void UDPPeer::PutReliableSendCommand(ConnectionCommandPtr &c,
unsigned int max_packet_size)
{
if (m_pending_disconnect)
return;
- if ( channels[c.channelnum].queued_commands.empty() &&
+ Channel &chan = channels[c->channelnum];
+
+ if (chan.queued_commands.empty() &&
/* don't queue more packets then window size */
- (channels[c.channelnum].queued_reliables.size()
- < (channels[c.channelnum].getWindowSize()/2))) {
+ (chan.queued_reliables.size() + 1 < chan.getWindowSize() / 2)) {
LOG(dout_con<<m_connection->getDesc()
- <<" processing reliable command for peer id: " << c.peer_id
- <<" data size: " << c.data.getSize() << std::endl);
- if (!processReliableSendCommand(c,max_packet_size)) {
- channels[c.channelnum].queued_commands.push_back(c);
- }
- }
- else {
+ <<" processing reliable command for peer id: " << c->peer_id
+ <<" data size: " << c->data.getSize() << std::endl);
+ if (processReliableSendCommand(c, max_packet_size))
+ return;
+ } else {
LOG(dout_con<<m_connection->getDesc()
- <<" Queueing reliable command for peer id: " << c.peer_id
- <<" data size: " << c.data.getSize() <<std::endl);
- channels[c.channelnum].queued_commands.push_back(c);
+ <<" Queueing reliable command for peer id: " << c->peer_id
+ <<" data size: " << c->data.getSize() <<std::endl);
+
+ if (chan.queued_commands.size() + 1 >= chan.getWindowSize() / 2) {
+ LOG(derr_con << m_connection->getDesc()
+ << "Possible packet stall to peer id: " << c->peer_id
+ << " queued_commands=" << chan.queued_commands.size()
+ << std::endl);
+ }
}
+ chan.queued_commands.push_back(c);
}
bool UDPPeer::processReliableSendCommand(
- ConnectionCommand &c,
+ ConnectionCommandPtr &c_ptr,
unsigned int max_packet_size)
{
if (m_pending_disconnect)
return true;
+ const auto &c = *c_ptr;
+ Channel &chan = channels[c.channelnum];
+
u32 chunksize_max = max_packet_size
- BASE_HEADER_SIZE
- RELIABLE_HEADER_SIZE;
sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512);
std::list<SharedBuffer<u8>> originals;
- u16 split_sequence_number = channels[c.channelnum].readNextSplitSeqNum();
+ u16 split_sequence_number = chan.readNextSplitSeqNum();
if (c.raw) {
originals.emplace_back(c.data);
} else {
makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number, &originals);
- channels[c.channelnum].setNextSplitSeqNum(split_sequence_number);
+ chan.setNextSplitSeqNum(split_sequence_number);
}
- bool have_sequence_number = true;
+ bool have_sequence_number = false;
bool have_initial_sequence_number = false;
- std::queue<BufferedPacket> toadd;
+ std::queue<BufferedPacketPtr> toadd;
volatile u16 initial_sequence_number = 0;
for (SharedBuffer<u8> &original : originals) {
- u16 seqnum = channels[c.channelnum].getOutgoingSequenceNumber(have_sequence_number);
+ u16 seqnum = chan.getOutgoingSequenceNumber(have_sequence_number);
/* oops, we don't have enough sequence numbers to send this packet */
if (!have_sequence_number)
SharedBuffer<u8> reliable = makeReliablePacket(original, seqnum);
// Add base headers and make a packet
- BufferedPacket p = con::makePacket(address, reliable,
+ BufferedPacketPtr p = con::makePacket(address, reliable,
m_connection->GetProtocolID(), m_connection->GetPeerID(),
c.channelnum);
}
if (have_sequence_number) {
- volatile u16 pcount = 0;
while (!toadd.empty()) {
- BufferedPacket p = toadd.front();
+ BufferedPacketPtr p = toadd.front();
toadd.pop();
// LOG(dout_con<<connection->getDesc()
// << " queuing reliable packet for peer_id: " << c.peer_id
// << " channel: " << (c.channelnum&0xFF)
// << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
// << std::endl)
- channels[c.channelnum].queued_reliables.push(p);
- pcount++;
+ chan.queued_reliables.push(p);
}
- sanity_check(channels[c.channelnum].queued_reliables.size() < 0xFFFF);
+ sanity_check(chan.queued_reliables.size() < 0xFFFF);
return true;
}
volatile u16 packets_available = toadd.size();
/* we didn't get a single sequence number no need to fill queue */
if (!have_initial_sequence_number) {
+ LOG(derr_con << m_connection->getDesc() << "Ran out of sequence numbers!" << std::endl);
return false;
}
toadd.pop();
bool successfully_put_back_sequence_number
- = channels[c.channelnum].putBackSequenceNumber(
+ = chan.putBackSequenceNumber(
(initial_sequence_number+toadd.size() % (SEQNUM_MAX+1)));
FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error");
}
+ // DO NOT REMOVE n_queued! It avoids a deadlock of async locked
+ // 'log_message_mutex' and 'm_list_mutex'.
+ u32 n_queued = chan.outgoing_reliables_sent.size();
+
LOG(dout_con<<m_connection->getDesc()
<< " Windowsize exceeded on reliable sending "
<< c.data.getSize() << " bytes"
<< std::endl << "\t\tgot at most : "
<< packets_available << " packets"
<< std::endl << "\t\tpackets queued : "
- << channels[c.channelnum].outgoing_reliables_sent.size()
+ << n_queued
<< std::endl);
return false;
(channel.queued_reliables.size() < maxtransfer) &&
(commands_processed < maxcommands)) {
try {
- ConnectionCommand c = channel.queued_commands.front();
+ ConnectionCommandPtr c = channel.queued_commands.front();
LOG(dout_con << m_connection->getDesc()
<< " processing queued reliable command " << std::endl);
// Packet is processed, remove it from queue
- if (processReliableSendCommand(c,max_packet_size)) {
+ if (processReliableSendCommand(c, max_packet_size)) {
channel.queued_commands.pop_front();
} else {
LOG(dout_con << m_connection->getDesc()
- << " Failed to queue packets for peer_id: " << c.peer_id
- << ", delaying sending of " << c.data.getSize()
+ << " Failed to queue packets for peer_id: " << c->peer_id
+ << ", delaying sending of " << c->data.getSize()
<< " bytes" << std::endl);
}
}
channels[channel].setNextSplitSeqNum(seqnum);
}
-SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, const BufferedPacket &toadd,
+SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, BufferedPacketPtr &toadd,
bool reliable)
{
assert(channel < CHANNEL_COUNT); // Pre-condition
return channels[channel].incoming_splits.insert(toadd, reliable);
}
+/*
+ ConnectionEvent
+*/
+
+const char *ConnectionEvent::describe() const
+{
+ switch(type) {
+ case CONNEVENT_NONE:
+ return "CONNEVENT_NONE";
+ case CONNEVENT_DATA_RECEIVED:
+ return "CONNEVENT_DATA_RECEIVED";
+ case CONNEVENT_PEER_ADDED:
+ return "CONNEVENT_PEER_ADDED";
+ case CONNEVENT_PEER_REMOVED:
+ return "CONNEVENT_PEER_REMOVED";
+ case CONNEVENT_BIND_FAILED:
+ return "CONNEVENT_BIND_FAILED";
+ }
+ return "Invalid ConnectionEvent";
+}
+
+
+ConnectionEventPtr ConnectionEvent::create(ConnectionEventType type)
+{
+ return std::shared_ptr<ConnectionEvent>(new ConnectionEvent(type));
+}
+
+ConnectionEventPtr ConnectionEvent::dataReceived(session_t peer_id, const Buffer<u8> &data)
+{
+ auto e = create(CONNEVENT_DATA_RECEIVED);
+ e->peer_id = peer_id;
+ data.copyTo(e->data);
+ return e;
+}
+
+ConnectionEventPtr ConnectionEvent::peerAdded(session_t peer_id, Address address)
+{
+ auto e = create(CONNEVENT_PEER_ADDED);
+ e->peer_id = peer_id;
+ e->address = address;
+ return e;
+}
+
+ConnectionEventPtr ConnectionEvent::peerRemoved(session_t peer_id, bool is_timeout, Address address)
+{
+ auto e = create(CONNEVENT_PEER_REMOVED);
+ e->peer_id = peer_id;
+ e->timeout = is_timeout;
+ e->address = address;
+ return e;
+}
+
+ConnectionEventPtr ConnectionEvent::bindFailed()
+{
+ return create(CONNEVENT_BIND_FAILED);
+}
+
/*
Connection
*/
m_bc_peerhandler(peerhandler)
{
- m_udpSocket.setTimeoutMs(5);
+ /* Amount of time Receive() will wait for data, this is entirely different
+ * from the connection timeout */
+ m_udpSocket.setTimeoutMs(500);
m_sendThread->setParent(this);
m_receiveThread->setParent(this);
}
/* Internal stuff */
-void Connection::putEvent(ConnectionEvent &e)
+
+void Connection::putEvent(ConnectionEventPtr e)
{
- assert(e.type != CONNEVENT_NONE); // Pre-condition
+ assert(e->type != CONNEVENT_NONE); // Pre-condition
m_event_queue.push_back(e);
}
return false;
peer = m_peers[peer_id];
m_peers.erase(peer_id);
- m_peer_ids.remove(peer_id);
+ auto it = std::find(m_peer_ids.begin(), m_peer_ids.end(), peer_id);
+ m_peer_ids.erase(it);
}
Address peer_address;
//any peer has a primary address this never fails!
peer->getAddress(MTP_PRIMARY, peer_address);
- // Create event
- ConnectionEvent e;
- e.peerRemoved(peer_id, timeout, peer_address);
- putEvent(e);
+ // Create event
+ putEvent(ConnectionEvent::peerRemoved(peer_id, timeout, peer_address));
peer->Drop();
return true;
/* Interface */
-ConnectionEvent Connection::waitEvent(u32 timeout_ms)
+ConnectionEventPtr Connection::waitEvent(u32 timeout_ms)
{
try {
return m_event_queue.pop_front(timeout_ms);
} catch(ItemNotFoundException &ex) {
- ConnectionEvent e;
- e.type = CONNEVENT_NONE;
- return e;
+ return ConnectionEvent::create(CONNEVENT_NONE);
}
}
-void Connection::putCommand(ConnectionCommand &c)
+void Connection::putCommand(ConnectionCommandPtr c)
{
if (!m_shutting_down) {
m_command_queue.push_back(c);
void Connection::Serve(Address bind_addr)
{
- ConnectionCommand c;
- c.serve(bind_addr);
- putCommand(c);
+ putCommand(ConnectionCommand::serve(bind_addr));
}
void Connection::Connect(Address address)
{
- ConnectionCommand c;
- c.connect(address);
- putCommand(c);
+ putCommand(ConnectionCommand::connect(address));
}
bool Connection::Connected()
void Connection::Disconnect()
{
- ConnectionCommand c;
- c.disconnect();
- putCommand(c);
+ putCommand(ConnectionCommand::disconnect());
}
-void Connection::Receive(NetworkPacket* pkt)
+bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
{
+ /*
+ Note that this function can potentially wait infinitely if non-data
+ events keep happening before the timeout expires.
+ This is not considered to be a problem (is it?)
+ */
for(;;) {
- ConnectionEvent e = waitEvent(m_bc_receive_timeout);
- if (e.type != CONNEVENT_NONE)
+ ConnectionEventPtr e_ptr = waitEvent(timeout);
+ const ConnectionEvent &e = *e_ptr;
+
+ if (e.type != CONNEVENT_NONE) {
LOG(dout_con << getDesc() << ": Receive: got event: "
<< e.describe() << std::endl);
- switch(e.type) {
+ }
+
+ switch (e.type) {
case CONNEVENT_NONE:
- throw NoIncomingDataException("No incoming data");
+ return false;
case CONNEVENT_DATA_RECEIVED:
// Data size is lesser than command size, ignoring packet
if (e.data.getSize() < 2) {
}
pkt->putRawPacket(*e.data, e.data.getSize(), e.peer_id);
- return;
+ return true;
case CONNEVENT_PEER_ADDED: {
UDPPeer tmp(e.peer_id, e.address, this);
if (m_bc_peerhandler)
"(port already in use?)");
}
}
- throw NoIncomingDataException("No incoming data");
+ return false;
+}
+
+void Connection::Receive(NetworkPacket *pkt)
+{
+ bool any = Receive(pkt, m_bc_receive_timeout);
+ if (!any)
+ throw NoIncomingDataException("No incoming data");
+}
+
+bool Connection::TryReceive(NetworkPacket *pkt)
+{
+ return Receive(pkt, 0);
}
void Connection::Send(session_t peer_id, u8 channelnum,
{
assert(channelnum < CHANNEL_COUNT); // Pre-condition
- ConnectionCommand c;
-
- c.send(peer_id, channelnum, pkt, reliable);
- putCommand(c);
+ putCommand(ConnectionCommand::send(peer_id, channelnum, pkt, reliable));
}
Address Connection::GetPeerAddress(session_t peer_id)
LOG(dout_con << getDesc()
<< "createPeer(): giving peer_id=" << peer_id_new << std::endl);
- ConnectionCommand cmd;
- SharedBuffer<u8> reply(4);
- writeU8(&reply[0], PACKET_TYPE_CONTROL);
- writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
- writeU16(&reply[2], peer_id_new);
- cmd.createPeer(peer_id_new,reply);
- putCommand(cmd);
+ {
+ Buffer<u8> reply(4);
+ writeU8(&reply[0], PACKET_TYPE_CONTROL);
+ writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
+ writeU16(&reply[2], peer_id_new);
+ putCommand(ConnectionCommand::createPeer(peer_id_new, reply));
+ }
// Create peer addition event
- ConnectionEvent e;
- e.peerAdded(peer_id_new, sender);
- putEvent(e);
+ putEvent(ConnectionEvent::peerAdded(peer_id_new, sender));
// We're now talking to a valid peer_id
return peer_id_new;
}
-void Connection::PrintInfo(std::ostream &out)
-{
- m_info_mutex.lock();
- out<<getDesc()<<": ";
- m_info_mutex.unlock();
-}
-
const std::string Connection::getDesc()
{
+ MutexAutoLock _(m_info_mutex);
return std::string("con(")+
itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")";
}
void Connection::DisconnectPeer(session_t peer_id)
{
- ConnectionCommand discon;
- discon.disconnect_peer(peer_id);
- putCommand(discon);
+ putCommand(ConnectionCommand::disconnect_peer(peer_id));
}
void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
" channel: " << (channelnum & 0xFF) <<
" seqnum: " << seqnum << std::endl);
- ConnectionCommand c;
SharedBuffer<u8> ack(4);
writeU8(&ack[0], PACKET_TYPE_CONTROL);
writeU8(&ack[1], CONTROLTYPE_ACK);
writeU16(&ack[2], seqnum);
- c.ack(peer_id, channelnum, ack);
- putCommand(c);
+ putCommand(ConnectionCommand::ack(peer_id, channelnum, ack));
m_sendThread->Trigger();
}
UDPPeer* Connection::createServerPeer(Address& address)
{
- if (getPeerNoEx(PEER_ID_SERVER) != 0)
+ if (ConnectedToServer())
{
throw ConnectionException("Already connected to a server");
}