]> git.lizzy.rs Git - minetest.git/blob - src/connection.cpp
Make MutexQueue use jsemaphore for signaling
[minetest.git] / src / connection.cpp
1 /*
2 Minetest
3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "connection.h"
21 #include "main.h"
22 #include "serialization.h"
23 #include "log.h"
24 #include "porting.h"
25 #include "util/serialize.h"
26 #include "util/numeric.h"
27 #include "util/string.h"
28 #include "settings.h"
29
30 namespace con
31 {
32
33 static u16 readPeerId(u8 *packetdata)
34 {
35         return readU16(&packetdata[4]);
36 }
37 static u8 readChannel(u8 *packetdata)
38 {
39         return readU8(&packetdata[6]);
40 }
41
42 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
43                 u32 protocol_id, u16 sender_peer_id, u8 channel)
44 {
45         u32 packet_size = datasize + BASE_HEADER_SIZE;
46         BufferedPacket p(packet_size);
47         p.address = address;
48
49         writeU32(&p.data[0], protocol_id);
50         writeU16(&p.data[4], sender_peer_id);
51         writeU8(&p.data[6], channel);
52
53         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
54
55         return p;
56 }
57
58 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
59                 u32 protocol_id, u16 sender_peer_id, u8 channel)
60 {
61         return makePacket(address, *data, data.getSize(),
62                         protocol_id, sender_peer_id, channel);
63 }
64
65 SharedBuffer<u8> makeOriginalPacket(
66                 SharedBuffer<u8> data)
67 {
68         u32 header_size = 1;
69         u32 packet_size = data.getSize() + header_size;
70         SharedBuffer<u8> b(packet_size);
71
72         writeU8(&b[0], TYPE_ORIGINAL);
73
74         memcpy(&b[header_size], *data, data.getSize());
75
76         return b;
77 }
78
79 std::list<SharedBuffer<u8> > makeSplitPacket(
80                 SharedBuffer<u8> data,
81                 u32 chunksize_max,
82                 u16 seqnum)
83 {
84         // Chunk packets, containing the TYPE_SPLIT header
85         std::list<SharedBuffer<u8> > chunks;
86         
87         u32 chunk_header_size = 7;
88         u32 maximum_data_size = chunksize_max - chunk_header_size;
89         u32 start = 0;
90         u32 end = 0;
91         u32 chunk_num = 0;
92         u16 chunk_count = 0;
93         do{
94                 end = start + maximum_data_size - 1;
95                 if(end > data.getSize() - 1)
96                         end = data.getSize() - 1;
97                 
98                 u32 payload_size = end - start + 1;
99                 u32 packet_size = chunk_header_size + payload_size;
100
101                 SharedBuffer<u8> chunk(packet_size);
102                 
103                 writeU8(&chunk[0], TYPE_SPLIT);
104                 writeU16(&chunk[1], seqnum);
105                 // [3] u16 chunk_count is written at next stage
106                 writeU16(&chunk[5], chunk_num);
107                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
108
109                 chunks.push_back(chunk);
110                 chunk_count++;
111                 
112                 start = end + 1;
113                 chunk_num++;
114         }
115         while(end != data.getSize() - 1);
116
117         for(std::list<SharedBuffer<u8> >::iterator i = chunks.begin();
118                 i != chunks.end(); ++i)
119         {
120                 // Write chunk_count
121                 writeU16(&((*i)[3]), chunk_count);
122         }
123
124         return chunks;
125 }
126
127 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
128                 SharedBuffer<u8> data,
129                 u32 chunksize_max,
130                 u16 &split_seqnum)
131 {
132         u32 original_header_size = 1;
133         std::list<SharedBuffer<u8> > list;
134         if(data.getSize() + original_header_size > chunksize_max)
135         {
136                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
137                 split_seqnum++;
138                 return list;
139         }
140         else
141         {
142                 list.push_back(makeOriginalPacket(data));
143         }
144         return list;
145 }
146
147 SharedBuffer<u8> makeReliablePacket(
148                 SharedBuffer<u8> data,
149                 u16 seqnum)
150 {
151         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
152         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
153                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
154         u32 header_size = 3;
155         u32 packet_size = data.getSize() + header_size;
156         SharedBuffer<u8> b(packet_size);
157
158         writeU8(&b[0], TYPE_RELIABLE);
159         writeU16(&b[1], seqnum);
160
161         memcpy(&b[header_size], *data, data.getSize());
162
163         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
164                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
165         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
166         return b;
167 }
168
169 /*
170         ReliablePacketBuffer
171 */
172
173 ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {}
174
175 void ReliablePacketBuffer::print()
176 {
177         for(std::list<BufferedPacket>::iterator i = m_list.begin();
178                 i != m_list.end();
179                 ++i)
180         {
181                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
182                 dout_con<<s<<" ";
183         }
184 }
185 bool ReliablePacketBuffer::empty()
186 {
187         return m_list.empty();
188 }
189 u32 ReliablePacketBuffer::size()
190 {
191         return m_list_size;
192 }
193 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
194 {
195         std::list<BufferedPacket>::iterator i = m_list.begin();
196         for(; i != m_list.end(); ++i)
197         {
198                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
199                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
200                                 <<", comparing to s="<<s<<std::endl;*/
201                 if(s == seqnum)
202                         break;
203         }
204         return i;
205 }
206 RPBSearchResult ReliablePacketBuffer::notFound()
207 {
208         return m_list.end();
209 }
210 bool ReliablePacketBuffer::getFirstSeqnum(u16 *result)
211 {
212         if(empty())
213                 return false;
214         BufferedPacket p = *m_list.begin();
215         *result = readU16(&p.data[BASE_HEADER_SIZE+1]);
216         return true;
217 }
218 BufferedPacket ReliablePacketBuffer::popFirst()
219 {
220         if(empty())
221                 throw NotFoundException("Buffer is empty");
222         BufferedPacket p = *m_list.begin();
223         m_list.erase(m_list.begin());
224         --m_list_size;
225         return p;
226 }
227 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
228 {
229         RPBSearchResult r = findPacket(seqnum);
230         if(r == notFound()){
231                 dout_con<<"Not found"<<std::endl;
232                 throw NotFoundException("seqnum not found in buffer");
233         }
234         BufferedPacket p = *r;
235         m_list.erase(r);
236         --m_list_size;
237         return p;
238 }
239 void ReliablePacketBuffer::insert(BufferedPacket &p)
240 {
241         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
242         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
243         assert(type == TYPE_RELIABLE);
244         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
245
246         ++m_list_size;
247         // Find the right place for the packet and insert it there
248
249         // If list is empty, just add it
250         if(m_list.empty())
251         {
252                 m_list.push_back(p);
253                 // Done.
254                 return;
255         }
256         // Otherwise find the right place
257         std::list<BufferedPacket>::iterator i = m_list.begin();
258         // Find the first packet in the list which has a higher seqnum
259         for(; i != m_list.end(); ++i){
260                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
261                 if(s == seqnum){
262                         --m_list_size;
263                         throw AlreadyExistsException("Same seqnum in list");
264                 }
265                 if(seqnum_higher(s, seqnum)){
266                         break;
267                 }
268         }
269         // If we're at the end of the list, add the packet to the
270         // end of the list
271         if(i == m_list.end())
272         {
273                 m_list.push_back(p);
274                 // Done.
275                 return;
276         }
277         // Insert before i
278         m_list.insert(i, p);
279 }
280
281 void ReliablePacketBuffer::incrementTimeouts(float dtime)
282 {
283         for(std::list<BufferedPacket>::iterator i = m_list.begin();
284                 i != m_list.end(); ++i)
285         {
286                 i->time += dtime;
287                 i->totaltime += dtime;
288         }
289 }
290
291 void ReliablePacketBuffer::resetTimedOuts(float timeout)
292 {
293         for(std::list<BufferedPacket>::iterator i = m_list.begin();
294                 i != m_list.end(); ++i)
295         {
296                 if(i->time >= timeout)
297                         i->time = 0.0;
298         }
299 }
300
301 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
302 {
303         for(std::list<BufferedPacket>::iterator i = m_list.begin();
304                 i != m_list.end(); ++i)
305         {
306                 if(i->totaltime >= timeout)
307                         return true;
308         }
309         return false;
310 }
311
312 std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
313 {
314         std::list<BufferedPacket> timed_outs;
315         for(std::list<BufferedPacket>::iterator i = m_list.begin();
316                 i != m_list.end(); ++i)
317         {
318                 if(i->time >= timeout)
319                         timed_outs.push_back(*i);
320         }
321         return timed_outs;
322 }
323
324 /*
325         IncomingSplitBuffer
326 */
327
328 IncomingSplitBuffer::~IncomingSplitBuffer()
329 {
330         for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
331                 i != m_buf.end(); ++i)
332         {
333                 delete i->second;
334         }
335 }
336 /*
337         This will throw a GotSplitPacketException when a full
338         split packet is constructed.
339 */
340 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
341 {
342         u32 headersize = BASE_HEADER_SIZE + 7;
343         assert(p.data.getSize() >= headersize);
344         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
345         assert(type == TYPE_SPLIT);
346         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
347         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
348         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
349
350         // Add if doesn't exist
351         if(m_buf.find(seqnum) == m_buf.end())
352         {
353                 IncomingSplitPacket *sp = new IncomingSplitPacket();
354                 sp->chunk_count = chunk_count;
355                 sp->reliable = reliable;
356                 m_buf[seqnum] = sp;
357         }
358         
359         IncomingSplitPacket *sp = m_buf[seqnum];
360         
361         // TODO: These errors should be thrown or something? Dunno.
362         if(chunk_count != sp->chunk_count)
363                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
364                                 <<" != sp->chunk_count="<<sp->chunk_count
365                                 <<std::endl;
366         if(reliable != sp->reliable)
367                 derr_con<<"Connection: WARNING: reliable="<<reliable
368                                 <<" != sp->reliable="<<sp->reliable
369                                 <<std::endl;
370
371         // If chunk already exists, ignore it.
372         // Sometimes two identical packets may arrive when there is network
373         // lag and the server re-sends stuff.
374         if(sp->chunks.find(chunk_num) != sp->chunks.end())
375                 return SharedBuffer<u8>();
376         
377         // Cut chunk data out of packet
378         u32 chunkdatasize = p.data.getSize() - headersize;
379         SharedBuffer<u8> chunkdata(chunkdatasize);
380         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
381         
382         // Set chunk data in buffer
383         sp->chunks[chunk_num] = chunkdata;
384         
385         // If not all chunks are received, return empty buffer
386         if(sp->allReceived() == false)
387                 return SharedBuffer<u8>();
388
389         // Calculate total size
390         u32 totalsize = 0;
391         for(std::map<u16, SharedBuffer<u8> >::iterator i = sp->chunks.begin();
392                 i != sp->chunks.end(); ++i)
393         {
394                 totalsize += i->second.getSize();
395         }
396         
397         SharedBuffer<u8> fulldata(totalsize);
398
399         // Copy chunks to data buffer
400         u32 start = 0;
401         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
402                         chunk_i++)
403         {
404                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
405                 u16 chunkdatasize = buf.getSize();
406                 memcpy(&fulldata[start], *buf, chunkdatasize);
407                 start += chunkdatasize;;
408         }
409
410         // Remove sp from buffer
411         m_buf.erase(seqnum);
412         delete sp;
413
414         return fulldata;
415 }
416 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
417 {
418         std::list<u16> remove_queue;
419         for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
420                 i != m_buf.end(); ++i)
421         {
422                 IncomingSplitPacket *p = i->second;
423                 // Reliable ones are not removed by timeout
424                 if(p->reliable == true)
425                         continue;
426                 p->time += dtime;
427                 if(p->time >= timeout)
428                         remove_queue.push_back(i->first);
429         }
430         for(std::list<u16>::iterator j = remove_queue.begin();
431                 j != remove_queue.end(); ++j)
432         {
433                 dout_con<<"NOTE: Removing timed out unreliable split packet"
434                                 <<std::endl;
435                 delete m_buf[*j];
436                 m_buf.erase(*j);
437         }
438 }
439
440 /*
441         Channel
442 */
443
444 Channel::Channel()
445 {
446         next_outgoing_seqnum = SEQNUM_INITIAL;
447         next_incoming_seqnum = SEQNUM_INITIAL;
448         next_outgoing_split_seqnum = SEQNUM_INITIAL;
449 }
450 Channel::~Channel()
451 {
452 }
453
454 /*
455         Peer
456 */
457
458 Peer::Peer(u16 a_id, Address a_address):
459         address(a_address),
460         id(a_id),
461         timeout_counter(0.0),
462         ping_timer(0.0),
463         resend_timeout(0.5),
464         avg_rtt(-1.0),
465         has_sent_with_id(false),
466         m_sendtime_accu(0),
467         m_max_packets_per_second(10),
468         m_num_sent(0),
469         m_max_num_sent(0),
470         congestion_control_aim_rtt(0.2),
471         congestion_control_max_rate(400),
472         congestion_control_min_rate(10)
473 {
474 }
475 Peer::~Peer()
476 {
477 }
478
479 void Peer::reportRTT(float rtt)
480 {
481         if(rtt >= 0.0){
482                 if(rtt < 0.01){
483                         if(m_max_packets_per_second < congestion_control_max_rate)
484                                 m_max_packets_per_second += 10;
485                 } else if(rtt < congestion_control_aim_rtt){
486                         if(m_max_packets_per_second < congestion_control_max_rate)
487                                 m_max_packets_per_second += 2;
488                 } else {
489                         m_max_packets_per_second *= 0.8;
490                         if(m_max_packets_per_second < congestion_control_min_rate)
491                                 m_max_packets_per_second = congestion_control_min_rate;
492                 }
493         }
494
495         if(rtt < -0.999)
496         {}
497         else if(avg_rtt < 0.0)
498                 avg_rtt = rtt;
499         else
500                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
501         
502         // Calculate resend_timeout
503
504         /*int reliable_count = 0;
505         for(int i=0; i<CHANNEL_COUNT; i++)
506         {
507                 reliable_count += channels[i].outgoing_reliables.size();
508         }
509         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
510                         * ((float)reliable_count * 1);*/
511         
512         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
513         if(timeout < RESEND_TIMEOUT_MIN)
514                 timeout = RESEND_TIMEOUT_MIN;
515         if(timeout > RESEND_TIMEOUT_MAX)
516                 timeout = RESEND_TIMEOUT_MAX;
517         resend_timeout = timeout;
518 }
519                                 
520 /*
521         Connection
522 */
523
524 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
525                 bool ipv6):
526         m_protocol_id(protocol_id),
527         m_max_packet_size(max_packet_size),
528         m_timeout(timeout),
529         m_socket(ipv6),
530         m_peer_id(0),
531         m_bc_peerhandler(NULL),
532         m_bc_receive_timeout(0),
533         m_indentation(0)
534 {
535         m_socket.setTimeoutMs(5);
536
537         Start();
538 }
539
540 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
541                 bool ipv6, PeerHandler *peerhandler):
542         m_protocol_id(protocol_id),
543         m_max_packet_size(max_packet_size),
544         m_timeout(timeout),
545         m_socket(ipv6),
546         m_peer_id(0),
547         m_bc_peerhandler(peerhandler),
548         m_bc_receive_timeout(0),
549         m_indentation(0)
550 {
551         m_socket.setTimeoutMs(5);
552
553         Start();
554 }
555
556
557 Connection::~Connection()
558 {
559         Stop();
560         // Delete peers
561         for(std::map<u16, Peer*>::iterator
562                         j = m_peers.begin();
563                         j != m_peers.end(); ++j)
564         {
565                 delete j->second;
566         }
567 }
568
569 /* Internal stuff */
570
571 void * Connection::Thread()
572 {
573         ThreadStarted();
574         log_register_thread("Connection");
575
576         dout_con<<"Connection thread started"<<std::endl;
577         
578         u32 curtime = porting::getTimeMs();
579         u32 lasttime = curtime;
580
581         while(!StopRequested())
582         {
583                 BEGIN_DEBUG_EXCEPTION_HANDLER
584                 
585                 lasttime = curtime;
586                 curtime = porting::getTimeMs();
587                 float dtime = (float)(curtime - lasttime) / 1000.;
588                 if(dtime > 0.1)
589                         dtime = 0.1;
590                 if(dtime < 0.0)
591                         dtime = 0.0;
592                 
593                 runTimeouts(dtime);
594
595                 //NOTE this is only thread safe for ONE consumer thread!
596                 while(!m_command_queue.empty()){
597                         ConnectionCommand c = m_command_queue.pop_frontNoEx();
598                         processCommand(c);
599                 }
600
601                 send(dtime);
602
603                 receive();
604                 
605                 END_DEBUG_EXCEPTION_HANDLER(derr_con);
606         }
607
608         return NULL;
609 }
610
611 void Connection::putEvent(ConnectionEvent &e)
612 {
613         assert(e.type != CONNEVENT_NONE);
614         m_event_queue.push_back(e);
615 }
616
617 void Connection::processCommand(ConnectionCommand &c)
618 {
619         switch(c.type){
620         case CONNCMD_NONE:
621                 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
622                 return;
623         case CONNCMD_SERVE:
624                 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
625                                 <<c.port<<std::endl;
626                 serve(c.port);
627                 return;
628         case CONNCMD_CONNECT:
629                 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
630                 connect(c.address);
631                 return;
632         case CONNCMD_DISCONNECT:
633                 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
634                 disconnect();
635                 return;
636         case CONNCMD_SEND:
637                 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
638                 send(c.peer_id, c.channelnum, c.data, c.reliable);
639                 return;
640         case CONNCMD_SEND_TO_ALL:
641                 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
642                 sendToAll(c.channelnum, c.data, c.reliable);
643                 return;
644         case CONNCMD_DELETE_PEER:
645                 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
646                 deletePeer(c.peer_id, false);
647                 return;
648         }
649 }
650
651 void Connection::send(float dtime)
652 {
653         for(std::map<u16, Peer*>::iterator
654                         j = m_peers.begin();
655                         j != m_peers.end(); ++j)
656         {
657                 Peer *peer = j->second;
658                 peer->m_sendtime_accu += dtime;
659                 peer->m_num_sent = 0;
660                 peer->m_max_num_sent = peer->m_sendtime_accu *
661                                 peer->m_max_packets_per_second;
662         }
663         Queue<OutgoingPacket> postponed_packets;
664         while(!m_outgoing_queue.empty()){
665                 OutgoingPacket packet = m_outgoing_queue.pop_front();
666                 Peer *peer = getPeerNoEx(packet.peer_id);
667                 if(!peer)
668                         continue;
669                 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
670                         postponed_packets.push_back(packet);
671                 } else if(peer->m_num_sent < peer->m_max_num_sent){
672                         rawSendAsPacket(packet.peer_id, packet.channelnum,
673                                         packet.data, packet.reliable);
674                         peer->m_num_sent++;
675                 } else {
676                         postponed_packets.push_back(packet);
677                 }
678         }
679         while(!postponed_packets.empty()){
680                 m_outgoing_queue.push_back(postponed_packets.pop_front());
681         }
682         for(std::map<u16, Peer*>::iterator
683                         j = m_peers.begin();
684                         j != m_peers.end(); ++j)
685         {
686                 Peer *peer = j->second;
687                 peer->m_sendtime_accu -= (float)peer->m_num_sent /
688                                 peer->m_max_packets_per_second;
689                 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
690                         peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
691         }
692 }
693
694 // Receive packets from the network and buffers and create ConnectionEvents
695 void Connection::receive()
696 {
697         u32 datasize = m_max_packet_size * 2;  // Double it just to be safe
698         // TODO: We can not know how many layers of header there are.
699         // For now, just assume there are no other than the base headers.
700         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
701         SharedBuffer<u8> packetdata(packet_maxsize);
702
703         bool single_wait_done = false;
704         
705         for(u32 loop_i=0; loop_i<1000; loop_i++) // Limit in case of DoS
706         {
707         try{
708                 /* Check if some buffer has relevant data */
709                 {
710                         u16 peer_id;
711                         SharedBuffer<u8> resultdata;
712                         bool got = getFromBuffers(peer_id, resultdata);
713                         if(got){
714                                 ConnectionEvent e;
715                                 e.dataReceived(peer_id, resultdata);
716                                 putEvent(e);
717                                 continue;
718                         }
719                 }
720                 
721                 if(single_wait_done){
722                         if(m_socket.WaitData(0) == false)
723                                 break;
724                 }
725                 
726                 single_wait_done = true;
727
728                 Address sender;
729                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
730
731                 if(received_size < 0)
732                         break;
733                 if(received_size < BASE_HEADER_SIZE)
734                         continue;
735                 if(readU32(&packetdata[0]) != m_protocol_id)
736                         continue;
737                 
738                 u16 peer_id = readPeerId(*packetdata);
739                 u8 channelnum = readChannel(*packetdata);
740                 if(channelnum > CHANNEL_COUNT-1){
741                         PrintInfo(derr_con);
742                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
743                         throw InvalidIncomingDataException("Channel doesn't exist");
744                 }
745
746                 if(peer_id == PEER_ID_INEXISTENT)
747                 {
748                         /*
749                                 Somebody is trying to send stuff to us with no peer id.
750                                 
751                                 Check if the same address and port was added to our peer
752                                 list before.
753                                 Allow only entries that have has_sent_with_id==false.
754                         */
755
756                         std::map<u16, Peer*>::iterator j;
757                         j = m_peers.begin();
758                         for(; j != m_peers.end(); ++j)
759                         {
760                                 Peer *peer = j->second;
761                                 if(peer->has_sent_with_id)
762                                         continue;
763                                 if(peer->address == sender)
764                                         break;
765                         }
766                         
767                         /*
768                                 If no peer was found with the same address and port,
769                                 we shall assume it is a new peer and create an entry.
770                         */
771                         if(j == m_peers.end())
772                         {
773                                 // Pass on to adding the peer
774                         }
775                         // Else: A peer was found.
776                         else
777                         {
778                                 Peer *peer = j->second;
779                                 peer_id = peer->id;
780                                 PrintInfo(derr_con);
781                                 derr_con<<"WARNING: Assuming unknown peer to be "
782                                                 <<"peer_id="<<peer_id<<std::endl;
783                         }
784                 }
785                 
786                 /*
787                         The peer was not found in our lists. Add it.
788                 */
789                 if(peer_id == PEER_ID_INEXISTENT)
790                 {
791                         // Somebody wants to make a new connection
792
793                         // Get a unique peer id (2 or higher)
794                         u16 peer_id_new = 2;
795                         /*
796                                 Find an unused peer id
797                         */
798                         bool out_of_ids = false;
799                         for(;;)
800                         {
801                                 // Check if exists
802                                 if(m_peers.find(peer_id_new) == m_peers.end())
803                                         break;
804                                 // Check for overflow
805                                 if(peer_id_new == 65535){
806                                         out_of_ids = true;
807                                         break;
808                                 }
809                                 peer_id_new++;
810                         }
811                         if(out_of_ids){
812                                 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
813                                 continue;
814                         }
815
816                         PrintInfo();
817                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
818                                         " giving peer_id="<<peer_id_new<<std::endl;
819
820                         // Create a peer
821                         Peer *peer = new Peer(peer_id_new, sender);
822                         m_peers[peer->id] = peer;
823                         
824                         // Create peer addition event
825                         ConnectionEvent e;
826                         e.peerAdded(peer_id_new, sender);
827                         putEvent(e);
828                         
829                         // Create CONTROL packet to tell the peer id to the new peer.
830                         SharedBuffer<u8> reply(4);
831                         writeU8(&reply[0], TYPE_CONTROL);
832                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
833                         writeU16(&reply[2], peer_id_new);
834                         sendAsPacket(peer_id_new, 0, reply, true);
835                         
836                         // We're now talking to a valid peer_id
837                         peer_id = peer_id_new;
838
839                         // Go on and process whatever it sent
840                 }
841
842                 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
843
844                 if(node == m_peers.end())
845                 {
846                         // Peer not found
847                         // This means that the peer id of the sender is not PEER_ID_INEXISTENT
848                         // and it is invalid.
849                         PrintInfo(derr_con);
850                         derr_con<<"Receive(): Peer not found"<<std::endl;
851                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
852                 }
853
854                 Peer *peer = node->second;
855
856                 // Validate peer address
857                 if(peer->address != sender)
858                 {
859                         PrintInfo(derr_con);
860                         derr_con<<"Peer "<<peer_id<<" sending from different address."
861                                         " Ignoring."<<std::endl;
862                         continue;
863                 }
864                 
865                 peer->timeout_counter = 0.0;
866
867                 Channel *channel = &(peer->channels[channelnum]);
868                 
869                 // Throw the received packet to channel->processPacket()
870
871                 // Make a new SharedBuffer from the data without the base headers
872                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
873                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
874                                 strippeddata.getSize());
875                 
876                 try{
877                         // Process it (the result is some data with no headers made by us)
878                         SharedBuffer<u8> resultdata = processPacket
879                                         (channel, strippeddata, peer_id, channelnum, false);
880                         
881                         PrintInfo();
882                         dout_con<<"ProcessPacket returned data of size "
883                                         <<resultdata.getSize()<<std::endl;
884                         
885                         ConnectionEvent e;
886                         e.dataReceived(peer_id, resultdata);
887                         putEvent(e);
888                         continue;
889                 }catch(ProcessedSilentlyException &e){
890                 }
891         }catch(InvalidIncomingDataException &e){
892         }
893         catch(ProcessedSilentlyException &e){
894         }
895         } // for
896 }
897
898 void Connection::runTimeouts(float dtime)
899 {
900         float congestion_control_aim_rtt
901                         = g_settings->getFloat("congestion_control_aim_rtt");
902         float congestion_control_max_rate
903                         = g_settings->getFloat("congestion_control_max_rate");
904         float congestion_control_min_rate
905                         = g_settings->getFloat("congestion_control_min_rate");
906
907         std::list<u16> timeouted_peers;
908         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
909                 j != m_peers.end(); ++j)
910         {
911                 Peer *peer = j->second;
912
913                 // Update congestion control values
914                 peer->congestion_control_aim_rtt = congestion_control_aim_rtt;
915                 peer->congestion_control_max_rate = congestion_control_max_rate;
916                 peer->congestion_control_min_rate = congestion_control_min_rate;
917                 
918                 /*
919                         Check peer timeout
920                 */
921                 peer->timeout_counter += dtime;
922                 if(peer->timeout_counter > m_timeout)
923                 {
924                         PrintInfo(derr_con);
925                         derr_con<<"RunTimeouts(): Peer "<<peer->id
926                                         <<" has timed out."
927                                         <<" (source=peer->timeout_counter)"
928                                         <<std::endl;
929                         // Add peer to the list
930                         timeouted_peers.push_back(peer->id);
931                         // Don't bother going through the buffers of this one
932                         continue;
933                 }
934
935                 float resend_timeout = peer->resend_timeout;
936                 for(u16 i=0; i<CHANNEL_COUNT; i++)
937                 {
938                         std::list<BufferedPacket> timed_outs;
939                         
940                         Channel *channel = &peer->channels[i];
941
942                         // Remove timed out incomplete unreliable split packets
943                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
944                         
945                         // Increment reliable packet times
946                         channel->outgoing_reliables.incrementTimeouts(dtime);
947
948                         // Check reliable packet total times, remove peer if
949                         // over timeout.
950                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
951                         {
952                                 PrintInfo(derr_con);
953                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
954                                                 <<" has timed out."
955                                                 <<" (source=reliable packet totaltime)"
956                                                 <<std::endl;
957                                 // Add peer to the to-be-removed list
958                                 timeouted_peers.push_back(peer->id);
959                                 goto nextpeer;
960                         }
961
962                         // Re-send timed out outgoing reliables
963                         
964                         timed_outs = channel->
965                                         outgoing_reliables.getTimedOuts(resend_timeout);
966
967                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
968
969                         for(std::list<BufferedPacket>::iterator j = timed_outs.begin();
970                                 j != timed_outs.end(); ++j)
971                         {
972                                 u16 peer_id = readPeerId(*(j->data));
973                                 u8 channel = readChannel(*(j->data));
974                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
975
976                                 PrintInfo(derr_con);
977                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
978                                 j->address.print(&derr_con);
979                                 derr_con<<"(t/o="<<resend_timeout<<"): "
980                                                 <<"from_peer_id="<<peer_id
981                                                 <<", channel="<<((int)channel&0xff)
982                                                 <<", seqnum="<<seqnum
983                                                 <<std::endl;
984
985                                 rawSend(*j);
986
987                                 // Enlarge avg_rtt and resend_timeout:
988                                 // The rtt will be at least the timeout.
989                                 // NOTE: This won't affect the timeout of the next
990                                 // checked channel because it was cached.
991                                 peer->reportRTT(resend_timeout);
992                         }
993                 }
994                 
995                 /*
996                         Send pings
997                 */
998                 peer->ping_timer += dtime;
999                 if(peer->ping_timer >= 5.0)
1000                 {
1001                         // Create and send PING packet
1002                         SharedBuffer<u8> data(2);
1003                         writeU8(&data[0], TYPE_CONTROL);
1004                         writeU8(&data[1], CONTROLTYPE_PING);
1005                         rawSendAsPacket(peer->id, 0, data, true);
1006
1007                         peer->ping_timer = 0.0;
1008                 }
1009                 
1010 nextpeer:
1011                 continue;
1012         }
1013
1014         // Remove timed out peers
1015         for(std::list<u16>::iterator i = timeouted_peers.begin();
1016                 i != timeouted_peers.end(); ++i)
1017         {
1018                 PrintInfo(derr_con);
1019                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1020                 deletePeer(*i, true);
1021         }
1022 }
1023
1024 void Connection::serve(u16 port)
1025 {
1026         dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
1027         try{
1028                 m_socket.Bind(port);
1029                 m_peer_id = PEER_ID_SERVER;
1030         }
1031         catch(SocketException &e){
1032                 // Create event
1033                 ConnectionEvent ce;
1034                 ce.bindFailed();
1035                 putEvent(ce);
1036         }
1037 }
1038
1039 void Connection::connect(Address address)
1040 {
1041         dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1042                         <<":"<<address.getPort()<<std::endl;
1043
1044         std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1045         if(node != m_peers.end()){
1046                 throw ConnectionException("Already connected to a server");
1047         }
1048
1049         Peer *peer = new Peer(PEER_ID_SERVER, address);
1050         m_peers[peer->id] = peer;
1051
1052         // Create event
1053         ConnectionEvent e;
1054         e.peerAdded(peer->id, peer->address);
1055         putEvent(e);
1056         
1057         m_socket.Bind(0);
1058         
1059         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1060         m_peer_id = PEER_ID_INEXISTENT;
1061         SharedBuffer<u8> data(0);
1062         Send(PEER_ID_SERVER, 0, data, true);
1063 }
1064
1065 void Connection::disconnect()
1066 {
1067         dout_con<<getDesc()<<" disconnecting"<<std::endl;
1068
1069         // Create and send DISCO packet
1070         SharedBuffer<u8> data(2);
1071         writeU8(&data[0], TYPE_CONTROL);
1072         writeU8(&data[1], CONTROLTYPE_DISCO);
1073         
1074         // Send to all
1075         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1076                 j != m_peers.end(); ++j)
1077         {
1078                 Peer *peer = j->second;
1079                 rawSendAsPacket(peer->id, 0, data, false);
1080         }
1081 }
1082
1083 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1084 {
1085         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1086                 j != m_peers.end(); ++j)
1087         {
1088                 Peer *peer = j->second;
1089                 send(peer->id, channelnum, data, reliable);
1090         }
1091 }
1092
1093 void Connection::send(u16 peer_id, u8 channelnum,
1094                 SharedBuffer<u8> data, bool reliable)
1095 {
1096         dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1097
1098         assert(channelnum < CHANNEL_COUNT);
1099         
1100         Peer *peer = getPeerNoEx(peer_id);
1101         if(peer == NULL)
1102                 return;
1103         Channel *channel = &(peer->channels[channelnum]);
1104
1105         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1106         if(reliable)
1107                 chunksize_max -= RELIABLE_HEADER_SIZE;
1108
1109         std::list<SharedBuffer<u8> > originals;
1110         originals = makeAutoSplitPacket(data, chunksize_max,
1111                         channel->next_outgoing_split_seqnum);
1112         
1113         for(std::list<SharedBuffer<u8> >::iterator i = originals.begin();
1114                 i != originals.end(); ++i)
1115         {
1116                 SharedBuffer<u8> original = *i;
1117                 
1118                 sendAsPacket(peer_id, channelnum, original, reliable);
1119         }
1120 }
1121
1122 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1123                 SharedBuffer<u8> data, bool reliable)
1124 {
1125         OutgoingPacket packet(peer_id, channelnum, data, reliable);
1126         m_outgoing_queue.push_back(packet);
1127 }
1128
1129 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1130                 SharedBuffer<u8> data, bool reliable)
1131 {
1132         Peer *peer = getPeerNoEx(peer_id);
1133         if(!peer)
1134                 return;
1135         Channel *channel = &(peer->channels[channelnum]);
1136
1137         if(reliable)
1138         {
1139                 u16 seqnum = channel->next_outgoing_seqnum;
1140                 channel->next_outgoing_seqnum++;
1141
1142                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1143
1144                 // Add base headers and make a packet
1145                 BufferedPacket p = makePacket(peer->address, reliable,
1146                                 m_protocol_id, m_peer_id, channelnum);
1147                 
1148                 try{
1149                         // Buffer the packet
1150                         channel->outgoing_reliables.insert(p);
1151                 }
1152                 catch(AlreadyExistsException &e)
1153                 {
1154                         PrintInfo(derr_con);
1155                         derr_con<<"WARNING: Going to send a reliable packet "
1156                                         "seqnum="<<seqnum<<" that is already "
1157                                         "in outgoing buffer"<<std::endl;
1158                         //assert(0);
1159                 }
1160                 
1161                 // Send the packet
1162                 rawSend(p);
1163         }
1164         else
1165         {
1166                 // Add base headers and make a packet
1167                 BufferedPacket p = makePacket(peer->address, data,
1168                                 m_protocol_id, m_peer_id, channelnum);
1169
1170                 // Send the packet
1171                 rawSend(p);
1172         }
1173 }
1174
1175 void Connection::rawSend(const BufferedPacket &packet)
1176 {
1177         try{
1178                 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1179         } catch(SendFailedException &e){
1180                 derr_con<<"Connection::rawSend(): SendFailedException: "
1181                                 <<packet.address.serializeString()<<std::endl;
1182         }
1183 }
1184
1185 Peer* Connection::getPeer(u16 peer_id)
1186 {
1187         std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1188
1189         if(node == m_peers.end()){
1190                 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1191         }
1192
1193         // Error checking
1194         assert(node->second->id == peer_id);
1195
1196         return node->second;
1197 }
1198
1199 Peer* Connection::getPeerNoEx(u16 peer_id)
1200 {
1201         std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1202
1203         if(node == m_peers.end()){
1204                 return NULL;
1205         }
1206
1207         // Error checking
1208         assert(node->second->id == peer_id);
1209
1210         return node->second;
1211 }
1212
1213 std::list<Peer*> Connection::getPeers()
1214 {
1215         std::list<Peer*> list;
1216         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1217                 j != m_peers.end(); ++j)
1218         {
1219                 Peer *peer = j->second;
1220                 list.push_back(peer);
1221         }
1222         return list;
1223 }
1224
1225 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1226 {
1227         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1228                 j != m_peers.end(); ++j)
1229         {
1230                 Peer *peer = j->second;
1231                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1232                 {
1233                         Channel *channel = &peer->channels[i];
1234                         SharedBuffer<u8> resultdata;
1235                         bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1236                         if(got){
1237                                 dst = resultdata;
1238                                 return true;
1239                         }
1240                 }
1241         }
1242         return false;
1243 }
1244
1245 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1246                 SharedBuffer<u8> &dst)
1247 {
1248         u16 firstseqnum = 0;
1249         // Clear old packets from start of buffer
1250         for(;;){
1251                 bool found = channel->incoming_reliables.getFirstSeqnum(&firstseqnum);
1252                 if(!found)
1253                         break;
1254                 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1255                         channel->incoming_reliables.popFirst();
1256                 else
1257                         break;
1258         }
1259         // This happens if all packets are old
1260         
1261         if(channel->incoming_reliables.empty() == false)
1262         {
1263                 if(firstseqnum == channel->next_incoming_seqnum)
1264                 {
1265                         BufferedPacket p = channel->incoming_reliables.popFirst();
1266                         
1267                         peer_id = readPeerId(*p.data);
1268                         u8 channelnum = readChannel(*p.data);
1269                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1270
1271                         PrintInfo();
1272                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
1273                                         <<" seqnum="<<seqnum
1274                                         <<" peer_id="<<peer_id
1275                                         <<" channel="<<((int)channelnum&0xff)
1276                                         <<std::endl;
1277
1278                         channel->next_incoming_seqnum++;
1279                         
1280                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1281                         // Get out the inside packet and re-process it
1282                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1283                         memcpy(*payload, &p.data[headers_size], payload.getSize());
1284
1285                         dst = processPacket(channel, payload, peer_id, channelnum, true);
1286                         return true;
1287                 }
1288         }
1289         return false;
1290 }
1291
1292 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1293                 SharedBuffer<u8> packetdata, u16 peer_id,
1294                 u8 channelnum, bool reliable)
1295 {
1296         IndentationRaiser iraiser(&(m_indentation));
1297
1298         if(packetdata.getSize() < 1)
1299                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1300
1301         u8 type = readU8(&packetdata[0]);
1302         
1303         if(type == TYPE_CONTROL)
1304         {
1305                 if(packetdata.getSize() < 2)
1306                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
1307
1308                 u8 controltype = readU8(&packetdata[1]);
1309
1310                 if(controltype == CONTROLTYPE_ACK)
1311                 {
1312                         if(packetdata.getSize() < 4)
1313                                 throw InvalidIncomingDataException
1314                                                 ("packetdata.getSize() < 4 (ACK header size)");
1315
1316                         u16 seqnum = readU16(&packetdata[2]);
1317                         PrintInfo();
1318                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1319                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1320                                         <<", seqnum="<<seqnum<<std::endl;
1321
1322                         try{
1323                                 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1324                                 // Get round trip time
1325                                 float rtt = p.totaltime;
1326
1327                                 // Let peer calculate stuff according to it
1328                                 // (avg_rtt and resend_timeout)
1329                                 Peer *peer = getPeer(peer_id);
1330                                 peer->reportRTT(rtt);
1331
1332                                 //PrintInfo(dout_con);
1333                                 //dout_con<<"RTT = "<<rtt<<std::endl;
1334
1335                                 /*dout_con<<"OUTGOING: ";
1336                                 PrintInfo();
1337                                 channel->outgoing_reliables.print();
1338                                 dout_con<<std::endl;*/
1339                         }
1340                         catch(NotFoundException &e){
1341                                 PrintInfo(derr_con);
1342                                 derr_con<<"WARNING: ACKed packet not "
1343                                                 "in outgoing queue"
1344                                                 <<std::endl;
1345                         }
1346
1347                         throw ProcessedSilentlyException("Got an ACK");
1348                 }
1349                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1350                 {
1351                         if(packetdata.getSize() < 4)
1352                                 throw InvalidIncomingDataException
1353                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1354                         u16 peer_id_new = readU16(&packetdata[2]);
1355                         PrintInfo();
1356                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1357
1358                         if(GetPeerID() != PEER_ID_INEXISTENT)
1359                         {
1360                                 PrintInfo(derr_con);
1361                                 derr_con<<"WARNING: Not changing"
1362                                                 " existing peer id."<<std::endl;
1363                         }
1364                         else
1365                         {
1366                                 dout_con<<"changing."<<std::endl;
1367                                 SetPeerID(peer_id_new);
1368                         }
1369                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
1370                 }
1371                 else if(controltype == CONTROLTYPE_PING)
1372                 {
1373                         // Just ignore it, the incoming data already reset
1374                         // the timeout counter
1375                         PrintInfo();
1376                         dout_con<<"PING"<<std::endl;
1377                         throw ProcessedSilentlyException("Got a PING");
1378                 }
1379                 else if(controltype == CONTROLTYPE_DISCO)
1380                 {
1381                         // Just ignore it, the incoming data already reset
1382                         // the timeout counter
1383                         PrintInfo();
1384                         dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1385                         
1386                         if(deletePeer(peer_id, false) == false)
1387                         {
1388                                 PrintInfo(derr_con);
1389                                 derr_con<<"DISCO: Peer not found"<<std::endl;
1390                         }
1391
1392                         throw ProcessedSilentlyException("Got a DISCO");
1393                 }
1394                 else{
1395                         PrintInfo(derr_con);
1396                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1397                                         <<((int)controltype&0xff)<<std::endl;
1398                         throw InvalidIncomingDataException("Invalid control type");
1399                 }
1400         }
1401         else if(type == TYPE_ORIGINAL)
1402         {
1403                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1404                         throw InvalidIncomingDataException
1405                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1406                 PrintInfo();
1407                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1408                                 <<std::endl;
1409                 // Get the inside packet out and return it
1410                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1411                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1412                 return payload;
1413         }
1414         else if(type == TYPE_SPLIT)
1415         {
1416                 // We have to create a packet again for buffering
1417                 // This isn't actually too bad an idea.
1418                 BufferedPacket packet = makePacket(
1419                                 getPeer(peer_id)->address,
1420                                 packetdata,
1421                                 GetProtocolID(),
1422                                 peer_id,
1423                                 channelnum);
1424                 // Buffer the packet
1425                 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1426                 if(data.getSize() != 0)
1427                 {
1428                         PrintInfo();
1429                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1430                                         <<"size="<<data.getSize()<<std::endl;
1431                         return data;
1432                 }
1433                 PrintInfo();
1434                 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1435                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1436         }
1437         else if(type == TYPE_RELIABLE)
1438         {
1439                 // Recursive reliable packets not allowed
1440                 if(reliable)
1441                         throw InvalidIncomingDataException("Found nested reliable packets");
1442
1443                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1444                         throw InvalidIncomingDataException
1445                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1446
1447                 u16 seqnum = readU16(&packetdata[1]);
1448
1449                 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1450                 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1451                 
1452                 PrintInfo();
1453                 if(is_future_packet)
1454                         dout_con<<"BUFFERING";
1455                 else if(is_old_packet)
1456                         dout_con<<"OLD";
1457                 else
1458                         dout_con<<"RECUR";
1459                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1460                                 <<" next="<<channel->next_incoming_seqnum;
1461                 dout_con<<" [sending CONTROLTYPE_ACK"
1462                                 " to peer_id="<<peer_id<<"]";
1463                 dout_con<<std::endl;
1464                 
1465                 //DEBUG
1466                 //assert(channel->incoming_reliables.size() < 100);
1467
1468                 // Send a CONTROLTYPE_ACK
1469                 SharedBuffer<u8> reply(4);
1470                 writeU8(&reply[0], TYPE_CONTROL);
1471                 writeU8(&reply[1], CONTROLTYPE_ACK);
1472                 writeU16(&reply[2], seqnum);
1473                 rawSendAsPacket(peer_id, channelnum, reply, false);
1474
1475                 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1476                 if(is_future_packet)
1477                 {
1478                         /*PrintInfo();
1479                         dout_con<<"Buffering reliable packet (seqnum="
1480                                         <<seqnum<<")"<<std::endl;*/
1481                         
1482                         // This one comes later, buffer it.
1483                         // Actually we have to make a packet to buffer one.
1484                         // Well, we have all the ingredients, so just do it.
1485                         BufferedPacket packet = makePacket(
1486                                         getPeer(peer_id)->address,
1487                                         packetdata,
1488                                         GetProtocolID(),
1489                                         peer_id,
1490                                         channelnum);
1491                         try{
1492                                 channel->incoming_reliables.insert(packet);
1493                                 
1494                                 /*PrintInfo();
1495                                 dout_con<<"INCOMING: ";
1496                                 channel->incoming_reliables.print();
1497                                 dout_con<<std::endl;*/
1498                         }
1499                         catch(AlreadyExistsException &e)
1500                         {
1501                         }
1502
1503                         throw ProcessedSilentlyException("Buffered future reliable packet");
1504                 }
1505                 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1506                 else if(is_old_packet)
1507                 {
1508                         // An old packet, dump it
1509                         throw InvalidIncomingDataException("Got an old reliable packet");
1510                 }
1511
1512                 channel->next_incoming_seqnum++;
1513
1514                 // Get out the inside packet and re-process it
1515                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1516                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1517
1518                 return processPacket(channel, payload, peer_id, channelnum, true);
1519         }
1520         else
1521         {
1522                 PrintInfo(derr_con);
1523                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1524                 throw InvalidIncomingDataException("Invalid packet type");
1525         }
1526         
1527         // We should never get here.
1528         // If you get here, add an exception or a return to some of the
1529         // above conditionals.
1530         assert(0);
1531         throw BaseException("Error in Channel::ProcessPacket()");
1532 }
1533
1534 bool Connection::deletePeer(u16 peer_id, bool timeout)
1535 {
1536         if(m_peers.find(peer_id) == m_peers.end())
1537                 return false;
1538         
1539         Peer *peer = m_peers[peer_id];
1540
1541         // Create event
1542         ConnectionEvent e;
1543         e.peerRemoved(peer_id, timeout, peer->address);
1544         putEvent(e);
1545
1546         delete m_peers[peer_id];
1547         m_peers.erase(peer_id);
1548         return true;
1549 }
1550
1551 /* Interface */
1552
1553 ConnectionEvent Connection::getEvent()
1554 {
1555         if(m_event_queue.empty()){
1556                 ConnectionEvent e;
1557                 e.type = CONNEVENT_NONE;
1558                 return e;
1559         }
1560         return m_event_queue.pop_frontNoEx();
1561 }
1562
1563 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1564 {
1565         try{
1566                 return m_event_queue.pop_front(timeout_ms);
1567         } catch(ItemNotFoundException &ex){
1568                 ConnectionEvent e;
1569                 e.type = CONNEVENT_NONE;
1570                 return e;
1571         }
1572 }
1573
1574 void Connection::putCommand(ConnectionCommand &c)
1575 {
1576         m_command_queue.push_back(c);
1577 }
1578
1579 void Connection::Serve(unsigned short port)
1580 {
1581         ConnectionCommand c;
1582         c.serve(port);
1583         putCommand(c);
1584 }
1585
1586 void Connection::Connect(Address address)
1587 {
1588         ConnectionCommand c;
1589         c.connect(address);
1590         putCommand(c);
1591 }
1592
1593 bool Connection::Connected()
1594 {
1595         JMutexAutoLock peerlock(m_peers_mutex);
1596
1597         if(m_peers.size() != 1)
1598                 return false;
1599                 
1600         std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1601         if(node == m_peers.end())
1602                 return false;
1603         
1604         if(m_peer_id == PEER_ID_INEXISTENT)
1605                 return false;
1606         
1607         return true;
1608 }
1609
1610 void Connection::Disconnect()
1611 {
1612         ConnectionCommand c;
1613         c.disconnect();
1614         putCommand(c);
1615 }
1616
1617 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1618 {
1619         for(;;){
1620                 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1621                 if(e.type != CONNEVENT_NONE)
1622                         dout_con<<getDesc()<<": Receive: got event: "
1623                                         <<e.describe()<<std::endl;
1624                 switch(e.type){
1625                 case CONNEVENT_NONE:
1626                         throw NoIncomingDataException("No incoming data");
1627                 case CONNEVENT_DATA_RECEIVED:
1628                         peer_id = e.peer_id;
1629                         data = SharedBuffer<u8>(e.data);
1630                         return e.data.getSize();
1631                 case CONNEVENT_PEER_ADDED: {
1632                         Peer tmp(e.peer_id, e.address);
1633                         if(m_bc_peerhandler)
1634                                 m_bc_peerhandler->peerAdded(&tmp);
1635                         continue; }
1636                 case CONNEVENT_PEER_REMOVED: {
1637                         Peer tmp(e.peer_id, e.address);
1638                         if(m_bc_peerhandler)
1639                                 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1640                         continue; }
1641                 case CONNEVENT_BIND_FAILED:
1642                         throw ConnectionBindFailed("Failed to bind socket "
1643                                         "(port already in use?)");
1644                 }
1645         }
1646         throw NoIncomingDataException("No incoming data");
1647 }
1648
1649 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1650 {
1651         assert(channelnum < CHANNEL_COUNT);
1652
1653         ConnectionCommand c;
1654         c.sendToAll(channelnum, data, reliable);
1655         putCommand(c);
1656 }
1657
1658 void Connection::Send(u16 peer_id, u8 channelnum,
1659                 SharedBuffer<u8> data, bool reliable)
1660 {
1661         assert(channelnum < CHANNEL_COUNT);
1662
1663         ConnectionCommand c;
1664         c.send(peer_id, channelnum, data, reliable);
1665         putCommand(c);
1666 }
1667
1668 void Connection::RunTimeouts(float dtime)
1669 {
1670         // No-op
1671 }
1672
1673 Address Connection::GetPeerAddress(u16 peer_id)
1674 {
1675         JMutexAutoLock peerlock(m_peers_mutex);
1676         return getPeer(peer_id)->address;
1677 }
1678
1679 float Connection::GetPeerAvgRTT(u16 peer_id)
1680 {
1681         JMutexAutoLock peerlock(m_peers_mutex);
1682         return getPeer(peer_id)->avg_rtt;
1683 }
1684
1685 void Connection::DeletePeer(u16 peer_id)
1686 {
1687         ConnectionCommand c;
1688         c.deletePeer(peer_id);
1689         putCommand(c);
1690 }
1691
1692 void Connection::PrintInfo(std::ostream &out)
1693 {
1694         out<<getDesc()<<": ";
1695 }
1696
1697 void Connection::PrintInfo()
1698 {
1699         PrintInfo(dout_con);
1700 }
1701
1702 std::string Connection::getDesc()
1703 {
1704         return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";
1705 }
1706
1707 } // namespace
1708