]> git.lizzy.rs Git - dragonfireclient.git/blob - src/connection.cpp
Disable word wrap in vertical texts in main menu
[dragonfireclient.git] / src / connection.cpp
1 /*
2 Minetest-c55
3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "connection.h"
21 #include "main.h"
22 #include "serialization.h"
23 #include "log.h"
24 #include "porting.h"
25
26 namespace con
27 {
28
29 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
30                 u32 protocol_id, u16 sender_peer_id, u8 channel)
31 {
32         u32 packet_size = datasize + BASE_HEADER_SIZE;
33         BufferedPacket p(packet_size);
34         p.address = address;
35
36         writeU32(&p.data[0], protocol_id);
37         writeU16(&p.data[4], sender_peer_id);
38         writeU8(&p.data[6], channel);
39
40         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
41
42         return p;
43 }
44
45 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
46                 u32 protocol_id, u16 sender_peer_id, u8 channel)
47 {
48         return makePacket(address, *data, data.getSize(),
49                         protocol_id, sender_peer_id, channel);
50 }
51
52 SharedBuffer<u8> makeOriginalPacket(
53                 SharedBuffer<u8> data)
54 {
55         u32 header_size = 1;
56         u32 packet_size = data.getSize() + header_size;
57         SharedBuffer<u8> b(packet_size);
58
59         writeU8(&b[0], TYPE_ORIGINAL);
60
61         memcpy(&b[header_size], *data, data.getSize());
62
63         return b;
64 }
65
66 core::list<SharedBuffer<u8> > makeSplitPacket(
67                 SharedBuffer<u8> data,
68                 u32 chunksize_max,
69                 u16 seqnum)
70 {
71         // Chunk packets, containing the TYPE_SPLIT header
72         core::list<SharedBuffer<u8> > chunks;
73         
74         u32 chunk_header_size = 7;
75         u32 maximum_data_size = chunksize_max - chunk_header_size;
76         u32 start = 0;
77         u32 end = 0;
78         u32 chunk_num = 0;
79         do{
80                 end = start + maximum_data_size - 1;
81                 if(end > data.getSize() - 1)
82                         end = data.getSize() - 1;
83                 
84                 u32 payload_size = end - start + 1;
85                 u32 packet_size = chunk_header_size + payload_size;
86
87                 SharedBuffer<u8> chunk(packet_size);
88                 
89                 writeU8(&chunk[0], TYPE_SPLIT);
90                 writeU16(&chunk[1], seqnum);
91                 // [3] u16 chunk_count is written at next stage
92                 writeU16(&chunk[5], chunk_num);
93                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
94
95                 chunks.push_back(chunk);
96                 
97                 start = end + 1;
98                 chunk_num++;
99         }
100         while(end != data.getSize() - 1);
101
102         u16 chunk_count = chunks.getSize();
103
104         core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
105         for(; i != chunks.end(); i++)
106         {
107                 // Write chunk_count
108                 writeU16(&((*i)[3]), chunk_count);
109         }
110
111         return chunks;
112 }
113
114 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
115                 SharedBuffer<u8> data,
116                 u32 chunksize_max,
117                 u16 &split_seqnum)
118 {
119         u32 original_header_size = 1;
120         core::list<SharedBuffer<u8> > list;
121         if(data.getSize() + original_header_size > chunksize_max)
122         {
123                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
124                 split_seqnum++;
125                 return list;
126         }
127         else
128         {
129                 list.push_back(makeOriginalPacket(data));
130         }
131         return list;
132 }
133
134 SharedBuffer<u8> makeReliablePacket(
135                 SharedBuffer<u8> data,
136                 u16 seqnum)
137 {
138         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
139         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
140                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
141         u32 header_size = 3;
142         u32 packet_size = data.getSize() + header_size;
143         SharedBuffer<u8> b(packet_size);
144
145         writeU8(&b[0], TYPE_RELIABLE);
146         writeU16(&b[1], seqnum);
147
148         memcpy(&b[header_size], *data, data.getSize());
149
150         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
151                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
152         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
153         return b;
154 }
155
156 /*
157         ReliablePacketBuffer
158 */
159
160 void ReliablePacketBuffer::print()
161 {
162         core::list<BufferedPacket>::Iterator i;
163         i = m_list.begin();
164         for(; i != m_list.end(); i++)
165         {
166                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
167                 dout_con<<s<<" ";
168         }
169 }
170 bool ReliablePacketBuffer::empty()
171 {
172         return m_list.empty();
173 }
174 u32 ReliablePacketBuffer::size()
175 {
176         return m_list.getSize();
177 }
178 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
179 {
180         core::list<BufferedPacket>::Iterator i;
181         i = m_list.begin();
182         for(; i != m_list.end(); i++)
183         {
184                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
186                                 <<", comparing to s="<<s<<std::endl;*/
187                 if(s == seqnum)
188                         break;
189         }
190         return i;
191 }
192 RPBSearchResult ReliablePacketBuffer::notFound()
193 {
194         return m_list.end();
195 }
196 u16 ReliablePacketBuffer::getFirstSeqnum()
197 {
198         if(empty())
199                 throw NotFoundException("Buffer is empty");
200         BufferedPacket p = *m_list.begin();
201         return readU16(&p.data[BASE_HEADER_SIZE+1]);
202 }
203 BufferedPacket ReliablePacketBuffer::popFirst()
204 {
205         if(empty())
206                 throw NotFoundException("Buffer is empty");
207         BufferedPacket p = *m_list.begin();
208         core::list<BufferedPacket>::Iterator i = m_list.begin();
209         m_list.erase(i);
210         return p;
211 }
212 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
213 {
214         RPBSearchResult r = findPacket(seqnum);
215         if(r == notFound()){
216                 dout_con<<"Not found"<<std::endl;
217                 throw NotFoundException("seqnum not found in buffer");
218         }
219         BufferedPacket p = *r;
220         m_list.erase(r);
221         return p;
222 }
223 void ReliablePacketBuffer::insert(BufferedPacket &p)
224 {
225         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
226         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
227         assert(type == TYPE_RELIABLE);
228         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
229
230         // Find the right place for the packet and insert it there
231
232         // If list is empty, just add it
233         if(m_list.empty())
234         {
235                 m_list.push_back(p);
236                 // Done.
237                 return;
238         }
239         // Otherwise find the right place
240         core::list<BufferedPacket>::Iterator i;
241         i = m_list.begin();
242         // Find the first packet in the list which has a higher seqnum
243         for(; i != m_list.end(); i++){
244                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
245                 if(s == seqnum){
246                         throw AlreadyExistsException("Same seqnum in list");
247                 }
248                 if(seqnum_higher(s, seqnum)){
249                         break;
250                 }
251         }
252         // If we're at the end of the list, add the packet to the
253         // end of the list
254         if(i == m_list.end())
255         {
256                 m_list.push_back(p);
257                 // Done.
258                 return;
259         }
260         // Insert before i
261         m_list.insert_before(i, p);
262 }
263
264 void ReliablePacketBuffer::incrementTimeouts(float dtime)
265 {
266         core::list<BufferedPacket>::Iterator i;
267         i = m_list.begin();
268         for(; i != m_list.end(); i++){
269                 i->time += dtime;
270                 i->totaltime += dtime;
271         }
272 }
273
274 void ReliablePacketBuffer::resetTimedOuts(float timeout)
275 {
276         core::list<BufferedPacket>::Iterator i;
277         i = m_list.begin();
278         for(; i != m_list.end(); i++){
279                 if(i->time >= timeout)
280                         i->time = 0.0;
281         }
282 }
283
284 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
285 {
286         core::list<BufferedPacket>::Iterator i;
287         i = m_list.begin();
288         for(; i != m_list.end(); i++){
289                 if(i->totaltime >= timeout)
290                         return true;
291         }
292         return false;
293 }
294
295 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
296 {
297         core::list<BufferedPacket> timed_outs;
298         core::list<BufferedPacket>::Iterator i;
299         i = m_list.begin();
300         for(; i != m_list.end(); i++)
301         {
302                 if(i->time >= timeout)
303                         timed_outs.push_back(*i);
304         }
305         return timed_outs;
306 }
307
308 /*
309         IncomingSplitBuffer
310 */
311
312 IncomingSplitBuffer::~IncomingSplitBuffer()
313 {
314         core::map<u16, IncomingSplitPacket*>::Iterator i;
315         i = m_buf.getIterator();
316         for(; i.atEnd() == false; i++)
317         {
318                 delete i.getNode()->getValue();
319         }
320 }
321 /*
322         This will throw a GotSplitPacketException when a full
323         split packet is constructed.
324 */
325 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
326 {
327         u32 headersize = BASE_HEADER_SIZE + 7;
328         assert(p.data.getSize() >= headersize);
329         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
330         assert(type == TYPE_SPLIT);
331         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
332         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
333         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
334
335         // Add if doesn't exist
336         if(m_buf.find(seqnum) == NULL)
337         {
338                 IncomingSplitPacket *sp = new IncomingSplitPacket();
339                 sp->chunk_count = chunk_count;
340                 sp->reliable = reliable;
341                 m_buf[seqnum] = sp;
342         }
343         
344         IncomingSplitPacket *sp = m_buf[seqnum];
345         
346         // TODO: These errors should be thrown or something? Dunno.
347         if(chunk_count != sp->chunk_count)
348                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
349                                 <<" != sp->chunk_count="<<sp->chunk_count
350                                 <<std::endl;
351         if(reliable != sp->reliable)
352                 derr_con<<"Connection: WARNING: reliable="<<reliable
353                                 <<" != sp->reliable="<<sp->reliable
354                                 <<std::endl;
355
356         // If chunk already exists, cancel
357         if(sp->chunks.find(chunk_num) != NULL)
358                 throw AlreadyExistsException("Chunk already in buffer");
359         
360         // Cut chunk data out of packet
361         u32 chunkdatasize = p.data.getSize() - headersize;
362         SharedBuffer<u8> chunkdata(chunkdatasize);
363         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
364         
365         // Set chunk data in buffer
366         sp->chunks[chunk_num] = chunkdata;
367         
368         // If not all chunks are received, return empty buffer
369         if(sp->allReceived() == false)
370                 return SharedBuffer<u8>();
371
372         // Calculate total size
373         u32 totalsize = 0;
374         core::map<u16, SharedBuffer<u8> >::Iterator i;
375         i = sp->chunks.getIterator();
376         for(; i.atEnd() == false; i++)
377         {
378                 totalsize += i.getNode()->getValue().getSize();
379         }
380         
381         SharedBuffer<u8> fulldata(totalsize);
382
383         // Copy chunks to data buffer
384         u32 start = 0;
385         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
386                         chunk_i++)
387         {
388                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
389                 u16 chunkdatasize = buf.getSize();
390                 memcpy(&fulldata[start], *buf, chunkdatasize);
391                 start += chunkdatasize;;
392         }
393
394         // Remove sp from buffer
395         m_buf.remove(seqnum);
396         delete sp;
397
398         return fulldata;
399 }
400 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
401 {
402         core::list<u16> remove_queue;
403         core::map<u16, IncomingSplitPacket*>::Iterator i;
404         i = m_buf.getIterator();
405         for(; i.atEnd() == false; i++)
406         {
407                 IncomingSplitPacket *p = i.getNode()->getValue();
408                 // Reliable ones are not removed by timeout
409                 if(p->reliable == true)
410                         continue;
411                 p->time += dtime;
412                 if(p->time >= timeout)
413                         remove_queue.push_back(i.getNode()->getKey());
414         }
415         core::list<u16>::Iterator j;
416         j = remove_queue.begin();
417         for(; j != remove_queue.end(); j++)
418         {
419                 dout_con<<"NOTE: Removing timed out unreliable split packet"
420                                 <<std::endl;
421                 delete m_buf[*j];
422                 m_buf.remove(*j);
423         }
424 }
425
426 /*
427         Channel
428 */
429
430 Channel::Channel()
431 {
432         next_outgoing_seqnum = SEQNUM_INITIAL;
433         next_incoming_seqnum = SEQNUM_INITIAL;
434         next_outgoing_split_seqnum = SEQNUM_INITIAL;
435 }
436 Channel::~Channel()
437 {
438 }
439
440 /*
441         Peer
442 */
443
444 Peer::Peer(u16 a_id, Address a_address):
445         address(a_address),
446         id(a_id),
447         timeout_counter(0.0),
448         ping_timer(0.0),
449         resend_timeout(0.5),
450         avg_rtt(-1.0),
451         has_sent_with_id(false),
452         m_sendtime_accu(0),
453         m_max_packets_per_second(10),
454         m_num_sent(0),
455         m_max_num_sent(0)
456 {
457 }
458 Peer::~Peer()
459 {
460 }
461
462 void Peer::reportRTT(float rtt)
463 {
464         if(rtt >= 0.0){
465                 if(rtt < 0.01){
466                         if(m_max_packets_per_second < 400)
467                                 m_max_packets_per_second += 10;
468                 } else if(rtt < 0.2){
469                         if(m_max_packets_per_second < 100)
470                                 m_max_packets_per_second += 2;
471                 } else {
472                         m_max_packets_per_second *= 0.8;
473                         if(m_max_packets_per_second < 10)
474                                 m_max_packets_per_second = 10;
475                 }
476         }
477
478         if(rtt < -0.999)
479         {}
480         else if(avg_rtt < 0.0)
481                 avg_rtt = rtt;
482         else
483                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
484         
485         // Calculate resend_timeout
486
487         /*int reliable_count = 0;
488         for(int i=0; i<CHANNEL_COUNT; i++)
489         {
490                 reliable_count += channels[i].outgoing_reliables.size();
491         }
492         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
493                         * ((float)reliable_count * 1);*/
494         
495         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
496         if(timeout < RESEND_TIMEOUT_MIN)
497                 timeout = RESEND_TIMEOUT_MIN;
498         if(timeout > RESEND_TIMEOUT_MAX)
499                 timeout = RESEND_TIMEOUT_MAX;
500         resend_timeout = timeout;
501 }
502                                 
503 /*
504         Connection
505 */
506
507 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
508         m_protocol_id(protocol_id),
509         m_max_packet_size(max_packet_size),
510         m_timeout(timeout),
511         m_peer_id(0),
512         m_bc_peerhandler(NULL),
513         m_bc_receive_timeout(0),
514         m_indentation(0)
515 {
516         m_socket.setTimeoutMs(5);
517
518         Start();
519 }
520
521 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
522                 PeerHandler *peerhandler):
523         m_protocol_id(protocol_id),
524         m_max_packet_size(max_packet_size),
525         m_timeout(timeout),
526         m_peer_id(0),
527         m_bc_peerhandler(peerhandler),
528         m_bc_receive_timeout(0),
529         m_indentation(0)
530 {
531         m_socket.setTimeoutMs(5);
532
533         Start();
534 }
535
536
537 Connection::~Connection()
538 {
539         stop();
540 }
541
542 /* Internal stuff */
543
544 void * Connection::Thread()
545 {
546         ThreadStarted();
547         log_register_thread("Connection");
548
549         dout_con<<"Connection thread started"<<std::endl;
550         
551         u32 curtime = porting::getTimeMs();
552         u32 lasttime = curtime;
553
554         while(getRun())
555         {
556                 BEGIN_DEBUG_EXCEPTION_HANDLER
557                 
558                 lasttime = curtime;
559                 curtime = porting::getTimeMs();
560                 float dtime = (float)(curtime - lasttime) / 1000.;
561                 if(dtime > 0.1)
562                         dtime = 0.1;
563                 if(dtime < 0.0)
564                         dtime = 0.0;
565                 
566                 runTimeouts(dtime);
567
568                 while(m_command_queue.size() != 0){
569                         ConnectionCommand c = m_command_queue.pop_front();
570                         processCommand(c);
571                 }
572
573                 send(dtime);
574
575                 receive();
576                 
577                 END_DEBUG_EXCEPTION_HANDLER(derr_con);
578         }
579
580         return NULL;
581 }
582
583 void Connection::putEvent(ConnectionEvent &e)
584 {
585         assert(e.type != CONNEVENT_NONE);
586         m_event_queue.push_back(e);
587 }
588
589 void Connection::processCommand(ConnectionCommand &c)
590 {
591         switch(c.type){
592         case CONNCMD_NONE:
593                 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
594                 return;
595         case CONNCMD_SERVE:
596                 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
597                                 <<c.port<<std::endl;
598                 serve(c.port);
599                 return;
600         case CONNCMD_CONNECT:
601                 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
602                 connect(c.address);
603                 return;
604         case CONNCMD_DISCONNECT:
605                 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
606                 disconnect();
607                 return;
608         case CONNCMD_SEND:
609                 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
610                 send(c.peer_id, c.channelnum, c.data, c.reliable);
611                 return;
612         case CONNCMD_SEND_TO_ALL:
613                 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
614                 sendToAll(c.channelnum, c.data, c.reliable);
615                 return;
616         case CONNCMD_DELETE_PEER:
617                 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
618                 deletePeer(c.peer_id, false);
619                 return;
620         }
621 }
622
623 void Connection::send(float dtime)
624 {
625         for(core::map<u16, Peer*>::Iterator
626                         j = m_peers.getIterator();
627                         j.atEnd() == false; j++)
628         {
629                 Peer *peer = j.getNode()->getValue();
630                 peer->m_sendtime_accu += dtime;
631                 peer->m_num_sent = 0;
632                 peer->m_max_num_sent = peer->m_sendtime_accu *
633                                 peer->m_max_packets_per_second;
634         }
635         Queue<OutgoingPacket> postponed_packets;
636         while(m_outgoing_queue.size() != 0){
637                 OutgoingPacket packet = m_outgoing_queue.pop_front();
638                 Peer *peer = getPeerNoEx(packet.peer_id);
639                 if(!peer)
640                         continue;
641                 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
642                         postponed_packets.push_back(packet);
643                 } else if(peer->m_num_sent < peer->m_max_num_sent){
644                         rawSendAsPacket(packet.peer_id, packet.channelnum,
645                                         packet.data, packet.reliable);
646                         peer->m_num_sent++;
647                 } else {
648                         postponed_packets.push_back(packet);
649                 }
650         }
651         while(postponed_packets.size() != 0){
652                 m_outgoing_queue.push_back(postponed_packets.pop_front());
653         }
654         for(core::map<u16, Peer*>::Iterator
655                         j = m_peers.getIterator();
656                         j.atEnd() == false; j++)
657         {
658                 Peer *peer = j.getNode()->getValue();
659                 peer->m_sendtime_accu -= (float)peer->m_num_sent /
660                                 peer->m_max_packets_per_second;
661                 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
662                         peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
663         }
664 }
665
666 // Receive packets from the network and buffers and create ConnectionEvents
667 void Connection::receive()
668 {
669         u32 datasize = m_max_packet_size * 2;  // Double it just to be safe
670         // TODO: We can not know how many layers of header there are.
671         // For now, just assume there are no other than the base headers.
672         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
673         SharedBuffer<u8> packetdata(packet_maxsize);
674
675         bool single_wait_done = false;
676         
677         for(;;)
678         {
679         try{
680                 /* Check if some buffer has relevant data */
681                 {
682                         u16 peer_id;
683                         SharedBuffer<u8> resultdata;
684                         bool got = getFromBuffers(peer_id, resultdata);
685                         if(got){
686                                 ConnectionEvent e;
687                                 e.dataReceived(peer_id, resultdata);
688                                 putEvent(e);
689                                 continue;
690                         }
691                 }
692                 
693                 if(single_wait_done){
694                         if(m_socket.WaitData(0) == false)
695                                 break;
696                 }
697                 
698                 single_wait_done = true;
699
700                 Address sender;
701                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
702
703                 if(received_size < 0)
704                         break;
705                 if(received_size < BASE_HEADER_SIZE)
706                         continue;
707                 if(readU32(&packetdata[0]) != m_protocol_id)
708                         continue;
709                 
710                 u16 peer_id = readPeerId(*packetdata);
711                 u8 channelnum = readChannel(*packetdata);
712                 if(channelnum > CHANNEL_COUNT-1){
713                         PrintInfo(derr_con);
714                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
715                         throw InvalidIncomingDataException("Channel doesn't exist");
716                 }
717
718                 if(peer_id == PEER_ID_INEXISTENT)
719                 {
720                         /*
721                                 Somebody is trying to send stuff to us with no peer id.
722                                 
723                                 Check if the same address and port was added to our peer
724                                 list before.
725                                 Allow only entries that have has_sent_with_id==false.
726                         */
727
728                         core::map<u16, Peer*>::Iterator j;
729                         j = m_peers.getIterator();
730                         for(; j.atEnd() == false; j++)
731                         {
732                                 Peer *peer = j.getNode()->getValue();
733                                 if(peer->has_sent_with_id)
734                                         continue;
735                                 if(peer->address == sender)
736                                         break;
737                         }
738                         
739                         /*
740                                 If no peer was found with the same address and port,
741                                 we shall assume it is a new peer and create an entry.
742                         */
743                         if(j.atEnd())
744                         {
745                                 // Pass on to adding the peer
746                         }
747                         // Else: A peer was found.
748                         else
749                         {
750                                 Peer *peer = j.getNode()->getValue();
751                                 peer_id = peer->id;
752                                 PrintInfo(derr_con);
753                                 derr_con<<"WARNING: Assuming unknown peer to be "
754                                                 <<"peer_id="<<peer_id<<std::endl;
755                         }
756                 }
757                 
758                 /*
759                         The peer was not found in our lists. Add it.
760                 */
761                 if(peer_id == PEER_ID_INEXISTENT)
762                 {
763                         // Somebody wants to make a new connection
764
765                         // Get a unique peer id (2 or higher)
766                         u16 peer_id_new = 2;
767                         /*
768                                 Find an unused peer id
769                         */
770                         bool out_of_ids = false;
771                         for(;;)
772                         {
773                                 // Check if exists
774                                 if(m_peers.find(peer_id_new) == NULL)
775                                         break;
776                                 // Check for overflow
777                                 if(peer_id_new == 65535){
778                                         out_of_ids = true;
779                                         break;
780                                 }
781                                 peer_id_new++;
782                         }
783                         if(out_of_ids){
784                                 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
785                                 continue;
786                         }
787
788                         PrintInfo();
789                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
790                                         " giving peer_id="<<peer_id_new<<std::endl;
791
792                         // Create a peer
793                         Peer *peer = new Peer(peer_id_new, sender);
794                         m_peers.insert(peer->id, peer);
795                         
796                         // Create peer addition event
797                         ConnectionEvent e;
798                         e.peerAdded(peer_id_new, sender);
799                         putEvent(e);
800                         
801                         // Create CONTROL packet to tell the peer id to the new peer.
802                         SharedBuffer<u8> reply(4);
803                         writeU8(&reply[0], TYPE_CONTROL);
804                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
805                         writeU16(&reply[2], peer_id_new);
806                         sendAsPacket(peer_id_new, 0, reply, true);
807                         
808                         // We're now talking to a valid peer_id
809                         peer_id = peer_id_new;
810
811                         // Go on and process whatever it sent
812                 }
813
814                 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
815
816                 if(node == NULL)
817                 {
818                         // Peer not found
819                         // This means that the peer id of the sender is not PEER_ID_INEXISTENT
820                         // and it is invalid.
821                         PrintInfo(derr_con);
822                         derr_con<<"Receive(): Peer not found"<<std::endl;
823                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
824                 }
825
826                 Peer *peer = node->getValue();
827
828                 // Validate peer address
829                 if(peer->address != sender)
830                 {
831                         PrintInfo(derr_con);
832                         derr_con<<"Peer "<<peer_id<<" sending from different address."
833                                         " Ignoring."<<std::endl;
834                         continue;
835                 }
836                 
837                 peer->timeout_counter = 0.0;
838
839                 Channel *channel = &(peer->channels[channelnum]);
840                 
841                 // Throw the received packet to channel->processPacket()
842
843                 // Make a new SharedBuffer from the data without the base headers
844                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
845                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
846                                 strippeddata.getSize());
847                 
848                 try{
849                         // Process it (the result is some data with no headers made by us)
850                         SharedBuffer<u8> resultdata = processPacket
851                                         (channel, strippeddata, peer_id, channelnum, false);
852                         
853                         PrintInfo();
854                         dout_con<<"ProcessPacket returned data of size "
855                                         <<resultdata.getSize()<<std::endl;
856                         
857                         ConnectionEvent e;
858                         e.dataReceived(peer_id, resultdata);
859                         putEvent(e);
860                         continue;
861                 }catch(ProcessedSilentlyException &e){
862                 }
863         }catch(InvalidIncomingDataException &e){
864         }
865         catch(ProcessedSilentlyException &e){
866         }
867         } // for
868 }
869
870 void Connection::runTimeouts(float dtime)
871 {
872         core::list<u16> timeouted_peers;
873         core::map<u16, Peer*>::Iterator j;
874         j = m_peers.getIterator();
875         for(; j.atEnd() == false; j++)
876         {
877                 Peer *peer = j.getNode()->getValue();
878                 
879                 /*
880                         Check peer timeout
881                 */
882                 peer->timeout_counter += dtime;
883                 if(peer->timeout_counter > m_timeout)
884                 {
885                         PrintInfo(derr_con);
886                         derr_con<<"RunTimeouts(): Peer "<<peer->id
887                                         <<" has timed out."
888                                         <<" (source=peer->timeout_counter)"
889                                         <<std::endl;
890                         // Add peer to the list
891                         timeouted_peers.push_back(peer->id);
892                         // Don't bother going through the buffers of this one
893                         continue;
894                 }
895
896                 float resend_timeout = peer->resend_timeout;
897                 for(u16 i=0; i<CHANNEL_COUNT; i++)
898                 {
899                         core::list<BufferedPacket> timed_outs;
900                         core::list<BufferedPacket>::Iterator j;
901                         
902                         Channel *channel = &peer->channels[i];
903
904                         // Remove timed out incomplete unreliable split packets
905                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
906                         
907                         // Increment reliable packet times
908                         channel->outgoing_reliables.incrementTimeouts(dtime);
909
910                         // Check reliable packet total times, remove peer if
911                         // over timeout.
912                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
913                         {
914                                 PrintInfo(derr_con);
915                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
916                                                 <<" has timed out."
917                                                 <<" (source=reliable packet totaltime)"
918                                                 <<std::endl;
919                                 // Add peer to the to-be-removed list
920                                 timeouted_peers.push_back(peer->id);
921                                 goto nextpeer;
922                         }
923
924                         // Re-send timed out outgoing reliables
925                         
926                         timed_outs = channel->
927                                         outgoing_reliables.getTimedOuts(resend_timeout);
928
929                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
930
931                         j = timed_outs.begin();
932                         for(; j != timed_outs.end(); j++)
933                         {
934                                 u16 peer_id = readPeerId(*(j->data));
935                                 u8 channel = readChannel(*(j->data));
936                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
937
938                                 PrintInfo(derr_con);
939                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
940                                 j->address.print(&derr_con);
941                                 derr_con<<"(t/o="<<resend_timeout<<"): "
942                                                 <<"from_peer_id="<<peer_id
943                                                 <<", channel="<<((int)channel&0xff)
944                                                 <<", seqnum="<<seqnum
945                                                 <<std::endl;
946
947                                 rawSend(*j);
948
949                                 // Enlarge avg_rtt and resend_timeout:
950                                 // The rtt will be at least the timeout.
951                                 // NOTE: This won't affect the timeout of the next
952                                 // checked channel because it was cached.
953                                 peer->reportRTT(resend_timeout);
954                         }
955                 }
956                 
957                 /*
958                         Send pings
959                 */
960                 peer->ping_timer += dtime;
961                 if(peer->ping_timer >= 5.0)
962                 {
963                         // Create and send PING packet
964                         SharedBuffer<u8> data(2);
965                         writeU8(&data[0], TYPE_CONTROL);
966                         writeU8(&data[1], CONTROLTYPE_PING);
967                         rawSendAsPacket(peer->id, 0, data, true);
968
969                         peer->ping_timer = 0.0;
970                 }
971                 
972 nextpeer:
973                 continue;
974         }
975
976         // Remove timed out peers
977         core::list<u16>::Iterator i = timeouted_peers.begin();
978         for(; i != timeouted_peers.end(); i++)
979         {
980                 PrintInfo(derr_con);
981                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
982                 deletePeer(*i, true);
983         }
984 }
985
986 void Connection::serve(u16 port)
987 {
988         dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
989         try{
990                 m_socket.Bind(port);
991                 m_peer_id = PEER_ID_SERVER;
992         }
993         catch(SocketException &e){
994                 // Create event
995                 ConnectionEvent e;
996                 e.bindFailed();
997                 putEvent(e);
998         }
999 }
1000
1001 void Connection::connect(Address address)
1002 {
1003         dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1004                         <<":"<<address.getPort()<<std::endl;
1005
1006         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1007         if(node != NULL){
1008                 throw ConnectionException("Already connected to a server");
1009         }
1010
1011         Peer *peer = new Peer(PEER_ID_SERVER, address);
1012         m_peers.insert(peer->id, peer);
1013
1014         // Create event
1015         ConnectionEvent e;
1016         e.peerAdded(peer->id, peer->address);
1017         putEvent(e);
1018         
1019         m_socket.Bind(0);
1020         
1021         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1022         m_peer_id = PEER_ID_INEXISTENT;
1023         SharedBuffer<u8> data(0);
1024         Send(PEER_ID_SERVER, 0, data, true);
1025 }
1026
1027 void Connection::disconnect()
1028 {
1029         dout_con<<getDesc()<<" disconnecting"<<std::endl;
1030
1031         // Create and send DISCO packet
1032         SharedBuffer<u8> data(2);
1033         writeU8(&data[0], TYPE_CONTROL);
1034         writeU8(&data[1], CONTROLTYPE_DISCO);
1035         
1036         // Send to all
1037         core::map<u16, Peer*>::Iterator j;
1038         j = m_peers.getIterator();
1039         for(; j.atEnd() == false; j++)
1040         {
1041                 Peer *peer = j.getNode()->getValue();
1042                 rawSendAsPacket(peer->id, 0, data, false);
1043         }
1044 }
1045
1046 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1047 {
1048         core::map<u16, Peer*>::Iterator j;
1049         j = m_peers.getIterator();
1050         for(; j.atEnd() == false; j++)
1051         {
1052                 Peer *peer = j.getNode()->getValue();
1053                 send(peer->id, channelnum, data, reliable);
1054         }
1055 }
1056
1057 void Connection::send(u16 peer_id, u8 channelnum,
1058                 SharedBuffer<u8> data, bool reliable)
1059 {
1060         dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1061
1062         assert(channelnum < CHANNEL_COUNT);
1063         
1064         Peer *peer = getPeerNoEx(peer_id);
1065         if(peer == NULL)
1066                 return;
1067         Channel *channel = &(peer->channels[channelnum]);
1068
1069         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1070         if(reliable)
1071                 chunksize_max -= RELIABLE_HEADER_SIZE;
1072
1073         core::list<SharedBuffer<u8> > originals;
1074         originals = makeAutoSplitPacket(data, chunksize_max,
1075                         channel->next_outgoing_split_seqnum);
1076         
1077         core::list<SharedBuffer<u8> >::Iterator i;
1078         i = originals.begin();
1079         for(; i != originals.end(); i++)
1080         {
1081                 SharedBuffer<u8> original = *i;
1082                 
1083                 sendAsPacket(peer_id, channelnum, original, reliable);
1084         }
1085 }
1086
1087 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1088                 SharedBuffer<u8> data, bool reliable)
1089 {
1090         OutgoingPacket packet(peer_id, channelnum, data, reliable);
1091         m_outgoing_queue.push_back(packet);
1092 }
1093
1094 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1095                 SharedBuffer<u8> data, bool reliable)
1096 {
1097         Peer *peer = getPeerNoEx(peer_id);
1098         if(!peer)
1099                 return;
1100         Channel *channel = &(peer->channels[channelnum]);
1101
1102         if(reliable)
1103         {
1104                 u16 seqnum = channel->next_outgoing_seqnum;
1105                 channel->next_outgoing_seqnum++;
1106
1107                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1108
1109                 // Add base headers and make a packet
1110                 BufferedPacket p = makePacket(peer->address, reliable,
1111                                 m_protocol_id, m_peer_id, channelnum);
1112                 
1113                 try{
1114                         // Buffer the packet
1115                         channel->outgoing_reliables.insert(p);
1116                 }
1117                 catch(AlreadyExistsException &e)
1118                 {
1119                         PrintInfo(derr_con);
1120                         derr_con<<"WARNING: Going to send a reliable packet "
1121                                         "seqnum="<<seqnum<<" that is already "
1122                                         "in outgoing buffer"<<std::endl;
1123                         //assert(0);
1124                 }
1125                 
1126                 // Send the packet
1127                 rawSend(p);
1128         }
1129         else
1130         {
1131                 // Add base headers and make a packet
1132                 BufferedPacket p = makePacket(peer->address, data,
1133                                 m_protocol_id, m_peer_id, channelnum);
1134
1135                 // Send the packet
1136                 rawSend(p);
1137         }
1138 }
1139
1140 void Connection::rawSend(const BufferedPacket &packet)
1141 {
1142         try{
1143                 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1144         } catch(SendFailedException &e){
1145                 derr_con<<"Connection::rawSend(): SendFailedException: "
1146                                 <<packet.address.serializeString()<<std::endl;
1147         }
1148 }
1149
1150 Peer* Connection::getPeer(u16 peer_id)
1151 {
1152         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1153
1154         if(node == NULL){
1155                 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1156         }
1157
1158         // Error checking
1159         assert(node->getValue()->id == peer_id);
1160
1161         return node->getValue();
1162 }
1163
1164 Peer* Connection::getPeerNoEx(u16 peer_id)
1165 {
1166         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1167
1168         if(node == NULL){
1169                 return NULL;
1170         }
1171
1172         // Error checking
1173         assert(node->getValue()->id == peer_id);
1174
1175         return node->getValue();
1176 }
1177
1178 core::list<Peer*> Connection::getPeers()
1179 {
1180         core::list<Peer*> list;
1181         core::map<u16, Peer*>::Iterator j;
1182         j = m_peers.getIterator();
1183         for(; j.atEnd() == false; j++)
1184         {
1185                 Peer *peer = j.getNode()->getValue();
1186                 list.push_back(peer);
1187         }
1188         return list;
1189 }
1190
1191 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1192 {
1193         core::map<u16, Peer*>::Iterator j;
1194         j = m_peers.getIterator();
1195         for(; j.atEnd() == false; j++)
1196         {
1197                 Peer *peer = j.getNode()->getValue();
1198                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1199                 {
1200                         Channel *channel = &peer->channels[i];
1201                         SharedBuffer<u8> resultdata;
1202                         bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1203                         if(got){
1204                                 dst = resultdata;
1205                                 return true;
1206                         }
1207                 }
1208         }
1209         return false;
1210 }
1211
1212 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1213                 SharedBuffer<u8> &dst)
1214 {
1215         u16 firstseqnum = 0;
1216         // Clear old packets from start of buffer
1217         try{
1218         for(;;){
1219                 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1220                 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1221                         channel->incoming_reliables.popFirst();
1222                 else
1223                         break;
1224         }
1225         // This happens if all packets are old
1226         }catch(con::NotFoundException)
1227         {}
1228         
1229         if(channel->incoming_reliables.empty() == false)
1230         {
1231                 if(firstseqnum == channel->next_incoming_seqnum)
1232                 {
1233                         BufferedPacket p = channel->incoming_reliables.popFirst();
1234                         
1235                         peer_id = readPeerId(*p.data);
1236                         u8 channelnum = readChannel(*p.data);
1237                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1238
1239                         PrintInfo();
1240                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
1241                                         <<" seqnum="<<seqnum
1242                                         <<" peer_id="<<peer_id
1243                                         <<" channel="<<((int)channelnum&0xff)
1244                                         <<std::endl;
1245
1246                         channel->next_incoming_seqnum++;
1247                         
1248                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1249                         // Get out the inside packet and re-process it
1250                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1251                         memcpy(*payload, &p.data[headers_size], payload.getSize());
1252
1253                         dst = processPacket(channel, payload, peer_id, channelnum, true);
1254                         return true;
1255                 }
1256         }
1257         return false;
1258 }
1259
1260 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1261                 SharedBuffer<u8> packetdata, u16 peer_id,
1262                 u8 channelnum, bool reliable)
1263 {
1264         IndentationRaiser iraiser(&(m_indentation));
1265
1266         if(packetdata.getSize() < 1)
1267                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1268
1269         u8 type = readU8(&packetdata[0]);
1270         
1271         if(type == TYPE_CONTROL)
1272         {
1273                 if(packetdata.getSize() < 2)
1274                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
1275
1276                 u8 controltype = readU8(&packetdata[1]);
1277
1278                 if(controltype == CONTROLTYPE_ACK)
1279                 {
1280                         if(packetdata.getSize() < 4)
1281                                 throw InvalidIncomingDataException
1282                                                 ("packetdata.getSize() < 4 (ACK header size)");
1283
1284                         u16 seqnum = readU16(&packetdata[2]);
1285                         PrintInfo();
1286                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1287                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1288                                         <<", seqnum="<<seqnum<<std::endl;
1289
1290                         try{
1291                                 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1292                                 // Get round trip time
1293                                 float rtt = p.totaltime;
1294
1295                                 // Let peer calculate stuff according to it
1296                                 // (avg_rtt and resend_timeout)
1297                                 Peer *peer = getPeer(peer_id);
1298                                 peer->reportRTT(rtt);
1299
1300                                 //PrintInfo(dout_con);
1301                                 //dout_con<<"RTT = "<<rtt<<std::endl;
1302
1303                                 /*dout_con<<"OUTGOING: ";
1304                                 PrintInfo();
1305                                 channel->outgoing_reliables.print();
1306                                 dout_con<<std::endl;*/
1307                         }
1308                         catch(NotFoundException &e){
1309                                 PrintInfo(derr_con);
1310                                 derr_con<<"WARNING: ACKed packet not "
1311                                                 "in outgoing queue"
1312                                                 <<std::endl;
1313                         }
1314
1315                         throw ProcessedSilentlyException("Got an ACK");
1316                 }
1317                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1318                 {
1319                         if(packetdata.getSize() < 4)
1320                                 throw InvalidIncomingDataException
1321                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1322                         u16 peer_id_new = readU16(&packetdata[2]);
1323                         PrintInfo();
1324                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1325
1326                         if(GetPeerID() != PEER_ID_INEXISTENT)
1327                         {
1328                                 PrintInfo(derr_con);
1329                                 derr_con<<"WARNING: Not changing"
1330                                                 " existing peer id."<<std::endl;
1331                         }
1332                         else
1333                         {
1334                                 dout_con<<"changing."<<std::endl;
1335                                 SetPeerID(peer_id_new);
1336                         }
1337                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
1338                 }
1339                 else if(controltype == CONTROLTYPE_PING)
1340                 {
1341                         // Just ignore it, the incoming data already reset
1342                         // the timeout counter
1343                         PrintInfo();
1344                         dout_con<<"PING"<<std::endl;
1345                         throw ProcessedSilentlyException("Got a PING");
1346                 }
1347                 else if(controltype == CONTROLTYPE_DISCO)
1348                 {
1349                         // Just ignore it, the incoming data already reset
1350                         // the timeout counter
1351                         PrintInfo();
1352                         dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1353                         
1354                         if(deletePeer(peer_id, false) == false)
1355                         {
1356                                 PrintInfo(derr_con);
1357                                 derr_con<<"DISCO: Peer not found"<<std::endl;
1358                         }
1359
1360                         throw ProcessedSilentlyException("Got a DISCO");
1361                 }
1362                 else{
1363                         PrintInfo(derr_con);
1364                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1365                                         <<((int)controltype&0xff)<<std::endl;
1366                         throw InvalidIncomingDataException("Invalid control type");
1367                 }
1368         }
1369         else if(type == TYPE_ORIGINAL)
1370         {
1371                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1372                         throw InvalidIncomingDataException
1373                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1374                 PrintInfo();
1375                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1376                                 <<std::endl;
1377                 // Get the inside packet out and return it
1378                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1379                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1380                 return payload;
1381         }
1382         else if(type == TYPE_SPLIT)
1383         {
1384                 // We have to create a packet again for buffering
1385                 // This isn't actually too bad an idea.
1386                 BufferedPacket packet = makePacket(
1387                                 getPeer(peer_id)->address,
1388                                 packetdata,
1389                                 GetProtocolID(),
1390                                 peer_id,
1391                                 channelnum);
1392                 // Buffer the packet
1393                 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1394                 if(data.getSize() != 0)
1395                 {
1396                         PrintInfo();
1397                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1398                                         <<"size="<<data.getSize()<<std::endl;
1399                         return data;
1400                 }
1401                 PrintInfo();
1402                 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1403                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1404         }
1405         else if(type == TYPE_RELIABLE)
1406         {
1407                 // Recursive reliable packets not allowed
1408                 assert(reliable == false);
1409
1410                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1411                         throw InvalidIncomingDataException
1412                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1413
1414                 u16 seqnum = readU16(&packetdata[1]);
1415
1416                 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1417                 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1418                 
1419                 PrintInfo();
1420                 if(is_future_packet)
1421                         dout_con<<"BUFFERING";
1422                 else if(is_old_packet)
1423                         dout_con<<"OLD";
1424                 else
1425                         dout_con<<"RECUR";
1426                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1427                                 <<" next="<<channel->next_incoming_seqnum;
1428                 dout_con<<" [sending CONTROLTYPE_ACK"
1429                                 " to peer_id="<<peer_id<<"]";
1430                 dout_con<<std::endl;
1431                 
1432                 //DEBUG
1433                 //assert(channel->incoming_reliables.size() < 100);
1434
1435                 // Send a CONTROLTYPE_ACK
1436                 SharedBuffer<u8> reply(4);
1437                 writeU8(&reply[0], TYPE_CONTROL);
1438                 writeU8(&reply[1], CONTROLTYPE_ACK);
1439                 writeU16(&reply[2], seqnum);
1440                 rawSendAsPacket(peer_id, channelnum, reply, false);
1441
1442                 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1443                 if(is_future_packet)
1444                 {
1445                         /*PrintInfo();
1446                         dout_con<<"Buffering reliable packet (seqnum="
1447                                         <<seqnum<<")"<<std::endl;*/
1448                         
1449                         // This one comes later, buffer it.
1450                         // Actually we have to make a packet to buffer one.
1451                         // Well, we have all the ingredients, so just do it.
1452                         BufferedPacket packet = makePacket(
1453                                         getPeer(peer_id)->address,
1454                                         packetdata,
1455                                         GetProtocolID(),
1456                                         peer_id,
1457                                         channelnum);
1458                         try{
1459                                 channel->incoming_reliables.insert(packet);
1460                                 
1461                                 /*PrintInfo();
1462                                 dout_con<<"INCOMING: ";
1463                                 channel->incoming_reliables.print();
1464                                 dout_con<<std::endl;*/
1465                         }
1466                         catch(AlreadyExistsException &e)
1467                         {
1468                         }
1469
1470                         throw ProcessedSilentlyException("Buffered future reliable packet");
1471                 }
1472                 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1473                 else if(is_old_packet)
1474                 {
1475                         // An old packet, dump it
1476                         throw InvalidIncomingDataException("Got an old reliable packet");
1477                 }
1478
1479                 channel->next_incoming_seqnum++;
1480
1481                 // Get out the inside packet and re-process it
1482                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1483                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1484
1485                 return processPacket(channel, payload, peer_id, channelnum, true);
1486         }
1487         else
1488         {
1489                 PrintInfo(derr_con);
1490                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1491                 throw InvalidIncomingDataException("Invalid packet type");
1492         }
1493         
1494         // We should never get here.
1495         // If you get here, add an exception or a return to some of the
1496         // above conditionals.
1497         assert(0);
1498         throw BaseException("Error in Channel::ProcessPacket()");
1499 }
1500
1501 bool Connection::deletePeer(u16 peer_id, bool timeout)
1502 {
1503         if(m_peers.find(peer_id) == NULL)
1504                 return false;
1505         
1506         Peer *peer = m_peers[peer_id];
1507
1508         // Create event
1509         ConnectionEvent e;
1510         e.peerRemoved(peer_id, timeout, peer->address);
1511         putEvent(e);
1512
1513         delete m_peers[peer_id];
1514         m_peers.remove(peer_id);
1515         return true;
1516 }
1517
1518 /* Interface */
1519
1520 ConnectionEvent Connection::getEvent()
1521 {
1522         if(m_event_queue.size() == 0){
1523                 ConnectionEvent e;
1524                 e.type = CONNEVENT_NONE;
1525                 return e;
1526         }
1527         return m_event_queue.pop_front();
1528 }
1529
1530 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1531 {
1532         try{
1533                 return m_event_queue.pop_front(timeout_ms);
1534         } catch(ItemNotFoundException &ex){
1535                 ConnectionEvent e;
1536                 e.type = CONNEVENT_NONE;
1537                 return e;
1538         }
1539 }
1540
1541 void Connection::putCommand(ConnectionCommand &c)
1542 {
1543         m_command_queue.push_back(c);
1544 }
1545
1546 void Connection::Serve(unsigned short port)
1547 {
1548         ConnectionCommand c;
1549         c.serve(port);
1550         putCommand(c);
1551 }
1552
1553 void Connection::Connect(Address address)
1554 {
1555         ConnectionCommand c;
1556         c.connect(address);
1557         putCommand(c);
1558 }
1559
1560 bool Connection::Connected()
1561 {
1562         JMutexAutoLock peerlock(m_peers_mutex);
1563
1564         if(m_peers.size() != 1)
1565                 return false;
1566                 
1567         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1568         if(node == NULL)
1569                 return false;
1570         
1571         if(m_peer_id == PEER_ID_INEXISTENT)
1572                 return false;
1573         
1574         return true;
1575 }
1576
1577 void Connection::Disconnect()
1578 {
1579         ConnectionCommand c;
1580         c.disconnect();
1581         putCommand(c);
1582 }
1583
1584 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1585 {
1586         for(;;){
1587                 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1588                 if(e.type != CONNEVENT_NONE)
1589                         dout_con<<getDesc()<<": Receive: got event: "
1590                                         <<e.describe()<<std::endl;
1591                 switch(e.type){
1592                 case CONNEVENT_NONE:
1593                         throw NoIncomingDataException("No incoming data");
1594                 case CONNEVENT_DATA_RECEIVED:
1595                         peer_id = e.peer_id;
1596                         data = SharedBuffer<u8>(e.data);
1597                         return e.data.getSize();
1598                 case CONNEVENT_PEER_ADDED: {
1599                         Peer tmp(e.peer_id, e.address);
1600                         if(m_bc_peerhandler)
1601                                 m_bc_peerhandler->peerAdded(&tmp);
1602                         continue; }
1603                 case CONNEVENT_PEER_REMOVED: {
1604                         Peer tmp(e.peer_id, e.address);
1605                         if(m_bc_peerhandler)
1606                                 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1607                         continue; }
1608                 case CONNEVENT_BIND_FAILED:
1609                         throw ConnectionBindFailed("Failed to bind socket "
1610                                         "(port already in use?)");
1611                 }
1612         }
1613         throw NoIncomingDataException("No incoming data");
1614 }
1615
1616 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1617 {
1618         assert(channelnum < CHANNEL_COUNT);
1619
1620         ConnectionCommand c;
1621         c.sendToAll(channelnum, data, reliable);
1622         putCommand(c);
1623 }
1624
1625 void Connection::Send(u16 peer_id, u8 channelnum,
1626                 SharedBuffer<u8> data, bool reliable)
1627 {
1628         assert(channelnum < CHANNEL_COUNT);
1629
1630         ConnectionCommand c;
1631         c.send(peer_id, channelnum, data, reliable);
1632         putCommand(c);
1633 }
1634
1635 void Connection::RunTimeouts(float dtime)
1636 {
1637         // No-op
1638 }
1639
1640 Address Connection::GetPeerAddress(u16 peer_id)
1641 {
1642         JMutexAutoLock peerlock(m_peers_mutex);
1643         return getPeer(peer_id)->address;
1644 }
1645
1646 float Connection::GetPeerAvgRTT(u16 peer_id)
1647 {
1648         JMutexAutoLock peerlock(m_peers_mutex);
1649         return getPeer(peer_id)->avg_rtt;
1650 }
1651
1652 void Connection::DeletePeer(u16 peer_id)
1653 {
1654         ConnectionCommand c;
1655         c.deletePeer(peer_id);
1656         putCommand(c);
1657 }
1658
1659 void Connection::PrintInfo(std::ostream &out)
1660 {
1661         out<<getDesc()<<": ";
1662 }
1663
1664 void Connection::PrintInfo()
1665 {
1666         PrintInfo(dout_con);
1667 }
1668
1669 std::string Connection::getDesc()
1670 {
1671         return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";
1672 }
1673
1674 } // namespace
1675