3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 #include "connection.h"
22 #include "serialization.h"
27 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
28 u32 protocol_id, u16 sender_peer_id, u8 channel)
30 u32 packet_size = datasize + BASE_HEADER_SIZE;
31 BufferedPacket p(packet_size);
34 writeU32(&p.data[0], protocol_id);
35 writeU16(&p.data[4], sender_peer_id);
36 writeU8(&p.data[6], channel);
38 memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
43 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
44 u32 protocol_id, u16 sender_peer_id, u8 channel)
46 return makePacket(address, *data, data.getSize(),
47 protocol_id, sender_peer_id, channel);
50 SharedBuffer<u8> makeOriginalPacket(
51 SharedBuffer<u8> data)
54 u32 packet_size = data.getSize() + header_size;
55 SharedBuffer<u8> b(packet_size);
57 writeU8(&b[0], TYPE_ORIGINAL);
59 memcpy(&b[header_size], *data, data.getSize());
64 core::list<SharedBuffer<u8> > makeSplitPacket(
65 SharedBuffer<u8> data,
69 // Chunk packets, containing the TYPE_SPLIT header
70 core::list<SharedBuffer<u8> > chunks;
72 u32 chunk_header_size = 7;
73 u32 maximum_data_size = chunksize_max - chunk_header_size;
78 end = start + maximum_data_size - 1;
79 if(end > data.getSize() - 1)
80 end = data.getSize() - 1;
82 u32 payload_size = end - start + 1;
83 u32 packet_size = chunk_header_size + payload_size;
85 SharedBuffer<u8> chunk(packet_size);
87 writeU8(&chunk[0], TYPE_SPLIT);
88 writeU16(&chunk[1], seqnum);
89 // [3] u16 chunk_count is written at next stage
90 writeU16(&chunk[5], chunk_num);
91 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
93 chunks.push_back(chunk);
98 while(end != data.getSize() - 1);
100 u16 chunk_count = chunks.getSize();
102 core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
103 for(; i != chunks.end(); i++)
106 writeU16(&((*i)[3]), chunk_count);
112 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
113 SharedBuffer<u8> data,
117 u32 original_header_size = 1;
118 core::list<SharedBuffer<u8> > list;
119 if(data.getSize() + original_header_size > chunksize_max)
121 list = makeSplitPacket(data, chunksize_max, split_seqnum);
127 list.push_back(makeOriginalPacket(data));
132 SharedBuffer<u8> makeReliablePacket(
133 SharedBuffer<u8> data,
136 /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
137 dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
138 <<((unsigned int)data[0]&0xff)<<std::endl;*/
140 u32 packet_size = data.getSize() + header_size;
141 SharedBuffer<u8> b(packet_size);
143 writeU8(&b[0], TYPE_RELIABLE);
144 writeU16(&b[1], seqnum);
146 memcpy(&b[header_size], *data, data.getSize());
148 /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
149 <<((unsigned int)data[0]&0xff)<<std::endl;*/
150 //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
158 void ReliablePacketBuffer::print()
160 core::list<BufferedPacket>::Iterator i;
162 for(; i != m_list.end(); i++)
164 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
168 bool ReliablePacketBuffer::empty()
170 return m_list.empty();
172 u32 ReliablePacketBuffer::size()
174 return m_list.getSize();
176 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
178 core::list<BufferedPacket>::Iterator i;
180 for(; i != m_list.end(); i++)
182 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
183 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
184 <<", comparing to s="<<s<<std::endl;*/
190 RPBSearchResult ReliablePacketBuffer::notFound()
194 u16 ReliablePacketBuffer::getFirstSeqnum()
197 throw NotFoundException("Buffer is empty");
198 BufferedPacket p = *m_list.begin();
199 return readU16(&p.data[BASE_HEADER_SIZE+1]);
201 BufferedPacket ReliablePacketBuffer::popFirst()
204 throw NotFoundException("Buffer is empty");
205 BufferedPacket p = *m_list.begin();
206 core::list<BufferedPacket>::Iterator i = m_list.begin();
210 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
212 RPBSearchResult r = findPacket(seqnum);
214 dout_con<<"Not found"<<std::endl;
215 throw NotFoundException("seqnum not found in buffer");
217 BufferedPacket p = *r;
221 void ReliablePacketBuffer::insert(BufferedPacket &p)
223 assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
224 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
225 assert(type == TYPE_RELIABLE);
226 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
228 // Find the right place for the packet and insert it there
230 // If list is empty, just add it
237 // Otherwise find the right place
238 core::list<BufferedPacket>::Iterator i;
240 // Find the first packet in the list which has a higher seqnum
241 for(; i != m_list.end(); i++){
242 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
244 throw AlreadyExistsException("Same seqnum in list");
246 if(seqnum_higher(s, seqnum)){
250 // If we're at the end of the list, add the packet to the
252 if(i == m_list.end())
259 m_list.insert_before(i, p);
262 void ReliablePacketBuffer::incrementTimeouts(float dtime)
264 core::list<BufferedPacket>::Iterator i;
266 for(; i != m_list.end(); i++){
268 i->totaltime += dtime;
272 void ReliablePacketBuffer::resetTimedOuts(float timeout)
274 core::list<BufferedPacket>::Iterator i;
276 for(; i != m_list.end(); i++){
277 if(i->time >= timeout)
282 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
284 core::list<BufferedPacket>::Iterator i;
286 for(; i != m_list.end(); i++){
287 if(i->totaltime >= timeout)
293 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
295 core::list<BufferedPacket> timed_outs;
296 core::list<BufferedPacket>::Iterator i;
298 for(; i != m_list.end(); i++)
300 if(i->time >= timeout)
301 timed_outs.push_back(*i);
310 IncomingSplitBuffer::~IncomingSplitBuffer()
312 core::map<u16, IncomingSplitPacket*>::Iterator i;
313 i = m_buf.getIterator();
314 for(; i.atEnd() == false; i++)
316 delete i.getNode()->getValue();
320 This will throw a GotSplitPacketException when a full
321 split packet is constructed.
323 void IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
325 u32 headersize = BASE_HEADER_SIZE + 7;
326 assert(p.data.getSize() >= headersize);
327 u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
328 assert(type == TYPE_SPLIT);
329 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
330 u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
331 u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
333 // Add if doesn't exist
334 if(m_buf.find(seqnum) == NULL)
336 IncomingSplitPacket *sp = new IncomingSplitPacket();
337 sp->chunk_count = chunk_count;
338 sp->reliable = reliable;
342 IncomingSplitPacket *sp = m_buf[seqnum];
344 // TODO: These errors should be thrown or something? Dunno.
345 if(chunk_count != sp->chunk_count)
346 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
347 <<" != sp->chunk_count="<<sp->chunk_count
349 if(reliable != sp->reliable)
350 derr_con<<"Connection: WARNING: reliable="<<reliable
351 <<" != sp->reliable="<<sp->reliable
354 // If chunk already exists, cancel
355 if(sp->chunks.find(chunk_num) != NULL)
356 throw AlreadyExistsException("Chunk already in buffer");
358 // Cut chunk data out of packet
359 u32 chunkdatasize = p.data.getSize() - headersize;
360 SharedBuffer<u8> chunkdata(chunkdatasize);
361 memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
363 // Set chunk data in buffer
364 sp->chunks[chunk_num] = chunkdata;
366 // If not all chunks are received, return
367 if(sp->allReceived() == false)
370 // Calculate total size
372 core::map<u16, SharedBuffer<u8> >::Iterator i;
373 i = sp->chunks.getIterator();
374 for(; i.atEnd() == false; i++)
376 totalsize += i.getNode()->getValue().getSize();
379 SharedBuffer<u8> fulldata(totalsize);
381 // Copy chunks to data buffer
383 for(u32 chunk_i=0; chunk_i<sp->chunk_count;
386 SharedBuffer<u8> buf = sp->chunks[chunk_i];
387 u16 chunkdatasize = buf.getSize();
388 memcpy(&fulldata[start], *buf, chunkdatasize);
389 start += chunkdatasize;;
392 // Remove sp from buffer
393 m_buf.remove(seqnum);
396 throw GotSplitPacketException(fulldata);
398 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
400 core::list<u16> remove_queue;
401 core::map<u16, IncomingSplitPacket*>::Iterator i;
402 i = m_buf.getIterator();
403 for(; i.atEnd() == false; i++)
405 IncomingSplitPacket *p = i.getNode()->getValue();
406 // Reliable ones are not removed by timeout
407 if(p->reliable == true)
410 if(p->time >= timeout)
411 remove_queue.push_back(i.getNode()->getKey());
413 core::list<u16>::Iterator j;
414 j = remove_queue.begin();
415 for(; j != remove_queue.end(); j++)
417 dout_con<<"NOTE: Removing timed out unreliable split packet"
430 next_outgoing_seqnum = SEQNUM_INITIAL;
431 next_incoming_seqnum = SEQNUM_INITIAL;
432 next_outgoing_split_seqnum = SEQNUM_INITIAL;
442 Peer::Peer(u16 a_id, Address a_address)
446 timeout_counter = 0.0;
447 //resend_timeout = RESEND_TIMEOUT_MINIMUM;
448 resend_timeout = 0.5;
450 has_sent_with_id = false;
456 void Peer::reportRTT(float rtt)
460 else if(avg_rtt < 0.0)
463 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
465 // Calculate resend_timeout
467 /*int reliable_count = 0;
468 for(int i=0; i<CHANNEL_COUNT; i++)
470 reliable_count += channels[i].outgoing_reliables.size();
472 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
473 * ((float)reliable_count * 1);*/
475 float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
476 if(timeout < RESEND_TIMEOUT_MIN)
477 timeout = RESEND_TIMEOUT_MIN;
478 if(timeout > RESEND_TIMEOUT_MAX)
479 timeout = RESEND_TIMEOUT_MAX;
480 resend_timeout = timeout;
487 Connection::Connection(
491 PeerHandler *peerhandler
494 assert(peerhandler != NULL);
496 m_protocol_id = protocol_id;
497 m_max_packet_size = max_packet_size;
499 m_peer_id = PEER_ID_NEW;
500 //m_waiting_new_peer_id = false;
502 m_peerhandler = peerhandler;
505 Connection::~Connection()
508 core::map<u16, Peer*>::Iterator j;
509 j = m_peers.getIterator();
510 for(; j.atEnd() == false; j++)
512 Peer *peer = j.getNode()->getValue();
517 void Connection::Serve(unsigned short port)
520 m_peer_id = PEER_ID_SERVER;
523 void Connection::Connect(Address address)
525 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
527 throw ConnectionException("Already connected to a server");
530 Peer *peer = new Peer(PEER_ID_SERVER, address);
531 m_peers.insert(peer->id, peer);
532 m_peerhandler->peerAdded(peer);
536 // Send a dummy packet to server with peer_id = PEER_ID_NEW
537 m_peer_id = PEER_ID_NEW;
538 SharedBuffer<u8> data(0);
539 Send(PEER_ID_SERVER, 0, data, true);
541 //m_waiting_new_peer_id = true;
544 bool Connection::Connected()
546 if(m_peers.size() != 1)
549 core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
553 if(m_peer_id == PEER_ID_NEW)
559 SharedBuffer<u8> Channel::ProcessPacket(
560 SharedBuffer<u8> packetdata,
566 IndentationRaiser iraiser(&(con->m_indentation));
568 if(packetdata.getSize() < 1)
569 throw InvalidIncomingDataException("packetdata.getSize() < 1");
571 u8 type = readU8(&packetdata[0]);
573 if(type == TYPE_CONTROL)
575 if(packetdata.getSize() < 2)
576 throw InvalidIncomingDataException("packetdata.getSize() < 2");
578 u8 controltype = readU8(&packetdata[1]);
580 if(controltype == CONTROLTYPE_ACK)
582 if(packetdata.getSize() < 4)
583 throw InvalidIncomingDataException
584 ("packetdata.getSize() < 4 (ACK header size)");
586 u16 seqnum = readU16(&packetdata[2]);
588 dout_con<<"Got CONTROLTYPE_ACK: channelnum="
589 <<((int)channelnum&0xff)<<", peer_id="<<peer_id
590 <<", seqnum="<<seqnum<<std::endl;
593 BufferedPacket p = outgoing_reliables.popSeqnum(seqnum);
594 // Get round trip time
595 float rtt = p.totaltime;
597 // Let peer calculate stuff according to it
598 // (avg_rtt and resend_timeout)
599 Peer *peer = con->GetPeer(peer_id);
600 peer->reportRTT(rtt);
602 //con->PrintInfo(dout_con);
603 //dout_con<<"RTT = "<<rtt<<std::endl;
605 /*dout_con<<"OUTGOING: ";
607 outgoing_reliables.print();
608 dout_con<<std::endl;*/
610 catch(NotFoundException &e){
611 con->PrintInfo(derr_con);
612 derr_con<<"WARNING: ACKed packet not "
617 throw ProcessedSilentlyException("Got an ACK");
619 else if(controltype == CONTROLTYPE_SET_PEER_ID)
621 if(packetdata.getSize() < 4)
622 throw InvalidIncomingDataException
623 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
624 u16 peer_id_new = readU16(&packetdata[2]);
626 dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
628 if(con->GetPeerID() != PEER_ID_NEW)
630 con->PrintInfo(derr_con);
631 derr_con<<"WARNING: Not changing"
632 " existing peer id."<<std::endl;
636 dout_con<<"changing."<<std::endl;
637 con->SetPeerID(peer_id_new);
639 throw ProcessedSilentlyException("Got a SET_PEER_ID");
641 else if(controltype == CONTROLTYPE_PING)
643 // Just ignore it, the incoming data already reset
644 // the timeout counter
646 dout_con<<"PING"<<std::endl;
647 throw ProcessedSilentlyException("Got a SET_PEER_ID");
650 con->PrintInfo(derr_con);
651 derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
652 <<((int)controltype&0xff)<<std::endl;
653 throw InvalidIncomingDataException("Invalid control type");
656 else if(type == TYPE_ORIGINAL)
658 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
659 throw InvalidIncomingDataException
660 ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
662 dout_con<<"RETURNING TYPE_ORIGINAL to user"
664 // Get the inside packet out and return it
665 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
666 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
669 else if(type == TYPE_SPLIT)
671 // We have to create a packet again for buffering
672 // This isn't actually too bad an idea.
673 BufferedPacket packet = makePacket(
674 con->GetPeer(peer_id)->address,
676 con->GetProtocolID(),
681 incoming_splits.insert(packet, reliable);
683 // This exception happens when all the pieces of a packet
685 catch(GotSplitPacketException &e)
688 dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
689 <<"size="<<e.getData().getSize()<<std::endl;
693 dout_con<<"BUFFERING TYPE_SPLIT"<<std::endl;
694 throw ProcessedSilentlyException("Buffered a split packet chunk");
696 else if(type == TYPE_RELIABLE)
698 // Recursive reliable packets not allowed
699 assert(reliable == false);
701 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
702 throw InvalidIncomingDataException
703 ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
705 u16 seqnum = readU16(&packetdata[1]);
707 bool is_future_packet = seqnum_higher(seqnum, next_incoming_seqnum);
708 bool is_old_packet = seqnum_higher(next_incoming_seqnum, seqnum);
712 dout_con<<"BUFFERING";
713 else if(is_old_packet)
717 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
718 <<" next="<<next_incoming_seqnum;
719 dout_con<<" [sending CONTROLTYPE_ACK"
720 " to peer_id="<<peer_id<<"]";
724 //assert(incoming_reliables.size() < 100);
726 // Send a CONTROLTYPE_ACK
727 SharedBuffer<u8> reply(4);
728 writeU8(&reply[0], TYPE_CONTROL);
729 writeU8(&reply[1], CONTROLTYPE_ACK);
730 writeU16(&reply[2], seqnum);
731 con->SendAsPacket(peer_id, channelnum, reply, false);
733 //if(seqnum_higher(seqnum, next_incoming_seqnum))
737 dout_con<<"Buffering reliable packet (seqnum="
738 <<seqnum<<")"<<std::endl;*/
740 // This one comes later, buffer it.
741 // Actually we have to make a packet to buffer one.
742 // Well, we have all the ingredients, so just do it.
743 BufferedPacket packet = makePacket(
744 con->GetPeer(peer_id)->address,
746 con->GetProtocolID(),
750 incoming_reliables.insert(packet);
753 dout_con<<"INCOMING: ";
754 incoming_reliables.print();
755 dout_con<<std::endl;*/
757 catch(AlreadyExistsException &e)
761 throw ProcessedSilentlyException("Buffered future reliable packet");
763 //else if(seqnum_higher(next_incoming_seqnum, seqnum))
764 else if(is_old_packet)
766 // An old packet, dump it
767 throw InvalidIncomingDataException("Got an old reliable packet");
770 next_incoming_seqnum++;
772 // Get out the inside packet and re-process it
773 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
774 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
776 return ProcessPacket(payload, con, peer_id, channelnum, true);
780 con->PrintInfo(derr_con);
781 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
782 throw InvalidIncomingDataException("Invalid packet type");
785 // We should never get here.
786 // If you get here, add an exception or a return to some of the
787 // above conditionals.
789 throw BaseException("Error in Channel::ProcessPacket()");
792 SharedBuffer<u8> Channel::CheckIncomingBuffers(Connection *con,
796 // Clear old packets from start of buffer
799 firstseqnum = incoming_reliables.getFirstSeqnum();
800 if(seqnum_higher(next_incoming_seqnum, firstseqnum))
801 incoming_reliables.popFirst();
805 // This happens if all packets are old
806 }catch(con::NotFoundException)
809 if(incoming_reliables.empty() == false)
811 if(firstseqnum == next_incoming_seqnum)
813 BufferedPacket p = incoming_reliables.popFirst();
815 peer_id = readPeerId(*p.data);
816 u8 channelnum = readChannel(*p.data);
817 u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
820 dout_con<<"UNBUFFERING TYPE_RELIABLE"
822 <<" peer_id="<<peer_id
823 <<" channel="<<((int)channelnum&0xff)
826 next_incoming_seqnum++;
828 u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
829 // Get out the inside packet and re-process it
830 SharedBuffer<u8> payload(p.data.getSize() - headers_size);
831 memcpy(*payload, &p.data[headers_size], payload.getSize());
833 return ProcessPacket(payload, con, peer_id, channelnum, true);
837 throw NoIncomingDataException("No relevant data in buffers");
840 SharedBuffer<u8> Connection::GetFromBuffers(u16 &peer_id)
842 core::map<u16, Peer*>::Iterator j;
843 j = m_peers.getIterator();
844 for(; j.atEnd() == false; j++)
846 Peer *peer = j.getNode()->getValue();
847 for(u16 i=0; i<CHANNEL_COUNT; i++)
849 Channel *channel = &peer->channels[i];
851 SharedBuffer<u8> resultdata = channel->CheckIncomingBuffers
856 catch(NoIncomingDataException &e)
859 catch(InvalidIncomingDataException &e)
862 catch(ProcessedSilentlyException &e)
867 throw NoIncomingDataException("No relevant data in buffers");
870 u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
873 Receive a packet from the network
876 // TODO: We can not know how many layers of header there are.
877 // For now, just assume there are no other than the base headers.
878 u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
879 Buffer<u8> packetdata(packet_maxsize);
886 Check if some buffer has relevant data
889 SharedBuffer<u8> resultdata = GetFromBuffers(peer_id);
891 if(datasize < resultdata.getSize())
892 throw InvalidIncomingDataException
893 ("Buffer too small for received data");
895 memcpy(data, *resultdata, resultdata.getSize());
896 return resultdata.getSize();
898 catch(NoIncomingDataException &e)
904 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
906 if(received_size < 0)
907 throw NoIncomingDataException("No incoming data");
908 if(received_size < BASE_HEADER_SIZE)
909 throw InvalidIncomingDataException("No full header received");
910 if(readU32(&packetdata[0]) != m_protocol_id)
911 throw InvalidIncomingDataException("Invalid protocol id");
913 peer_id = readPeerId(*packetdata);
914 u8 channelnum = readChannel(*packetdata);
915 if(channelnum > CHANNEL_COUNT-1){
917 derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
918 throw InvalidIncomingDataException("Channel doesn't exist");
921 if(peer_id == PEER_ID_NEW)
924 Somebody is trying to send stuff to us with no peer id.
926 Check if the same address and port was added to our peer
928 Allow only entries that have has_sent_with_id==false.
931 core::map<u16, Peer*>::Iterator j;
932 j = m_peers.getIterator();
933 for(; j.atEnd() == false; j++)
935 Peer *peer = j.getNode()->getValue();
936 if(peer->has_sent_with_id)
938 if(peer->address == sender)
943 If no peer was found with the same address and port,
944 we shall assume it is a new peer and create an entry.
948 // Pass on to adding the peer
950 // Else: A peer was found.
953 Peer *peer = j.getNode()->getValue();
956 derr_con<<"WARNING: Assuming unknown peer to be "
957 <<"peer_id="<<peer_id<<std::endl;
962 The peer was not found in our lists. Add it.
964 if(peer_id == PEER_ID_NEW)
966 // Somebody wants to make a new connection
968 // Get a unique peer id (2 or higher)
971 Find an unused peer id
976 if(m_peers.find(peer_id_new) == NULL)
978 // Check for overflow
979 if(peer_id_new == 65535)
980 throw ConnectionException
981 ("Connection ran out of peer ids");
986 dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_NEW,"
987 " giving peer_id="<<peer_id_new<<std::endl;
990 Peer *peer = new Peer(peer_id_new, sender);
991 m_peers.insert(peer->id, peer);
992 m_peerhandler->peerAdded(peer);
994 // Create CONTROL packet to tell the peer id to the new peer.
995 SharedBuffer<u8> reply(4);
996 writeU8(&reply[0], TYPE_CONTROL);
997 writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
998 writeU16(&reply[2], peer_id_new);
999 SendAsPacket(peer_id_new, 0, reply, true);
1001 // We're now talking to a valid peer_id
1002 peer_id = peer_id_new;
1004 // Go on and process whatever it sent
1007 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1012 // This means that the peer id of the sender is not PEER_ID_NEW
1013 // and it is invalid.
1014 PrintInfo(derr_con);
1015 derr_con<<"Receive(): Peer not found"<<std::endl;
1016 throw InvalidIncomingDataException("Peer not found (possible timeout)");
1019 Peer *peer = node->getValue();
1021 // Validate peer address
1022 if(peer->address != sender)
1024 PrintInfo(derr_con);
1025 derr_con<<"Peer "<<peer_id<<" sending from different address."
1026 " Ignoring."<<std::endl;
1027 throw InvalidIncomingDataException
1028 ("Peer sending from different address");
1029 /*// If there is more data, receive again
1030 if(m_socket.WaitData(0) == true)
1032 throw NoIncomingDataException("No incoming data (2)");*/
1035 peer->timeout_counter = 0.0;
1037 Channel *channel = &(peer->channels[channelnum]);
1039 // Throw the received packet to channel->processPacket()
1041 // Make a new SharedBuffer from the data without the base headers
1042 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1043 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1044 strippeddata.getSize());
1047 // Process it (the result is some data with no headers made by us)
1048 SharedBuffer<u8> resultdata = channel->ProcessPacket
1049 (strippeddata, this, peer_id, channelnum);
1052 dout_con<<"ProcessPacket returned data of size "
1053 <<resultdata.getSize()<<std::endl;
1055 if(datasize < resultdata.getSize())
1056 throw InvalidIncomingDataException
1057 ("Buffer too small for received data");
1059 memcpy(data, *resultdata, resultdata.getSize());
1060 return resultdata.getSize();
1062 catch(ProcessedSilentlyException &e)
1064 // If there is more data, receive again
1065 if(m_socket.WaitData(0) == true)
1068 throw NoIncomingDataException("No incoming data (2)");
1070 catch(InvalidIncomingDataException &e)
1072 // If there is more data, receive again
1073 if(m_socket.WaitData(0) == true)
1079 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1081 core::map<u16, Peer*>::Iterator j;
1082 j = m_peers.getIterator();
1083 for(; j.atEnd() == false; j++)
1085 Peer *peer = j.getNode()->getValue();
1086 Send(peer->id, channelnum, data, reliable);
1090 void Connection::Send(u16 peer_id, u8 channelnum,
1091 SharedBuffer<u8> data, bool reliable)
1093 assert(channelnum < CHANNEL_COUNT);
1095 Peer *peer = GetPeer(peer_id);
1096 Channel *channel = &(peer->channels[channelnum]);
1098 u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1100 chunksize_max -= RELIABLE_HEADER_SIZE;
1102 core::list<SharedBuffer<u8> > originals;
1103 originals = makeAutoSplitPacket(data, chunksize_max,
1104 channel->next_outgoing_split_seqnum);
1106 core::list<SharedBuffer<u8> >::Iterator i;
1107 i = originals.begin();
1108 for(; i != originals.end(); i++)
1110 SharedBuffer<u8> original = *i;
1112 SendAsPacket(peer_id, channelnum, original, reliable);
1116 void Connection::SendAsPacket(u16 peer_id, u8 channelnum,
1117 SharedBuffer<u8> data, bool reliable)
1119 Peer *peer = GetPeer(peer_id);
1120 Channel *channel = &(peer->channels[channelnum]);
1124 u16 seqnum = channel->next_outgoing_seqnum;
1125 channel->next_outgoing_seqnum++;
1127 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1129 // Add base headers and make a packet
1130 BufferedPacket p = makePacket(peer->address, reliable,
1131 m_protocol_id, m_peer_id, channelnum);
1134 // Buffer the packet
1135 channel->outgoing_reliables.insert(p);
1137 catch(AlreadyExistsException &e)
1139 PrintInfo(derr_con);
1140 derr_con<<"WARNING: Going to send a reliable packet "
1141 "seqnum="<<seqnum<<" that is already "
1142 "in outgoing buffer"<<std::endl;
1151 // Add base headers and make a packet
1152 BufferedPacket p = makePacket(peer->address, data,
1153 m_protocol_id, m_peer_id, channelnum);
1160 void Connection::RawSend(const BufferedPacket &packet)
1162 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1165 void Connection::RunTimeouts(float dtime)
1167 core::list<u16> timeouted_peers;
1168 core::map<u16, Peer*>::Iterator j;
1169 j = m_peers.getIterator();
1170 for(; j.atEnd() == false; j++)
1172 Peer *peer = j.getNode()->getValue();
1177 peer->timeout_counter += dtime;
1178 if(peer->timeout_counter > m_timeout)
1180 PrintInfo(derr_con);
1181 derr_con<<"RunTimeouts(): Peer "<<peer->id
1183 <<" (source=peer->timeout_counter)"
1185 // Add peer to the list
1186 timeouted_peers.push_back(peer->id);
1187 // Don't bother going through the buffers of this one
1191 float resend_timeout = peer->resend_timeout;
1192 for(u16 i=0; i<CHANNEL_COUNT; i++)
1194 core::list<BufferedPacket> timed_outs;
1195 core::list<BufferedPacket>::Iterator j;
1197 Channel *channel = &peer->channels[i];
1199 // Remove timed out incomplete unreliable split packets
1200 channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
1202 // Increment reliable packet times
1203 channel->outgoing_reliables.incrementTimeouts(dtime);
1205 // Check reliable packet total times, remove peer if
1207 if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
1209 PrintInfo(derr_con);
1210 derr_con<<"RunTimeouts(): Peer "<<peer->id
1212 <<" (source=reliable packet totaltime)"
1214 // Add peer to the to-be-removed list
1215 timeouted_peers.push_back(peer->id);
1219 // Re-send timed out outgoing reliables
1221 timed_outs = channel->
1222 outgoing_reliables.getTimedOuts(resend_timeout);
1224 channel->outgoing_reliables.resetTimedOuts(resend_timeout);
1226 j = timed_outs.begin();
1227 for(; j != timed_outs.end(); j++)
1229 u16 peer_id = readPeerId(*(j->data));
1230 u8 channel = readChannel(*(j->data));
1231 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
1233 PrintInfo(derr_con);
1234 derr_con<<"RE-SENDING timed-out RELIABLE to ";
1235 j->address.print(&derr_con);
1236 derr_con<<"(t/o="<<resend_timeout<<"): "
1237 <<"from_peer_id="<<peer_id
1238 <<", channel="<<((int)channel&0xff)
1239 <<", seqnum="<<seqnum
1244 // Enlarge avg_rtt and resend_timeout:
1245 // The rtt will be at least the timeout.
1246 // NOTE: This won't affect the timeout of the next
1247 // checked channel because it was cached.
1248 peer->reportRTT(resend_timeout);
1255 peer->ping_timer += dtime;
1256 if(peer->ping_timer >= 5.0)
1258 // Create and send PING packet
1259 SharedBuffer<u8> data(2);
1260 writeU8(&data[0], TYPE_CONTROL);
1261 writeU8(&data[1], CONTROLTYPE_PING);
1262 SendAsPacket(peer->id, 0, data, true);
1264 peer->ping_timer = 0.0;
1271 // Remove timeouted peers
1272 core::list<u16>::Iterator i = timeouted_peers.begin();
1273 for(; i != timeouted_peers.end(); i++)
1275 PrintInfo(derr_con);
1276 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1277 m_peerhandler->deletingPeer(m_peers[*i], true);
1283 Peer* Connection::GetPeer(u16 peer_id)
1285 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1289 throw PeerNotFoundException("Peer not found (possible timeout)");
1293 assert(node->getValue()->id == peer_id);
1295 return node->getValue();
1298 Peer* Connection::GetPeerNoEx(u16 peer_id)
1300 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1307 assert(node->getValue()->id == peer_id);
1309 return node->getValue();
1312 core::list<Peer*> Connection::GetPeers()
1314 core::list<Peer*> list;
1315 core::map<u16, Peer*>::Iterator j;
1316 j = m_peers.getIterator();
1317 for(; j.atEnd() == false; j++)
1319 Peer *peer = j.getNode()->getValue();
1320 list.push_back(peer);
1325 void Connection::PrintInfo(std::ostream &out)
1327 out<<m_socket.GetHandle();
1329 out<<"con "<<m_peer_id<<": ";
1330 for(s16 i=0; i<(s16)m_indentation-1; i++)
1334 void Connection::PrintInfo()
1336 PrintInfo(dout_con);