]> git.lizzy.rs Git - dragonfireclient.git/blob - src/connection.cpp
Added tag working for changeset c37bcfd89dd6
[dragonfireclient.git] / src / connection.cpp
1 /*
2 Minetest-c55
3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "connection.h"
21 #include "main.h"
22 #include "serialization.h"
23
24 namespace con
25 {
26
27 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
28                 u32 protocol_id, u16 sender_peer_id, u8 channel)
29 {
30         u32 packet_size = datasize + BASE_HEADER_SIZE;
31         BufferedPacket p(packet_size);
32         p.address = address;
33
34         writeU32(&p.data[0], protocol_id);
35         writeU16(&p.data[4], sender_peer_id);
36         writeU8(&p.data[6], channel);
37
38         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
39
40         return p;
41 }
42
43 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
44                 u32 protocol_id, u16 sender_peer_id, u8 channel)
45 {
46         return makePacket(address, *data, data.getSize(),
47                         protocol_id, sender_peer_id, channel);
48 }
49
50 SharedBuffer<u8> makeOriginalPacket(
51                 SharedBuffer<u8> data)
52 {
53         u32 header_size = 1;
54         u32 packet_size = data.getSize() + header_size;
55         SharedBuffer<u8> b(packet_size);
56
57         writeU8(&b[0], TYPE_ORIGINAL);
58
59         memcpy(&b[header_size], *data, data.getSize());
60
61         return b;
62 }
63
64 core::list<SharedBuffer<u8> > makeSplitPacket(
65                 SharedBuffer<u8> data,
66                 u32 chunksize_max,
67                 u16 seqnum)
68 {
69         // Chunk packets, containing the TYPE_SPLIT header
70         core::list<SharedBuffer<u8> > chunks;
71         
72         u32 chunk_header_size = 7;
73         u32 maximum_data_size = chunksize_max - chunk_header_size;
74         u32 start = 0;
75         u32 end = 0;
76         u32 chunk_num = 0;
77         do{
78                 end = start + maximum_data_size - 1;
79                 if(end > data.getSize() - 1)
80                         end = data.getSize() - 1;
81                 
82                 u32 payload_size = end - start + 1;
83                 u32 packet_size = chunk_header_size + payload_size;
84
85                 SharedBuffer<u8> chunk(packet_size);
86                 
87                 writeU8(&chunk[0], TYPE_SPLIT);
88                 writeU16(&chunk[1], seqnum);
89                 // [3] u16 chunk_count is written at next stage
90                 writeU16(&chunk[5], chunk_num);
91                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
92
93                 chunks.push_back(chunk);
94                 
95                 start = end + 1;
96                 chunk_num++;
97         }
98         while(end != data.getSize() - 1);
99
100         u16 chunk_count = chunks.getSize();
101
102         core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
103         for(; i != chunks.end(); i++)
104         {
105                 // Write chunk_count
106                 writeU16(&((*i)[3]), chunk_count);
107         }
108
109         return chunks;
110 }
111
112 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
113                 SharedBuffer<u8> data,
114                 u32 chunksize_max,
115                 u16 &split_seqnum)
116 {
117         u32 original_header_size = 1;
118         core::list<SharedBuffer<u8> > list;
119         if(data.getSize() + original_header_size > chunksize_max)
120         {
121                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
122                 split_seqnum++;
123                 return list;
124         }
125         else
126         {
127                 list.push_back(makeOriginalPacket(data));
128         }
129         return list;
130 }
131
132 SharedBuffer<u8> makeReliablePacket(
133                 SharedBuffer<u8> data,
134                 u16 seqnum)
135 {
136         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
137         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
138                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
139         u32 header_size = 3;
140         u32 packet_size = data.getSize() + header_size;
141         SharedBuffer<u8> b(packet_size);
142
143         writeU8(&b[0], TYPE_RELIABLE);
144         writeU16(&b[1], seqnum);
145
146         memcpy(&b[header_size], *data, data.getSize());
147
148         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
149                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
150         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
151         return b;
152 }
153
154 /*
155         ReliablePacketBuffer
156 */
157
158 void ReliablePacketBuffer::print()
159 {
160         core::list<BufferedPacket>::Iterator i;
161         i = m_list.begin();
162         for(; i != m_list.end(); i++)
163         {
164                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
165                 dout_con<<s<<" ";
166         }
167 }
168 bool ReliablePacketBuffer::empty()
169 {
170         return m_list.empty();
171 }
172 u32 ReliablePacketBuffer::size()
173 {
174         return m_list.getSize();
175 }
176 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
177 {
178         core::list<BufferedPacket>::Iterator i;
179         i = m_list.begin();
180         for(; i != m_list.end(); i++)
181         {
182                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
183                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
184                                 <<", comparing to s="<<s<<std::endl;*/
185                 if(s == seqnum)
186                         break;
187         }
188         return i;
189 }
190 RPBSearchResult ReliablePacketBuffer::notFound()
191 {
192         return m_list.end();
193 }
194 u16 ReliablePacketBuffer::getFirstSeqnum()
195 {
196         if(empty())
197                 throw NotFoundException("Buffer is empty");
198         BufferedPacket p = *m_list.begin();
199         return readU16(&p.data[BASE_HEADER_SIZE+1]);
200 }
201 BufferedPacket ReliablePacketBuffer::popFirst()
202 {
203         if(empty())
204                 throw NotFoundException("Buffer is empty");
205         BufferedPacket p = *m_list.begin();
206         core::list<BufferedPacket>::Iterator i = m_list.begin();
207         m_list.erase(i);
208         return p;
209 }
210 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
211 {
212         RPBSearchResult r = findPacket(seqnum);
213         if(r == notFound()){
214                 dout_con<<"Not found"<<std::endl;
215                 throw NotFoundException("seqnum not found in buffer");
216         }
217         BufferedPacket p = *r;
218         m_list.erase(r);
219         return p;
220 }
221 void ReliablePacketBuffer::insert(BufferedPacket &p)
222 {
223         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
224         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
225         assert(type == TYPE_RELIABLE);
226         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
227
228         // Find the right place for the packet and insert it there
229
230         // If list is empty, just add it
231         if(m_list.empty())
232         {
233                 m_list.push_back(p);
234                 // Done.
235                 return;
236         }
237         // Otherwise find the right place
238         core::list<BufferedPacket>::Iterator i;
239         i = m_list.begin();
240         // Find the first packet in the list which has a higher seqnum
241         for(; i != m_list.end(); i++){
242                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
243                 if(s == seqnum){
244                         throw AlreadyExistsException("Same seqnum in list");
245                 }
246                 if(seqnum_higher(s, seqnum)){
247                         break;
248                 }
249         }
250         // If we're at the end of the list, add the packet to the
251         // end of the list
252         if(i == m_list.end())
253         {
254                 m_list.push_back(p);
255                 // Done.
256                 return;
257         }
258         // Insert before i
259         m_list.insert_before(i, p);
260 }
261
262 void ReliablePacketBuffer::incrementTimeouts(float dtime)
263 {
264         core::list<BufferedPacket>::Iterator i;
265         i = m_list.begin();
266         for(; i != m_list.end(); i++){
267                 i->time += dtime;
268                 i->totaltime += dtime;
269         }
270 }
271
272 void ReliablePacketBuffer::resetTimedOuts(float timeout)
273 {
274         core::list<BufferedPacket>::Iterator i;
275         i = m_list.begin();
276         for(; i != m_list.end(); i++){
277                 if(i->time >= timeout)
278                         i->time = 0.0;
279         }
280 }
281
282 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
283 {
284         core::list<BufferedPacket>::Iterator i;
285         i = m_list.begin();
286         for(; i != m_list.end(); i++){
287                 if(i->totaltime >= timeout)
288                         return true;
289         }
290         return false;
291 }
292
293 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
294 {
295         core::list<BufferedPacket> timed_outs;
296         core::list<BufferedPacket>::Iterator i;
297         i = m_list.begin();
298         for(; i != m_list.end(); i++)
299         {
300                 if(i->time >= timeout)
301                         timed_outs.push_back(*i);
302         }
303         return timed_outs;
304 }
305
306 /*
307         IncomingSplitBuffer
308 */
309
310 IncomingSplitBuffer::~IncomingSplitBuffer()
311 {
312         core::map<u16, IncomingSplitPacket*>::Iterator i;
313         i = m_buf.getIterator();
314         for(; i.atEnd() == false; i++)
315         {
316                 delete i.getNode()->getValue();
317         }
318 }
319 /*
320         This will throw a GotSplitPacketException when a full
321         split packet is constructed.
322 */
323 void IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
324 {
325         u32 headersize = BASE_HEADER_SIZE + 7;
326         assert(p.data.getSize() >= headersize);
327         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
328         assert(type == TYPE_SPLIT);
329         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
330         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
331         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
332
333         // Add if doesn't exist
334         if(m_buf.find(seqnum) == NULL)
335         {
336                 IncomingSplitPacket *sp = new IncomingSplitPacket();
337                 sp->chunk_count = chunk_count;
338                 sp->reliable = reliable;
339                 m_buf[seqnum] = sp;
340         }
341         
342         IncomingSplitPacket *sp = m_buf[seqnum];
343         
344         // TODO: These errors should be thrown or something? Dunno.
345         if(chunk_count != sp->chunk_count)
346                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
347                                 <<" != sp->chunk_count="<<sp->chunk_count
348                                 <<std::endl;
349         if(reliable != sp->reliable)
350                 derr_con<<"Connection: WARNING: reliable="<<reliable
351                                 <<" != sp->reliable="<<sp->reliable
352                                 <<std::endl;
353
354         // If chunk already exists, cancel
355         if(sp->chunks.find(chunk_num) != NULL)
356                 throw AlreadyExistsException("Chunk already in buffer");
357         
358         // Cut chunk data out of packet
359         u32 chunkdatasize = p.data.getSize() - headersize;
360         SharedBuffer<u8> chunkdata(chunkdatasize);
361         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
362         
363         // Set chunk data in buffer
364         sp->chunks[chunk_num] = chunkdata;
365         
366         // If not all chunks are received, return
367         if(sp->allReceived() == false)
368                 return;
369
370         // Calculate total size
371         u32 totalsize = 0;
372         core::map<u16, SharedBuffer<u8> >::Iterator i;
373         i = sp->chunks.getIterator();
374         for(; i.atEnd() == false; i++)
375         {
376                 totalsize += i.getNode()->getValue().getSize();
377         }
378         
379         SharedBuffer<u8> fulldata(totalsize);
380
381         // Copy chunks to data buffer
382         u32 start = 0;
383         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
384                         chunk_i++)
385         {
386                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
387                 u16 chunkdatasize = buf.getSize();
388                 memcpy(&fulldata[start], *buf, chunkdatasize);
389                 start += chunkdatasize;;
390         }
391
392         // Remove sp from buffer
393         m_buf.remove(seqnum);
394         delete sp;
395         
396         throw GotSplitPacketException(fulldata);
397 }
398 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
399 {
400         core::list<u16> remove_queue;
401         core::map<u16, IncomingSplitPacket*>::Iterator i;
402         i = m_buf.getIterator();
403         for(; i.atEnd() == false; i++)
404         {
405                 IncomingSplitPacket *p = i.getNode()->getValue();
406                 // Reliable ones are not removed by timeout
407                 if(p->reliable == true)
408                         continue;
409                 p->time += dtime;
410                 if(p->time >= timeout)
411                         remove_queue.push_back(i.getNode()->getKey());
412         }
413         core::list<u16>::Iterator j;
414         j = remove_queue.begin();
415         for(; j != remove_queue.end(); j++)
416         {
417                 dout_con<<"NOTE: Removing timed out unreliable split packet"
418                                 <<std::endl;
419                 delete m_buf[*j];
420                 m_buf.remove(*j);
421         }
422 }
423
424 /*
425         Channel
426 */
427
428 Channel::Channel()
429 {
430         next_outgoing_seqnum = SEQNUM_INITIAL;
431         next_incoming_seqnum = SEQNUM_INITIAL;
432         next_outgoing_split_seqnum = SEQNUM_INITIAL;
433 }
434 Channel::~Channel()
435 {
436 }
437
438 /*
439         Peer
440 */
441
442 Peer::Peer(u16 a_id, Address a_address)
443 {
444         id = a_id;
445         address = a_address;
446         timeout_counter = 0.0;
447         //resend_timeout = RESEND_TIMEOUT_MINIMUM;
448         resend_timeout = 0.5;
449         avg_rtt = -1.0;
450         has_sent_with_id = false;
451 }
452 Peer::~Peer()
453 {
454 }
455
456 void Peer::reportRTT(float rtt)
457 {
458         if(rtt < -0.999)
459         {}
460         else if(avg_rtt < 0.0)
461                 avg_rtt = rtt;
462         else
463                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
464         
465         // Calculate resend_timeout
466
467         /*int reliable_count = 0;
468         for(int i=0; i<CHANNEL_COUNT; i++)
469         {
470                 reliable_count += channels[i].outgoing_reliables.size();
471         }
472         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
473                         * ((float)reliable_count * 1);*/
474         
475         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
476         if(timeout < RESEND_TIMEOUT_MIN)
477                 timeout = RESEND_TIMEOUT_MIN;
478         if(timeout > RESEND_TIMEOUT_MAX)
479                 timeout = RESEND_TIMEOUT_MAX;
480         resend_timeout = timeout;
481 }
482                                 
483 /*
484         Connection
485 */
486
487 Connection::Connection(
488         u32 protocol_id,
489         u32 max_packet_size,
490         float timeout,
491         PeerHandler *peerhandler
492 )
493 {
494         assert(peerhandler != NULL);
495
496         m_protocol_id = protocol_id;
497         m_max_packet_size = max_packet_size;
498         m_timeout = timeout;
499         m_peer_id = PEER_ID_NEW;
500         //m_waiting_new_peer_id = false;
501         m_indentation = 0;
502         m_peerhandler = peerhandler;
503 }
504
505 Connection::~Connection()
506 {
507         // Clear peers
508         core::map<u16, Peer*>::Iterator j;
509         j = m_peers.getIterator();
510         for(; j.atEnd() == false; j++)
511         {
512                 Peer *peer = j.getNode()->getValue();
513                 delete peer;
514         }
515 }
516
517 void Connection::Serve(unsigned short port)
518 {
519         m_socket.Bind(port);
520         m_peer_id = PEER_ID_SERVER;
521 }
522
523 void Connection::Connect(Address address)
524 {
525         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
526         if(node != NULL){
527                 throw ConnectionException("Already connected to a server");
528         }
529
530         Peer *peer = new Peer(PEER_ID_SERVER, address);
531         m_peers.insert(peer->id, peer);
532         m_peerhandler->peerAdded(peer);
533         
534         m_socket.Bind(0);
535         
536         // Send a dummy packet to server with peer_id = PEER_ID_NEW
537         m_peer_id = PEER_ID_NEW;
538         SharedBuffer<u8> data(0);
539         Send(PEER_ID_SERVER, 0, data, true);
540
541         //m_waiting_new_peer_id = true;
542 }
543
544 bool Connection::Connected()
545 {
546         if(m_peers.size() != 1)
547                 return false;
548                 
549         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
550         if(node == NULL)
551                 return false;
552         
553         if(m_peer_id == PEER_ID_NEW)
554                 return false;
555         
556         return true;
557 }
558
559 SharedBuffer<u8> Channel::ProcessPacket(
560                 SharedBuffer<u8> packetdata,
561                 Connection *con,
562                 u16 peer_id,
563                 u8 channelnum,
564                 bool reliable)
565 {
566         IndentationRaiser iraiser(&(con->m_indentation));
567
568         if(packetdata.getSize() < 1)
569                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
570
571         u8 type = readU8(&packetdata[0]);
572         
573         if(type == TYPE_CONTROL)
574         {
575                 if(packetdata.getSize() < 2)
576                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
577
578                 u8 controltype = readU8(&packetdata[1]);
579
580                 if(controltype == CONTROLTYPE_ACK)
581                 {
582                         if(packetdata.getSize() < 4)
583                                 throw InvalidIncomingDataException
584                                                 ("packetdata.getSize() < 4 (ACK header size)");
585
586                         u16 seqnum = readU16(&packetdata[2]);
587                         con->PrintInfo();
588                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
589                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
590                                         <<", seqnum="<<seqnum<<std::endl;
591
592                         try{
593                                 BufferedPacket p = outgoing_reliables.popSeqnum(seqnum);
594                                 // Get round trip time
595                                 float rtt = p.totaltime;
596
597                                 // Let peer calculate stuff according to it
598                                 // (avg_rtt and resend_timeout)
599                                 Peer *peer = con->GetPeer(peer_id);
600                                 peer->reportRTT(rtt);
601
602                                 //con->PrintInfo(dout_con);
603                                 //dout_con<<"RTT = "<<rtt<<std::endl;
604
605                                 /*dout_con<<"OUTGOING: ";
606                                 con->PrintInfo();
607                                 outgoing_reliables.print();
608                                 dout_con<<std::endl;*/
609                         }
610                         catch(NotFoundException &e){
611                                 con->PrintInfo(derr_con);
612                                 derr_con<<"WARNING: ACKed packet not "
613                                                 "in outgoing queue"
614                                                 <<std::endl;
615                         }
616
617                         throw ProcessedSilentlyException("Got an ACK");
618                 }
619                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
620                 {
621                         if(packetdata.getSize() < 4)
622                                 throw InvalidIncomingDataException
623                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
624                         u16 peer_id_new = readU16(&packetdata[2]);
625                         con->PrintInfo();
626                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
627
628                         if(con->GetPeerID() != PEER_ID_NEW)
629                         {
630                                 con->PrintInfo(derr_con);
631                                 derr_con<<"WARNING: Not changing"
632                                                 " existing peer id."<<std::endl;
633                         }
634                         else
635                         {
636                                 dout_con<<"changing."<<std::endl;
637                                 con->SetPeerID(peer_id_new);
638                         }
639                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
640                 }
641                 else if(controltype == CONTROLTYPE_PING)
642                 {
643                         // Just ignore it, the incoming data already reset
644                         // the timeout counter
645                         con->PrintInfo();
646                         dout_con<<"PING"<<std::endl;
647                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
648                 }
649                 else{
650                         con->PrintInfo(derr_con);
651                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
652                                         <<((int)controltype&0xff)<<std::endl;
653                         throw InvalidIncomingDataException("Invalid control type");
654                 }
655         }
656         else if(type == TYPE_ORIGINAL)
657         {
658                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
659                         throw InvalidIncomingDataException
660                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
661                 con->PrintInfo();
662                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
663                                 <<std::endl;
664                 // Get the inside packet out and return it
665                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
666                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
667                 return payload;
668         }
669         else if(type == TYPE_SPLIT)
670         {
671                 // We have to create a packet again for buffering
672                 // This isn't actually too bad an idea.
673                 BufferedPacket packet = makePacket(
674                                 con->GetPeer(peer_id)->address,
675                                 packetdata,
676                                 con->GetProtocolID(),
677                                 peer_id,
678                                 channelnum);
679                 try{
680                         // Buffer the packet
681                         incoming_splits.insert(packet, reliable);
682                 }
683                 // This exception happens when all the pieces of a packet
684                 // are collected.
685                 catch(GotSplitPacketException &e)
686                 {
687                         con->PrintInfo();
688                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
689                                         <<"size="<<e.getData().getSize()<<std::endl;
690                         return e.getData();
691                 }
692                 con->PrintInfo();
693                 dout_con<<"BUFFERING TYPE_SPLIT"<<std::endl;
694                 throw ProcessedSilentlyException("Buffered a split packet chunk");
695         }
696         else if(type == TYPE_RELIABLE)
697         {
698                 // Recursive reliable packets not allowed
699                 assert(reliable == false);
700
701                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
702                         throw InvalidIncomingDataException
703                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
704
705                 u16 seqnum = readU16(&packetdata[1]);
706
707                 bool is_future_packet = seqnum_higher(seqnum, next_incoming_seqnum);
708                 bool is_old_packet = seqnum_higher(next_incoming_seqnum, seqnum);
709                 
710                 con->PrintInfo();
711                 if(is_future_packet)
712                         dout_con<<"BUFFERING";
713                 else if(is_old_packet)
714                         dout_con<<"OLD";
715                 else
716                         dout_con<<"RECUR";
717                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
718                                 <<" next="<<next_incoming_seqnum;
719                 dout_con<<" [sending CONTROLTYPE_ACK"
720                                 " to peer_id="<<peer_id<<"]";
721                 dout_con<<std::endl;
722                 
723                 //DEBUG
724                 //assert(incoming_reliables.size() < 100);
725
726                 // Send a CONTROLTYPE_ACK
727                 SharedBuffer<u8> reply(4);
728                 writeU8(&reply[0], TYPE_CONTROL);
729                 writeU8(&reply[1], CONTROLTYPE_ACK);
730                 writeU16(&reply[2], seqnum);
731                 con->SendAsPacket(peer_id, channelnum, reply, false);
732
733                 //if(seqnum_higher(seqnum, next_incoming_seqnum))
734                 if(is_future_packet)
735                 {
736                         /*con->PrintInfo();
737                         dout_con<<"Buffering reliable packet (seqnum="
738                                         <<seqnum<<")"<<std::endl;*/
739                         
740                         // This one comes later, buffer it.
741                         // Actually we have to make a packet to buffer one.
742                         // Well, we have all the ingredients, so just do it.
743                         BufferedPacket packet = makePacket(
744                                         con->GetPeer(peer_id)->address,
745                                         packetdata,
746                                         con->GetProtocolID(),
747                                         peer_id,
748                                         channelnum);
749                         try{
750                                 incoming_reliables.insert(packet);
751                                 
752                                 /*con->PrintInfo();
753                                 dout_con<<"INCOMING: ";
754                                 incoming_reliables.print();
755                                 dout_con<<std::endl;*/
756                         }
757                         catch(AlreadyExistsException &e)
758                         {
759                         }
760
761                         throw ProcessedSilentlyException("Buffered future reliable packet");
762                 }
763                 //else if(seqnum_higher(next_incoming_seqnum, seqnum))
764                 else if(is_old_packet)
765                 {
766                         // An old packet, dump it
767                         throw InvalidIncomingDataException("Got an old reliable packet");
768                 }
769
770                 next_incoming_seqnum++;
771
772                 // Get out the inside packet and re-process it
773                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
774                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
775
776                 return ProcessPacket(payload, con, peer_id, channelnum, true);
777         }
778         else
779         {
780                 con->PrintInfo(derr_con);
781                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
782                 throw InvalidIncomingDataException("Invalid packet type");
783         }
784         
785         // We should never get here.
786         // If you get here, add an exception or a return to some of the
787         // above conditionals.
788         assert(0);
789         throw BaseException("Error in Channel::ProcessPacket()");
790 }
791
792 SharedBuffer<u8> Channel::CheckIncomingBuffers(Connection *con,
793                 u16 &peer_id)
794 {
795         u16 firstseqnum = 0;
796         // Clear old packets from start of buffer
797         try{
798         for(;;){
799                 firstseqnum = incoming_reliables.getFirstSeqnum();
800                 if(seqnum_higher(next_incoming_seqnum, firstseqnum))
801                         incoming_reliables.popFirst();
802                 else
803                         break;
804         }
805         // This happens if all packets are old
806         }catch(con::NotFoundException)
807         {}
808         
809         if(incoming_reliables.empty() == false)
810         {
811                 if(firstseqnum == next_incoming_seqnum)
812                 {
813                         BufferedPacket p = incoming_reliables.popFirst();
814                         
815                         peer_id = readPeerId(*p.data);
816                         u8 channelnum = readChannel(*p.data);
817                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
818
819                         con->PrintInfo();
820                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
821                                         <<" seqnum="<<seqnum
822                                         <<" peer_id="<<peer_id
823                                         <<" channel="<<((int)channelnum&0xff)
824                                         <<std::endl;
825
826                         next_incoming_seqnum++;
827                         
828                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
829                         // Get out the inside packet and re-process it
830                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
831                         memcpy(*payload, &p.data[headers_size], payload.getSize());
832
833                         return ProcessPacket(payload, con, peer_id, channelnum, true);
834                 }
835         }
836                 
837         throw NoIncomingDataException("No relevant data in buffers");
838 }
839
840 SharedBuffer<u8> Connection::GetFromBuffers(u16 &peer_id)
841 {
842         core::map<u16, Peer*>::Iterator j;
843         j = m_peers.getIterator();
844         for(; j.atEnd() == false; j++)
845         {
846                 Peer *peer = j.getNode()->getValue();
847                 for(u16 i=0; i<CHANNEL_COUNT; i++)
848                 {
849                         Channel *channel = &peer->channels[i];
850                         try{
851                                 SharedBuffer<u8> resultdata = channel->CheckIncomingBuffers
852                                                 (this, peer_id);
853
854                                 return resultdata;
855                         }
856                         catch(NoIncomingDataException &e)
857                         {
858                         }
859                         catch(InvalidIncomingDataException &e)
860                         {
861                         }
862                         catch(ProcessedSilentlyException &e)
863                         {
864                         }
865                 }
866         }
867         throw NoIncomingDataException("No relevant data in buffers");
868 }
869
870 u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
871 {
872         /*
873                 Receive a packet from the network
874         */
875         
876         // TODO: We can not know how many layers of header there are.
877         // For now, just assume there are no other than the base headers.
878         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
879         Buffer<u8> packetdata(packet_maxsize);
880         
881         for(;;)
882         {
883         try
884         {
885                 /*
886                         Check if some buffer has relevant data
887                 */
888                 try{
889                         SharedBuffer<u8> resultdata = GetFromBuffers(peer_id);
890
891                         if(datasize < resultdata.getSize())
892                                 throw InvalidIncomingDataException
893                                                 ("Buffer too small for received data");
894                                 
895                         memcpy(data, *resultdata, resultdata.getSize());
896                         return resultdata.getSize();
897                 }
898                 catch(NoIncomingDataException &e)
899                 {
900                 }
901         
902                 Address sender;
903
904                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
905
906                 if(received_size < 0)
907                         throw NoIncomingDataException("No incoming data");
908                 if(received_size < BASE_HEADER_SIZE)
909                         throw InvalidIncomingDataException("No full header received");
910                 if(readU32(&packetdata[0]) != m_protocol_id)
911                         throw InvalidIncomingDataException("Invalid protocol id");
912                 
913                 peer_id = readPeerId(*packetdata);
914                 u8 channelnum = readChannel(*packetdata);
915                 if(channelnum > CHANNEL_COUNT-1){
916                         PrintInfo(derr_con);
917                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
918                         throw InvalidIncomingDataException("Channel doesn't exist");
919                 }
920
921                 if(peer_id == PEER_ID_NEW)
922                 {
923                         /*
924                                 Somebody is trying to send stuff to us with no peer id.
925                                 
926                                 Check if the same address and port was added to our peer
927                                 list before.
928                                 Allow only entries that have has_sent_with_id==false.
929                         */
930
931                         core::map<u16, Peer*>::Iterator j;
932                         j = m_peers.getIterator();
933                         for(; j.atEnd() == false; j++)
934                         {
935                                 Peer *peer = j.getNode()->getValue();
936                                 if(peer->has_sent_with_id)
937                                         continue;
938                                 if(peer->address == sender)
939                                         break;
940                         }
941                         
942                         /*
943                                 If no peer was found with the same address and port,
944                                 we shall assume it is a new peer and create an entry.
945                         */
946                         if(j.atEnd())
947                         {
948                                 // Pass on to adding the peer
949                         }
950                         // Else: A peer was found.
951                         else
952                         {
953                                 Peer *peer = j.getNode()->getValue();
954                                 peer_id = peer->id;
955                                 PrintInfo(derr_con);
956                                 derr_con<<"WARNING: Assuming unknown peer to be "
957                                                 <<"peer_id="<<peer_id<<std::endl;
958                         }
959                 }
960                 
961                 /*
962                         The peer was not found in our lists. Add it.
963                 */
964                 if(peer_id == PEER_ID_NEW)
965                 {
966                         // Somebody wants to make a new connection
967
968                         // Get a unique peer id (2 or higher)
969                         u16 peer_id_new = 2;
970                         /*
971                                 Find an unused peer id
972                         */
973                         for(;;)
974                         {
975                                 // Check if exists
976                                 if(m_peers.find(peer_id_new) == NULL)
977                                         break;
978                                 // Check for overflow
979                                 if(peer_id_new == 65535)
980                                         throw ConnectionException
981                                                 ("Connection ran out of peer ids");
982                                 peer_id_new++;
983                         }
984
985                         PrintInfo();
986                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_NEW,"
987                                         " giving peer_id="<<peer_id_new<<std::endl;
988
989                         // Create a peer
990                         Peer *peer = new Peer(peer_id_new, sender);
991                         m_peers.insert(peer->id, peer);
992                         m_peerhandler->peerAdded(peer);
993                         
994                         // Create CONTROL packet to tell the peer id to the new peer.
995                         SharedBuffer<u8> reply(4);
996                         writeU8(&reply[0], TYPE_CONTROL);
997                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
998                         writeU16(&reply[2], peer_id_new);
999                         SendAsPacket(peer_id_new, 0, reply, true);
1000                         
1001                         // We're now talking to a valid peer_id
1002                         peer_id = peer_id_new;
1003
1004                         // Go on and process whatever it sent
1005                 }
1006
1007                 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1008
1009                 if(node == NULL)
1010                 {
1011                         // Peer not found
1012                         // This means that the peer id of the sender is not PEER_ID_NEW
1013                         // and it is invalid.
1014                         PrintInfo(derr_con);
1015                         derr_con<<"Receive(): Peer not found"<<std::endl;
1016                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
1017                 }
1018
1019                 Peer *peer = node->getValue();
1020
1021                 // Validate peer address
1022                 if(peer->address != sender)
1023                 {
1024                         PrintInfo(derr_con);
1025                         derr_con<<"Peer "<<peer_id<<" sending from different address."
1026                                         " Ignoring."<<std::endl;
1027                         throw InvalidIncomingDataException
1028                                         ("Peer sending from different address");
1029                         /*// If there is more data, receive again
1030                         if(m_socket.WaitData(0) == true)
1031                                 continue;
1032                         throw NoIncomingDataException("No incoming data (2)");*/
1033                 }
1034                 
1035                 peer->timeout_counter = 0.0;
1036
1037                 Channel *channel = &(peer->channels[channelnum]);
1038                 
1039                 // Throw the received packet to channel->processPacket()
1040
1041                 // Make a new SharedBuffer from the data without the base headers
1042                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1043                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1044                                 strippeddata.getSize());
1045                 
1046                 try{
1047                         // Process it (the result is some data with no headers made by us)
1048                         SharedBuffer<u8> resultdata = channel->ProcessPacket
1049                                         (strippeddata, this, peer_id, channelnum);
1050                         
1051                         PrintInfo();
1052                         dout_con<<"ProcessPacket returned data of size "
1053                                         <<resultdata.getSize()<<std::endl;
1054                         
1055                         if(datasize < resultdata.getSize())
1056                                 throw InvalidIncomingDataException
1057                                                 ("Buffer too small for received data");
1058                         
1059                         memcpy(data, *resultdata, resultdata.getSize());
1060                         return resultdata.getSize();
1061                 }
1062                 catch(ProcessedSilentlyException &e)
1063                 {
1064                         // If there is more data, receive again
1065                         if(m_socket.WaitData(0) == true)
1066                                 continue;
1067                 }
1068                 throw NoIncomingDataException("No incoming data (2)");
1069         } // try
1070         catch(InvalidIncomingDataException &e)
1071         {
1072                 // If there is more data, receive again
1073                 if(m_socket.WaitData(0) == true)
1074                         continue;
1075         }
1076         } // for
1077 }
1078
1079 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1080 {
1081         core::map<u16, Peer*>::Iterator j;
1082         j = m_peers.getIterator();
1083         for(; j.atEnd() == false; j++)
1084         {
1085                 Peer *peer = j.getNode()->getValue();
1086                 Send(peer->id, channelnum, data, reliable);
1087         }
1088 }
1089
1090 void Connection::Send(u16 peer_id, u8 channelnum,
1091                 SharedBuffer<u8> data, bool reliable)
1092 {
1093         assert(channelnum < CHANNEL_COUNT);
1094         
1095         Peer *peer = GetPeer(peer_id);
1096         Channel *channel = &(peer->channels[channelnum]);
1097
1098         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1099         if(reliable)
1100                 chunksize_max -= RELIABLE_HEADER_SIZE;
1101
1102         core::list<SharedBuffer<u8> > originals;
1103         originals = makeAutoSplitPacket(data, chunksize_max,
1104                         channel->next_outgoing_split_seqnum);
1105         
1106         core::list<SharedBuffer<u8> >::Iterator i;
1107         i = originals.begin();
1108         for(; i != originals.end(); i++)
1109         {
1110                 SharedBuffer<u8> original = *i;
1111                 
1112                 SendAsPacket(peer_id, channelnum, original, reliable);
1113         }
1114 }
1115
1116 void Connection::SendAsPacket(u16 peer_id, u8 channelnum,
1117                 SharedBuffer<u8> data, bool reliable)
1118 {
1119         Peer *peer = GetPeer(peer_id);
1120         Channel *channel = &(peer->channels[channelnum]);
1121
1122         if(reliable)
1123         {
1124                 u16 seqnum = channel->next_outgoing_seqnum;
1125                 channel->next_outgoing_seqnum++;
1126
1127                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1128
1129                 // Add base headers and make a packet
1130                 BufferedPacket p = makePacket(peer->address, reliable,
1131                                 m_protocol_id, m_peer_id, channelnum);
1132                 
1133                 try{
1134                         // Buffer the packet
1135                         channel->outgoing_reliables.insert(p);
1136                 }
1137                 catch(AlreadyExistsException &e)
1138                 {
1139                         PrintInfo(derr_con);
1140                         derr_con<<"WARNING: Going to send a reliable packet "
1141                                         "seqnum="<<seqnum<<" that is already "
1142                                         "in outgoing buffer"<<std::endl;
1143                         //assert(0);
1144                 }
1145                 
1146                 // Send the packet
1147                 RawSend(p);
1148         }
1149         else
1150         {
1151                 // Add base headers and make a packet
1152                 BufferedPacket p = makePacket(peer->address, data,
1153                                 m_protocol_id, m_peer_id, channelnum);
1154
1155                 // Send the packet
1156                 RawSend(p);
1157         }
1158 }
1159
1160 void Connection::RawSend(const BufferedPacket &packet)
1161 {
1162         m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1163 }
1164
1165 void Connection::RunTimeouts(float dtime)
1166 {
1167         core::list<u16> timeouted_peers;
1168         core::map<u16, Peer*>::Iterator j;
1169         j = m_peers.getIterator();
1170         for(; j.atEnd() == false; j++)
1171         {
1172                 Peer *peer = j.getNode()->getValue();
1173                 
1174                 /*
1175                         Check peer timeout
1176                 */
1177                 peer->timeout_counter += dtime;
1178                 if(peer->timeout_counter > m_timeout)
1179                 {
1180                         PrintInfo(derr_con);
1181                         derr_con<<"RunTimeouts(): Peer "<<peer->id
1182                                         <<" has timed out."
1183                                         <<" (source=peer->timeout_counter)"
1184                                         <<std::endl;
1185                         // Add peer to the list
1186                         timeouted_peers.push_back(peer->id);
1187                         // Don't bother going through the buffers of this one
1188                         continue;
1189                 }
1190
1191                 float resend_timeout = peer->resend_timeout;
1192                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1193                 {
1194                         core::list<BufferedPacket> timed_outs;
1195                         core::list<BufferedPacket>::Iterator j;
1196                         
1197                         Channel *channel = &peer->channels[i];
1198
1199                         // Remove timed out incomplete unreliable split packets
1200                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
1201                         
1202                         // Increment reliable packet times
1203                         channel->outgoing_reliables.incrementTimeouts(dtime);
1204
1205                         // Check reliable packet total times, remove peer if
1206                         // over timeout.
1207                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
1208                         {
1209                                 PrintInfo(derr_con);
1210                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
1211                                                 <<" has timed out."
1212                                                 <<" (source=reliable packet totaltime)"
1213                                                 <<std::endl;
1214                                 // Add peer to the to-be-removed list
1215                                 timeouted_peers.push_back(peer->id);
1216                                 goto nextpeer;
1217                         }
1218
1219                         // Re-send timed out outgoing reliables
1220                         
1221                         timed_outs = channel->
1222                                         outgoing_reliables.getTimedOuts(resend_timeout);
1223
1224                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
1225
1226                         j = timed_outs.begin();
1227                         for(; j != timed_outs.end(); j++)
1228                         {
1229                                 u16 peer_id = readPeerId(*(j->data));
1230                                 u8 channel = readChannel(*(j->data));
1231                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
1232
1233                                 PrintInfo(derr_con);
1234                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
1235                                 j->address.print(&derr_con);
1236                                 derr_con<<"(t/o="<<resend_timeout<<"): "
1237                                                 <<"from_peer_id="<<peer_id
1238                                                 <<", channel="<<((int)channel&0xff)
1239                                                 <<", seqnum="<<seqnum
1240                                                 <<std::endl;
1241
1242                                 RawSend(*j);
1243
1244                                 // Enlarge avg_rtt and resend_timeout:
1245                                 // The rtt will be at least the timeout.
1246                                 // NOTE: This won't affect the timeout of the next
1247                                 // checked channel because it was cached.
1248                                 peer->reportRTT(resend_timeout);
1249                         }
1250                 }
1251                 
1252                 /*
1253                         Send pings
1254                 */
1255                 peer->ping_timer += dtime;
1256                 if(peer->ping_timer >= 5.0)
1257                 {
1258                         // Create and send PING packet
1259                         SharedBuffer<u8> data(2);
1260                         writeU8(&data[0], TYPE_CONTROL);
1261                         writeU8(&data[1], CONTROLTYPE_PING);
1262                         SendAsPacket(peer->id, 0, data, true);
1263
1264                         peer->ping_timer = 0.0;
1265                 }
1266                 
1267 nextpeer:
1268                 continue;
1269         }
1270
1271         // Remove timeouted peers
1272         core::list<u16>::Iterator i = timeouted_peers.begin();
1273         for(; i != timeouted_peers.end(); i++)
1274         {
1275                 PrintInfo(derr_con);
1276                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1277                 m_peerhandler->deletingPeer(m_peers[*i], true);
1278                 delete m_peers[*i];
1279                 m_peers.remove(*i);
1280         }
1281 }
1282
1283 Peer* Connection::GetPeer(u16 peer_id)
1284 {
1285         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1286
1287         if(node == NULL){
1288                 // Peer not found
1289                 throw PeerNotFoundException("Peer not found (possible timeout)");
1290         }
1291
1292         // Error checking
1293         assert(node->getValue()->id == peer_id);
1294
1295         return node->getValue();
1296 }
1297
1298 Peer* Connection::GetPeerNoEx(u16 peer_id)
1299 {
1300         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1301
1302         if(node == NULL){
1303                 return NULL;
1304         }
1305
1306         // Error checking
1307         assert(node->getValue()->id == peer_id);
1308
1309         return node->getValue();
1310 }
1311
1312 core::list<Peer*> Connection::GetPeers()
1313 {
1314         core::list<Peer*> list;
1315         core::map<u16, Peer*>::Iterator j;
1316         j = m_peers.getIterator();
1317         for(; j.atEnd() == false; j++)
1318         {
1319                 Peer *peer = j.getNode()->getValue();
1320                 list.push_back(peer);
1321         }
1322         return list;
1323 }
1324
1325 void Connection::PrintInfo(std::ostream &out)
1326 {
1327         out<<m_socket.GetHandle();
1328         out<<" ";
1329         out<<"con "<<m_peer_id<<": ";
1330         for(s16 i=0; i<(s16)m_indentation-1; i++)
1331                 out<<"  ";
1332 }
1333
1334 void Connection::PrintInfo()
1335 {
1336         PrintInfo(dout_con);
1337 }
1338
1339 } // namespace
1340