X-Git-Url: https://git.lizzy.rs/?a=blobdiff_plain;f=src%2Fconnection.cpp;h=42262846fbb789806071adba7ff4194c1d1c7f4c;hb=b38afc9311e235d2b90d61dbbcf1a18e549073a7;hp=e890a12a11a582162704d3f1bbce4f71bc2431fc;hpb=3fb0d2fb65c968f91c333a1d31d2d7a1a02ab7d1;p=dragonfireclient.git diff --git a/src/connection.cpp b/src/connection.cpp index e890a12a1..42262846f 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -1,18 +1,18 @@ /* -Minetest-c55 -Copyright (C) 2010 celeron55, Perttu Ahola +Minetest +Copyright (C) 2013 celeron55, Perttu Ahola This program is free software; you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation; either version 2 of the License, or +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. +GNU Lesser General Public License for more details. -You should have received a copy of the GNU General Public License along +You should have received a copy of the GNU Lesser General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ @@ -20,10 +20,25 @@ with this program; if not, write to the Free Software Foundation, Inc., #include "connection.h" #include "main.h" #include "serialization.h" +#include "log.h" +#include "porting.h" +#include "util/serialize.h" +#include "util/numeric.h" +#include "util/string.h" +#include "settings.h" namespace con { +static u16 readPeerId(u8 *packetdata) +{ + return readU16(&packetdata[4]); +} +static u8 readChannel(u8 *packetdata) +{ + return readU8(&packetdata[6]); +} + BufferedPacket makePacket(Address &address, u8 *data, u32 datasize, u32 protocol_id, u16 sender_peer_id, u8 channel) { @@ -61,19 +76,20 @@ SharedBuffer makeOriginalPacket( return b; } -core::list > makeSplitPacket( +std::list > makeSplitPacket( SharedBuffer data, u32 chunksize_max, u16 seqnum) { // Chunk packets, containing the TYPE_SPLIT header - core::list > chunks; + std::list > chunks; u32 chunk_header_size = 7; u32 maximum_data_size = chunksize_max - chunk_header_size; u32 start = 0; u32 end = 0; u32 chunk_num = 0; + u16 chunk_count = 0; do{ end = start + maximum_data_size - 1; if(end > data.getSize() - 1) @@ -91,16 +107,15 @@ core::list > makeSplitPacket( memcpy(&chunk[chunk_header_size], &data[start], payload_size); chunks.push_back(chunk); + chunk_count++; start = end + 1; chunk_num++; } while(end != data.getSize() - 1); - u16 chunk_count = chunks.getSize(); - - core::list >::Iterator i = chunks.begin(); - for(; i != chunks.end(); i++) + for(std::list >::iterator i = chunks.begin(); + i != chunks.end(); ++i) { // Write chunk_count writeU16(&((*i)[3]), chunk_count); @@ -109,13 +124,13 @@ core::list > makeSplitPacket( return chunks; } -core::list > makeAutoSplitPacket( +std::list > makeAutoSplitPacket( SharedBuffer data, u32 chunksize_max, u16 &split_seqnum) { u32 original_header_size = 1; - core::list > list; + std::list > list; if(data.getSize() + original_header_size > chunksize_max) { list = makeSplitPacket(data, chunksize_max, split_seqnum); @@ -155,11 +170,13 @@ SharedBuffer makeReliablePacket( ReliablePacketBuffer */ +ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {} + void ReliablePacketBuffer::print() { - core::list::Iterator i; - i = m_list.begin(); - for(; i != m_list.end(); i++) + for(std::list::iterator i = m_list.begin(); + i != m_list.end(); + ++i) { u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); dout_con<::Iterator i; - i = m_list.begin(); - for(; i != m_list.end(); i++) + std::list::iterator i = m_list.begin(); + for(; i != m_list.end(); ++i) { u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); /*dout_con<<"findPacket(): finding seqnum="<::Iterator i = m_list.begin(); - m_list.erase(i); + m_list.erase(m_list.begin()); + --m_list_size; return p; } BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) @@ -216,6 +233,7 @@ BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum) } BufferedPacket p = *r; m_list.erase(r); + --m_list_size; return p; } void ReliablePacketBuffer::insert(BufferedPacket &p) @@ -225,6 +243,7 @@ void ReliablePacketBuffer::insert(BufferedPacket &p) assert(type == TYPE_RELIABLE); u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); + ++m_list_size; // Find the right place for the packet and insert it there // If list is empty, just add it @@ -235,12 +254,12 @@ void ReliablePacketBuffer::insert(BufferedPacket &p) return; } // Otherwise find the right place - core::list::Iterator i; - i = m_list.begin(); + std::list::iterator i = m_list.begin(); // Find the first packet in the list which has a higher seqnum - for(; i != m_list.end(); i++){ + for(; i != m_list.end(); ++i){ u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1])); if(s == seqnum){ + --m_list_size; throw AlreadyExistsException("Same seqnum in list"); } if(seqnum_higher(s, seqnum)){ @@ -256,14 +275,14 @@ void ReliablePacketBuffer::insert(BufferedPacket &p) return; } // Insert before i - m_list.insert_before(i, p); + m_list.insert(i, p); } void ReliablePacketBuffer::incrementTimeouts(float dtime) { - core::list::Iterator i; - i = m_list.begin(); - for(; i != m_list.end(); i++){ + for(std::list::iterator i = m_list.begin(); + i != m_list.end(); ++i) + { i->time += dtime; i->totaltime += dtime; } @@ -271,9 +290,9 @@ void ReliablePacketBuffer::incrementTimeouts(float dtime) void ReliablePacketBuffer::resetTimedOuts(float timeout) { - core::list::Iterator i; - i = m_list.begin(); - for(; i != m_list.end(); i++){ + for(std::list::iterator i = m_list.begin(); + i != m_list.end(); ++i) + { if(i->time >= timeout) i->time = 0.0; } @@ -281,21 +300,20 @@ void ReliablePacketBuffer::resetTimedOuts(float timeout) bool ReliablePacketBuffer::anyTotaltimeReached(float timeout) { - core::list::Iterator i; - i = m_list.begin(); - for(; i != m_list.end(); i++){ + for(std::list::iterator i = m_list.begin(); + i != m_list.end(); ++i) + { if(i->totaltime >= timeout) return true; } return false; } -core::list ReliablePacketBuffer::getTimedOuts(float timeout) +std::list ReliablePacketBuffer::getTimedOuts(float timeout) { - core::list timed_outs; - core::list::Iterator i; - i = m_list.begin(); - for(; i != m_list.end(); i++) + std::list timed_outs; + for(std::list::iterator i = m_list.begin(); + i != m_list.end(); ++i) { if(i->time >= timeout) timed_outs.push_back(*i); @@ -309,18 +327,17 @@ core::list ReliablePacketBuffer::getTimedOuts(float timeout) IncomingSplitBuffer::~IncomingSplitBuffer() { - core::map::Iterator i; - i = m_buf.getIterator(); - for(; i.atEnd() == false; i++) + for(std::map::iterator i = m_buf.begin(); + i != m_buf.end(); ++i) { - delete i.getNode()->getValue(); + delete i->second; } } /* This will throw a GotSplitPacketException when a full split packet is constructed. */ -void IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable) +SharedBuffer IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable) { u32 headersize = BASE_HEADER_SIZE + 7; assert(p.data.getSize() >= headersize); @@ -331,7 +348,7 @@ void IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable) u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]); // Add if doesn't exist - if(m_buf.find(seqnum) == NULL) + if(m_buf.find(seqnum) == m_buf.end()) { IncomingSplitPacket *sp = new IncomingSplitPacket(); sp->chunk_count = chunk_count; @@ -351,9 +368,11 @@ void IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable) <<" != sp->reliable="<reliable <chunks.find(chunk_num) != NULL) - throw AlreadyExistsException("Chunk already in buffer"); + // If chunk already exists, ignore it. + // Sometimes two identical packets may arrive when there is network + // lag and the server re-sends stuff. + if(sp->chunks.find(chunk_num) != sp->chunks.end()) + return SharedBuffer(); // Cut chunk data out of packet u32 chunkdatasize = p.data.getSize() - headersize; @@ -363,17 +382,16 @@ void IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable) // Set chunk data in buffer sp->chunks[chunk_num] = chunkdata; - // If not all chunks are received, return + // If not all chunks are received, return empty buffer if(sp->allReceived() == false) - return; + return SharedBuffer(); // Calculate total size u32 totalsize = 0; - core::map >::Iterator i; - i = sp->chunks.getIterator(); - for(; i.atEnd() == false; i++) + for(std::map >::iterator i = sp->chunks.begin(); + i != sp->chunks.end(); ++i) { - totalsize += i.getNode()->getValue().getSize(); + totalsize += i->second.getSize(); } SharedBuffer fulldata(totalsize); @@ -390,34 +408,32 @@ void IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable) } // Remove sp from buffer - m_buf.remove(seqnum); + m_buf.erase(seqnum); delete sp; - - throw GotSplitPacketException(fulldata); + + return fulldata; } void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout) { - core::list remove_queue; - core::map::Iterator i; - i = m_buf.getIterator(); - for(; i.atEnd() == false; i++) + std::list remove_queue; + for(std::map::iterator i = m_buf.begin(); + i != m_buf.end(); ++i) { - IncomingSplitPacket *p = i.getNode()->getValue(); + IncomingSplitPacket *p = i->second; // Reliable ones are not removed by timeout if(p->reliable == true) continue; p->time += dtime; if(p->time >= timeout) - remove_queue.push_back(i.getNode()->getKey()); + remove_queue.push_back(i->first); } - core::list::Iterator j; - j = remove_queue.begin(); - for(; j != remove_queue.end(); j++) + for(std::list::iterator j = remove_queue.begin(); + j != remove_queue.end(); ++j) { dout_con<<"NOTE: Removing timed out unreliable split packet" <= 0.0){ + if(rtt < 0.01){ + if(m_max_packets_per_second < congestion_control_max_rate) + m_max_packets_per_second += 10; + } else if(rtt < congestion_control_aim_rtt){ + if(m_max_packets_per_second < congestion_control_max_rate) + m_max_packets_per_second += 2; + } else { + m_max_packets_per_second *= 0.8; + if(m_max_packets_per_second < congestion_control_min_rate) + m_max_packets_per_second = congestion_control_min_rate; + } + } + if(rtt < -0.999) {} else if(avg_rtt < 0.0) @@ -485,216 +521,891 @@ void Peer::reportRTT(float rtt) Connection */ -Connection::Connection( - u32 protocol_id, - u32 max_packet_size, - float timeout, - PeerHandler *peerhandler -) +Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, + bool ipv6): + m_protocol_id(protocol_id), + m_max_packet_size(max_packet_size), + m_timeout(timeout), + m_socket(ipv6), + m_peer_id(0), + m_bc_peerhandler(NULL), + m_bc_receive_timeout(0), + m_indentation(0) { - assert(peerhandler != NULL); + m_socket.setTimeoutMs(5); - m_protocol_id = protocol_id; - m_max_packet_size = max_packet_size; - m_timeout = timeout; - m_peer_id = PEER_ID_INEXISTENT; - //m_waiting_new_peer_id = false; - m_indentation = 0; - m_peerhandler = peerhandler; + Start(); +} + +Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout, + bool ipv6, PeerHandler *peerhandler): + m_protocol_id(protocol_id), + m_max_packet_size(max_packet_size), + m_timeout(timeout), + m_socket(ipv6), + m_peer_id(0), + m_bc_peerhandler(peerhandler), + m_bc_receive_timeout(0), + m_indentation(0) +{ + m_socket.setTimeoutMs(5); + + Start(); } + Connection::~Connection() { - // Clear peers - core::map::Iterator j; - j = m_peers.getIterator(); - for(; j.atEnd() == false; j++) + stop(); + // Delete peers + for(std::map::iterator + j = m_peers.begin(); + j != m_peers.end(); ++j) { - Peer *peer = j.getNode()->getValue(); - delete peer; + delete j->second; } } -void Connection::Serve(unsigned short port) -{ - m_socket.Bind(port); - m_peer_id = PEER_ID_SERVER; -} +/* Internal stuff */ -void Connection::Connect(Address address) +void * Connection::Thread() { - core::map::Node *node = m_peers.find(PEER_ID_SERVER); - if(node != NULL){ - throw ConnectionException("Already connected to a server"); - } + ThreadStarted(); + log_register_thread("Connection"); - Peer *peer = new Peer(PEER_ID_SERVER, address); - m_peers.insert(peer->id, peer); - m_peerhandler->peerAdded(peer); + dout_con<<"Connection thread started"< data(0); - Send(PEER_ID_SERVER, 0, data, true); + u32 curtime = porting::getTimeMs(); + u32 lasttime = curtime; + + while(getRun()) + { + BEGIN_DEBUG_EXCEPTION_HANDLER + + lasttime = curtime; + curtime = porting::getTimeMs(); + float dtime = (float)(curtime - lasttime) / 1000.; + if(dtime > 0.1) + dtime = 0.1; + if(dtime < 0.0) + dtime = 0.0; + + runTimeouts(dtime); + + while(!m_command_queue.empty()){ + ConnectionCommand c = m_command_queue.pop_front(); + processCommand(c); + } + + send(dtime); + + receive(); + + END_DEBUG_EXCEPTION_HANDLER(derr_con); + } - //m_waiting_new_peer_id = true; + return NULL; } -void Connection::Disconnect() +void Connection::putEvent(ConnectionEvent &e) { - // Create and send DISCO packet - SharedBuffer data(2); - writeU8(&data[0], TYPE_CONTROL); - writeU8(&data[1], CONTROLTYPE_DISCO); - - // Send to all - core::map::Iterator j; - j = m_peers.getIterator(); - for(; j.atEnd() == false; j++) - { - Peer *peer = j.getNode()->getValue(); - SendAsPacket(peer->id, 0, data, false); - } + assert(e.type != CONNEVENT_NONE); + m_event_queue.push_back(e); } -bool Connection::Connected() +void Connection::processCommand(ConnectionCommand &c) { - if(m_peers.size() != 1) - return false; - - core::map::Node *node = m_peers.find(PEER_ID_SERVER); - if(node == NULL) - return false; - - if(m_peer_id == PEER_ID_INEXISTENT) - return false; - - return true; + switch(c.type){ + case CONNCMD_NONE: + dout_con< Channel::ProcessPacket( - SharedBuffer packetdata, - Connection *con, - u16 peer_id, - u8 channelnum, - bool reliable) +void Connection::send(float dtime) { - IndentationRaiser iraiser(&(con->m_indentation)); + for(std::map::iterator + j = m_peers.begin(); + j != m_peers.end(); ++j) + { + Peer *peer = j->second; + peer->m_sendtime_accu += dtime; + peer->m_num_sent = 0; + peer->m_max_num_sent = peer->m_sendtime_accu * + peer->m_max_packets_per_second; + } + Queue postponed_packets; + while(!m_outgoing_queue.empty()){ + OutgoingPacket packet = m_outgoing_queue.pop_front(); + Peer *peer = getPeerNoEx(packet.peer_id); + if(!peer) + continue; + if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){ + postponed_packets.push_back(packet); + } else if(peer->m_num_sent < peer->m_max_num_sent){ + rawSendAsPacket(packet.peer_id, packet.channelnum, + packet.data, packet.reliable); + peer->m_num_sent++; + } else { + postponed_packets.push_back(packet); + } + } + while(!postponed_packets.empty()){ + m_outgoing_queue.push_back(postponed_packets.pop_front()); + } + for(std::map::iterator + j = m_peers.begin(); + j != m_peers.end(); ++j) + { + Peer *peer = j->second; + peer->m_sendtime_accu -= (float)peer->m_num_sent / + peer->m_max_packets_per_second; + if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second) + peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second; + } +} - if(packetdata.getSize() < 1) - throw InvalidIncomingDataException("packetdata.getSize() < 1"); +// Receive packets from the network and buffers and create ConnectionEvents +void Connection::receive() +{ + u32 datasize = m_max_packet_size * 2; // Double it just to be safe + // TODO: We can not know how many layers of header there are. + // For now, just assume there are no other than the base headers. + u32 packet_maxsize = datasize + BASE_HEADER_SIZE; + SharedBuffer packetdata(packet_maxsize); - u8 type = readU8(&packetdata[0]); + bool single_wait_done = false; - if(type == TYPE_CONTROL) + for(u32 loop_i=0; loop_i<1000; loop_i++) // Limit in case of DoS { - if(packetdata.getSize() < 2) - throw InvalidIncomingDataException("packetdata.getSize() < 2"); - - u8 controltype = readU8(&packetdata[1]); - - if(controltype == CONTROLTYPE_ACK) + try{ + /* Check if some buffer has relevant data */ { - if(packetdata.getSize() < 4) - throw InvalidIncomingDataException - ("packetdata.getSize() < 4 (ACK header size)"); - - u16 seqnum = readU16(&packetdata[2]); - con->PrintInfo(); - dout_con<<"Got CONTROLTYPE_ACK: channelnum=" - <<((int)channelnum&0xff)<<", peer_id="<PrintInfo(); - outgoing_reliables.print(); - dout_con<PrintInfo(derr_con); - derr_con<<"WARNING: ACKed packet not " - "in outgoing queue" - < resultdata; + bool got = getFromBuffers(peer_id, resultdata); + if(got){ + ConnectionEvent e; + e.dataReceived(peer_id, resultdata); + putEvent(e); + continue; } + } + + if(single_wait_done){ + if(m_socket.WaitData(0) == false) + break; + } + + single_wait_done = true; - throw ProcessedSilentlyException("Got an ACK"); + Address sender; + s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize); + + if(received_size < 0) + break; + if(received_size < BASE_HEADER_SIZE) + continue; + if(readU32(&packetdata[0]) != m_protocol_id) + continue; + + u16 peer_id = readPeerId(*packetdata); + u8 channelnum = readChannel(*packetdata); + if(channelnum > CHANNEL_COUNT-1){ + PrintInfo(derr_con); + derr_con<<"Receive(): Invalid channel "<PrintInfo(); - dout_con<<"Got new peer id: "<GetPeerID() != PEER_ID_INEXISTENT) + std::map::iterator j; + j = m_peers.begin(); + for(; j != m_peers.end(); ++j) { - con->PrintInfo(derr_con); - derr_con<<"WARNING: Not changing" - " existing peer id."<second; + if(peer->has_sent_with_id) + continue; + if(peer->address == sender) + break; + } + + /* + If no peer was found with the same address and port, + we shall assume it is a new peer and create an entry. + */ + if(j == m_peers.end()) + { + // Pass on to adding the peer } + // Else: A peer was found. else { - dout_con<<"changing."<SetPeerID(peer_id_new); + Peer *peer = j->second; + peer_id = peer->id; + PrintInfo(derr_con); + derr_con<<"WARNING: Assuming unknown peer to be " + <<"peer_id="<PrintInfo(); - dout_con<<"PING"<PrintInfo(); - dout_con<<"DISCO: Removing peer "<<(peer_id)<deletePeer(peer_id, false) == false) + // Somebody wants to make a new connection + + // Get a unique peer id (2 or higher) + u16 peer_id_new = 2; + /* + Find an unused peer id + */ + bool out_of_ids = false; + for(;;) { - con->PrintInfo(derr_con); - derr_con<<"DISCO: Peer not found"<::iterator node = m_peers.find(peer_id); + + if(node == m_peers.end()) + { + // Peer not found + // This means that the peer id of the sender is not PEER_ID_INEXISTENT + // and it is invalid. + PrintInfo(derr_con); + derr_con<<"Receive(): Peer not found"<PrintInfo(); - dout_con<<"RETURNING TYPE_ORIGINAL to user" - <second; + + // Validate peer address + if(peer->address != sender) + { + PrintInfo(derr_con); + derr_con<<"Peer "<timeout_counter = 0.0; + + Channel *channel = &(peer->channels[channelnum]); + + // Throw the received packet to channel->processPacket() + + // Make a new SharedBuffer from the data without the base headers + SharedBuffer strippeddata(received_size - BASE_HEADER_SIZE); + memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE], + strippeddata.getSize()); + + try{ + // Process it (the result is some data with no headers made by us) + SharedBuffer resultdata = processPacket + (channel, strippeddata, peer_id, channelnum, false); + + PrintInfo(); + dout_con<<"ProcessPacket returned data of size " + <getFloat("congestion_control_aim_rtt"); + float congestion_control_max_rate + = g_settings->getFloat("congestion_control_max_rate"); + float congestion_control_min_rate + = g_settings->getFloat("congestion_control_min_rate"); + + std::list timeouted_peers; + for(std::map::iterator j = m_peers.begin(); + j != m_peers.end(); ++j) + { + Peer *peer = j->second; + + // Update congestion control values + peer->congestion_control_aim_rtt = congestion_control_aim_rtt; + peer->congestion_control_max_rate = congestion_control_max_rate; + peer->congestion_control_min_rate = congestion_control_min_rate; + + /* + Check peer timeout + */ + peer->timeout_counter += dtime; + if(peer->timeout_counter > m_timeout) + { + PrintInfo(derr_con); + derr_con<<"RunTimeouts(): Peer "<id + <<" has timed out." + <<" (source=peer->timeout_counter)" + <id); + // Don't bother going through the buffers of this one + continue; + } + + float resend_timeout = peer->resend_timeout; + for(u16 i=0; i timed_outs; + + Channel *channel = &peer->channels[i]; + + // Remove timed out incomplete unreliable split packets + channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout); + + // Increment reliable packet times + channel->outgoing_reliables.incrementTimeouts(dtime); + + // Check reliable packet total times, remove peer if + // over timeout. + if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout)) + { + PrintInfo(derr_con); + derr_con<<"RunTimeouts(): Peer "<id + <<" has timed out." + <<" (source=reliable packet totaltime)" + <id); + goto nextpeer; + } + + // Re-send timed out outgoing reliables + + timed_outs = channel-> + outgoing_reliables.getTimedOuts(resend_timeout); + + channel->outgoing_reliables.resetTimedOuts(resend_timeout); + + for(std::list::iterator j = timed_outs.begin(); + j != timed_outs.end(); ++j) + { + u16 peer_id = readPeerId(*(j->data)); + u8 channel = readChannel(*(j->data)); + u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1])); + + PrintInfo(derr_con); + derr_con<<"RE-SENDING timed-out RELIABLE to "; + j->address.print(&derr_con); + derr_con<<"(t/o="<::iterator node = m_peers.find(PEER_ID_SERVER); + if(node != m_peers.end()){ + throw ConnectionException("Already connected to a server"); + } + + Peer *peer = new Peer(PEER_ID_SERVER, address); + m_peers[peer->id] = peer; + + // Create event + ConnectionEvent e; + e.peerAdded(peer->id, peer->address); + putEvent(e); + + m_socket.Bind(0); + + // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT + m_peer_id = PEER_ID_INEXISTENT; + SharedBuffer data(0); + Send(PEER_ID_SERVER, 0, data, true); +} + +void Connection::disconnect() +{ + dout_con< data(2); + writeU8(&data[0], TYPE_CONTROL); + writeU8(&data[1], CONTROLTYPE_DISCO); + + // Send to all + for(std::map::iterator j = m_peers.begin(); + j != m_peers.end(); ++j) + { + Peer *peer = j->second; + rawSendAsPacket(peer->id, 0, data, false); + } +} + +void Connection::sendToAll(u8 channelnum, SharedBuffer data, bool reliable) +{ + for(std::map::iterator j = m_peers.begin(); + j != m_peers.end(); ++j) + { + Peer *peer = j->second; + send(peer->id, channelnum, data, reliable); + } +} + +void Connection::send(u16 peer_id, u8 channelnum, + SharedBuffer data, bool reliable) +{ + dout_con<address, data, + m_protocol_id, m_peer_id, channelnum); + + // Send the packet + rawSend(p); + } +} + +void Connection::rawSend(const BufferedPacket &packet) +{ + try{ + m_socket.Send(packet.address, *packet.data, packet.data.getSize()); + } catch(SendFailedException &e){ + derr_con<<"Connection::rawSend(): SendFailedException: " + <::iterator node = m_peers.find(peer_id); + + if(node == m_peers.end()){ + throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)"); + } + + // Error checking + assert(node->second->id == peer_id); + + return node->second; +} + +Peer* Connection::getPeerNoEx(u16 peer_id) +{ + std::map::iterator node = m_peers.find(peer_id); + + if(node == m_peers.end()){ + return NULL; + } + + // Error checking + assert(node->second->id == peer_id); + + return node->second; +} + +std::list Connection::getPeers() +{ + std::list list; + for(std::map::iterator j = m_peers.begin(); + j != m_peers.end(); ++j) + { + Peer *peer = j->second; + list.push_back(peer); + } + return list; +} + +bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer &dst) +{ + for(std::map::iterator j = m_peers.begin(); + j != m_peers.end(); ++j) + { + Peer *peer = j->second; + for(u16 i=0; ichannels[i]; + SharedBuffer resultdata; + bool got = checkIncomingBuffers(channel, peer_id, resultdata); + if(got){ + dst = resultdata; + return true; + } + } + } + return false; +} + +bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id, + SharedBuffer &dst) +{ + u16 firstseqnum = 0; + // Clear old packets from start of buffer + for(;;){ + bool found = channel->incoming_reliables.getFirstSeqnum(&firstseqnum); + if(!found) + break; + if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum)) + channel->incoming_reliables.popFirst(); + else + break; + } + // This happens if all packets are old + + if(channel->incoming_reliables.empty() == false) + { + if(firstseqnum == channel->next_incoming_seqnum) + { + BufferedPacket p = channel->incoming_reliables.popFirst(); + + peer_id = readPeerId(*p.data); + u8 channelnum = readChannel(*p.data); + u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); + + PrintInfo(); + dout_con<<"UNBUFFERING TYPE_RELIABLE" + <<" seqnum="<outgoing_reliables.print(); + dout_con< payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE); memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize()); return payload; @@ -704,32 +1415,29 @@ SharedBuffer Channel::ProcessPacket( // We have to create a packet again for buffering // This isn't actually too bad an idea. BufferedPacket packet = makePacket( - con->GetPeer(peer_id)->address, + getPeer(peer_id)->address, packetdata, - con->GetProtocolID(), + GetProtocolID(), peer_id, channelnum); - try{ - // Buffer the packet - incoming_splits.insert(packet, reliable); - } - // This exception happens when all the pieces of a packet - // are collected. - catch(GotSplitPacketException &e) + // Buffer the packet + SharedBuffer data = channel->incoming_splits.insert(packet, reliable); + if(data.getSize() != 0) { - con->PrintInfo(); + PrintInfo(); dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, " - <<"size="< Channel::ProcessPacket( u16 seqnum = readU16(&packetdata[1]); - bool is_future_packet = seqnum_higher(seqnum, next_incoming_seqnum); - bool is_old_packet = seqnum_higher(next_incoming_seqnum, seqnum); + bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum); + bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum); - con->PrintInfo(); + PrintInfo(); if(is_future_packet) dout_con<<"BUFFERING"; else if(is_old_packet) @@ -748,25 +1456,25 @@ SharedBuffer Channel::ProcessPacket( else dout_con<<"RECUR"; dout_con<<" TYPE_RELIABLE seqnum="<incoming_reliables.size() < 100); // Send a CONTROLTYPE_ACK SharedBuffer reply(4); writeU8(&reply[0], TYPE_CONTROL); writeU8(&reply[1], CONTROLTYPE_ACK); writeU16(&reply[2], seqnum); - con->SendAsPacket(peer_id, channelnum, reply, false); + rawSendAsPacket(peer_id, channelnum, reply, false); - //if(seqnum_higher(seqnum, next_incoming_seqnum)) + //if(seqnum_higher(seqnum, channel->next_incoming_seqnum)) if(is_future_packet) { - /*con->PrintInfo(); + /*PrintInfo(); dout_con<<"Buffering reliable packet (seqnum=" < Channel::ProcessPacket( // Actually we have to make a packet to buffer one. // Well, we have all the ingredients, so just do it. BufferedPacket packet = makePacket( - con->GetPeer(peer_id)->address, + getPeer(peer_id)->address, packetdata, - con->GetProtocolID(), + GetProtocolID(), peer_id, channelnum); try{ - incoming_reliables.insert(packet); + channel->incoming_reliables.insert(packet); - /*con->PrintInfo(); + /*PrintInfo(); dout_con<<"INCOMING: "; - incoming_reliables.print(); + channel->incoming_reliables.print(); dout_con< payload(packetdata.getSize() - RELIABLE_HEADER_SIZE); - memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize()); - - return ProcessPacket(payload, con, peer_id, channelnum, true); - } - else - { - con->PrintInfo(derr_con); - derr_con<<"Got invalid type="<<((int)type&0xff)< Channel::CheckIncomingBuffers(Connection *con, - u16 &peer_id) -{ - u16 firstseqnum = 0; - // Clear old packets from start of buffer - try{ - for(;;){ - firstseqnum = incoming_reliables.getFirstSeqnum(); - if(seqnum_higher(next_incoming_seqnum, firstseqnum)) - incoming_reliables.popFirst(); - else - break; - } - // This happens if all packets are old - }catch(con::NotFoundException) - {} - - if(incoming_reliables.empty() == false) - { - if(firstseqnum == next_incoming_seqnum) - { - BufferedPacket p = incoming_reliables.popFirst(); - - peer_id = readPeerId(*p.data); - u8 channelnum = readChannel(*p.data); - u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]); - - con->PrintInfo(); - dout_con<<"UNBUFFERING TYPE_RELIABLE" - <<" seqnum="< Connection::GetFromBuffers(u16 &peer_id) -{ - core::map::Iterator j; - j = m_peers.getIterator(); - for(; j.atEnd() == false; j++) - { - Peer *peer = j.getNode()->getValue(); - for(u16 i=0; ichannels[i]; - try{ - SharedBuffer resultdata = channel->CheckIncomingBuffers - (this, peer_id); - - return resultdata; - } - catch(NoIncomingDataException &e) - { - } - catch(InvalidIncomingDataException &e) - { - } - catch(ProcessedSilentlyException &e) - { - } - } - } - throw NoIncomingDataException("No relevant data in buffers"); -} - -u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize) -{ - /* - Receive a packet from the network - */ - - // TODO: We can not know how many layers of header there are. - // For now, just assume there are no other than the base headers. - u32 packet_maxsize = datasize + BASE_HEADER_SIZE; - Buffer packetdata(packet_maxsize); - - for(;;) - { - try - { - /* - Check if some buffer has relevant data - */ - try{ - SharedBuffer resultdata = GetFromBuffers(peer_id); - - if(datasize < resultdata.getSize()) - throw InvalidIncomingDataException - ("Buffer too small for received data"); - - memcpy(data, *resultdata, resultdata.getSize()); - return resultdata.getSize(); - } - catch(NoIncomingDataException &e) - { - } - - Address sender; - - s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize); - - if(received_size < 0) - throw NoIncomingDataException("No incoming data"); - if(received_size < BASE_HEADER_SIZE) - throw InvalidIncomingDataException("No full header received"); - if(readU32(&packetdata[0]) != m_protocol_id) - throw InvalidIncomingDataException("Invalid protocol id"); - - peer_id = readPeerId(*packetdata); - u8 channelnum = readChannel(*packetdata); - if(channelnum > CHANNEL_COUNT-1){ - PrintInfo(derr_con); - derr_con<<"Receive(): Invalid channel "<::Iterator j; - j = m_peers.getIterator(); - for(; j.atEnd() == false; j++) - { - Peer *peer = j.getNode()->getValue(); - if(peer->has_sent_with_id) - continue; - if(peer->address == sender) - break; - } - - /* - If no peer was found with the same address and port, - we shall assume it is a new peer and create an entry. - */ - if(j.atEnd()) - { - // Pass on to adding the peer - } - // Else: A peer was found. - else - { - Peer *peer = j.getNode()->getValue(); - peer_id = peer->id; - PrintInfo(derr_con); - derr_con<<"WARNING: Assuming unknown peer to be " - <<"peer_id="<getValue(); - - // Validate peer address - if(peer->address != sender) + throw ProcessedSilentlyException("Buffered future reliable packet"); + } + //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum)) + else if(is_old_packet) { - PrintInfo(derr_con); - derr_con<<"Peer "<timeout_counter = 0.0; - Channel *channel = &(peer->channels[channelnum]); - - // Throw the received packet to channel->processPacket() + channel->next_incoming_seqnum++; - // Make a new SharedBuffer from the data without the base headers - SharedBuffer strippeddata(received_size - BASE_HEADER_SIZE); - memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE], - strippeddata.getSize()); - - try{ - // Process it (the result is some data with no headers made by us) - SharedBuffer resultdata = channel->ProcessPacket - (strippeddata, this, peer_id, channelnum); - - PrintInfo(); - dout_con<<"ProcessPacket returned data of size " - < payload(packetdata.getSize() - RELIABLE_HEADER_SIZE); + memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize()); -void Connection::SendToAll(u8 channelnum, SharedBuffer data, bool reliable) -{ - core::map::Iterator j; - j = m_peers.getIterator(); - for(; j.atEnd() == false; j++) + return processPacket(channel, payload, peer_id, channelnum, true); + } + else { - Peer *peer = j.getNode()->getValue(); - Send(peer->id, channelnum, data, reliable); + PrintInfo(derr_con); + derr_con<<"Got invalid type="<<((int)type&0xff)< data, bool reliable) +bool Connection::deletePeer(u16 peer_id, bool timeout) { - assert(channelnum < CHANNEL_COUNT); + if(m_peers.find(peer_id) == m_peers.end()) + return false; - Peer *peer = GetPeer(peer_id); - Channel *channel = &(peer->channels[channelnum]); + Peer *peer = m_peers[peer_id]; - u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE; - if(reliable) - chunksize_max -= RELIABLE_HEADER_SIZE; + // Create event + ConnectionEvent e; + e.peerRemoved(peer_id, timeout, peer->address); + putEvent(e); - core::list > originals; - originals = makeAutoSplitPacket(data, chunksize_max, - channel->next_outgoing_split_seqnum); - - core::list >::Iterator i; - i = originals.begin(); - for(; i != originals.end(); i++) - { - SharedBuffer original = *i; - - SendAsPacket(peer_id, channelnum, original, reliable); - } + delete m_peers[peer_id]; + m_peers.erase(peer_id); + return true; } -void Connection::SendAsPacket(u16 peer_id, u8 channelnum, - SharedBuffer data, bool reliable) -{ - Peer *peer = GetPeer(peer_id); - Channel *channel = &(peer->channels[channelnum]); - - if(reliable) - { - u16 seqnum = channel->next_outgoing_seqnum; - channel->next_outgoing_seqnum++; - - SharedBuffer reliable = makeReliablePacket(data, seqnum); +/* Interface */ - // Add base headers and make a packet - BufferedPacket p = makePacket(peer->address, reliable, - m_protocol_id, m_peer_id, channelnum); - - try{ - // Buffer the packet - channel->outgoing_reliables.insert(p); - } - catch(AlreadyExistsException &e) - { - PrintInfo(derr_con); - derr_con<<"WARNING: Going to send a reliable packet " - "seqnum="<address, data, - m_protocol_id, m_peer_id, channelnum); + return m_event_queue.pop_front(); +} - // Send the packet - RawSend(p); +ConnectionEvent Connection::waitEvent(u32 timeout_ms) +{ + try{ + return m_event_queue.pop_front(timeout_ms); + } catch(ItemNotFoundException &ex){ + ConnectionEvent e; + e.type = CONNEVENT_NONE; + return e; } } -void Connection::RawSend(const BufferedPacket &packet) +void Connection::putCommand(ConnectionCommand &c) { - m_socket.Send(packet.address, *packet.data, packet.data.getSize()); + m_command_queue.push_back(c); } -void Connection::RunTimeouts(float dtime) +void Connection::Serve(unsigned short port) { - core::list timeouted_peers; - core::map::Iterator j; - j = m_peers.getIterator(); - for(; j.atEnd() == false; j++) - { - Peer *peer = j.getNode()->getValue(); - - /* - Check peer timeout - */ - peer->timeout_counter += dtime; - if(peer->timeout_counter > m_timeout) - { - PrintInfo(derr_con); - derr_con<<"RunTimeouts(): Peer "<id - <<" has timed out." - <<" (source=peer->timeout_counter)" - <id); - // Don't bother going through the buffers of this one - continue; - } - - float resend_timeout = peer->resend_timeout; - for(u16 i=0; i timed_outs; - core::list::Iterator j; - - Channel *channel = &peer->channels[i]; - - // Remove timed out incomplete unreliable split packets - channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout); - - // Increment reliable packet times - channel->outgoing_reliables.incrementTimeouts(dtime); - - // Check reliable packet total times, remove peer if - // over timeout. - if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout)) - { - PrintInfo(derr_con); - derr_con<<"RunTimeouts(): Peer "<id - <<" has timed out." - <<" (source=reliable packet totaltime)" - <id); - goto nextpeer; - } - - // Re-send timed out outgoing reliables - - timed_outs = channel-> - outgoing_reliables.getTimedOuts(resend_timeout); - - channel->outgoing_reliables.resetTimedOuts(resend_timeout); - - j = timed_outs.begin(); - for(; j != timed_outs.end(); j++) - { - u16 peer_id = readPeerId(*(j->data)); - u8 channel = readChannel(*(j->data)); - u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1])); - - PrintInfo(derr_con); - derr_con<<"RE-SENDING timed-out RELIABLE to "; - j->address.print(&derr_con); - derr_con<<"(t/o="<::iterator node = m_peers.find(PEER_ID_SERVER); + if(node == m_peers.end()) + return false; + + if(m_peer_id == PEER_ID_INEXISTENT) + return false; + + return true; } -Peer* Connection::GetPeer(u16 peer_id) +void Connection::Disconnect() { - core::map::Node *node = m_peers.find(peer_id); + ConnectionCommand c; + c.disconnect(); + putCommand(c); +} - if(node == NULL){ - // Peer not found - throw PeerNotFoundException("Peer not found (possible timeout)"); +u32 Connection::Receive(u16 &peer_id, SharedBuffer &data) +{ + for(;;){ + ConnectionEvent e = waitEvent(m_bc_receive_timeout); + if(e.type != CONNEVENT_NONE) + dout_con<(e.data); + return e.data.getSize(); + case CONNEVENT_PEER_ADDED: { + Peer tmp(e.peer_id, e.address); + if(m_bc_peerhandler) + m_bc_peerhandler->peerAdded(&tmp); + continue; } + case CONNEVENT_PEER_REMOVED: { + Peer tmp(e.peer_id, e.address); + if(m_bc_peerhandler) + m_bc_peerhandler->deletingPeer(&tmp, e.timeout); + continue; } + case CONNEVENT_BIND_FAILED: + throw ConnectionBindFailed("Failed to bind socket " + "(port already in use?)"); + } } + throw NoIncomingDataException("No incoming data"); +} - // Error checking - assert(node->getValue()->id == peer_id); +void Connection::SendToAll(u8 channelnum, SharedBuffer data, bool reliable) +{ + assert(channelnum < CHANNEL_COUNT); - return node->getValue(); + ConnectionCommand c; + c.sendToAll(channelnum, data, reliable); + putCommand(c); } -Peer* Connection::GetPeerNoEx(u16 peer_id) +void Connection::Send(u16 peer_id, u8 channelnum, + SharedBuffer data, bool reliable) { - core::map::Node *node = m_peers.find(peer_id); + assert(channelnum < CHANNEL_COUNT); - if(node == NULL){ - return NULL; - } + ConnectionCommand c; + c.send(peer_id, channelnum, data, reliable); + putCommand(c); +} - // Error checking - assert(node->getValue()->id == peer_id); +void Connection::RunTimeouts(float dtime) +{ + // No-op +} - return node->getValue(); +Address Connection::GetPeerAddress(u16 peer_id) +{ + JMutexAutoLock peerlock(m_peers_mutex); + return getPeer(peer_id)->address; } -core::list Connection::GetPeers() +float Connection::GetPeerAvgRTT(u16 peer_id) { - core::list list; - core::map::Iterator j; - j = m_peers.getIterator(); - for(; j.atEnd() == false; j++) - { - Peer *peer = j.getNode()->getValue(); - list.push_back(peer); - } - return list; + JMutexAutoLock peerlock(m_peers_mutex); + return getPeer(peer_id)->avg_rtt; } -bool Connection::deletePeer(u16 peer_id, bool timeout) +void Connection::DeletePeer(u16 peer_id) { - if(m_peers.find(peer_id) == NULL) - return false; - m_peerhandler->deletingPeer(m_peers[peer_id], timeout); - delete m_peers[peer_id]; - m_peers.remove(peer_id); - return true; + ConnectionCommand c; + c.deletePeer(peer_id); + putCommand(c); } void Connection::PrintInfo(std::ostream &out) { - out<