3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU Lesser General Public License for more details.
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
29 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
30 u32 protocol_id, u16 sender_peer_id, u8 channel)
32 u32 packet_size = datasize + BASE_HEADER_SIZE;
33 BufferedPacket p(packet_size);
36 writeU32(&p.data[0], protocol_id);
37 writeU16(&p.data[4], sender_peer_id);
38 writeU8(&p.data[6], channel);
40 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
45 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
46 u32 protocol_id, u16 sender_peer_id, u8 channel)
48 return makePacket(address, *data, data.getSize(),
49 protocol_id, sender_peer_id, channel);
52 SharedBuffer<u8> makeOriginalPacket(
53 SharedBuffer<u8> data)
56 u32 packet_size = data.getSize() + header_size;
57 SharedBuffer<u8> b(packet_size);
59 writeU8(&b[0], TYPE_ORIGINAL);
61 memcpy(&b[header_size], *data, data.getSize());
66 core::list<SharedBuffer<u8> > makeSplitPacket(
67 SharedBuffer<u8> data,
71 // Chunk packets, containing the TYPE_SPLIT header
72 core::list<SharedBuffer<u8> > chunks;
74 u32 chunk_header_size = 7;
75 u32 maximum_data_size = chunksize_max - chunk_header_size;
80 end = start + maximum_data_size - 1;
81 if(end > data.getSize() - 1)
82 end = data.getSize() - 1;
84 u32 payload_size = end - start + 1;
85 u32 packet_size = chunk_header_size + payload_size;
87 SharedBuffer<u8> chunk(packet_size);
89 writeU8(&chunk[0], TYPE_SPLIT);
90 writeU16(&chunk[1], seqnum);
91 // [3] u16 chunk_count is written at next stage
92 writeU16(&chunk[5], chunk_num);
93 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
95 chunks.push_back(chunk);
100 while(end != data.getSize() - 1);
102 u16 chunk_count = chunks.getSize();
104 core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
105 for(; i != chunks.end(); i++)
108 writeU16(&((*i)[3]), chunk_count);
114 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
115 SharedBuffer<u8> data,
119 u32 original_header_size = 1;
120 core::list<SharedBuffer<u8> > list;
121 if(data.getSize() + original_header_size > chunksize_max)
123 list = makeSplitPacket(data, chunksize_max, split_seqnum);
129 list.push_back(makeOriginalPacket(data));
134 SharedBuffer<u8> makeReliablePacket(
135 SharedBuffer<u8> data,
138 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
139 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
140 <<((unsigned int)data[0]&0xff)<<std::endl;*/
142 u32 packet_size = data.getSize() + header_size;
143 SharedBuffer<u8> b(packet_size);
145 writeU8(&b[0], TYPE_RELIABLE);
146 writeU16(&b[1], seqnum);
148 memcpy(&b[header_size], *data, data.getSize());
150 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
151 <<((unsigned int)data[0]&0xff)<<std::endl;*/
152 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
160 void ReliablePacketBuffer::print()
162 core::list<BufferedPacket>::Iterator i;
164 for(; i != m_list.end(); i++)
166 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
170 bool ReliablePacketBuffer::empty()
172 return m_list.empty();
174 u32 ReliablePacketBuffer::size()
176 return m_list.getSize();
178 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
180 core::list<BufferedPacket>::Iterator i;
182 for(; i != m_list.end(); i++)
184 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
186 <<", comparing to s="<<s<<std::endl;*/
192 RPBSearchResult ReliablePacketBuffer::notFound()
196 u16 ReliablePacketBuffer::getFirstSeqnum()
199 throw NotFoundException("Buffer is empty");
200 BufferedPacket p = *m_list.begin();
201 return readU16(&p.data[BASE_HEADER_SIZE+1]);
203 BufferedPacket ReliablePacketBuffer::popFirst()
206 throw NotFoundException("Buffer is empty");
207 BufferedPacket p = *m_list.begin();
208 core::list<BufferedPacket>::Iterator i = m_list.begin();
212 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
214 RPBSearchResult r = findPacket(seqnum);
216 dout_con<<"Not found"<<std::endl;
217 throw NotFoundException("seqnum not found in buffer");
219 BufferedPacket p = *r;
223 void ReliablePacketBuffer::insert(BufferedPacket &p)
225 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
226 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
227 assert(type == TYPE_RELIABLE);
228 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
230 // Find the right place for the packet and insert it there
232 // If list is empty, just add it
239 // Otherwise find the right place
240 core::list<BufferedPacket>::Iterator i;
242 // Find the first packet in the list which has a higher seqnum
243 for(; i != m_list.end(); i++){
244 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
246 throw AlreadyExistsException("Same seqnum in list");
248 if(seqnum_higher(s, seqnum)){
252 // If we're at the end of the list, add the packet to the
254 if(i == m_list.end())
261 m_list.insert_before(i, p);
264 void ReliablePacketBuffer::incrementTimeouts(float dtime)
266 core::list<BufferedPacket>::Iterator i;
268 for(; i != m_list.end(); i++){
270 i->totaltime += dtime;
274 void ReliablePacketBuffer::resetTimedOuts(float timeout)
276 core::list<BufferedPacket>::Iterator i;
278 for(; i != m_list.end(); i++){
279 if(i->time >= timeout)
284 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
286 core::list<BufferedPacket>::Iterator i;
288 for(; i != m_list.end(); i++){
289 if(i->totaltime >= timeout)
295 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
297 core::list<BufferedPacket> timed_outs;
298 core::list<BufferedPacket>::Iterator i;
300 for(; i != m_list.end(); i++)
302 if(i->time >= timeout)
303 timed_outs.push_back(*i);
312 IncomingSplitBuffer::~IncomingSplitBuffer()
314 core::map<u16, IncomingSplitPacket*>::Iterator i;
315 i = m_buf.getIterator();
316 for(; i.atEnd() == false; i++)
318 delete i.getNode()->getValue();
322 This will throw a GotSplitPacketException when a full
323 split packet is constructed.
325 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
327 u32 headersize = BASE_HEADER_SIZE + 7;
328 assert(p.data.getSize() >= headersize);
329 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
330 assert(type == TYPE_SPLIT);
331 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
332 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
333 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
335 // Add if doesn't exist
336 if(m_buf.find(seqnum) == NULL)
338 IncomingSplitPacket *sp = new IncomingSplitPacket();
339 sp->chunk_count = chunk_count;
340 sp->reliable = reliable;
344 IncomingSplitPacket *sp = m_buf[seqnum];
346 // TODO: These errors should be thrown or something? Dunno.
347 if(chunk_count != sp->chunk_count)
348 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
349 <<" != sp->chunk_count="<<sp->chunk_count
351 if(reliable != sp->reliable)
352 derr_con<<"Connection: WARNING: reliable="<<reliable
353 <<" != sp->reliable="<<sp->reliable
356 // If chunk already exists, ignore it.
357 // Sometimes two identical packets may arrive when there is network
358 // lag and the server re-sends stuff.
359 if(sp->chunks.find(chunk_num) != NULL)
360 return SharedBuffer<u8>();
362 // Cut chunk data out of packet
363 u32 chunkdatasize = p.data.getSize() - headersize;
364 SharedBuffer<u8> chunkdata(chunkdatasize);
365 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
367 // Set chunk data in buffer
368 sp->chunks[chunk_num] = chunkdata;
370 // If not all chunks are received, return empty buffer
371 if(sp->allReceived() == false)
372 return SharedBuffer<u8>();
374 // Calculate total size
376 core::map<u16, SharedBuffer<u8> >::Iterator i;
377 i = sp->chunks.getIterator();
378 for(; i.atEnd() == false; i++)
380 totalsize += i.getNode()->getValue().getSize();
383 SharedBuffer<u8> fulldata(totalsize);
385 // Copy chunks to data buffer
387 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
390 SharedBuffer<u8> buf = sp->chunks[chunk_i];
391 u16 chunkdatasize = buf.getSize();
392 memcpy(&fulldata[start], *buf, chunkdatasize);
393 start += chunkdatasize;;
396 // Remove sp from buffer
397 m_buf.remove(seqnum);
402 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
404 core::list<u16> remove_queue;
405 core::map<u16, IncomingSplitPacket*>::Iterator i;
406 i = m_buf.getIterator();
407 for(; i.atEnd() == false; i++)
409 IncomingSplitPacket *p = i.getNode()->getValue();
410 // Reliable ones are not removed by timeout
411 if(p->reliable == true)
414 if(p->time >= timeout)
415 remove_queue.push_back(i.getNode()->getKey());
417 core::list<u16>::Iterator j;
418 j = remove_queue.begin();
419 for(; j != remove_queue.end(); j++)
421 dout_con<<"NOTE: Removing timed out unreliable split packet"
434 next_outgoing_seqnum = SEQNUM_INITIAL;
435 next_incoming_seqnum = SEQNUM_INITIAL;
436 next_outgoing_split_seqnum = SEQNUM_INITIAL;
446 Peer::Peer(u16 a_id, Address a_address):
449 timeout_counter(0.0),
453 has_sent_with_id(false),
455 m_max_packets_per_second(10),
464 void Peer::reportRTT(float rtt)
468 if(m_max_packets_per_second < 400)
469 m_max_packets_per_second += 10;
470 } else if(rtt < 0.2){
471 if(m_max_packets_per_second < 100)
472 m_max_packets_per_second += 2;
474 m_max_packets_per_second *= 0.8;
475 if(m_max_packets_per_second < 10)
476 m_max_packets_per_second = 10;
482 else if(avg_rtt < 0.0)
485 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
487 // Calculate resend_timeout
489 /*int reliable_count = 0;
490 for(int i=0; i<CHANNEL_COUNT; i++)
492 reliable_count += channels[i].outgoing_reliables.size();
494 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
495 * ((float)reliable_count * 1);*/
497 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
498 if(timeout < RESEND_TIMEOUT_MIN)
499 timeout = RESEND_TIMEOUT_MIN;
500 if(timeout > RESEND_TIMEOUT_MAX)
501 timeout = RESEND_TIMEOUT_MAX;
502 resend_timeout = timeout;
509 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
510 m_protocol_id(protocol_id),
511 m_max_packet_size(max_packet_size),
514 m_bc_peerhandler(NULL),
515 m_bc_receive_timeout(0),
518 m_socket.setTimeoutMs(5);
523 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
524 PeerHandler *peerhandler):
525 m_protocol_id(protocol_id),
526 m_max_packet_size(max_packet_size),
529 m_bc_peerhandler(peerhandler),
530 m_bc_receive_timeout(0),
533 m_socket.setTimeoutMs(5);
539 Connection::~Connection()
543 for(core::map<u16, Peer*>::Iterator
544 j = m_peers.getIterator();
545 j.atEnd() == false; j++)
547 Peer *peer = j.getNode()->getValue();
554 void * Connection::Thread()
557 log_register_thread("Connection");
559 dout_con<<"Connection thread started"<<std::endl;
561 u32 curtime = porting::getTimeMs();
562 u32 lasttime = curtime;
566 BEGIN_DEBUG_EXCEPTION_HANDLER
569 curtime = porting::getTimeMs();
570 float dtime = (float)(curtime - lasttime) / 1000.;
578 while(m_command_queue.size() != 0){
579 ConnectionCommand c = m_command_queue.pop_front();
587 END_DEBUG_EXCEPTION_HANDLER(derr_con);
593 void Connection::putEvent(ConnectionEvent &e)
595 assert(e.type != CONNEVENT_NONE);
596 m_event_queue.push_back(e);
599 void Connection::processCommand(ConnectionCommand &c)
603 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
606 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
610 case CONNCMD_CONNECT:
611 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
614 case CONNCMD_DISCONNECT:
615 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
619 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
620 send(c.peer_id, c.channelnum, c.data, c.reliable);
622 case CONNCMD_SEND_TO_ALL:
623 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
624 sendToAll(c.channelnum, c.data, c.reliable);
626 case CONNCMD_DELETE_PEER:
627 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
628 deletePeer(c.peer_id, false);
633 void Connection::send(float dtime)
635 for(core::map<u16, Peer*>::Iterator
636 j = m_peers.getIterator();
637 j.atEnd() == false; j++)
639 Peer *peer = j.getNode()->getValue();
640 peer->m_sendtime_accu += dtime;
641 peer->m_num_sent = 0;
642 peer->m_max_num_sent = peer->m_sendtime_accu *
643 peer->m_max_packets_per_second;
645 Queue<OutgoingPacket> postponed_packets;
646 while(m_outgoing_queue.size() != 0){
647 OutgoingPacket packet = m_outgoing_queue.pop_front();
648 Peer *peer = getPeerNoEx(packet.peer_id);
651 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
652 postponed_packets.push_back(packet);
653 } else if(peer->m_num_sent < peer->m_max_num_sent){
654 rawSendAsPacket(packet.peer_id, packet.channelnum,
655 packet.data, packet.reliable);
658 postponed_packets.push_back(packet);
661 while(postponed_packets.size() != 0){
662 m_outgoing_queue.push_back(postponed_packets.pop_front());
664 for(core::map<u16, Peer*>::Iterator
665 j = m_peers.getIterator();
666 j.atEnd() == false; j++)
668 Peer *peer = j.getNode()->getValue();
669 peer->m_sendtime_accu -= (float)peer->m_num_sent /
670 peer->m_max_packets_per_second;
671 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
672 peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
676 // Receive packets from the network and buffers and create ConnectionEvents
677 void Connection::receive()
679 u32 datasize = m_max_packet_size * 2; // Double it just to be safe
680 // TODO: We can not know how many layers of header there are.
681 // For now, just assume there are no other than the base headers.
682 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
683 SharedBuffer<u8> packetdata(packet_maxsize);
685 bool single_wait_done = false;
690 /* Check if some buffer has relevant data */
693 SharedBuffer<u8> resultdata;
694 bool got = getFromBuffers(peer_id, resultdata);
697 e.dataReceived(peer_id, resultdata);
703 if(single_wait_done){
704 if(m_socket.WaitData(0) == false)
708 single_wait_done = true;
711 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
713 if(received_size < 0)
715 if(received_size < BASE_HEADER_SIZE)
717 if(readU32(&packetdata[0]) != m_protocol_id)
720 u16 peer_id = readPeerId(*packetdata);
721 u8 channelnum = readChannel(*packetdata);
722 if(channelnum > CHANNEL_COUNT-1){
724 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
725 throw InvalidIncomingDataException("Channel doesn't exist");
728 if(peer_id == PEER_ID_INEXISTENT)
731 Somebody is trying to send stuff to us with no peer id.
733 Check if the same address and port was added to our peer
735 Allow only entries that have has_sent_with_id==false.
738 core::map<u16, Peer*>::Iterator j;
739 j = m_peers.getIterator();
740 for(; j.atEnd() == false; j++)
742 Peer *peer = j.getNode()->getValue();
743 if(peer->has_sent_with_id)
745 if(peer->address == sender)
750 If no peer was found with the same address and port,
751 we shall assume it is a new peer and create an entry.
755 // Pass on to adding the peer
757 // Else: A peer was found.
760 Peer *peer = j.getNode()->getValue();
763 derr_con<<"WARNING: Assuming unknown peer to be "
764 <<"peer_id="<<peer_id<<std::endl;
769 The peer was not found in our lists. Add it.
771 if(peer_id == PEER_ID_INEXISTENT)
773 // Somebody wants to make a new connection
775 // Get a unique peer id (2 or higher)
778 Find an unused peer id
780 bool out_of_ids = false;
784 if(m_peers.find(peer_id_new) == NULL)
786 // Check for overflow
787 if(peer_id_new == 65535){
794 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
799 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
800 " giving peer_id="<<peer_id_new<<std::endl;
803 Peer *peer = new Peer(peer_id_new, sender);
804 m_peers.insert(peer->id, peer);
806 // Create peer addition event
808 e.peerAdded(peer_id_new, sender);
811 // Create CONTROL packet to tell the peer id to the new peer.
812 SharedBuffer<u8> reply(4);
813 writeU8(&reply[0], TYPE_CONTROL);
814 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
815 writeU16(&reply[2], peer_id_new);
816 sendAsPacket(peer_id_new, 0, reply, true);
818 // We're now talking to a valid peer_id
819 peer_id = peer_id_new;
821 // Go on and process whatever it sent
824 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
829 // This means that the peer id of the sender is not PEER_ID_INEXISTENT
830 // and it is invalid.
832 derr_con<<"Receive(): Peer not found"<<std::endl;
833 throw InvalidIncomingDataException("Peer not found (possible timeout)");
836 Peer *peer = node->getValue();
838 // Validate peer address
839 if(peer->address != sender)
842 derr_con<<"Peer "<<peer_id<<" sending from different address."
843 " Ignoring."<<std::endl;
847 peer->timeout_counter = 0.0;
849 Channel *channel = &(peer->channels[channelnum]);
851 // Throw the received packet to channel->processPacket()
853 // Make a new SharedBuffer from the data without the base headers
854 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
855 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
856 strippeddata.getSize());
859 // Process it (the result is some data with no headers made by us)
860 SharedBuffer<u8> resultdata = processPacket
861 (channel, strippeddata, peer_id, channelnum, false);
864 dout_con<<"ProcessPacket returned data of size "
865 <<resultdata.getSize()<<std::endl;
868 e.dataReceived(peer_id, resultdata);
871 }catch(ProcessedSilentlyException &e){
873 }catch(InvalidIncomingDataException &e){
875 catch(ProcessedSilentlyException &e){
880 void Connection::runTimeouts(float dtime)
882 core::list<u16> timeouted_peers;
883 core::map<u16, Peer*>::Iterator j;
884 j = m_peers.getIterator();
885 for(; j.atEnd() == false; j++)
887 Peer *peer = j.getNode()->getValue();
892 peer->timeout_counter += dtime;
893 if(peer->timeout_counter > m_timeout)
896 derr_con<<"RunTimeouts(): Peer "<<peer->id
898 <<" (source=peer->timeout_counter)"
900 // Add peer to the list
901 timeouted_peers.push_back(peer->id);
902 // Don't bother going through the buffers of this one
906 float resend_timeout = peer->resend_timeout;
907 for(u16 i=0; i<CHANNEL_COUNT; i++)
909 core::list<BufferedPacket> timed_outs;
910 core::list<BufferedPacket>::Iterator j;
912 Channel *channel = &peer->channels[i];
914 // Remove timed out incomplete unreliable split packets
915 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
917 // Increment reliable packet times
918 channel->outgoing_reliables.incrementTimeouts(dtime);
920 // Check reliable packet total times, remove peer if
922 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
925 derr_con<<"RunTimeouts(): Peer "<<peer->id
927 <<" (source=reliable packet totaltime)"
929 // Add peer to the to-be-removed list
930 timeouted_peers.push_back(peer->id);
934 // Re-send timed out outgoing reliables
936 timed_outs = channel->
937 outgoing_reliables.getTimedOuts(resend_timeout);
939 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
941 j = timed_outs.begin();
942 for(; j != timed_outs.end(); j++)
944 u16 peer_id = readPeerId(*(j->data));
945 u8 channel = readChannel(*(j->data));
946 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
949 derr_con<<"RE-SENDING timed-out RELIABLE to ";
950 j->address.print(&derr_con);
951 derr_con<<"(t/o="<<resend_timeout<<"): "
952 <<"from_peer_id="<<peer_id
953 <<", channel="<<((int)channel&0xff)
954 <<", seqnum="<<seqnum
959 // Enlarge avg_rtt and resend_timeout:
960 // The rtt will be at least the timeout.
961 // NOTE: This won't affect the timeout of the next
962 // checked channel because it was cached.
963 peer->reportRTT(resend_timeout);
970 peer->ping_timer += dtime;
971 if(peer->ping_timer >= 5.0)
973 // Create and send PING packet
974 SharedBuffer<u8> data(2);
975 writeU8(&data[0], TYPE_CONTROL);
976 writeU8(&data[1], CONTROLTYPE_PING);
977 rawSendAsPacket(peer->id, 0, data, true);
979 peer->ping_timer = 0.0;
986 // Remove timed out peers
987 core::list<u16>::Iterator i = timeouted_peers.begin();
988 for(; i != timeouted_peers.end(); i++)
991 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
992 deletePeer(*i, true);
996 void Connection::serve(u16 port)
998 dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
1000 m_socket.Bind(port);
1001 m_peer_id = PEER_ID_SERVER;
1003 catch(SocketException &e){
1011 void Connection::connect(Address address)
1013 dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1014 <<":"<<address.getPort()<<std::endl;
1016 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1018 throw ConnectionException("Already connected to a server");
1021 Peer *peer = new Peer(PEER_ID_SERVER, address);
1022 m_peers.insert(peer->id, peer);
1026 e.peerAdded(peer->id, peer->address);
1031 // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1032 m_peer_id = PEER_ID_INEXISTENT;
1033 SharedBuffer<u8> data(0);
1034 Send(PEER_ID_SERVER, 0, data, true);
1037 void Connection::disconnect()
1039 dout_con<<getDesc()<<" disconnecting"<<std::endl;
1041 // Create and send DISCO packet
1042 SharedBuffer<u8> data(2);
1043 writeU8(&data[0], TYPE_CONTROL);
1044 writeU8(&data[1], CONTROLTYPE_DISCO);
1047 core::map<u16, Peer*>::Iterator j;
1048 j = m_peers.getIterator();
1049 for(; j.atEnd() == false; j++)
1051 Peer *peer = j.getNode()->getValue();
1052 rawSendAsPacket(peer->id, 0, data, false);
1056 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1058 core::map<u16, Peer*>::Iterator j;
1059 j = m_peers.getIterator();
1060 for(; j.atEnd() == false; j++)
1062 Peer *peer = j.getNode()->getValue();
1063 send(peer->id, channelnum, data, reliable);
1067 void Connection::send(u16 peer_id, u8 channelnum,
1068 SharedBuffer<u8> data, bool reliable)
1070 dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1072 assert(channelnum < CHANNEL_COUNT);
1074 Peer *peer = getPeerNoEx(peer_id);
1077 Channel *channel = &(peer->channels[channelnum]);
1079 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1081 chunksize_max -= RELIABLE_HEADER_SIZE;
1083 core::list<SharedBuffer<u8> > originals;
1084 originals = makeAutoSplitPacket(data, chunksize_max,
1085 channel->next_outgoing_split_seqnum);
1087 core::list<SharedBuffer<u8> >::Iterator i;
1088 i = originals.begin();
1089 for(; i != originals.end(); i++)
1091 SharedBuffer<u8> original = *i;
1093 sendAsPacket(peer_id, channelnum, original, reliable);
1097 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1098 SharedBuffer<u8> data, bool reliable)
1100 OutgoingPacket packet(peer_id, channelnum, data, reliable);
1101 m_outgoing_queue.push_back(packet);
1104 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1105 SharedBuffer<u8> data, bool reliable)
1107 Peer *peer = getPeerNoEx(peer_id);
1110 Channel *channel = &(peer->channels[channelnum]);
1114 u16 seqnum = channel->next_outgoing_seqnum;
1115 channel->next_outgoing_seqnum++;
1117 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1119 // Add base headers and make a packet
1120 BufferedPacket p = makePacket(peer->address, reliable,
1121 m_protocol_id, m_peer_id, channelnum);
1124 // Buffer the packet
1125 channel->outgoing_reliables.insert(p);
1127 catch(AlreadyExistsException &e)
1129 PrintInfo(derr_con);
1130 derr_con<<"WARNING: Going to send a reliable packet "
1131 "seqnum="<<seqnum<<" that is already "
1132 "in outgoing buffer"<<std::endl;
1141 // Add base headers and make a packet
1142 BufferedPacket p = makePacket(peer->address, data,
1143 m_protocol_id, m_peer_id, channelnum);
1150 void Connection::rawSend(const BufferedPacket &packet)
1153 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1154 } catch(SendFailedException &e){
1155 derr_con<<"Connection::rawSend(): SendFailedException: "
1156 <<packet.address.serializeString()<<std::endl;
1160 Peer* Connection::getPeer(u16 peer_id)
1162 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1165 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1169 assert(node->getValue()->id == peer_id);
1171 return node->getValue();
1174 Peer* Connection::getPeerNoEx(u16 peer_id)
1176 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1183 assert(node->getValue()->id == peer_id);
1185 return node->getValue();
1188 core::list<Peer*> Connection::getPeers()
1190 core::list<Peer*> list;
1191 core::map<u16, Peer*>::Iterator j;
1192 j = m_peers.getIterator();
1193 for(; j.atEnd() == false; j++)
1195 Peer *peer = j.getNode()->getValue();
1196 list.push_back(peer);
1201 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1203 core::map<u16, Peer*>::Iterator j;
1204 j = m_peers.getIterator();
1205 for(; j.atEnd() == false; j++)
1207 Peer *peer = j.getNode()->getValue();
1208 for(u16 i=0; i<CHANNEL_COUNT; i++)
1210 Channel *channel = &peer->channels[i];
1211 SharedBuffer<u8> resultdata;
1212 bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1222 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1223 SharedBuffer<u8> &dst)
1225 u16 firstseqnum = 0;
1226 // Clear old packets from start of buffer
1229 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1230 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1231 channel->incoming_reliables.popFirst();
1235 // This happens if all packets are old
1236 }catch(con::NotFoundException)
1239 if(channel->incoming_reliables.empty() == false)
1241 if(firstseqnum == channel->next_incoming_seqnum)
1243 BufferedPacket p = channel->incoming_reliables.popFirst();
1245 peer_id = readPeerId(*p.data);
1246 u8 channelnum = readChannel(*p.data);
1247 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1250 dout_con<<"UNBUFFERING TYPE_RELIABLE"
1251 <<" seqnum="<<seqnum
1252 <<" peer_id="<<peer_id
1253 <<" channel="<<((int)channelnum&0xff)
1256 channel->next_incoming_seqnum++;
1258 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1259 // Get out the inside packet and re-process it
1260 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1261 memcpy(*payload, &p.data[headers_size], payload.getSize());
1263 dst = processPacket(channel, payload, peer_id, channelnum, true);
1270 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1271 SharedBuffer<u8> packetdata, u16 peer_id,
1272 u8 channelnum, bool reliable)
1274 IndentationRaiser iraiser(&(m_indentation));
1276 if(packetdata.getSize() < 1)
1277 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1279 u8 type = readU8(&packetdata[0]);
1281 if(type == TYPE_CONTROL)
1283 if(packetdata.getSize() < 2)
1284 throw InvalidIncomingDataException("packetdata.getSize() < 2");
1286 u8 controltype = readU8(&packetdata[1]);
1288 if(controltype == CONTROLTYPE_ACK)
1290 if(packetdata.getSize() < 4)
1291 throw InvalidIncomingDataException
1292 ("packetdata.getSize() < 4 (ACK header size)");
1294 u16 seqnum = readU16(&packetdata[2]);
1296 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1297 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1298 <<", seqnum="<<seqnum<<std::endl;
1301 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1302 // Get round trip time
1303 float rtt = p.totaltime;
1305 // Let peer calculate stuff according to it
1306 // (avg_rtt and resend_timeout)
1307 Peer *peer = getPeer(peer_id);
1308 peer->reportRTT(rtt);
1310 //PrintInfo(dout_con);
1311 //dout_con<<"RTT = "<<rtt<<std::endl;
1313 /*dout_con<<"OUTGOING: ";
1315 channel->outgoing_reliables.print();
1316 dout_con<<std::endl;*/
1318 catch(NotFoundException &e){
1319 PrintInfo(derr_con);
1320 derr_con<<"WARNING: ACKed packet not "
1325 throw ProcessedSilentlyException("Got an ACK");
1327 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1329 if(packetdata.getSize() < 4)
1330 throw InvalidIncomingDataException
1331 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1332 u16 peer_id_new = readU16(&packetdata[2]);
1334 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1336 if(GetPeerID() != PEER_ID_INEXISTENT)
1338 PrintInfo(derr_con);
1339 derr_con<<"WARNING: Not changing"
1340 " existing peer id."<<std::endl;
1344 dout_con<<"changing."<<std::endl;
1345 SetPeerID(peer_id_new);
1347 throw ProcessedSilentlyException("Got a SET_PEER_ID");
1349 else if(controltype == CONTROLTYPE_PING)
1351 // Just ignore it, the incoming data already reset
1352 // the timeout counter
1354 dout_con<<"PING"<<std::endl;
1355 throw ProcessedSilentlyException("Got a PING");
1357 else if(controltype == CONTROLTYPE_DISCO)
1359 // Just ignore it, the incoming data already reset
1360 // the timeout counter
1362 dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1364 if(deletePeer(peer_id, false) == false)
1366 PrintInfo(derr_con);
1367 derr_con<<"DISCO: Peer not found"<<std::endl;
1370 throw ProcessedSilentlyException("Got a DISCO");
1373 PrintInfo(derr_con);
1374 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1375 <<((int)controltype&0xff)<<std::endl;
1376 throw InvalidIncomingDataException("Invalid control type");
1379 else if(type == TYPE_ORIGINAL)
1381 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1382 throw InvalidIncomingDataException
1383 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1385 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1387 // Get the inside packet out and return it
1388 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1389 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1392 else if(type == TYPE_SPLIT)
1394 // We have to create a packet again for buffering
1395 // This isn't actually too bad an idea.
1396 BufferedPacket packet = makePacket(
1397 getPeer(peer_id)->address,
1402 // Buffer the packet
1403 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1404 if(data.getSize() != 0)
1407 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1408 <<"size="<<data.getSize()<<std::endl;
1412 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1413 throw ProcessedSilentlyException("Buffered a split packet chunk");
1415 else if(type == TYPE_RELIABLE)
1417 // Recursive reliable packets not allowed
1418 assert(reliable == false);
1420 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1421 throw InvalidIncomingDataException
1422 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1424 u16 seqnum = readU16(&packetdata[1]);
1426 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1427 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1430 if(is_future_packet)
1431 dout_con<<"BUFFERING";
1432 else if(is_old_packet)
1436 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1437 <<" next="<<channel->next_incoming_seqnum;
1438 dout_con<<" [sending CONTROLTYPE_ACK"
1439 " to peer_id="<<peer_id<<"]";
1440 dout_con<<std::endl;
1443 //assert(channel->incoming_reliables.size() < 100);
1445 // Send a CONTROLTYPE_ACK
1446 SharedBuffer<u8> reply(4);
1447 writeU8(&reply[0], TYPE_CONTROL);
1448 writeU8(&reply[1], CONTROLTYPE_ACK);
1449 writeU16(&reply[2], seqnum);
1450 rawSendAsPacket(peer_id, channelnum, reply, false);
1452 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1453 if(is_future_packet)
1456 dout_con<<"Buffering reliable packet (seqnum="
1457 <<seqnum<<")"<<std::endl;*/
1459 // This one comes later, buffer it.
1460 // Actually we have to make a packet to buffer one.
1461 // Well, we have all the ingredients, so just do it.
1462 BufferedPacket packet = makePacket(
1463 getPeer(peer_id)->address,
1469 channel->incoming_reliables.insert(packet);
1472 dout_con<<"INCOMING: ";
1473 channel->incoming_reliables.print();
1474 dout_con<<std::endl;*/
1476 catch(AlreadyExistsException &e)
1480 throw ProcessedSilentlyException("Buffered future reliable packet");
1482 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1483 else if(is_old_packet)
1485 // An old packet, dump it
1486 throw InvalidIncomingDataException("Got an old reliable packet");
1489 channel->next_incoming_seqnum++;
1491 // Get out the inside packet and re-process it
1492 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1493 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1495 return processPacket(channel, payload, peer_id, channelnum, true);
1499 PrintInfo(derr_con);
1500 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1501 throw InvalidIncomingDataException("Invalid packet type");
1504 // We should never get here.
1505 // If you get here, add an exception or a return to some of the
1506 // above conditionals.
1508 throw BaseException("Error in Channel::ProcessPacket()");
1511 bool Connection::deletePeer(u16 peer_id, bool timeout)
1513 if(m_peers.find(peer_id) == NULL)
1516 Peer *peer = m_peers[peer_id];
1520 e.peerRemoved(peer_id, timeout, peer->address);
1523 delete m_peers[peer_id];
1524 m_peers.remove(peer_id);
1530 ConnectionEvent Connection::getEvent()
1532 if(m_event_queue.size() == 0){
1534 e.type = CONNEVENT_NONE;
1537 return m_event_queue.pop_front();
1540 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1543 return m_event_queue.pop_front(timeout_ms);
1544 } catch(ItemNotFoundException &ex){
1546 e.type = CONNEVENT_NONE;
1551 void Connection::putCommand(ConnectionCommand &c)
1553 m_command_queue.push_back(c);
1556 void Connection::Serve(unsigned short port)
1558 ConnectionCommand c;
1563 void Connection::Connect(Address address)
1565 ConnectionCommand c;
1570 bool Connection::Connected()
1572 JMutexAutoLock peerlock(m_peers_mutex);
1574 if(m_peers.size() != 1)
1577 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1581 if(m_peer_id == PEER_ID_INEXISTENT)
1587 void Connection::Disconnect()
1589 ConnectionCommand c;
1594 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1597 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1598 if(e.type != CONNEVENT_NONE)
1599 dout_con<<getDesc()<<": Receive: got event: "
1600 <<e.describe()<<std::endl;
1602 case CONNEVENT_NONE:
1603 throw NoIncomingDataException("No incoming data");
1604 case CONNEVENT_DATA_RECEIVED:
1605 peer_id = e.peer_id;
1606 data = SharedBuffer<u8>(e.data);
1607 return e.data.getSize();
1608 case CONNEVENT_PEER_ADDED: {
1609 Peer tmp(e.peer_id, e.address);
1610 if(m_bc_peerhandler)
1611 m_bc_peerhandler->peerAdded(&tmp);
1613 case CONNEVENT_PEER_REMOVED: {
1614 Peer tmp(e.peer_id, e.address);
1615 if(m_bc_peerhandler)
1616 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1618 case CONNEVENT_BIND_FAILED:
1619 throw ConnectionBindFailed("Failed to bind socket "
1620 "(port already in use?)");
1623 throw NoIncomingDataException("No incoming data");
1626 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1628 assert(channelnum < CHANNEL_COUNT);
1630 ConnectionCommand c;
1631 c.sendToAll(channelnum, data, reliable);
1635 void Connection::Send(u16 peer_id, u8 channelnum,
1636 SharedBuffer<u8> data, bool reliable)
1638 assert(channelnum < CHANNEL_COUNT);
1640 ConnectionCommand c;
1641 c.send(peer_id, channelnum, data, reliable);
1645 void Connection::RunTimeouts(float dtime)
1650 Address Connection::GetPeerAddress(u16 peer_id)
1652 JMutexAutoLock peerlock(m_peers_mutex);
1653 return getPeer(peer_id)->address;
1656 float Connection::GetPeerAvgRTT(u16 peer_id)
1658 JMutexAutoLock peerlock(m_peers_mutex);
1659 return getPeer(peer_id)->avg_rtt;
1662 void Connection::DeletePeer(u16 peer_id)
1664 ConnectionCommand c;
1665 c.deletePeer(peer_id);
1669 void Connection::PrintInfo(std::ostream &out)
1671 out<<getDesc()<<": ";
1674 void Connection::PrintInfo()
1676 PrintInfo(dout_con);
1679 std::string Connection::getDesc()
1681 return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";