]> git.lizzy.rs Git - minetest.git/blob - src/connection.cpp
added dedicated server build without irrlicht
[minetest.git] / src / connection.cpp
1 /*
2 Minetest-c55
3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "connection.h"
21 #include "main.h"
22 #include "serialization.h"
23
24 namespace con
25 {
26
27 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
28                 u32 protocol_id, u16 sender_peer_id, u8 channel)
29 {
30         u32 packet_size = datasize + BASE_HEADER_SIZE;
31         BufferedPacket p(packet_size);
32         p.address = address;
33
34         writeU32(&p.data[0], protocol_id);
35         writeU16(&p.data[4], sender_peer_id);
36         writeU8(&p.data[6], channel);
37
38         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
39
40         return p;
41 }
42
43 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
44                 u32 protocol_id, u16 sender_peer_id, u8 channel)
45 {
46         return makePacket(address, *data, data.getSize(),
47                         protocol_id, sender_peer_id, channel);
48 }
49
50 SharedBuffer<u8> makeOriginalPacket(
51                 SharedBuffer<u8> data)
52 {
53         u32 header_size = 1;
54         u32 packet_size = data.getSize() + header_size;
55         SharedBuffer<u8> b(packet_size);
56
57         writeU8(&b[0], TYPE_ORIGINAL);
58
59         memcpy(&b[header_size], *data, data.getSize());
60
61         return b;
62 }
63
64 core::list<SharedBuffer<u8> > makeSplitPacket(
65                 SharedBuffer<u8> data,
66                 u32 chunksize_max,
67                 u16 seqnum)
68 {
69         // Chunk packets, containing the TYPE_SPLIT header
70         core::list<SharedBuffer<u8> > chunks;
71         
72         u32 chunk_header_size = 7;
73         u32 maximum_data_size = chunksize_max - chunk_header_size;
74         u32 start = 0;
75         u32 end = 0;
76         u32 chunk_num = 0;
77         do{
78                 end = start + maximum_data_size - 1;
79                 if(end > data.getSize() - 1)
80                         end = data.getSize() - 1;
81                 
82                 u32 payload_size = end - start + 1;
83                 u32 packet_size = chunk_header_size + payload_size;
84
85                 SharedBuffer<u8> chunk(packet_size);
86                 
87                 writeU8(&chunk[0], TYPE_SPLIT);
88                 writeU16(&chunk[1], seqnum);
89                 // [3] u16 chunk_count is written at next stage
90                 writeU16(&chunk[5], chunk_num);
91                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
92
93                 chunks.push_back(chunk);
94                 
95                 start = end + 1;
96                 chunk_num++;
97         }
98         while(end != data.getSize() - 1);
99
100         u16 chunk_count = chunks.getSize();
101
102         core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
103         for(; i != chunks.end(); i++)
104         {
105                 // Write chunk_count
106                 writeU16(&((*i)[3]), chunk_count);
107         }
108
109         return chunks;
110 }
111
112 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
113                 SharedBuffer<u8> data,
114                 u32 chunksize_max,
115                 u16 &split_seqnum)
116 {
117         u32 original_header_size = 1;
118         core::list<SharedBuffer<u8> > list;
119         if(data.getSize() + original_header_size > chunksize_max)
120         {
121                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
122                 split_seqnum++;
123                 return list;
124         }
125         else
126         {
127                 list.push_back(makeOriginalPacket(data));
128         }
129         return list;
130 }
131
132 SharedBuffer<u8> makeReliablePacket(
133                 SharedBuffer<u8> data,
134                 u16 seqnum)
135 {
136         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
137         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
138                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
139         u32 header_size = 3;
140         u32 packet_size = data.getSize() + header_size;
141         SharedBuffer<u8> b(packet_size);
142
143         writeU8(&b[0], TYPE_RELIABLE);
144         writeU16(&b[1], seqnum);
145
146         memcpy(&b[header_size], *data, data.getSize());
147
148         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
149                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
150         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
151         return b;
152 }
153
154 /*
155         ReliablePacketBuffer
156 */
157
158 void ReliablePacketBuffer::print()
159 {
160         core::list<BufferedPacket>::Iterator i;
161         i = m_list.begin();
162         for(; i != m_list.end(); i++)
163         {
164                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
165                 dout_con<<s<<" ";
166         }
167 }
168 bool ReliablePacketBuffer::empty()
169 {
170         return m_list.empty();
171 }
172 u32 ReliablePacketBuffer::size()
173 {
174         return m_list.getSize();
175 }
176 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
177 {
178         core::list<BufferedPacket>::Iterator i;
179         i = m_list.begin();
180         for(; i != m_list.end(); i++)
181         {
182                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
183                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
184                                 <<", comparing to s="<<s<<std::endl;*/
185                 if(s == seqnum)
186                         break;
187         }
188         return i;
189 }
190 RPBSearchResult ReliablePacketBuffer::notFound()
191 {
192         return m_list.end();
193 }
194 u16 ReliablePacketBuffer::getFirstSeqnum()
195 {
196         if(empty())
197                 throw NotFoundException("Buffer is empty");
198         BufferedPacket p = *m_list.begin();
199         return readU16(&p.data[BASE_HEADER_SIZE+1]);
200 }
201 BufferedPacket ReliablePacketBuffer::popFirst()
202 {
203         if(empty())
204                 throw NotFoundException("Buffer is empty");
205         BufferedPacket p = *m_list.begin();
206         core::list<BufferedPacket>::Iterator i = m_list.begin();
207         m_list.erase(i);
208         return p;
209 }
210 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
211 {
212         RPBSearchResult r = findPacket(seqnum);
213         if(r == notFound()){
214                 dout_con<<"Not found"<<std::endl;
215                 throw NotFoundException("seqnum not found in buffer");
216         }
217         BufferedPacket p = *r;
218         m_list.erase(r);
219         return p;
220 }
221 void ReliablePacketBuffer::insert(BufferedPacket &p)
222 {
223         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
224         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
225         assert(type == TYPE_RELIABLE);
226         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
227
228         // Find the right place for the packet and insert it there
229
230         // If list is empty, just add it
231         if(m_list.empty())
232         {
233                 m_list.push_back(p);
234                 // Done.
235                 return;
236         }
237         // Otherwise find the right place
238         core::list<BufferedPacket>::Iterator i;
239         i = m_list.begin();
240         // Find the first packet in the list which has a higher seqnum
241         for(; i != m_list.end(); i++){
242                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
243                 if(s == seqnum){
244                         throw AlreadyExistsException("Same seqnum in list");
245                 }
246                 if(seqnum_higher(s, seqnum)){
247                         break;
248                 }
249         }
250         // If we're at the end of the list, add the packet to the
251         // end of the list
252         if(i == m_list.end())
253         {
254                 m_list.push_back(p);
255                 // Done.
256                 return;
257         }
258         // Insert before i
259         m_list.insert_before(i, p);
260 }
261
262 void ReliablePacketBuffer::incrementTimeouts(float dtime)
263 {
264         core::list<BufferedPacket>::Iterator i;
265         i = m_list.begin();
266         for(; i != m_list.end(); i++){
267                 i->time += dtime;
268                 i->totaltime += dtime;
269         }
270 }
271
272 void ReliablePacketBuffer::resetTimedOuts(float timeout)
273 {
274         core::list<BufferedPacket>::Iterator i;
275         i = m_list.begin();
276         for(; i != m_list.end(); i++){
277                 if(i->time >= timeout)
278                         i->time = 0.0;
279         }
280 }
281
282 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
283 {
284         core::list<BufferedPacket>::Iterator i;
285         i = m_list.begin();
286         for(; i != m_list.end(); i++){
287                 if(i->totaltime >= timeout)
288                         return true;
289         }
290         return false;
291 }
292
293 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
294 {
295         core::list<BufferedPacket> timed_outs;
296         core::list<BufferedPacket>::Iterator i;
297         i = m_list.begin();
298         for(; i != m_list.end(); i++)
299         {
300                 if(i->time >= timeout)
301                         timed_outs.push_back(*i);
302         }
303         return timed_outs;
304 }
305
306 /*
307         IncomingSplitBuffer
308 */
309
310 IncomingSplitBuffer::~IncomingSplitBuffer()
311 {
312         core::map<u16, IncomingSplitPacket*>::Iterator i;
313         i = m_buf.getIterator();
314         for(; i.atEnd() == false; i++)
315         {
316                 delete i.getNode()->getValue();
317         }
318 }
319 /*
320         This will throw a GotSplitPacketException when a full
321         split packet is constructed.
322 */
323 void IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
324 {
325         u32 headersize = BASE_HEADER_SIZE + 7;
326         assert(p.data.getSize() >= headersize);
327         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
328         assert(type == TYPE_SPLIT);
329         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
330         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
331         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
332
333         // Add if doesn't exist
334         if(m_buf.find(seqnum) == NULL)
335         {
336                 IncomingSplitPacket *sp = new IncomingSplitPacket();
337                 sp->chunk_count = chunk_count;
338                 sp->reliable = reliable;
339                 m_buf[seqnum] = sp;
340         }
341         
342         IncomingSplitPacket *sp = m_buf[seqnum];
343         
344         // TODO: These errors should be thrown or something? Dunno.
345         if(chunk_count != sp->chunk_count)
346                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
347                                 <<" != sp->chunk_count="<<sp->chunk_count
348                                 <<std::endl;
349         if(reliable != sp->reliable)
350                 derr_con<<"Connection: WARNING: reliable="<<reliable
351                                 <<" != sp->reliable="<<sp->reliable
352                                 <<std::endl;
353
354         // If chunk already exists, cancel
355         if(sp->chunks.find(chunk_num) != NULL)
356                 throw AlreadyExistsException("Chunk already in buffer");
357         
358         // Cut chunk data out of packet
359         u32 chunkdatasize = p.data.getSize() - headersize;
360         SharedBuffer<u8> chunkdata(chunkdatasize);
361         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
362         
363         // Set chunk data in buffer
364         sp->chunks[chunk_num] = chunkdata;
365         
366         // If not all chunks are received, return
367         if(sp->allReceived() == false)
368                 return;
369
370         // Calculate total size
371         u32 totalsize = 0;
372         core::map<u16, SharedBuffer<u8> >::Iterator i;
373         i = sp->chunks.getIterator();
374         for(; i.atEnd() == false; i++)
375         {
376                 totalsize += i.getNode()->getValue().getSize();
377         }
378         
379         SharedBuffer<u8> fulldata(totalsize);
380
381         // Copy chunks to data buffer
382         u32 start = 0;
383         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
384                         chunk_i++)
385         {
386                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
387                 u16 chunkdatasize = buf.getSize();
388                 memcpy(&fulldata[start], *buf, chunkdatasize);
389                 start += chunkdatasize;;
390         }
391
392         // Remove sp from buffer
393         m_buf.remove(seqnum);
394         delete sp;
395         
396         throw GotSplitPacketException(fulldata);
397 }
398 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
399 {
400         core::list<u16> remove_queue;
401         core::map<u16, IncomingSplitPacket*>::Iterator i;
402         i = m_buf.getIterator();
403         for(; i.atEnd() == false; i++)
404         {
405                 IncomingSplitPacket *p = i.getNode()->getValue();
406                 // Reliable ones are not removed by timeout
407                 if(p->reliable == true)
408                         continue;
409                 p->time += dtime;
410                 if(p->time >= timeout)
411                         remove_queue.push_back(i.getNode()->getKey());
412         }
413         core::list<u16>::Iterator j;
414         j = remove_queue.begin();
415         for(; j != remove_queue.end(); j++)
416         {
417                 dout_con<<"NOTE: Removing timed out unreliable split packet"
418                                 <<std::endl;
419                 delete m_buf[*j];
420                 m_buf.remove(*j);
421         }
422 }
423
424 /*
425         Channel
426 */
427
428 Channel::Channel()
429 {
430         next_outgoing_seqnum = SEQNUM_INITIAL;
431         next_incoming_seqnum = SEQNUM_INITIAL;
432         next_outgoing_split_seqnum = SEQNUM_INITIAL;
433 }
434 Channel::~Channel()
435 {
436 }
437
438 /*
439         Peer
440 */
441
442 Peer::Peer(u16 a_id, Address a_address)
443 {
444         id = a_id;
445         address = a_address;
446         timeout_counter = 0.0;
447         //resend_timeout = RESEND_TIMEOUT_MINIMUM;
448         ping_timer = 0.0;
449         resend_timeout = 0.5;
450         avg_rtt = -1.0;
451         has_sent_with_id = false;
452 }
453 Peer::~Peer()
454 {
455 }
456
457 void Peer::reportRTT(float rtt)
458 {
459         if(rtt < -0.999)
460         {}
461         else if(avg_rtt < 0.0)
462                 avg_rtt = rtt;
463         else
464                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
465         
466         // Calculate resend_timeout
467
468         /*int reliable_count = 0;
469         for(int i=0; i<CHANNEL_COUNT; i++)
470         {
471                 reliable_count += channels[i].outgoing_reliables.size();
472         }
473         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
474                         * ((float)reliable_count * 1);*/
475         
476         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
477         if(timeout < RESEND_TIMEOUT_MIN)
478                 timeout = RESEND_TIMEOUT_MIN;
479         if(timeout > RESEND_TIMEOUT_MAX)
480                 timeout = RESEND_TIMEOUT_MAX;
481         resend_timeout = timeout;
482 }
483                                 
484 /*
485         Connection
486 */
487
488 Connection::Connection(
489         u32 protocol_id,
490         u32 max_packet_size,
491         float timeout,
492         PeerHandler *peerhandler
493 )
494 {
495         assert(peerhandler != NULL);
496
497         m_protocol_id = protocol_id;
498         m_max_packet_size = max_packet_size;
499         m_timeout = timeout;
500         m_peer_id = PEER_ID_NEW;
501         //m_waiting_new_peer_id = false;
502         m_indentation = 0;
503         m_peerhandler = peerhandler;
504 }
505
506 Connection::~Connection()
507 {
508         // Clear peers
509         core::map<u16, Peer*>::Iterator j;
510         j = m_peers.getIterator();
511         for(; j.atEnd() == false; j++)
512         {
513                 Peer *peer = j.getNode()->getValue();
514                 delete peer;
515         }
516 }
517
518 void Connection::Serve(unsigned short port)
519 {
520         m_socket.Bind(port);
521         m_peer_id = PEER_ID_SERVER;
522 }
523
524 void Connection::Connect(Address address)
525 {
526         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
527         if(node != NULL){
528                 throw ConnectionException("Already connected to a server");
529         }
530
531         Peer *peer = new Peer(PEER_ID_SERVER, address);
532         m_peers.insert(peer->id, peer);
533         m_peerhandler->peerAdded(peer);
534         
535         m_socket.Bind(0);
536         
537         // Send a dummy packet to server with peer_id = PEER_ID_NEW
538         m_peer_id = PEER_ID_NEW;
539         SharedBuffer<u8> data(0);
540         Send(PEER_ID_SERVER, 0, data, true);
541
542         //m_waiting_new_peer_id = true;
543 }
544
545 bool Connection::Connected()
546 {
547         if(m_peers.size() != 1)
548                 return false;
549                 
550         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
551         if(node == NULL)
552                 return false;
553         
554         if(m_peer_id == PEER_ID_NEW)
555                 return false;
556         
557         return true;
558 }
559
560 SharedBuffer<u8> Channel::ProcessPacket(
561                 SharedBuffer<u8> packetdata,
562                 Connection *con,
563                 u16 peer_id,
564                 u8 channelnum,
565                 bool reliable)
566 {
567         IndentationRaiser iraiser(&(con->m_indentation));
568
569         if(packetdata.getSize() < 1)
570                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
571
572         u8 type = readU8(&packetdata[0]);
573         
574         if(type == TYPE_CONTROL)
575         {
576                 if(packetdata.getSize() < 2)
577                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
578
579                 u8 controltype = readU8(&packetdata[1]);
580
581                 if(controltype == CONTROLTYPE_ACK)
582                 {
583                         if(packetdata.getSize() < 4)
584                                 throw InvalidIncomingDataException
585                                                 ("packetdata.getSize() < 4 (ACK header size)");
586
587                         u16 seqnum = readU16(&packetdata[2]);
588                         con->PrintInfo();
589                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
590                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
591                                         <<", seqnum="<<seqnum<<std::endl;
592
593                         try{
594                                 BufferedPacket p = outgoing_reliables.popSeqnum(seqnum);
595                                 // Get round trip time
596                                 float rtt = p.totaltime;
597
598                                 // Let peer calculate stuff according to it
599                                 // (avg_rtt and resend_timeout)
600                                 Peer *peer = con->GetPeer(peer_id);
601                                 peer->reportRTT(rtt);
602
603                                 //con->PrintInfo(dout_con);
604                                 //dout_con<<"RTT = "<<rtt<<std::endl;
605
606                                 /*dout_con<<"OUTGOING: ";
607                                 con->PrintInfo();
608                                 outgoing_reliables.print();
609                                 dout_con<<std::endl;*/
610                         }
611                         catch(NotFoundException &e){
612                                 con->PrintInfo(derr_con);
613                                 derr_con<<"WARNING: ACKed packet not "
614                                                 "in outgoing queue"
615                                                 <<std::endl;
616                         }
617
618                         throw ProcessedSilentlyException("Got an ACK");
619                 }
620                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
621                 {
622                         if(packetdata.getSize() < 4)
623                                 throw InvalidIncomingDataException
624                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
625                         u16 peer_id_new = readU16(&packetdata[2]);
626                         con->PrintInfo();
627                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
628
629                         if(con->GetPeerID() != PEER_ID_NEW)
630                         {
631                                 con->PrintInfo(derr_con);
632                                 derr_con<<"WARNING: Not changing"
633                                                 " existing peer id."<<std::endl;
634                         }
635                         else
636                         {
637                                 dout_con<<"changing."<<std::endl;
638                                 con->SetPeerID(peer_id_new);
639                         }
640                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
641                 }
642                 else if(controltype == CONTROLTYPE_PING)
643                 {
644                         // Just ignore it, the incoming data already reset
645                         // the timeout counter
646                         con->PrintInfo();
647                         dout_con<<"PING"<<std::endl;
648                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
649                 }
650                 else{
651                         con->PrintInfo(derr_con);
652                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
653                                         <<((int)controltype&0xff)<<std::endl;
654                         throw InvalidIncomingDataException("Invalid control type");
655                 }
656         }
657         else if(type == TYPE_ORIGINAL)
658         {
659                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
660                         throw InvalidIncomingDataException
661                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
662                 con->PrintInfo();
663                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
664                                 <<std::endl;
665                 // Get the inside packet out and return it
666                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
667                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
668                 return payload;
669         }
670         else if(type == TYPE_SPLIT)
671         {
672                 // We have to create a packet again for buffering
673                 // This isn't actually too bad an idea.
674                 BufferedPacket packet = makePacket(
675                                 con->GetPeer(peer_id)->address,
676                                 packetdata,
677                                 con->GetProtocolID(),
678                                 peer_id,
679                                 channelnum);
680                 try{
681                         // Buffer the packet
682                         incoming_splits.insert(packet, reliable);
683                 }
684                 // This exception happens when all the pieces of a packet
685                 // are collected.
686                 catch(GotSplitPacketException &e)
687                 {
688                         con->PrintInfo();
689                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
690                                         <<"size="<<e.getData().getSize()<<std::endl;
691                         return e.getData();
692                 }
693                 con->PrintInfo();
694                 dout_con<<"BUFFERING TYPE_SPLIT"<<std::endl;
695                 throw ProcessedSilentlyException("Buffered a split packet chunk");
696         }
697         else if(type == TYPE_RELIABLE)
698         {
699                 // Recursive reliable packets not allowed
700                 assert(reliable == false);
701
702                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
703                         throw InvalidIncomingDataException
704                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
705
706                 u16 seqnum = readU16(&packetdata[1]);
707
708                 bool is_future_packet = seqnum_higher(seqnum, next_incoming_seqnum);
709                 bool is_old_packet = seqnum_higher(next_incoming_seqnum, seqnum);
710                 
711                 con->PrintInfo();
712                 if(is_future_packet)
713                         dout_con<<"BUFFERING";
714                 else if(is_old_packet)
715                         dout_con<<"OLD";
716                 else
717                         dout_con<<"RECUR";
718                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
719                                 <<" next="<<next_incoming_seqnum;
720                 dout_con<<" [sending CONTROLTYPE_ACK"
721                                 " to peer_id="<<peer_id<<"]";
722                 dout_con<<std::endl;
723                 
724                 //DEBUG
725                 //assert(incoming_reliables.size() < 100);
726
727                 // Send a CONTROLTYPE_ACK
728                 SharedBuffer<u8> reply(4);
729                 writeU8(&reply[0], TYPE_CONTROL);
730                 writeU8(&reply[1], CONTROLTYPE_ACK);
731                 writeU16(&reply[2], seqnum);
732                 con->SendAsPacket(peer_id, channelnum, reply, false);
733
734                 //if(seqnum_higher(seqnum, next_incoming_seqnum))
735                 if(is_future_packet)
736                 {
737                         /*con->PrintInfo();
738                         dout_con<<"Buffering reliable packet (seqnum="
739                                         <<seqnum<<")"<<std::endl;*/
740                         
741                         // This one comes later, buffer it.
742                         // Actually we have to make a packet to buffer one.
743                         // Well, we have all the ingredients, so just do it.
744                         BufferedPacket packet = makePacket(
745                                         con->GetPeer(peer_id)->address,
746                                         packetdata,
747                                         con->GetProtocolID(),
748                                         peer_id,
749                                         channelnum);
750                         try{
751                                 incoming_reliables.insert(packet);
752                                 
753                                 /*con->PrintInfo();
754                                 dout_con<<"INCOMING: ";
755                                 incoming_reliables.print();
756                                 dout_con<<std::endl;*/
757                         }
758                         catch(AlreadyExistsException &e)
759                         {
760                         }
761
762                         throw ProcessedSilentlyException("Buffered future reliable packet");
763                 }
764                 //else if(seqnum_higher(next_incoming_seqnum, seqnum))
765                 else if(is_old_packet)
766                 {
767                         // An old packet, dump it
768                         throw InvalidIncomingDataException("Got an old reliable packet");
769                 }
770
771                 next_incoming_seqnum++;
772
773                 // Get out the inside packet and re-process it
774                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
775                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
776
777                 return ProcessPacket(payload, con, peer_id, channelnum, true);
778         }
779         else
780         {
781                 con->PrintInfo(derr_con);
782                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
783                 throw InvalidIncomingDataException("Invalid packet type");
784         }
785         
786         // We should never get here.
787         // If you get here, add an exception or a return to some of the
788         // above conditionals.
789         assert(0);
790         throw BaseException("Error in Channel::ProcessPacket()");
791 }
792
793 SharedBuffer<u8> Channel::CheckIncomingBuffers(Connection *con,
794                 u16 &peer_id)
795 {
796         u16 firstseqnum = 0;
797         // Clear old packets from start of buffer
798         try{
799         for(;;){
800                 firstseqnum = incoming_reliables.getFirstSeqnum();
801                 if(seqnum_higher(next_incoming_seqnum, firstseqnum))
802                         incoming_reliables.popFirst();
803                 else
804                         break;
805         }
806         // This happens if all packets are old
807         }catch(con::NotFoundException)
808         {}
809         
810         if(incoming_reliables.empty() == false)
811         {
812                 if(firstseqnum == next_incoming_seqnum)
813                 {
814                         BufferedPacket p = incoming_reliables.popFirst();
815                         
816                         peer_id = readPeerId(*p.data);
817                         u8 channelnum = readChannel(*p.data);
818                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
819
820                         con->PrintInfo();
821                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
822                                         <<" seqnum="<<seqnum
823                                         <<" peer_id="<<peer_id
824                                         <<" channel="<<((int)channelnum&0xff)
825                                         <<std::endl;
826
827                         next_incoming_seqnum++;
828                         
829                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
830                         // Get out the inside packet and re-process it
831                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
832                         memcpy(*payload, &p.data[headers_size], payload.getSize());
833
834                         return ProcessPacket(payload, con, peer_id, channelnum, true);
835                 }
836         }
837                 
838         throw NoIncomingDataException("No relevant data in buffers");
839 }
840
841 SharedBuffer<u8> Connection::GetFromBuffers(u16 &peer_id)
842 {
843         core::map<u16, Peer*>::Iterator j;
844         j = m_peers.getIterator();
845         for(; j.atEnd() == false; j++)
846         {
847                 Peer *peer = j.getNode()->getValue();
848                 for(u16 i=0; i<CHANNEL_COUNT; i++)
849                 {
850                         Channel *channel = &peer->channels[i];
851                         try{
852                                 SharedBuffer<u8> resultdata = channel->CheckIncomingBuffers
853                                                 (this, peer_id);
854
855                                 return resultdata;
856                         }
857                         catch(NoIncomingDataException &e)
858                         {
859                         }
860                         catch(InvalidIncomingDataException &e)
861                         {
862                         }
863                         catch(ProcessedSilentlyException &e)
864                         {
865                         }
866                 }
867         }
868         throw NoIncomingDataException("No relevant data in buffers");
869 }
870
871 u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
872 {
873         /*
874                 Receive a packet from the network
875         */
876         
877         // TODO: We can not know how many layers of header there are.
878         // For now, just assume there are no other than the base headers.
879         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
880         Buffer<u8> packetdata(packet_maxsize);
881         
882         for(;;)
883         {
884         try
885         {
886                 /*
887                         Check if some buffer has relevant data
888                 */
889                 try{
890                         SharedBuffer<u8> resultdata = GetFromBuffers(peer_id);
891
892                         if(datasize < resultdata.getSize())
893                                 throw InvalidIncomingDataException
894                                                 ("Buffer too small for received data");
895                                 
896                         memcpy(data, *resultdata, resultdata.getSize());
897                         return resultdata.getSize();
898                 }
899                 catch(NoIncomingDataException &e)
900                 {
901                 }
902         
903                 Address sender;
904
905                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
906
907                 if(received_size < 0)
908                         throw NoIncomingDataException("No incoming data");
909                 if(received_size < BASE_HEADER_SIZE)
910                         throw InvalidIncomingDataException("No full header received");
911                 if(readU32(&packetdata[0]) != m_protocol_id)
912                         throw InvalidIncomingDataException("Invalid protocol id");
913                 
914                 peer_id = readPeerId(*packetdata);
915                 u8 channelnum = readChannel(*packetdata);
916                 if(channelnum > CHANNEL_COUNT-1){
917                         PrintInfo(derr_con);
918                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
919                         throw InvalidIncomingDataException("Channel doesn't exist");
920                 }
921
922                 if(peer_id == PEER_ID_NEW)
923                 {
924                         /*
925                                 Somebody is trying to send stuff to us with no peer id.
926                                 
927                                 Check if the same address and port was added to our peer
928                                 list before.
929                                 Allow only entries that have has_sent_with_id==false.
930                         */
931
932                         core::map<u16, Peer*>::Iterator j;
933                         j = m_peers.getIterator();
934                         for(; j.atEnd() == false; j++)
935                         {
936                                 Peer *peer = j.getNode()->getValue();
937                                 if(peer->has_sent_with_id)
938                                         continue;
939                                 if(peer->address == sender)
940                                         break;
941                         }
942                         
943                         /*
944                                 If no peer was found with the same address and port,
945                                 we shall assume it is a new peer and create an entry.
946                         */
947                         if(j.atEnd())
948                         {
949                                 // Pass on to adding the peer
950                         }
951                         // Else: A peer was found.
952                         else
953                         {
954                                 Peer *peer = j.getNode()->getValue();
955                                 peer_id = peer->id;
956                                 PrintInfo(derr_con);
957                                 derr_con<<"WARNING: Assuming unknown peer to be "
958                                                 <<"peer_id="<<peer_id<<std::endl;
959                         }
960                 }
961                 
962                 /*
963                         The peer was not found in our lists. Add it.
964                 */
965                 if(peer_id == PEER_ID_NEW)
966                 {
967                         // Somebody wants to make a new connection
968
969                         // Get a unique peer id (2 or higher)
970                         u16 peer_id_new = 2;
971                         /*
972                                 Find an unused peer id
973                         */
974                         for(;;)
975                         {
976                                 // Check if exists
977                                 if(m_peers.find(peer_id_new) == NULL)
978                                         break;
979                                 // Check for overflow
980                                 if(peer_id_new == 65535)
981                                         throw ConnectionException
982                                                 ("Connection ran out of peer ids");
983                                 peer_id_new++;
984                         }
985
986                         PrintInfo();
987                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_NEW,"
988                                         " giving peer_id="<<peer_id_new<<std::endl;
989
990                         // Create a peer
991                         Peer *peer = new Peer(peer_id_new, sender);
992                         m_peers.insert(peer->id, peer);
993                         m_peerhandler->peerAdded(peer);
994                         
995                         // Create CONTROL packet to tell the peer id to the new peer.
996                         SharedBuffer<u8> reply(4);
997                         writeU8(&reply[0], TYPE_CONTROL);
998                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
999                         writeU16(&reply[2], peer_id_new);
1000                         SendAsPacket(peer_id_new, 0, reply, true);
1001                         
1002                         // We're now talking to a valid peer_id
1003                         peer_id = peer_id_new;
1004
1005                         // Go on and process whatever it sent
1006                 }
1007
1008                 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1009
1010                 if(node == NULL)
1011                 {
1012                         // Peer not found
1013                         // This means that the peer id of the sender is not PEER_ID_NEW
1014                         // and it is invalid.
1015                         PrintInfo(derr_con);
1016                         derr_con<<"Receive(): Peer not found"<<std::endl;
1017                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
1018                 }
1019
1020                 Peer *peer = node->getValue();
1021
1022                 // Validate peer address
1023                 if(peer->address != sender)
1024                 {
1025                         PrintInfo(derr_con);
1026                         derr_con<<"Peer "<<peer_id<<" sending from different address."
1027                                         " Ignoring."<<std::endl;
1028                         throw InvalidIncomingDataException
1029                                         ("Peer sending from different address");
1030                         /*// If there is more data, receive again
1031                         if(m_socket.WaitData(0) == true)
1032                                 continue;
1033                         throw NoIncomingDataException("No incoming data (2)");*/
1034                 }
1035                 
1036                 peer->timeout_counter = 0.0;
1037
1038                 Channel *channel = &(peer->channels[channelnum]);
1039                 
1040                 // Throw the received packet to channel->processPacket()
1041
1042                 // Make a new SharedBuffer from the data without the base headers
1043                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1044                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1045                                 strippeddata.getSize());
1046                 
1047                 try{
1048                         // Process it (the result is some data with no headers made by us)
1049                         SharedBuffer<u8> resultdata = channel->ProcessPacket
1050                                         (strippeddata, this, peer_id, channelnum);
1051                         
1052                         PrintInfo();
1053                         dout_con<<"ProcessPacket returned data of size "
1054                                         <<resultdata.getSize()<<std::endl;
1055                         
1056                         if(datasize < resultdata.getSize())
1057                                 throw InvalidIncomingDataException
1058                                                 ("Buffer too small for received data");
1059                         
1060                         memcpy(data, *resultdata, resultdata.getSize());
1061                         return resultdata.getSize();
1062                 }
1063                 catch(ProcessedSilentlyException &e)
1064                 {
1065                         // If there is more data, receive again
1066                         if(m_socket.WaitData(0) == true)
1067                                 continue;
1068                 }
1069                 throw NoIncomingDataException("No incoming data (2)");
1070         } // try
1071         catch(InvalidIncomingDataException &e)
1072         {
1073                 // If there is more data, receive again
1074                 if(m_socket.WaitData(0) == true)
1075                         continue;
1076         }
1077         } // for
1078 }
1079
1080 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1081 {
1082         core::map<u16, Peer*>::Iterator j;
1083         j = m_peers.getIterator();
1084         for(; j.atEnd() == false; j++)
1085         {
1086                 Peer *peer = j.getNode()->getValue();
1087                 Send(peer->id, channelnum, data, reliable);
1088         }
1089 }
1090
1091 void Connection::Send(u16 peer_id, u8 channelnum,
1092                 SharedBuffer<u8> data, bool reliable)
1093 {
1094         assert(channelnum < CHANNEL_COUNT);
1095         
1096         Peer *peer = GetPeer(peer_id);
1097         Channel *channel = &(peer->channels[channelnum]);
1098
1099         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1100         if(reliable)
1101                 chunksize_max -= RELIABLE_HEADER_SIZE;
1102
1103         core::list<SharedBuffer<u8> > originals;
1104         originals = makeAutoSplitPacket(data, chunksize_max,
1105                         channel->next_outgoing_split_seqnum);
1106         
1107         core::list<SharedBuffer<u8> >::Iterator i;
1108         i = originals.begin();
1109         for(; i != originals.end(); i++)
1110         {
1111                 SharedBuffer<u8> original = *i;
1112                 
1113                 SendAsPacket(peer_id, channelnum, original, reliable);
1114         }
1115 }
1116
1117 void Connection::SendAsPacket(u16 peer_id, u8 channelnum,
1118                 SharedBuffer<u8> data, bool reliable)
1119 {
1120         Peer *peer = GetPeer(peer_id);
1121         Channel *channel = &(peer->channels[channelnum]);
1122
1123         if(reliable)
1124         {
1125                 u16 seqnum = channel->next_outgoing_seqnum;
1126                 channel->next_outgoing_seqnum++;
1127
1128                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1129
1130                 // Add base headers and make a packet
1131                 BufferedPacket p = makePacket(peer->address, reliable,
1132                                 m_protocol_id, m_peer_id, channelnum);
1133                 
1134                 try{
1135                         // Buffer the packet
1136                         channel->outgoing_reliables.insert(p);
1137                 }
1138                 catch(AlreadyExistsException &e)
1139                 {
1140                         PrintInfo(derr_con);
1141                         derr_con<<"WARNING: Going to send a reliable packet "
1142                                         "seqnum="<<seqnum<<" that is already "
1143                                         "in outgoing buffer"<<std::endl;
1144                         //assert(0);
1145                 }
1146                 
1147                 // Send the packet
1148                 RawSend(p);
1149         }
1150         else
1151         {
1152                 // Add base headers and make a packet
1153                 BufferedPacket p = makePacket(peer->address, data,
1154                                 m_protocol_id, m_peer_id, channelnum);
1155
1156                 // Send the packet
1157                 RawSend(p);
1158         }
1159 }
1160
1161 void Connection::RawSend(const BufferedPacket &packet)
1162 {
1163         m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1164 }
1165
1166 void Connection::RunTimeouts(float dtime)
1167 {
1168         core::list<u16> timeouted_peers;
1169         core::map<u16, Peer*>::Iterator j;
1170         j = m_peers.getIterator();
1171         for(; j.atEnd() == false; j++)
1172         {
1173                 Peer *peer = j.getNode()->getValue();
1174                 
1175                 /*
1176                         Check peer timeout
1177                 */
1178                 peer->timeout_counter += dtime;
1179                 if(peer->timeout_counter > m_timeout)
1180                 {
1181                         PrintInfo(derr_con);
1182                         derr_con<<"RunTimeouts(): Peer "<<peer->id
1183                                         <<" has timed out."
1184                                         <<" (source=peer->timeout_counter)"
1185                                         <<std::endl;
1186                         // Add peer to the list
1187                         timeouted_peers.push_back(peer->id);
1188                         // Don't bother going through the buffers of this one
1189                         continue;
1190                 }
1191
1192                 float resend_timeout = peer->resend_timeout;
1193                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1194                 {
1195                         core::list<BufferedPacket> timed_outs;
1196                         core::list<BufferedPacket>::Iterator j;
1197                         
1198                         Channel *channel = &peer->channels[i];
1199
1200                         // Remove timed out incomplete unreliable split packets
1201                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
1202                         
1203                         // Increment reliable packet times
1204                         channel->outgoing_reliables.incrementTimeouts(dtime);
1205
1206                         // Check reliable packet total times, remove peer if
1207                         // over timeout.
1208                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
1209                         {
1210                                 PrintInfo(derr_con);
1211                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
1212                                                 <<" has timed out."
1213                                                 <<" (source=reliable packet totaltime)"
1214                                                 <<std::endl;
1215                                 // Add peer to the to-be-removed list
1216                                 timeouted_peers.push_back(peer->id);
1217                                 goto nextpeer;
1218                         }
1219
1220                         // Re-send timed out outgoing reliables
1221                         
1222                         timed_outs = channel->
1223                                         outgoing_reliables.getTimedOuts(resend_timeout);
1224
1225                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
1226
1227                         j = timed_outs.begin();
1228                         for(; j != timed_outs.end(); j++)
1229                         {
1230                                 u16 peer_id = readPeerId(*(j->data));
1231                                 u8 channel = readChannel(*(j->data));
1232                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
1233
1234                                 PrintInfo(derr_con);
1235                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
1236                                 j->address.print(&derr_con);
1237                                 derr_con<<"(t/o="<<resend_timeout<<"): "
1238                                                 <<"from_peer_id="<<peer_id
1239                                                 <<", channel="<<((int)channel&0xff)
1240                                                 <<", seqnum="<<seqnum
1241                                                 <<std::endl;
1242
1243                                 RawSend(*j);
1244
1245                                 // Enlarge avg_rtt and resend_timeout:
1246                                 // The rtt will be at least the timeout.
1247                                 // NOTE: This won't affect the timeout of the next
1248                                 // checked channel because it was cached.
1249                                 peer->reportRTT(resend_timeout);
1250                         }
1251                 }
1252                 
1253                 /*
1254                         Send pings
1255                 */
1256                 peer->ping_timer += dtime;
1257                 if(peer->ping_timer >= 5.0)
1258                 {
1259                         // Create and send PING packet
1260                         SharedBuffer<u8> data(2);
1261                         writeU8(&data[0], TYPE_CONTROL);
1262                         writeU8(&data[1], CONTROLTYPE_PING);
1263                         SendAsPacket(peer->id, 0, data, true);
1264
1265                         peer->ping_timer = 0.0;
1266                 }
1267                 
1268 nextpeer:
1269                 continue;
1270         }
1271
1272         // Remove timeouted peers
1273         core::list<u16>::Iterator i = timeouted_peers.begin();
1274         for(; i != timeouted_peers.end(); i++)
1275         {
1276                 PrintInfo(derr_con);
1277                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1278                 m_peerhandler->deletingPeer(m_peers[*i], true);
1279                 delete m_peers[*i];
1280                 m_peers.remove(*i);
1281         }
1282 }
1283
1284 Peer* Connection::GetPeer(u16 peer_id)
1285 {
1286         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1287
1288         if(node == NULL){
1289                 // Peer not found
1290                 throw PeerNotFoundException("Peer not found (possible timeout)");
1291         }
1292
1293         // Error checking
1294         assert(node->getValue()->id == peer_id);
1295
1296         return node->getValue();
1297 }
1298
1299 Peer* Connection::GetPeerNoEx(u16 peer_id)
1300 {
1301         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1302
1303         if(node == NULL){
1304                 return NULL;
1305         }
1306
1307         // Error checking
1308         assert(node->getValue()->id == peer_id);
1309
1310         return node->getValue();
1311 }
1312
1313 core::list<Peer*> Connection::GetPeers()
1314 {
1315         core::list<Peer*> list;
1316         core::map<u16, Peer*>::Iterator j;
1317         j = m_peers.getIterator();
1318         for(; j.atEnd() == false; j++)
1319         {
1320                 Peer *peer = j.getNode()->getValue();
1321                 list.push_back(peer);
1322         }
1323         return list;
1324 }
1325
1326 void Connection::PrintInfo(std::ostream &out)
1327 {
1328         out<<m_socket.GetHandle();
1329         out<<" ";
1330         out<<"con "<<m_peer_id<<": ";
1331         for(s16 i=0; i<(s16)m_indentation-1; i++)
1332                 out<<"  ";
1333 }
1334
1335 void Connection::PrintInfo()
1336 {
1337         PrintInfo(dout_con);
1338 }
1339
1340 } // namespace
1341