3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
27 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
28 u32 protocol_id, u16 sender_peer_id, u8 channel)
30 u32 packet_size = datasize + BASE_HEADER_SIZE;
31 BufferedPacket p(packet_size);
34 writeU32(&p.data[0], protocol_id);
35 writeU16(&p.data[4], sender_peer_id);
36 writeU8(&p.data[6], channel);
38 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
43 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
44 u32 protocol_id, u16 sender_peer_id, u8 channel)
46 return makePacket(address, *data, data.getSize(),
47 protocol_id, sender_peer_id, channel);
50 SharedBuffer<u8> makeOriginalPacket(
51 SharedBuffer<u8> data)
54 u32 packet_size = data.getSize() + header_size;
55 SharedBuffer<u8> b(packet_size);
57 writeU8(&b[0], TYPE_ORIGINAL);
59 memcpy(&b[header_size], *data, data.getSize());
64 core::list<SharedBuffer<u8> > makeSplitPacket(
65 SharedBuffer<u8> data,
69 // Chunk packets, containing the TYPE_SPLIT header
70 core::list<SharedBuffer<u8> > chunks;
72 u32 chunk_header_size = 7;
73 u32 maximum_data_size = chunksize_max - chunk_header_size;
78 end = start + maximum_data_size - 1;
79 if(end > data.getSize() - 1)
80 end = data.getSize() - 1;
82 u32 payload_size = end - start + 1;
83 u32 packet_size = chunk_header_size + payload_size;
85 SharedBuffer<u8> chunk(packet_size);
87 writeU8(&chunk[0], TYPE_SPLIT);
88 writeU16(&chunk[1], seqnum);
89 // [3] u16 chunk_count is written at next stage
90 writeU16(&chunk[5], chunk_num);
91 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
93 chunks.push_back(chunk);
98 while(end != data.getSize() - 1);
100 u16 chunk_count = chunks.getSize();
102 core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
103 for(; i != chunks.end(); i++)
106 writeU16(&((*i)[3]), chunk_count);
112 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
113 SharedBuffer<u8> data,
117 u32 original_header_size = 1;
118 core::list<SharedBuffer<u8> > list;
119 if(data.getSize() + original_header_size > chunksize_max)
121 list = makeSplitPacket(data, chunksize_max, split_seqnum);
127 list.push_back(makeOriginalPacket(data));
132 SharedBuffer<u8> makeReliablePacket(
133 SharedBuffer<u8> data,
136 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
137 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
138 <<((unsigned int)data[0]&0xff)<<std::endl;*/
140 u32 packet_size = data.getSize() + header_size;
141 SharedBuffer<u8> b(packet_size);
143 writeU8(&b[0], TYPE_RELIABLE);
144 writeU16(&b[1], seqnum);
146 memcpy(&b[header_size], *data, data.getSize());
148 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
149 <<((unsigned int)data[0]&0xff)<<std::endl;*/
150 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
158 void ReliablePacketBuffer::print()
160 core::list<BufferedPacket>::Iterator i;
162 for(; i != m_list.end(); i++)
164 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
168 bool ReliablePacketBuffer::empty()
170 return m_list.empty();
172 u32 ReliablePacketBuffer::size()
174 return m_list.getSize();
176 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
178 core::list<BufferedPacket>::Iterator i;
180 for(; i != m_list.end(); i++)
182 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
183 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
184 <<", comparing to s="<<s<<std::endl;*/
190 RPBSearchResult ReliablePacketBuffer::notFound()
194 u16 ReliablePacketBuffer::getFirstSeqnum()
197 throw NotFoundException("Buffer is empty");
198 BufferedPacket p = *m_list.begin();
199 return readU16(&p.data[BASE_HEADER_SIZE+1]);
201 BufferedPacket ReliablePacketBuffer::popFirst()
204 throw NotFoundException("Buffer is empty");
205 BufferedPacket p = *m_list.begin();
206 core::list<BufferedPacket>::Iterator i = m_list.begin();
210 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
212 RPBSearchResult r = findPacket(seqnum);
214 dout_con<<"Not found"<<std::endl;
215 throw NotFoundException("seqnum not found in buffer");
217 BufferedPacket p = *r;
221 void ReliablePacketBuffer::insert(BufferedPacket &p)
223 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
224 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
225 assert(type == TYPE_RELIABLE);
226 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
228 // Find the right place for the packet and insert it there
230 // If list is empty, just add it
237 // Otherwise find the right place
238 core::list<BufferedPacket>::Iterator i;
240 // Find the first packet in the list which has a higher seqnum
241 for(; i != m_list.end(); i++){
242 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
244 throw AlreadyExistsException("Same seqnum in list");
246 if(seqnum_higher(s, seqnum)){
250 // If we're at the end of the list, add the packet to the
252 if(i == m_list.end())
259 m_list.insert_before(i, p);
262 void ReliablePacketBuffer::incrementTimeouts(float dtime)
264 core::list<BufferedPacket>::Iterator i;
266 for(; i != m_list.end(); i++){
268 i->totaltime += dtime;
272 void ReliablePacketBuffer::resetTimedOuts(float timeout)
274 core::list<BufferedPacket>::Iterator i;
276 for(; i != m_list.end(); i++){
277 if(i->time >= timeout)
282 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
284 core::list<BufferedPacket>::Iterator i;
286 for(; i != m_list.end(); i++){
287 if(i->totaltime >= timeout)
293 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
295 core::list<BufferedPacket> timed_outs;
296 core::list<BufferedPacket>::Iterator i;
298 for(; i != m_list.end(); i++)
300 if(i->time >= timeout)
301 timed_outs.push_back(*i);
310 IncomingSplitBuffer::~IncomingSplitBuffer()
312 core::map<u16, IncomingSplitPacket*>::Iterator i;
313 i = m_buf.getIterator();
314 for(; i.atEnd() == false; i++)
316 delete i.getNode()->getValue();
320 This will throw a GotSplitPacketException when a full
321 split packet is constructed.
323 void IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
325 u32 headersize = BASE_HEADER_SIZE + 7;
326 assert(p.data.getSize() >= headersize);
327 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
328 assert(type == TYPE_SPLIT);
329 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
330 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
331 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
333 // Add if doesn't exist
334 if(m_buf.find(seqnum) == NULL)
336 IncomingSplitPacket *sp = new IncomingSplitPacket();
337 sp->chunk_count = chunk_count;
338 sp->reliable = reliable;
342 IncomingSplitPacket *sp = m_buf[seqnum];
344 // TODO: These errors should be thrown or something? Dunno.
345 if(chunk_count != sp->chunk_count)
346 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
347 <<" != sp->chunk_count="<<sp->chunk_count
349 if(reliable != sp->reliable)
350 derr_con<<"Connection: WARNING: reliable="<<reliable
351 <<" != sp->reliable="<<sp->reliable
354 // If chunk already exists, cancel
355 if(sp->chunks.find(chunk_num) != NULL)
356 throw AlreadyExistsException("Chunk already in buffer");
358 // Cut chunk data out of packet
359 u32 chunkdatasize = p.data.getSize() - headersize;
360 SharedBuffer<u8> chunkdata(chunkdatasize);
361 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
363 // Set chunk data in buffer
364 sp->chunks[chunk_num] = chunkdata;
366 // If not all chunks are received, return
367 if(sp->allReceived() == false)
370 // Calculate total size
372 core::map<u16, SharedBuffer<u8> >::Iterator i;
373 i = sp->chunks.getIterator();
374 for(; i.atEnd() == false; i++)
376 totalsize += i.getNode()->getValue().getSize();
379 SharedBuffer<u8> fulldata(totalsize);
381 // Copy chunks to data buffer
383 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
386 SharedBuffer<u8> buf = sp->chunks[chunk_i];
387 u16 chunkdatasize = buf.getSize();
388 memcpy(&fulldata[start], *buf, chunkdatasize);
389 start += chunkdatasize;;
392 // Remove sp from buffer
393 m_buf.remove(seqnum);
396 throw GotSplitPacketException(fulldata);
398 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
400 core::list<u16> remove_queue;
401 core::map<u16, IncomingSplitPacket*>::Iterator i;
402 i = m_buf.getIterator();
403 for(; i.atEnd() == false; i++)
405 IncomingSplitPacket *p = i.getNode()->getValue();
406 // Reliable ones are not removed by timeout
407 if(p->reliable == true)
410 if(p->time >= timeout)
411 remove_queue.push_back(i.getNode()->getKey());
413 core::list<u16>::Iterator j;
414 j = remove_queue.begin();
415 for(; j != remove_queue.end(); j++)
417 dout_con<<"NOTE: Removing timed out unreliable split packet"
430 next_outgoing_seqnum = SEQNUM_INITIAL;
431 next_incoming_seqnum = SEQNUM_INITIAL;
432 next_outgoing_split_seqnum = SEQNUM_INITIAL;
442 Peer::Peer(u16 a_id, Address a_address)
446 timeout_counter = 0.0;
447 //resend_timeout = RESEND_TIMEOUT_MINIMUM;
449 resend_timeout = 0.5;
451 has_sent_with_id = false;
457 void Peer::reportRTT(float rtt)
461 else if(avg_rtt < 0.0)
464 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
466 // Calculate resend_timeout
468 /*int reliable_count = 0;
469 for(int i=0; i<CHANNEL_COUNT; i++)
471 reliable_count += channels[i].outgoing_reliables.size();
473 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
474 * ((float)reliable_count * 1);*/
476 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
477 if(timeout < RESEND_TIMEOUT_MIN)
478 timeout = RESEND_TIMEOUT_MIN;
479 if(timeout > RESEND_TIMEOUT_MAX)
480 timeout = RESEND_TIMEOUT_MAX;
481 resend_timeout = timeout;
488 Connection::Connection(
492 PeerHandler *peerhandler
495 assert(peerhandler != NULL);
497 m_protocol_id = protocol_id;
498 m_max_packet_size = max_packet_size;
500 m_peer_id = PEER_ID_NEW;
501 //m_waiting_new_peer_id = false;
503 m_peerhandler = peerhandler;
506 Connection::~Connection()
509 core::map<u16, Peer*>::Iterator j;
510 j = m_peers.getIterator();
511 for(; j.atEnd() == false; j++)
513 Peer *peer = j.getNode()->getValue();
518 void Connection::Serve(unsigned short port)
521 m_peer_id = PEER_ID_SERVER;
524 void Connection::Connect(Address address)
526 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
528 throw ConnectionException("Already connected to a server");
531 Peer *peer = new Peer(PEER_ID_SERVER, address);
532 m_peers.insert(peer->id, peer);
533 m_peerhandler->peerAdded(peer);
537 // Send a dummy packet to server with peer_id = PEER_ID_NEW
538 m_peer_id = PEER_ID_NEW;
539 SharedBuffer<u8> data(0);
540 Send(PEER_ID_SERVER, 0, data, true);
542 //m_waiting_new_peer_id = true;
545 bool Connection::Connected()
547 if(m_peers.size() != 1)
550 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
554 if(m_peer_id == PEER_ID_NEW)
560 SharedBuffer<u8> Channel::ProcessPacket(
561 SharedBuffer<u8> packetdata,
567 IndentationRaiser iraiser(&(con->m_indentation));
569 if(packetdata.getSize() < 1)
570 throw InvalidIncomingDataException("packetdata.getSize() < 1");
572 u8 type = readU8(&packetdata[0]);
574 if(type == TYPE_CONTROL)
576 if(packetdata.getSize() < 2)
577 throw InvalidIncomingDataException("packetdata.getSize() < 2");
579 u8 controltype = readU8(&packetdata[1]);
581 if(controltype == CONTROLTYPE_ACK)
583 if(packetdata.getSize() < 4)
584 throw InvalidIncomingDataException
585 ("packetdata.getSize() < 4 (ACK header size)");
587 u16 seqnum = readU16(&packetdata[2]);
589 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
590 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
591 <<", seqnum="<<seqnum<<std::endl;
594 BufferedPacket p = outgoing_reliables.popSeqnum(seqnum);
595 // Get round trip time
596 float rtt = p.totaltime;
598 // Let peer calculate stuff according to it
599 // (avg_rtt and resend_timeout)
600 Peer *peer = con->GetPeer(peer_id);
601 peer->reportRTT(rtt);
603 //con->PrintInfo(dout_con);
604 //dout_con<<"RTT = "<<rtt<<std::endl;
606 /*dout_con<<"OUTGOING: ";
608 outgoing_reliables.print();
609 dout_con<<std::endl;*/
611 catch(NotFoundException &e){
612 con->PrintInfo(derr_con);
613 derr_con<<"WARNING: ACKed packet not "
618 throw ProcessedSilentlyException("Got an ACK");
620 else if(controltype == CONTROLTYPE_SET_PEER_ID)
622 if(packetdata.getSize() < 4)
623 throw InvalidIncomingDataException
624 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
625 u16 peer_id_new = readU16(&packetdata[2]);
627 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
629 if(con->GetPeerID() != PEER_ID_NEW)
631 con->PrintInfo(derr_con);
632 derr_con<<"WARNING: Not changing"
633 " existing peer id."<<std::endl;
637 dout_con<<"changing."<<std::endl;
638 con->SetPeerID(peer_id_new);
640 throw ProcessedSilentlyException("Got a SET_PEER_ID");
642 else if(controltype == CONTROLTYPE_PING)
644 // Just ignore it, the incoming data already reset
645 // the timeout counter
647 dout_con<<"PING"<<std::endl;
648 throw ProcessedSilentlyException("Got a SET_PEER_ID");
651 con->PrintInfo(derr_con);
652 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
653 <<((int)controltype&0xff)<<std::endl;
654 throw InvalidIncomingDataException("Invalid control type");
657 else if(type == TYPE_ORIGINAL)
659 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
660 throw InvalidIncomingDataException
661 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
663 dout_con<<"RETURNING TYPE_ORIGINAL to user"
665 // Get the inside packet out and return it
666 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
667 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
670 else if(type == TYPE_SPLIT)
672 // We have to create a packet again for buffering
673 // This isn't actually too bad an idea.
674 BufferedPacket packet = makePacket(
675 con->GetPeer(peer_id)->address,
677 con->GetProtocolID(),
682 incoming_splits.insert(packet, reliable);
684 // This exception happens when all the pieces of a packet
686 catch(GotSplitPacketException &e)
689 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
690 <<"size="<<e.getData().getSize()<<std::endl;
694 dout_con<<"BUFFERING TYPE_SPLIT"<<std::endl;
695 throw ProcessedSilentlyException("Buffered a split packet chunk");
697 else if(type == TYPE_RELIABLE)
699 // Recursive reliable packets not allowed
700 assert(reliable == false);
702 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
703 throw InvalidIncomingDataException
704 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
706 u16 seqnum = readU16(&packetdata[1]);
708 bool is_future_packet = seqnum_higher(seqnum, next_incoming_seqnum);
709 bool is_old_packet = seqnum_higher(next_incoming_seqnum, seqnum);
713 dout_con<<"BUFFERING";
714 else if(is_old_packet)
718 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
719 <<" next="<<next_incoming_seqnum;
720 dout_con<<" [sending CONTROLTYPE_ACK"
721 " to peer_id="<<peer_id<<"]";
725 //assert(incoming_reliables.size() < 100);
727 // Send a CONTROLTYPE_ACK
728 SharedBuffer<u8> reply(4);
729 writeU8(&reply[0], TYPE_CONTROL);
730 writeU8(&reply[1], CONTROLTYPE_ACK);
731 writeU16(&reply[2], seqnum);
732 con->SendAsPacket(peer_id, channelnum, reply, false);
734 //if(seqnum_higher(seqnum, next_incoming_seqnum))
738 dout_con<<"Buffering reliable packet (seqnum="
739 <<seqnum<<")"<<std::endl;*/
741 // This one comes later, buffer it.
742 // Actually we have to make a packet to buffer one.
743 // Well, we have all the ingredients, so just do it.
744 BufferedPacket packet = makePacket(
745 con->GetPeer(peer_id)->address,
747 con->GetProtocolID(),
751 incoming_reliables.insert(packet);
754 dout_con<<"INCOMING: ";
755 incoming_reliables.print();
756 dout_con<<std::endl;*/
758 catch(AlreadyExistsException &e)
762 throw ProcessedSilentlyException("Buffered future reliable packet");
764 //else if(seqnum_higher(next_incoming_seqnum, seqnum))
765 else if(is_old_packet)
767 // An old packet, dump it
768 throw InvalidIncomingDataException("Got an old reliable packet");
771 next_incoming_seqnum++;
773 // Get out the inside packet and re-process it
774 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
775 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
777 return ProcessPacket(payload, con, peer_id, channelnum, true);
781 con->PrintInfo(derr_con);
782 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
783 throw InvalidIncomingDataException("Invalid packet type");
786 // We should never get here.
787 // If you get here, add an exception or a return to some of the
788 // above conditionals.
790 throw BaseException("Error in Channel::ProcessPacket()");
793 SharedBuffer<u8> Channel::CheckIncomingBuffers(Connection *con,
797 // Clear old packets from start of buffer
800 firstseqnum = incoming_reliables.getFirstSeqnum();
801 if(seqnum_higher(next_incoming_seqnum, firstseqnum))
802 incoming_reliables.popFirst();
806 // This happens if all packets are old
807 }catch(con::NotFoundException)
810 if(incoming_reliables.empty() == false)
812 if(firstseqnum == next_incoming_seqnum)
814 BufferedPacket p = incoming_reliables.popFirst();
816 peer_id = readPeerId(*p.data);
817 u8 channelnum = readChannel(*p.data);
818 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
821 dout_con<<"UNBUFFERING TYPE_RELIABLE"
823 <<" peer_id="<<peer_id
824 <<" channel="<<((int)channelnum&0xff)
827 next_incoming_seqnum++;
829 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
830 // Get out the inside packet and re-process it
831 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
832 memcpy(*payload, &p.data[headers_size], payload.getSize());
834 return ProcessPacket(payload, con, peer_id, channelnum, true);
838 throw NoIncomingDataException("No relevant data in buffers");
841 SharedBuffer<u8> Connection::GetFromBuffers(u16 &peer_id)
843 core::map<u16, Peer*>::Iterator j;
844 j = m_peers.getIterator();
845 for(; j.atEnd() == false; j++)
847 Peer *peer = j.getNode()->getValue();
848 for(u16 i=0; i<CHANNEL_COUNT; i++)
850 Channel *channel = &peer->channels[i];
852 SharedBuffer<u8> resultdata = channel->CheckIncomingBuffers
857 catch(NoIncomingDataException &e)
860 catch(InvalidIncomingDataException &e)
863 catch(ProcessedSilentlyException &e)
868 throw NoIncomingDataException("No relevant data in buffers");
871 u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
874 Receive a packet from the network
877 // TODO: We can not know how many layers of header there are.
878 // For now, just assume there are no other than the base headers.
879 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
880 Buffer<u8> packetdata(packet_maxsize);
887 Check if some buffer has relevant data
890 SharedBuffer<u8> resultdata = GetFromBuffers(peer_id);
892 if(datasize < resultdata.getSize())
893 throw InvalidIncomingDataException
894 ("Buffer too small for received data");
896 memcpy(data, *resultdata, resultdata.getSize());
897 return resultdata.getSize();
899 catch(NoIncomingDataException &e)
905 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
907 if(received_size < 0)
908 throw NoIncomingDataException("No incoming data");
909 if(received_size < BASE_HEADER_SIZE)
910 throw InvalidIncomingDataException("No full header received");
911 if(readU32(&packetdata[0]) != m_protocol_id)
912 throw InvalidIncomingDataException("Invalid protocol id");
914 peer_id = readPeerId(*packetdata);
915 u8 channelnum = readChannel(*packetdata);
916 if(channelnum > CHANNEL_COUNT-1){
918 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
919 throw InvalidIncomingDataException("Channel doesn't exist");
922 if(peer_id == PEER_ID_NEW)
925 Somebody is trying to send stuff to us with no peer id.
927 Check if the same address and port was added to our peer
929 Allow only entries that have has_sent_with_id==false.
932 core::map<u16, Peer*>::Iterator j;
933 j = m_peers.getIterator();
934 for(; j.atEnd() == false; j++)
936 Peer *peer = j.getNode()->getValue();
937 if(peer->has_sent_with_id)
939 if(peer->address == sender)
944 If no peer was found with the same address and port,
945 we shall assume it is a new peer and create an entry.
949 // Pass on to adding the peer
951 // Else: A peer was found.
954 Peer *peer = j.getNode()->getValue();
957 derr_con<<"WARNING: Assuming unknown peer to be "
958 <<"peer_id="<<peer_id<<std::endl;
963 The peer was not found in our lists. Add it.
965 if(peer_id == PEER_ID_NEW)
967 // Somebody wants to make a new connection
969 // Get a unique peer id (2 or higher)
972 Find an unused peer id
977 if(m_peers.find(peer_id_new) == NULL)
979 // Check for overflow
980 if(peer_id_new == 65535)
981 throw ConnectionException
982 ("Connection ran out of peer ids");
987 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_NEW,"
988 " giving peer_id="<<peer_id_new<<std::endl;
991 Peer *peer = new Peer(peer_id_new, sender);
992 m_peers.insert(peer->id, peer);
993 m_peerhandler->peerAdded(peer);
995 // Create CONTROL packet to tell the peer id to the new peer.
996 SharedBuffer<u8> reply(4);
997 writeU8(&reply[0], TYPE_CONTROL);
998 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
999 writeU16(&reply[2], peer_id_new);
1000 SendAsPacket(peer_id_new, 0, reply, true);
1002 // We're now talking to a valid peer_id
1003 peer_id = peer_id_new;
1005 // Go on and process whatever it sent
1008 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1013 // This means that the peer id of the sender is not PEER_ID_NEW
1014 // and it is invalid.
1015 PrintInfo(derr_con);
1016 derr_con<<"Receive(): Peer not found"<<std::endl;
1017 throw InvalidIncomingDataException("Peer not found (possible timeout)");
1020 Peer *peer = node->getValue();
1022 // Validate peer address
1023 if(peer->address != sender)
1025 PrintInfo(derr_con);
1026 derr_con<<"Peer "<<peer_id<<" sending from different address."
1027 " Ignoring."<<std::endl;
1028 throw InvalidIncomingDataException
1029 ("Peer sending from different address");
1030 /*// If there is more data, receive again
1031 if(m_socket.WaitData(0) == true)
1033 throw NoIncomingDataException("No incoming data (2)");*/
1036 peer->timeout_counter = 0.0;
1038 Channel *channel = &(peer->channels[channelnum]);
1040 // Throw the received packet to channel->processPacket()
1042 // Make a new SharedBuffer from the data without the base headers
1043 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1044 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1045 strippeddata.getSize());
1048 // Process it (the result is some data with no headers made by us)
1049 SharedBuffer<u8> resultdata = channel->ProcessPacket
1050 (strippeddata, this, peer_id, channelnum);
1053 dout_con<<"ProcessPacket returned data of size "
1054 <<resultdata.getSize()<<std::endl;
1056 if(datasize < resultdata.getSize())
1057 throw InvalidIncomingDataException
1058 ("Buffer too small for received data");
1060 memcpy(data, *resultdata, resultdata.getSize());
1061 return resultdata.getSize();
1063 catch(ProcessedSilentlyException &e)
1065 // If there is more data, receive again
1066 if(m_socket.WaitData(0) == true)
1069 throw NoIncomingDataException("No incoming data (2)");
1071 catch(InvalidIncomingDataException &e)
1073 // If there is more data, receive again
1074 if(m_socket.WaitData(0) == true)
1080 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1082 core::map<u16, Peer*>::Iterator j;
1083 j = m_peers.getIterator();
1084 for(; j.atEnd() == false; j++)
1086 Peer *peer = j.getNode()->getValue();
1087 Send(peer->id, channelnum, data, reliable);
1091 void Connection::Send(u16 peer_id, u8 channelnum,
1092 SharedBuffer<u8> data, bool reliable)
1094 assert(channelnum < CHANNEL_COUNT);
1096 Peer *peer = GetPeer(peer_id);
1097 Channel *channel = &(peer->channels[channelnum]);
1099 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1101 chunksize_max -= RELIABLE_HEADER_SIZE;
1103 core::list<SharedBuffer<u8> > originals;
1104 originals = makeAutoSplitPacket(data, chunksize_max,
1105 channel->next_outgoing_split_seqnum);
1107 core::list<SharedBuffer<u8> >::Iterator i;
1108 i = originals.begin();
1109 for(; i != originals.end(); i++)
1111 SharedBuffer<u8> original = *i;
1113 SendAsPacket(peer_id, channelnum, original, reliable);
1117 void Connection::SendAsPacket(u16 peer_id, u8 channelnum,
1118 SharedBuffer<u8> data, bool reliable)
1120 Peer *peer = GetPeer(peer_id);
1121 Channel *channel = &(peer->channels[channelnum]);
1125 u16 seqnum = channel->next_outgoing_seqnum;
1126 channel->next_outgoing_seqnum++;
1128 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1130 // Add base headers and make a packet
1131 BufferedPacket p = makePacket(peer->address, reliable,
1132 m_protocol_id, m_peer_id, channelnum);
1135 // Buffer the packet
1136 channel->outgoing_reliables.insert(p);
1138 catch(AlreadyExistsException &e)
1140 PrintInfo(derr_con);
1141 derr_con<<"WARNING: Going to send a reliable packet "
1142 "seqnum="<<seqnum<<" that is already "
1143 "in outgoing buffer"<<std::endl;
1152 // Add base headers and make a packet
1153 BufferedPacket p = makePacket(peer->address, data,
1154 m_protocol_id, m_peer_id, channelnum);
1161 void Connection::RawSend(const BufferedPacket &packet)
1163 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1166 void Connection::RunTimeouts(float dtime)
1168 core::list<u16> timeouted_peers;
1169 core::map<u16, Peer*>::Iterator j;
1170 j = m_peers.getIterator();
1171 for(; j.atEnd() == false; j++)
1173 Peer *peer = j.getNode()->getValue();
1178 peer->timeout_counter += dtime;
1179 if(peer->timeout_counter > m_timeout)
1181 PrintInfo(derr_con);
1182 derr_con<<"RunTimeouts(): Peer "<<peer->id
1184 <<" (source=peer->timeout_counter)"
1186 // Add peer to the list
1187 timeouted_peers.push_back(peer->id);
1188 // Don't bother going through the buffers of this one
1192 float resend_timeout = peer->resend_timeout;
1193 for(u16 i=0; i<CHANNEL_COUNT; i++)
1195 core::list<BufferedPacket> timed_outs;
1196 core::list<BufferedPacket>::Iterator j;
1198 Channel *channel = &peer->channels[i];
1200 // Remove timed out incomplete unreliable split packets
1201 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
1203 // Increment reliable packet times
1204 channel->outgoing_reliables.incrementTimeouts(dtime);
1206 // Check reliable packet total times, remove peer if
1208 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
1210 PrintInfo(derr_con);
1211 derr_con<<"RunTimeouts(): Peer "<<peer->id
1213 <<" (source=reliable packet totaltime)"
1215 // Add peer to the to-be-removed list
1216 timeouted_peers.push_back(peer->id);
1220 // Re-send timed out outgoing reliables
1222 timed_outs = channel->
1223 outgoing_reliables.getTimedOuts(resend_timeout);
1225 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
1227 j = timed_outs.begin();
1228 for(; j != timed_outs.end(); j++)
1230 u16 peer_id = readPeerId(*(j->data));
1231 u8 channel = readChannel(*(j->data));
1232 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
1234 PrintInfo(derr_con);
1235 derr_con<<"RE-SENDING timed-out RELIABLE to ";
1236 j->address.print(&derr_con);
1237 derr_con<<"(t/o="<<resend_timeout<<"): "
1238 <<"from_peer_id="<<peer_id
1239 <<", channel="<<((int)channel&0xff)
1240 <<", seqnum="<<seqnum
1245 // Enlarge avg_rtt and resend_timeout:
1246 // The rtt will be at least the timeout.
1247 // NOTE: This won't affect the timeout of the next
1248 // checked channel because it was cached.
1249 peer->reportRTT(resend_timeout);
1256 peer->ping_timer += dtime;
1257 if(peer->ping_timer >= 5.0)
1259 // Create and send PING packet
1260 SharedBuffer<u8> data(2);
1261 writeU8(&data[0], TYPE_CONTROL);
1262 writeU8(&data[1], CONTROLTYPE_PING);
1263 SendAsPacket(peer->id, 0, data, true);
1265 peer->ping_timer = 0.0;
1272 // Remove timeouted peers
1273 core::list<u16>::Iterator i = timeouted_peers.begin();
1274 for(; i != timeouted_peers.end(); i++)
1276 PrintInfo(derr_con);
1277 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1278 m_peerhandler->deletingPeer(m_peers[*i], true);
1284 Peer* Connection::GetPeer(u16 peer_id)
1286 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1290 throw PeerNotFoundException("Peer not found (possible timeout)");
1294 assert(node->getValue()->id == peer_id);
1296 return node->getValue();
1299 Peer* Connection::GetPeerNoEx(u16 peer_id)
1301 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1308 assert(node->getValue()->id == peer_id);
1310 return node->getValue();
1313 core::list<Peer*> Connection::GetPeers()
1315 core::list<Peer*> list;
1316 core::map<u16, Peer*>::Iterator j;
1317 j = m_peers.getIterator();
1318 for(; j.atEnd() == false; j++)
1320 Peer *peer = j.getNode()->getValue();
1321 list.push_back(peer);
1326 void Connection::PrintInfo(std::ostream &out)
1328 out<<m_socket.GetHandle();
1330 out<<"con "<<m_peer_id<<": ";
1331 for(s16 i=0; i<(s16)m_indentation-1; i++)
1335 void Connection::PrintInfo()
1337 PrintInfo(dout_con);