/* defines used for debugging and profiling */
/******************************************************************************/
#ifdef NDEBUG
-#define LOG(a) a
-#define PROFILE(a)
+ #define LOG(a) a
+ #define PROFILE(a)
#else
-/* this mutex is used to achieve log message consistency */
-std::mutex log_message_mutex;
-#define LOG(a) \
- { \
- MutexAutoLock loglock(log_message_mutex); \
- a; \
- }
-#define PROFILE(a) a
+ #if 0
+ /* this mutex is used to achieve log message consistency */
+ std::mutex log_message_mutex;
+ #define LOG(a) \
+ { \
+ MutexAutoLock loglock(log_message_mutex); \
+ a; \
+ }
+ #else
+ // Prevent deadlocks until a solution is found after 5.2.0 (TODO)
+ #define LOG(a) a
+ #endif
+
+ #define PROFILE(a) a
#endif
#define PING_TIMEOUT 5.0
Peer(a_address,a_id,connection)
{
for (Channel &channel : channels)
- channel.setWindowSize(g_settings->getU16("max_packets_per_iteration"));
+ channel.setWindowSize(START_RELIABLE_WINDOW_SIZE);
}
bool UDPPeer::getAddress(MTProtocols type,Address& toset)
if (m_pending_disconnect)
return;
- if ( channels[c.channelnum].queued_commands.empty() &&
+ Channel &chan = channels[c.channelnum];
+
+ if (chan.queued_commands.empty() &&
/* don't queue more packets then window size */
- (channels[c.channelnum].queued_reliables.size()
- < (channels[c.channelnum].getWindowSize()/2))) {
+ (chan.queued_reliables.size() < chan.getWindowSize() / 2)) {
LOG(dout_con<<m_connection->getDesc()
<<" processing reliable command for peer id: " << c.peer_id
<<" data size: " << c.data.getSize() << std::endl);
if (!processReliableSendCommand(c,max_packet_size)) {
- channels[c.channelnum].queued_commands.push_back(c);
+ chan.queued_commands.push_back(c);
}
}
else {
LOG(dout_con<<m_connection->getDesc()
<<" Queueing reliable command for peer id: " << c.peer_id
<<" data size: " << c.data.getSize() <<std::endl);
- channels[c.channelnum].queued_commands.push_back(c);
+ chan.queued_commands.push_back(c);
+ if (chan.queued_commands.size() >= chan.getWindowSize() / 2) {
+ LOG(derr_con << m_connection->getDesc()
+ << "Possible packet stall to peer id: " << c.peer_id
+ << " queued_commands=" << chan.queued_commands.size()
+ << std::endl);
+ }
}
}
if (m_pending_disconnect)
return true;
+ Channel &chan = channels[c.channelnum];
+
u32 chunksize_max = max_packet_size
- BASE_HEADER_SIZE
- RELIABLE_HEADER_SIZE;
sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512);
std::list<SharedBuffer<u8>> originals;
- u16 split_sequence_number = channels[c.channelnum].readNextSplitSeqNum();
+ u16 split_sequence_number = chan.readNextSplitSeqNum();
if (c.raw) {
originals.emplace_back(c.data);
} else {
makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number, &originals);
- channels[c.channelnum].setNextSplitSeqNum(split_sequence_number);
+ chan.setNextSplitSeqNum(split_sequence_number);
}
bool have_sequence_number = true;
volatile u16 initial_sequence_number = 0;
for (SharedBuffer<u8> &original : originals) {
- u16 seqnum = channels[c.channelnum].getOutgoingSequenceNumber(have_sequence_number);
+ u16 seqnum = chan.getOutgoingSequenceNumber(have_sequence_number);
/* oops, we don't have enough sequence numbers to send this packet */
if (!have_sequence_number)
// << " channel: " << (c.channelnum&0xFF)
// << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
// << std::endl)
- channels[c.channelnum].queued_reliables.push(p);
+ chan.queued_reliables.push(p);
pcount++;
}
- sanity_check(channels[c.channelnum].queued_reliables.size() < 0xFFFF);
+ sanity_check(chan.queued_reliables.size() < 0xFFFF);
return true;
}
toadd.pop();
bool successfully_put_back_sequence_number
- = channels[c.channelnum].putBackSequenceNumber(
+ = chan.putBackSequenceNumber(
(initial_sequence_number+toadd.size() % (SEQNUM_MAX+1)));
FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error");
}
+ // DO NOT REMOVE n_queued! It avoids a deadlock of async locked
+ // 'log_message_mutex' and 'm_list_mutex'.
+ u32 n_queued = chan.outgoing_reliables_sent.size();
+
LOG(dout_con<<m_connection->getDesc()
<< " Windowsize exceeded on reliable sending "
<< c.data.getSize() << " bytes"
<< std::endl << "\t\tgot at most : "
<< packets_available << " packets"
<< std::endl << "\t\tpackets queued : "
- << channels[c.channelnum].outgoing_reliables_sent.size()
+ << n_queued
<< std::endl);
return false;
m_bc_peerhandler(peerhandler)
{
- m_udpSocket.setTimeoutMs(5);
+ /* Amount of time Receive() will wait for data, this is entirely different
+ * from the connection timeout */
+ m_udpSocket.setTimeoutMs(500);
m_sendThread->setParent(this);
m_receiveThread->setParent(this);
return false;
peer = m_peers[peer_id];
m_peers.erase(peer_id);
- m_peer_ids.remove(peer_id);
+ auto it = std::find(m_peer_ids.begin(), m_peer_ids.end(), peer_id);
+ m_peer_ids.erase(it);
}
Address peer_address;
putCommand(c);
}
-void Connection::Receive(NetworkPacket* pkt)
+bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
{
+ /*
+ Note that this function can potentially wait infinitely if non-data
+ events keep happening before the timeout expires.
+ This is not considered to be a problem (is it?)
+ */
for(;;) {
- ConnectionEvent e = waitEvent(m_bc_receive_timeout);
+ ConnectionEvent e = waitEvent(timeout);
if (e.type != CONNEVENT_NONE)
LOG(dout_con << getDesc() << ": Receive: got event: "
<< e.describe() << std::endl);
switch(e.type) {
case CONNEVENT_NONE:
- throw NoIncomingDataException("No incoming data");
+ return false;
case CONNEVENT_DATA_RECEIVED:
// Data size is lesser than command size, ignoring packet
if (e.data.getSize() < 2) {
}
pkt->putRawPacket(*e.data, e.data.getSize(), e.peer_id);
- return;
+ return true;
case CONNEVENT_PEER_ADDED: {
UDPPeer tmp(e.peer_id, e.address, this);
if (m_bc_peerhandler)
"(port already in use?)");
}
}
- throw NoIncomingDataException("No incoming data");
+ return false;
+}
+
+void Connection::Receive(NetworkPacket *pkt)
+{
+ bool any = Receive(pkt, m_bc_receive_timeout);
+ if (!any)
+ throw NoIncomingDataException("No incoming data");
+}
+
+bool Connection::TryReceive(NetworkPacket *pkt)
+{
+ return Receive(pkt, 0);
}
void Connection::Send(session_t peer_id, u8 channelnum,
UDPPeer* Connection::createServerPeer(Address& address)
{
- if (getPeerNoEx(PEER_ID_SERVER) != 0)
+ if (ConnectedToServer())
{
throw ConnectionException("Already connected to a server");
}