]> git.lizzy.rs Git - dragonfireclient.git/blob - src/connection.cpp
Fix typo in lua_api.txt
[dragonfireclient.git] / src / connection.cpp
1 /*
2 Minetest
3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "connection.h"
21 #include "main.h"
22 #include "serialization.h"
23 #include "log.h"
24 #include "porting.h"
25 #include "util/serialize.h"
26 #include "util/numeric.h"
27 #include "util/string.h"
28 #include "settings.h"
29
30 namespace con
31 {
32
33 static u16 readPeerId(u8 *packetdata)
34 {
35         return readU16(&packetdata[4]);
36 }
37 static u8 readChannel(u8 *packetdata)
38 {
39         return readU8(&packetdata[6]);
40 }
41
42 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
43                 u32 protocol_id, u16 sender_peer_id, u8 channel)
44 {
45         u32 packet_size = datasize + BASE_HEADER_SIZE;
46         BufferedPacket p(packet_size);
47         p.address = address;
48
49         writeU32(&p.data[0], protocol_id);
50         writeU16(&p.data[4], sender_peer_id);
51         writeU8(&p.data[6], channel);
52
53         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
54
55         return p;
56 }
57
58 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
59                 u32 protocol_id, u16 sender_peer_id, u8 channel)
60 {
61         return makePacket(address, *data, data.getSize(),
62                         protocol_id, sender_peer_id, channel);
63 }
64
65 SharedBuffer<u8> makeOriginalPacket(
66                 SharedBuffer<u8> data)
67 {
68         u32 header_size = 1;
69         u32 packet_size = data.getSize() + header_size;
70         SharedBuffer<u8> b(packet_size);
71
72         writeU8(&b[0], TYPE_ORIGINAL);
73
74         memcpy(&b[header_size], *data, data.getSize());
75
76         return b;
77 }
78
79 std::list<SharedBuffer<u8> > makeSplitPacket(
80                 SharedBuffer<u8> data,
81                 u32 chunksize_max,
82                 u16 seqnum)
83 {
84         // Chunk packets, containing the TYPE_SPLIT header
85         std::list<SharedBuffer<u8> > chunks;
86         
87         u32 chunk_header_size = 7;
88         u32 maximum_data_size = chunksize_max - chunk_header_size;
89         u32 start = 0;
90         u32 end = 0;
91         u32 chunk_num = 0;
92         u16 chunk_count = 0;
93         do{
94                 end = start + maximum_data_size - 1;
95                 if(end > data.getSize() - 1)
96                         end = data.getSize() - 1;
97                 
98                 u32 payload_size = end - start + 1;
99                 u32 packet_size = chunk_header_size + payload_size;
100
101                 SharedBuffer<u8> chunk(packet_size);
102                 
103                 writeU8(&chunk[0], TYPE_SPLIT);
104                 writeU16(&chunk[1], seqnum);
105                 // [3] u16 chunk_count is written at next stage
106                 writeU16(&chunk[5], chunk_num);
107                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
108
109                 chunks.push_back(chunk);
110                 chunk_count++;
111                 
112                 start = end + 1;
113                 chunk_num++;
114         }
115         while(end != data.getSize() - 1);
116
117         for(std::list<SharedBuffer<u8> >::iterator i = chunks.begin();
118                 i != chunks.end(); ++i)
119         {
120                 // Write chunk_count
121                 writeU16(&((*i)[3]), chunk_count);
122         }
123
124         return chunks;
125 }
126
127 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
128                 SharedBuffer<u8> data,
129                 u32 chunksize_max,
130                 u16 &split_seqnum)
131 {
132         u32 original_header_size = 1;
133         std::list<SharedBuffer<u8> > list;
134         if(data.getSize() + original_header_size > chunksize_max)
135         {
136                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
137                 split_seqnum++;
138                 return list;
139         }
140         else
141         {
142                 list.push_back(makeOriginalPacket(data));
143         }
144         return list;
145 }
146
147 SharedBuffer<u8> makeReliablePacket(
148                 SharedBuffer<u8> data,
149                 u16 seqnum)
150 {
151         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
152         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
153                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
154         u32 header_size = 3;
155         u32 packet_size = data.getSize() + header_size;
156         SharedBuffer<u8> b(packet_size);
157
158         writeU8(&b[0], TYPE_RELIABLE);
159         writeU16(&b[1], seqnum);
160
161         memcpy(&b[header_size], *data, data.getSize());
162
163         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
164                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
165         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
166         return b;
167 }
168
169 /*
170         ReliablePacketBuffer
171 */
172
173 ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {}
174
175 void ReliablePacketBuffer::print()
176 {
177         for(std::list<BufferedPacket>::iterator i = m_list.begin();
178                 i != m_list.end();
179                 ++i)
180         {
181                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
182                 dout_con<<s<<" ";
183         }
184 }
185 bool ReliablePacketBuffer::empty()
186 {
187         return m_list.empty();
188 }
189 u32 ReliablePacketBuffer::size()
190 {
191         return m_list_size;
192 }
193 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
194 {
195         std::list<BufferedPacket>::iterator i = m_list.begin();
196         for(; i != m_list.end(); ++i)
197         {
198                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
199                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
200                                 <<", comparing to s="<<s<<std::endl;*/
201                 if(s == seqnum)
202                         break;
203         }
204         return i;
205 }
206 RPBSearchResult ReliablePacketBuffer::notFound()
207 {
208         return m_list.end();
209 }
210 bool ReliablePacketBuffer::getFirstSeqnum(u16 *result)
211 {
212         if(empty())
213                 return false;
214         BufferedPacket p = *m_list.begin();
215         *result = readU16(&p.data[BASE_HEADER_SIZE+1]);
216         return true;
217 }
218 BufferedPacket ReliablePacketBuffer::popFirst()
219 {
220         if(empty())
221                 throw NotFoundException("Buffer is empty");
222         BufferedPacket p = *m_list.begin();
223         m_list.erase(m_list.begin());
224         --m_list_size;
225         return p;
226 }
227 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
228 {
229         RPBSearchResult r = findPacket(seqnum);
230         if(r == notFound()){
231                 dout_con<<"Not found"<<std::endl;
232                 throw NotFoundException("seqnum not found in buffer");
233         }
234         BufferedPacket p = *r;
235         m_list.erase(r);
236         --m_list_size;
237         return p;
238 }
239 void ReliablePacketBuffer::insert(BufferedPacket &p)
240 {
241         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
242         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
243         assert(type == TYPE_RELIABLE);
244         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
245
246         ++m_list_size;
247         // Find the right place for the packet and insert it there
248
249         // If list is empty, just add it
250         if(m_list.empty())
251         {
252                 m_list.push_back(p);
253                 // Done.
254                 return;
255         }
256         // Otherwise find the right place
257         std::list<BufferedPacket>::iterator i = m_list.begin();
258         // Find the first packet in the list which has a higher seqnum
259         for(; i != m_list.end(); ++i){
260                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
261                 if(s == seqnum){
262                         --m_list_size;
263                         throw AlreadyExistsException("Same seqnum in list");
264                 }
265                 if(seqnum_higher(s, seqnum)){
266                         break;
267                 }
268         }
269         // If we're at the end of the list, add the packet to the
270         // end of the list
271         if(i == m_list.end())
272         {
273                 m_list.push_back(p);
274                 // Done.
275                 return;
276         }
277         // Insert before i
278         m_list.insert(i, p);
279 }
280
281 void ReliablePacketBuffer::incrementTimeouts(float dtime)
282 {
283         for(std::list<BufferedPacket>::iterator i = m_list.begin();
284                 i != m_list.end(); ++i)
285         {
286                 i->time += dtime;
287                 i->totaltime += dtime;
288         }
289 }
290
291 void ReliablePacketBuffer::resetTimedOuts(float timeout)
292 {
293         for(std::list<BufferedPacket>::iterator i = m_list.begin();
294                 i != m_list.end(); ++i)
295         {
296                 if(i->time >= timeout)
297                         i->time = 0.0;
298         }
299 }
300
301 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
302 {
303         for(std::list<BufferedPacket>::iterator i = m_list.begin();
304                 i != m_list.end(); ++i)
305         {
306                 if(i->totaltime >= timeout)
307                         return true;
308         }
309         return false;
310 }
311
312 std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
313 {
314         std::list<BufferedPacket> timed_outs;
315         for(std::list<BufferedPacket>::iterator i = m_list.begin();
316                 i != m_list.end(); ++i)
317         {
318                 if(i->time >= timeout)
319                         timed_outs.push_back(*i);
320         }
321         return timed_outs;
322 }
323
324 /*
325         IncomingSplitBuffer
326 */
327
328 IncomingSplitBuffer::~IncomingSplitBuffer()
329 {
330         for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
331                 i != m_buf.end(); ++i)
332         {
333                 delete i->second;
334         }
335 }
336 /*
337         This will throw a GotSplitPacketException when a full
338         split packet is constructed.
339 */
340 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
341 {
342         u32 headersize = BASE_HEADER_SIZE + 7;
343         assert(p.data.getSize() >= headersize);
344         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
345         assert(type == TYPE_SPLIT);
346         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
347         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
348         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
349
350         // Add if doesn't exist
351         if(m_buf.find(seqnum) == m_buf.end())
352         {
353                 IncomingSplitPacket *sp = new IncomingSplitPacket();
354                 sp->chunk_count = chunk_count;
355                 sp->reliable = reliable;
356                 m_buf[seqnum] = sp;
357         }
358         
359         IncomingSplitPacket *sp = m_buf[seqnum];
360         
361         // TODO: These errors should be thrown or something? Dunno.
362         if(chunk_count != sp->chunk_count)
363                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
364                                 <<" != sp->chunk_count="<<sp->chunk_count
365                                 <<std::endl;
366         if(reliable != sp->reliable)
367                 derr_con<<"Connection: WARNING: reliable="<<reliable
368                                 <<" != sp->reliable="<<sp->reliable
369                                 <<std::endl;
370
371         // If chunk already exists, ignore it.
372         // Sometimes two identical packets may arrive when there is network
373         // lag and the server re-sends stuff.
374         if(sp->chunks.find(chunk_num) != sp->chunks.end())
375                 return SharedBuffer<u8>();
376         
377         // Cut chunk data out of packet
378         u32 chunkdatasize = p.data.getSize() - headersize;
379         SharedBuffer<u8> chunkdata(chunkdatasize);
380         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
381         
382         // Set chunk data in buffer
383         sp->chunks[chunk_num] = chunkdata;
384         
385         // If not all chunks are received, return empty buffer
386         if(sp->allReceived() == false)
387                 return SharedBuffer<u8>();
388
389         // Calculate total size
390         u32 totalsize = 0;
391         for(std::map<u16, SharedBuffer<u8> >::iterator i = sp->chunks.begin();
392                 i != sp->chunks.end(); ++i)
393         {
394                 totalsize += i->second.getSize();
395         }
396         
397         SharedBuffer<u8> fulldata(totalsize);
398
399         // Copy chunks to data buffer
400         u32 start = 0;
401         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
402                         chunk_i++)
403         {
404                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
405                 u16 chunkdatasize = buf.getSize();
406                 memcpy(&fulldata[start], *buf, chunkdatasize);
407                 start += chunkdatasize;;
408         }
409
410         // Remove sp from buffer
411         m_buf.erase(seqnum);
412         delete sp;
413
414         return fulldata;
415 }
416 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
417 {
418         std::list<u16> remove_queue;
419         for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
420                 i != m_buf.end(); ++i)
421         {
422                 IncomingSplitPacket *p = i->second;
423                 // Reliable ones are not removed by timeout
424                 if(p->reliable == true)
425                         continue;
426                 p->time += dtime;
427                 if(p->time >= timeout)
428                         remove_queue.push_back(i->first);
429         }
430         for(std::list<u16>::iterator j = remove_queue.begin();
431                 j != remove_queue.end(); ++j)
432         {
433                 dout_con<<"NOTE: Removing timed out unreliable split packet"
434                                 <<std::endl;
435                 delete m_buf[*j];
436                 m_buf.erase(*j);
437         }
438 }
439
440 /*
441         Channel
442 */
443
444 Channel::Channel()
445 {
446         next_outgoing_seqnum = SEQNUM_INITIAL;
447         next_incoming_seqnum = SEQNUM_INITIAL;
448         next_outgoing_split_seqnum = SEQNUM_INITIAL;
449 }
450 Channel::~Channel()
451 {
452 }
453
454 /*
455         Peer
456 */
457
458 Peer::Peer(u16 a_id, Address a_address):
459         address(a_address),
460         id(a_id),
461         timeout_counter(0.0),
462         ping_timer(0.0),
463         resend_timeout(0.5),
464         avg_rtt(-1.0),
465         has_sent_with_id(false),
466         m_sendtime_accu(0),
467         m_max_packets_per_second(10),
468         m_num_sent(0),
469         m_max_num_sent(0),
470         congestion_control_aim_rtt(0.2),
471         congestion_control_max_rate(400),
472         congestion_control_min_rate(10)
473 {
474 }
475 Peer::~Peer()
476 {
477 }
478
479 void Peer::reportRTT(float rtt)
480 {
481         if(rtt >= 0.0){
482                 if(rtt < 0.01){
483                         if(m_max_packets_per_second < congestion_control_max_rate)
484                                 m_max_packets_per_second += 10;
485                 } else if(rtt < congestion_control_aim_rtt){
486                         if(m_max_packets_per_second < congestion_control_max_rate)
487                                 m_max_packets_per_second += 2;
488                 } else {
489                         m_max_packets_per_second *= 0.8;
490                         if(m_max_packets_per_second < congestion_control_min_rate)
491                                 m_max_packets_per_second = congestion_control_min_rate;
492                 }
493         }
494
495         if(rtt < -0.999)
496         {}
497         else if(avg_rtt < 0.0)
498                 avg_rtt = rtt;
499         else
500                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
501         
502         // Calculate resend_timeout
503
504         /*int reliable_count = 0;
505         for(int i=0; i<CHANNEL_COUNT; i++)
506         {
507                 reliable_count += channels[i].outgoing_reliables.size();
508         }
509         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
510                         * ((float)reliable_count * 1);*/
511         
512         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
513         if(timeout < RESEND_TIMEOUT_MIN)
514                 timeout = RESEND_TIMEOUT_MIN;
515         if(timeout > RESEND_TIMEOUT_MAX)
516                 timeout = RESEND_TIMEOUT_MAX;
517         resend_timeout = timeout;
518 }
519                                 
520 /*
521         Connection
522 */
523
524 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
525                 bool ipv6):
526         m_protocol_id(protocol_id),
527         m_max_packet_size(max_packet_size),
528         m_timeout(timeout),
529         m_socket(ipv6),
530         m_peer_id(0),
531         m_bc_peerhandler(NULL),
532         m_bc_receive_timeout(0),
533         m_indentation(0)
534 {
535         m_socket.setTimeoutMs(5);
536
537         Start();
538 }
539
540 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
541                 bool ipv6, PeerHandler *peerhandler):
542         m_protocol_id(protocol_id),
543         m_max_packet_size(max_packet_size),
544         m_timeout(timeout),
545         m_socket(ipv6),
546         m_peer_id(0),
547         m_bc_peerhandler(peerhandler),
548         m_bc_receive_timeout(0),
549         m_indentation(0)
550 {
551         m_socket.setTimeoutMs(5);
552
553         Start();
554 }
555
556
557 Connection::~Connection()
558 {
559         stop();
560         // Delete peers
561         for(std::map<u16, Peer*>::iterator
562                         j = m_peers.begin();
563                         j != m_peers.end(); ++j)
564         {
565                 delete j->second;
566         }
567 }
568
569 /* Internal stuff */
570
571 void * Connection::Thread()
572 {
573         ThreadStarted();
574         log_register_thread("Connection");
575
576         dout_con<<"Connection thread started"<<std::endl;
577         
578         u32 curtime = porting::getTimeMs();
579         u32 lasttime = curtime;
580
581         while(getRun())
582         {
583                 BEGIN_DEBUG_EXCEPTION_HANDLER
584                 
585                 lasttime = curtime;
586                 curtime = porting::getTimeMs();
587                 float dtime = (float)(curtime - lasttime) / 1000.;
588                 if(dtime > 0.1)
589                         dtime = 0.1;
590                 if(dtime < 0.0)
591                         dtime = 0.0;
592                 
593                 runTimeouts(dtime);
594
595                 while(!m_command_queue.empty()){
596                         ConnectionCommand c = m_command_queue.pop_front();
597                         processCommand(c);
598                 }
599
600                 send(dtime);
601
602                 receive();
603                 
604                 END_DEBUG_EXCEPTION_HANDLER(derr_con);
605         }
606
607         return NULL;
608 }
609
610 void Connection::putEvent(ConnectionEvent &e)
611 {
612         assert(e.type != CONNEVENT_NONE);
613         m_event_queue.push_back(e);
614 }
615
616 void Connection::processCommand(ConnectionCommand &c)
617 {
618         switch(c.type){
619         case CONNCMD_NONE:
620                 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
621                 return;
622         case CONNCMD_SERVE:
623                 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
624                                 <<c.port<<std::endl;
625                 serve(c.port);
626                 return;
627         case CONNCMD_CONNECT:
628                 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
629                 connect(c.address);
630                 return;
631         case CONNCMD_DISCONNECT:
632                 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
633                 disconnect();
634                 return;
635         case CONNCMD_SEND:
636                 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
637                 send(c.peer_id, c.channelnum, c.data, c.reliable);
638                 return;
639         case CONNCMD_SEND_TO_ALL:
640                 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
641                 sendToAll(c.channelnum, c.data, c.reliable);
642                 return;
643         case CONNCMD_DELETE_PEER:
644                 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
645                 deletePeer(c.peer_id, false);
646                 return;
647         }
648 }
649
650 void Connection::send(float dtime)
651 {
652         for(std::map<u16, Peer*>::iterator
653                         j = m_peers.begin();
654                         j != m_peers.end(); ++j)
655         {
656                 Peer *peer = j->second;
657                 peer->m_sendtime_accu += dtime;
658                 peer->m_num_sent = 0;
659                 peer->m_max_num_sent = peer->m_sendtime_accu *
660                                 peer->m_max_packets_per_second;
661         }
662         Queue<OutgoingPacket> postponed_packets;
663         while(!m_outgoing_queue.empty()){
664                 OutgoingPacket packet = m_outgoing_queue.pop_front();
665                 Peer *peer = getPeerNoEx(packet.peer_id);
666                 if(!peer)
667                         continue;
668                 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
669                         postponed_packets.push_back(packet);
670                 } else if(peer->m_num_sent < peer->m_max_num_sent){
671                         rawSendAsPacket(packet.peer_id, packet.channelnum,
672                                         packet.data, packet.reliable);
673                         peer->m_num_sent++;
674                 } else {
675                         postponed_packets.push_back(packet);
676                 }
677         }
678         while(!postponed_packets.empty()){
679                 m_outgoing_queue.push_back(postponed_packets.pop_front());
680         }
681         for(std::map<u16, Peer*>::iterator
682                         j = m_peers.begin();
683                         j != m_peers.end(); ++j)
684         {
685                 Peer *peer = j->second;
686                 peer->m_sendtime_accu -= (float)peer->m_num_sent /
687                                 peer->m_max_packets_per_second;
688                 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
689                         peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
690         }
691 }
692
693 // Receive packets from the network and buffers and create ConnectionEvents
694 void Connection::receive()
695 {
696         u32 datasize = m_max_packet_size * 2;  // Double it just to be safe
697         // TODO: We can not know how many layers of header there are.
698         // For now, just assume there are no other than the base headers.
699         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
700         SharedBuffer<u8> packetdata(packet_maxsize);
701
702         bool single_wait_done = false;
703         
704         for(u32 loop_i=0; loop_i<1000; loop_i++) // Limit in case of DoS
705         {
706         try{
707                 /* Check if some buffer has relevant data */
708                 {
709                         u16 peer_id;
710                         SharedBuffer<u8> resultdata;
711                         bool got = getFromBuffers(peer_id, resultdata);
712                         if(got){
713                                 ConnectionEvent e;
714                                 e.dataReceived(peer_id, resultdata);
715                                 putEvent(e);
716                                 continue;
717                         }
718                 }
719                 
720                 if(single_wait_done){
721                         if(m_socket.WaitData(0) == false)
722                                 break;
723                 }
724                 
725                 single_wait_done = true;
726
727                 Address sender;
728                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
729
730                 if(received_size < 0)
731                         break;
732                 if(received_size < BASE_HEADER_SIZE)
733                         continue;
734                 if(readU32(&packetdata[0]) != m_protocol_id)
735                         continue;
736                 
737                 u16 peer_id = readPeerId(*packetdata);
738                 u8 channelnum = readChannel(*packetdata);
739                 if(channelnum > CHANNEL_COUNT-1){
740                         PrintInfo(derr_con);
741                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
742                         throw InvalidIncomingDataException("Channel doesn't exist");
743                 }
744
745                 if(peer_id == PEER_ID_INEXISTENT)
746                 {
747                         /*
748                                 Somebody is trying to send stuff to us with no peer id.
749                                 
750                                 Check if the same address and port was added to our peer
751                                 list before.
752                                 Allow only entries that have has_sent_with_id==false.
753                         */
754
755                         std::map<u16, Peer*>::iterator j;
756                         j = m_peers.begin();
757                         for(; j != m_peers.end(); ++j)
758                         {
759                                 Peer *peer = j->second;
760                                 if(peer->has_sent_with_id)
761                                         continue;
762                                 if(peer->address == sender)
763                                         break;
764                         }
765                         
766                         /*
767                                 If no peer was found with the same address and port,
768                                 we shall assume it is a new peer and create an entry.
769                         */
770                         if(j == m_peers.end())
771                         {
772                                 // Pass on to adding the peer
773                         }
774                         // Else: A peer was found.
775                         else
776                         {
777                                 Peer *peer = j->second;
778                                 peer_id = peer->id;
779                                 PrintInfo(derr_con);
780                                 derr_con<<"WARNING: Assuming unknown peer to be "
781                                                 <<"peer_id="<<peer_id<<std::endl;
782                         }
783                 }
784                 
785                 /*
786                         The peer was not found in our lists. Add it.
787                 */
788                 if(peer_id == PEER_ID_INEXISTENT)
789                 {
790                         // Somebody wants to make a new connection
791
792                         // Get a unique peer id (2 or higher)
793                         u16 peer_id_new = 2;
794                         /*
795                                 Find an unused peer id
796                         */
797                         bool out_of_ids = false;
798                         for(;;)
799                         {
800                                 // Check if exists
801                                 if(m_peers.find(peer_id_new) == m_peers.end())
802                                         break;
803                                 // Check for overflow
804                                 if(peer_id_new == 65535){
805                                         out_of_ids = true;
806                                         break;
807                                 }
808                                 peer_id_new++;
809                         }
810                         if(out_of_ids){
811                                 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
812                                 continue;
813                         }
814
815                         PrintInfo();
816                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
817                                         " giving peer_id="<<peer_id_new<<std::endl;
818
819                         // Create a peer
820                         Peer *peer = new Peer(peer_id_new, sender);
821                         m_peers[peer->id] = peer;
822                         
823                         // Create peer addition event
824                         ConnectionEvent e;
825                         e.peerAdded(peer_id_new, sender);
826                         putEvent(e);
827                         
828                         // Create CONTROL packet to tell the peer id to the new peer.
829                         SharedBuffer<u8> reply(4);
830                         writeU8(&reply[0], TYPE_CONTROL);
831                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
832                         writeU16(&reply[2], peer_id_new);
833                         sendAsPacket(peer_id_new, 0, reply, true);
834                         
835                         // We're now talking to a valid peer_id
836                         peer_id = peer_id_new;
837
838                         // Go on and process whatever it sent
839                 }
840
841                 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
842
843                 if(node == m_peers.end())
844                 {
845                         // Peer not found
846                         // This means that the peer id of the sender is not PEER_ID_INEXISTENT
847                         // and it is invalid.
848                         PrintInfo(derr_con);
849                         derr_con<<"Receive(): Peer not found"<<std::endl;
850                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
851                 }
852
853                 Peer *peer = node->second;
854
855                 // Validate peer address
856                 if(peer->address != sender)
857                 {
858                         PrintInfo(derr_con);
859                         derr_con<<"Peer "<<peer_id<<" sending from different address."
860                                         " Ignoring."<<std::endl;
861                         continue;
862                 }
863                 
864                 peer->timeout_counter = 0.0;
865
866                 Channel *channel = &(peer->channels[channelnum]);
867                 
868                 // Throw the received packet to channel->processPacket()
869
870                 // Make a new SharedBuffer from the data without the base headers
871                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
872                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
873                                 strippeddata.getSize());
874                 
875                 try{
876                         // Process it (the result is some data with no headers made by us)
877                         SharedBuffer<u8> resultdata = processPacket
878                                         (channel, strippeddata, peer_id, channelnum, false);
879                         
880                         PrintInfo();
881                         dout_con<<"ProcessPacket returned data of size "
882                                         <<resultdata.getSize()<<std::endl;
883                         
884                         ConnectionEvent e;
885                         e.dataReceived(peer_id, resultdata);
886                         putEvent(e);
887                         continue;
888                 }catch(ProcessedSilentlyException &e){
889                 }
890         }catch(InvalidIncomingDataException &e){
891         }
892         catch(ProcessedSilentlyException &e){
893         }
894         } // for
895 }
896
897 void Connection::runTimeouts(float dtime)
898 {
899         float congestion_control_aim_rtt
900                         = g_settings->getFloat("congestion_control_aim_rtt");
901         float congestion_control_max_rate
902                         = g_settings->getFloat("congestion_control_max_rate");
903         float congestion_control_min_rate
904                         = g_settings->getFloat("congestion_control_min_rate");
905
906         std::list<u16> timeouted_peers;
907         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
908                 j != m_peers.end(); ++j)
909         {
910                 Peer *peer = j->second;
911
912                 // Update congestion control values
913                 peer->congestion_control_aim_rtt = congestion_control_aim_rtt;
914                 peer->congestion_control_max_rate = congestion_control_max_rate;
915                 peer->congestion_control_min_rate = congestion_control_min_rate;
916                 
917                 /*
918                         Check peer timeout
919                 */
920                 peer->timeout_counter += dtime;
921                 if(peer->timeout_counter > m_timeout)
922                 {
923                         PrintInfo(derr_con);
924                         derr_con<<"RunTimeouts(): Peer "<<peer->id
925                                         <<" has timed out."
926                                         <<" (source=peer->timeout_counter)"
927                                         <<std::endl;
928                         // Add peer to the list
929                         timeouted_peers.push_back(peer->id);
930                         // Don't bother going through the buffers of this one
931                         continue;
932                 }
933
934                 float resend_timeout = peer->resend_timeout;
935                 for(u16 i=0; i<CHANNEL_COUNT; i++)
936                 {
937                         std::list<BufferedPacket> timed_outs;
938                         
939                         Channel *channel = &peer->channels[i];
940
941                         // Remove timed out incomplete unreliable split packets
942                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
943                         
944                         // Increment reliable packet times
945                         channel->outgoing_reliables.incrementTimeouts(dtime);
946
947                         // Check reliable packet total times, remove peer if
948                         // over timeout.
949                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
950                         {
951                                 PrintInfo(derr_con);
952                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
953                                                 <<" has timed out."
954                                                 <<" (source=reliable packet totaltime)"
955                                                 <<std::endl;
956                                 // Add peer to the to-be-removed list
957                                 timeouted_peers.push_back(peer->id);
958                                 goto nextpeer;
959                         }
960
961                         // Re-send timed out outgoing reliables
962                         
963                         timed_outs = channel->
964                                         outgoing_reliables.getTimedOuts(resend_timeout);
965
966                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
967
968                         for(std::list<BufferedPacket>::iterator j = timed_outs.begin();
969                                 j != timed_outs.end(); ++j)
970                         {
971                                 u16 peer_id = readPeerId(*(j->data));
972                                 u8 channel = readChannel(*(j->data));
973                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
974
975                                 PrintInfo(derr_con);
976                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
977                                 j->address.print(&derr_con);
978                                 derr_con<<"(t/o="<<resend_timeout<<"): "
979                                                 <<"from_peer_id="<<peer_id
980                                                 <<", channel="<<((int)channel&0xff)
981                                                 <<", seqnum="<<seqnum
982                                                 <<std::endl;
983
984                                 rawSend(*j);
985
986                                 // Enlarge avg_rtt and resend_timeout:
987                                 // The rtt will be at least the timeout.
988                                 // NOTE: This won't affect the timeout of the next
989                                 // checked channel because it was cached.
990                                 peer->reportRTT(resend_timeout);
991                         }
992                 }
993                 
994                 /*
995                         Send pings
996                 */
997                 peer->ping_timer += dtime;
998                 if(peer->ping_timer >= 5.0)
999                 {
1000                         // Create and send PING packet
1001                         SharedBuffer<u8> data(2);
1002                         writeU8(&data[0], TYPE_CONTROL);
1003                         writeU8(&data[1], CONTROLTYPE_PING);
1004                         rawSendAsPacket(peer->id, 0, data, true);
1005
1006                         peer->ping_timer = 0.0;
1007                 }
1008                 
1009 nextpeer:
1010                 continue;
1011         }
1012
1013         // Remove timed out peers
1014         for(std::list<u16>::iterator i = timeouted_peers.begin();
1015                 i != timeouted_peers.end(); ++i)
1016         {
1017                 PrintInfo(derr_con);
1018                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1019                 deletePeer(*i, true);
1020         }
1021 }
1022
1023 void Connection::serve(u16 port)
1024 {
1025         dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
1026         try{
1027                 m_socket.Bind(port);
1028                 m_peer_id = PEER_ID_SERVER;
1029         }
1030         catch(SocketException &e){
1031                 // Create event
1032                 ConnectionEvent ce;
1033                 ce.bindFailed();
1034                 putEvent(ce);
1035         }
1036 }
1037
1038 void Connection::connect(Address address)
1039 {
1040         dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1041                         <<":"<<address.getPort()<<std::endl;
1042
1043         std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1044         if(node != m_peers.end()){
1045                 throw ConnectionException("Already connected to a server");
1046         }
1047
1048         Peer *peer = new Peer(PEER_ID_SERVER, address);
1049         m_peers[peer->id] = peer;
1050
1051         // Create event
1052         ConnectionEvent e;
1053         e.peerAdded(peer->id, peer->address);
1054         putEvent(e);
1055         
1056         m_socket.Bind(0);
1057         
1058         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1059         m_peer_id = PEER_ID_INEXISTENT;
1060         SharedBuffer<u8> data(0);
1061         Send(PEER_ID_SERVER, 0, data, true);
1062 }
1063
1064 void Connection::disconnect()
1065 {
1066         dout_con<<getDesc()<<" disconnecting"<<std::endl;
1067
1068         // Create and send DISCO packet
1069         SharedBuffer<u8> data(2);
1070         writeU8(&data[0], TYPE_CONTROL);
1071         writeU8(&data[1], CONTROLTYPE_DISCO);
1072         
1073         // Send to all
1074         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1075                 j != m_peers.end(); ++j)
1076         {
1077                 Peer *peer = j->second;
1078                 rawSendAsPacket(peer->id, 0, data, false);
1079         }
1080 }
1081
1082 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1083 {
1084         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1085                 j != m_peers.end(); ++j)
1086         {
1087                 Peer *peer = j->second;
1088                 send(peer->id, channelnum, data, reliable);
1089         }
1090 }
1091
1092 void Connection::send(u16 peer_id, u8 channelnum,
1093                 SharedBuffer<u8> data, bool reliable)
1094 {
1095         dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1096
1097         assert(channelnum < CHANNEL_COUNT);
1098         
1099         Peer *peer = getPeerNoEx(peer_id);
1100         if(peer == NULL)
1101                 return;
1102         Channel *channel = &(peer->channels[channelnum]);
1103
1104         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1105         if(reliable)
1106                 chunksize_max -= RELIABLE_HEADER_SIZE;
1107
1108         std::list<SharedBuffer<u8> > originals;
1109         originals = makeAutoSplitPacket(data, chunksize_max,
1110                         channel->next_outgoing_split_seqnum);
1111         
1112         for(std::list<SharedBuffer<u8> >::iterator i = originals.begin();
1113                 i != originals.end(); ++i)
1114         {
1115                 SharedBuffer<u8> original = *i;
1116                 
1117                 sendAsPacket(peer_id, channelnum, original, reliable);
1118         }
1119 }
1120
1121 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1122                 SharedBuffer<u8> data, bool reliable)
1123 {
1124         OutgoingPacket packet(peer_id, channelnum, data, reliable);
1125         m_outgoing_queue.push_back(packet);
1126 }
1127
1128 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1129                 SharedBuffer<u8> data, bool reliable)
1130 {
1131         Peer *peer = getPeerNoEx(peer_id);
1132         if(!peer)
1133                 return;
1134         Channel *channel = &(peer->channels[channelnum]);
1135
1136         if(reliable)
1137         {
1138                 u16 seqnum = channel->next_outgoing_seqnum;
1139                 channel->next_outgoing_seqnum++;
1140
1141                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1142
1143                 // Add base headers and make a packet
1144                 BufferedPacket p = makePacket(peer->address, reliable,
1145                                 m_protocol_id, m_peer_id, channelnum);
1146                 
1147                 try{
1148                         // Buffer the packet
1149                         channel->outgoing_reliables.insert(p);
1150                 }
1151                 catch(AlreadyExistsException &e)
1152                 {
1153                         PrintInfo(derr_con);
1154                         derr_con<<"WARNING: Going to send a reliable packet "
1155                                         "seqnum="<<seqnum<<" that is already "
1156                                         "in outgoing buffer"<<std::endl;
1157                         //assert(0);
1158                 }
1159                 
1160                 // Send the packet
1161                 rawSend(p);
1162         }
1163         else
1164         {
1165                 // Add base headers and make a packet
1166                 BufferedPacket p = makePacket(peer->address, data,
1167                                 m_protocol_id, m_peer_id, channelnum);
1168
1169                 // Send the packet
1170                 rawSend(p);
1171         }
1172 }
1173
1174 void Connection::rawSend(const BufferedPacket &packet)
1175 {
1176         try{
1177                 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1178         } catch(SendFailedException &e){
1179                 derr_con<<"Connection::rawSend(): SendFailedException: "
1180                                 <<packet.address.serializeString()<<std::endl;
1181         }
1182 }
1183
1184 Peer* Connection::getPeer(u16 peer_id)
1185 {
1186         std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1187
1188         if(node == m_peers.end()){
1189                 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1190         }
1191
1192         // Error checking
1193         assert(node->second->id == peer_id);
1194
1195         return node->second;
1196 }
1197
1198 Peer* Connection::getPeerNoEx(u16 peer_id)
1199 {
1200         std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1201
1202         if(node == m_peers.end()){
1203                 return NULL;
1204         }
1205
1206         // Error checking
1207         assert(node->second->id == peer_id);
1208
1209         return node->second;
1210 }
1211
1212 std::list<Peer*> Connection::getPeers()
1213 {
1214         std::list<Peer*> list;
1215         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1216                 j != m_peers.end(); ++j)
1217         {
1218                 Peer *peer = j->second;
1219                 list.push_back(peer);
1220         }
1221         return list;
1222 }
1223
1224 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1225 {
1226         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1227                 j != m_peers.end(); ++j)
1228         {
1229                 Peer *peer = j->second;
1230                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1231                 {
1232                         Channel *channel = &peer->channels[i];
1233                         SharedBuffer<u8> resultdata;
1234                         bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1235                         if(got){
1236                                 dst = resultdata;
1237                                 return true;
1238                         }
1239                 }
1240         }
1241         return false;
1242 }
1243
1244 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1245                 SharedBuffer<u8> &dst)
1246 {
1247         u16 firstseqnum = 0;
1248         // Clear old packets from start of buffer
1249         for(;;){
1250                 bool found = channel->incoming_reliables.getFirstSeqnum(&firstseqnum);
1251                 if(!found)
1252                         break;
1253                 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1254                         channel->incoming_reliables.popFirst();
1255                 else
1256                         break;
1257         }
1258         // This happens if all packets are old
1259         
1260         if(channel->incoming_reliables.empty() == false)
1261         {
1262                 if(firstseqnum == channel->next_incoming_seqnum)
1263                 {
1264                         BufferedPacket p = channel->incoming_reliables.popFirst();
1265                         
1266                         peer_id = readPeerId(*p.data);
1267                         u8 channelnum = readChannel(*p.data);
1268                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1269
1270                         PrintInfo();
1271                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
1272                                         <<" seqnum="<<seqnum
1273                                         <<" peer_id="<<peer_id
1274                                         <<" channel="<<((int)channelnum&0xff)
1275                                         <<std::endl;
1276
1277                         channel->next_incoming_seqnum++;
1278                         
1279                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1280                         // Get out the inside packet and re-process it
1281                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1282                         memcpy(*payload, &p.data[headers_size], payload.getSize());
1283
1284                         dst = processPacket(channel, payload, peer_id, channelnum, true);
1285                         return true;
1286                 }
1287         }
1288         return false;
1289 }
1290
1291 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1292                 SharedBuffer<u8> packetdata, u16 peer_id,
1293                 u8 channelnum, bool reliable)
1294 {
1295         IndentationRaiser iraiser(&(m_indentation));
1296
1297         if(packetdata.getSize() < 1)
1298                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1299
1300         u8 type = readU8(&packetdata[0]);
1301         
1302         if(type == TYPE_CONTROL)
1303         {
1304                 if(packetdata.getSize() < 2)
1305                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
1306
1307                 u8 controltype = readU8(&packetdata[1]);
1308
1309                 if(controltype == CONTROLTYPE_ACK)
1310                 {
1311                         if(packetdata.getSize() < 4)
1312                                 throw InvalidIncomingDataException
1313                                                 ("packetdata.getSize() < 4 (ACK header size)");
1314
1315                         u16 seqnum = readU16(&packetdata[2]);
1316                         PrintInfo();
1317                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1318                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1319                                         <<", seqnum="<<seqnum<<std::endl;
1320
1321                         try{
1322                                 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1323                                 // Get round trip time
1324                                 float rtt = p.totaltime;
1325
1326                                 // Let peer calculate stuff according to it
1327                                 // (avg_rtt and resend_timeout)
1328                                 Peer *peer = getPeer(peer_id);
1329                                 peer->reportRTT(rtt);
1330
1331                                 //PrintInfo(dout_con);
1332                                 //dout_con<<"RTT = "<<rtt<<std::endl;
1333
1334                                 /*dout_con<<"OUTGOING: ";
1335                                 PrintInfo();
1336                                 channel->outgoing_reliables.print();
1337                                 dout_con<<std::endl;*/
1338                         }
1339                         catch(NotFoundException &e){
1340                                 PrintInfo(derr_con);
1341                                 derr_con<<"WARNING: ACKed packet not "
1342                                                 "in outgoing queue"
1343                                                 <<std::endl;
1344                         }
1345
1346                         throw ProcessedSilentlyException("Got an ACK");
1347                 }
1348                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1349                 {
1350                         if(packetdata.getSize() < 4)
1351                                 throw InvalidIncomingDataException
1352                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1353                         u16 peer_id_new = readU16(&packetdata[2]);
1354                         PrintInfo();
1355                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1356
1357                         if(GetPeerID() != PEER_ID_INEXISTENT)
1358                         {
1359                                 PrintInfo(derr_con);
1360                                 derr_con<<"WARNING: Not changing"
1361                                                 " existing peer id."<<std::endl;
1362                         }
1363                         else
1364                         {
1365                                 dout_con<<"changing."<<std::endl;
1366                                 SetPeerID(peer_id_new);
1367                         }
1368                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
1369                 }
1370                 else if(controltype == CONTROLTYPE_PING)
1371                 {
1372                         // Just ignore it, the incoming data already reset
1373                         // the timeout counter
1374                         PrintInfo();
1375                         dout_con<<"PING"<<std::endl;
1376                         throw ProcessedSilentlyException("Got a PING");
1377                 }
1378                 else if(controltype == CONTROLTYPE_DISCO)
1379                 {
1380                         // Just ignore it, the incoming data already reset
1381                         // the timeout counter
1382                         PrintInfo();
1383                         dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1384                         
1385                         if(deletePeer(peer_id, false) == false)
1386                         {
1387                                 PrintInfo(derr_con);
1388                                 derr_con<<"DISCO: Peer not found"<<std::endl;
1389                         }
1390
1391                         throw ProcessedSilentlyException("Got a DISCO");
1392                 }
1393                 else{
1394                         PrintInfo(derr_con);
1395                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1396                                         <<((int)controltype&0xff)<<std::endl;
1397                         throw InvalidIncomingDataException("Invalid control type");
1398                 }
1399         }
1400         else if(type == TYPE_ORIGINAL)
1401         {
1402                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1403                         throw InvalidIncomingDataException
1404                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1405                 PrintInfo();
1406                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1407                                 <<std::endl;
1408                 // Get the inside packet out and return it
1409                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1410                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1411                 return payload;
1412         }
1413         else if(type == TYPE_SPLIT)
1414         {
1415                 // We have to create a packet again for buffering
1416                 // This isn't actually too bad an idea.
1417                 BufferedPacket packet = makePacket(
1418                                 getPeer(peer_id)->address,
1419                                 packetdata,
1420                                 GetProtocolID(),
1421                                 peer_id,
1422                                 channelnum);
1423                 // Buffer the packet
1424                 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1425                 if(data.getSize() != 0)
1426                 {
1427                         PrintInfo();
1428                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1429                                         <<"size="<<data.getSize()<<std::endl;
1430                         return data;
1431                 }
1432                 PrintInfo();
1433                 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1434                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1435         }
1436         else if(type == TYPE_RELIABLE)
1437         {
1438                 // Recursive reliable packets not allowed
1439                 if(reliable)
1440                         throw InvalidIncomingDataException("Found nested reliable packets");
1441
1442                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1443                         throw InvalidIncomingDataException
1444                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1445
1446                 u16 seqnum = readU16(&packetdata[1]);
1447
1448                 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1449                 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1450                 
1451                 PrintInfo();
1452                 if(is_future_packet)
1453                         dout_con<<"BUFFERING";
1454                 else if(is_old_packet)
1455                         dout_con<<"OLD";
1456                 else
1457                         dout_con<<"RECUR";
1458                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1459                                 <<" next="<<channel->next_incoming_seqnum;
1460                 dout_con<<" [sending CONTROLTYPE_ACK"
1461                                 " to peer_id="<<peer_id<<"]";
1462                 dout_con<<std::endl;
1463                 
1464                 //DEBUG
1465                 //assert(channel->incoming_reliables.size() < 100);
1466
1467                 // Send a CONTROLTYPE_ACK
1468                 SharedBuffer<u8> reply(4);
1469                 writeU8(&reply[0], TYPE_CONTROL);
1470                 writeU8(&reply[1], CONTROLTYPE_ACK);
1471                 writeU16(&reply[2], seqnum);
1472                 rawSendAsPacket(peer_id, channelnum, reply, false);
1473
1474                 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1475                 if(is_future_packet)
1476                 {
1477                         /*PrintInfo();
1478                         dout_con<<"Buffering reliable packet (seqnum="
1479                                         <<seqnum<<")"<<std::endl;*/
1480                         
1481                         // This one comes later, buffer it.
1482                         // Actually we have to make a packet to buffer one.
1483                         // Well, we have all the ingredients, so just do it.
1484                         BufferedPacket packet = makePacket(
1485                                         getPeer(peer_id)->address,
1486                                         packetdata,
1487                                         GetProtocolID(),
1488                                         peer_id,
1489                                         channelnum);
1490                         try{
1491                                 channel->incoming_reliables.insert(packet);
1492                                 
1493                                 /*PrintInfo();
1494                                 dout_con<<"INCOMING: ";
1495                                 channel->incoming_reliables.print();
1496                                 dout_con<<std::endl;*/
1497                         }
1498                         catch(AlreadyExistsException &e)
1499                         {
1500                         }
1501
1502                         throw ProcessedSilentlyException("Buffered future reliable packet");
1503                 }
1504                 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1505                 else if(is_old_packet)
1506                 {
1507                         // An old packet, dump it
1508                         throw InvalidIncomingDataException("Got an old reliable packet");
1509                 }
1510
1511                 channel->next_incoming_seqnum++;
1512
1513                 // Get out the inside packet and re-process it
1514                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1515                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1516
1517                 return processPacket(channel, payload, peer_id, channelnum, true);
1518         }
1519         else
1520         {
1521                 PrintInfo(derr_con);
1522                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1523                 throw InvalidIncomingDataException("Invalid packet type");
1524         }
1525         
1526         // We should never get here.
1527         // If you get here, add an exception or a return to some of the
1528         // above conditionals.
1529         assert(0);
1530         throw BaseException("Error in Channel::ProcessPacket()");
1531 }
1532
1533 bool Connection::deletePeer(u16 peer_id, bool timeout)
1534 {
1535         if(m_peers.find(peer_id) == m_peers.end())
1536                 return false;
1537         
1538         Peer *peer = m_peers[peer_id];
1539
1540         // Create event
1541         ConnectionEvent e;
1542         e.peerRemoved(peer_id, timeout, peer->address);
1543         putEvent(e);
1544
1545         delete m_peers[peer_id];
1546         m_peers.erase(peer_id);
1547         return true;
1548 }
1549
1550 /* Interface */
1551
1552 ConnectionEvent Connection::getEvent()
1553 {
1554         if(m_event_queue.empty()){
1555                 ConnectionEvent e;
1556                 e.type = CONNEVENT_NONE;
1557                 return e;
1558         }
1559         return m_event_queue.pop_front();
1560 }
1561
1562 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1563 {
1564         try{
1565                 return m_event_queue.pop_front(timeout_ms);
1566         } catch(ItemNotFoundException &ex){
1567                 ConnectionEvent e;
1568                 e.type = CONNEVENT_NONE;
1569                 return e;
1570         }
1571 }
1572
1573 void Connection::putCommand(ConnectionCommand &c)
1574 {
1575         m_command_queue.push_back(c);
1576 }
1577
1578 void Connection::Serve(unsigned short port)
1579 {
1580         ConnectionCommand c;
1581         c.serve(port);
1582         putCommand(c);
1583 }
1584
1585 void Connection::Connect(Address address)
1586 {
1587         ConnectionCommand c;
1588         c.connect(address);
1589         putCommand(c);
1590 }
1591
1592 bool Connection::Connected()
1593 {
1594         JMutexAutoLock peerlock(m_peers_mutex);
1595
1596         if(m_peers.size() != 1)
1597                 return false;
1598                 
1599         std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1600         if(node == m_peers.end())
1601                 return false;
1602         
1603         if(m_peer_id == PEER_ID_INEXISTENT)
1604                 return false;
1605         
1606         return true;
1607 }
1608
1609 void Connection::Disconnect()
1610 {
1611         ConnectionCommand c;
1612         c.disconnect();
1613         putCommand(c);
1614 }
1615
1616 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1617 {
1618         for(;;){
1619                 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1620                 if(e.type != CONNEVENT_NONE)
1621                         dout_con<<getDesc()<<": Receive: got event: "
1622                                         <<e.describe()<<std::endl;
1623                 switch(e.type){
1624                 case CONNEVENT_NONE:
1625                         throw NoIncomingDataException("No incoming data");
1626                 case CONNEVENT_DATA_RECEIVED:
1627                         peer_id = e.peer_id;
1628                         data = SharedBuffer<u8>(e.data);
1629                         return e.data.getSize();
1630                 case CONNEVENT_PEER_ADDED: {
1631                         Peer tmp(e.peer_id, e.address);
1632                         if(m_bc_peerhandler)
1633                                 m_bc_peerhandler->peerAdded(&tmp);
1634                         continue; }
1635                 case CONNEVENT_PEER_REMOVED: {
1636                         Peer tmp(e.peer_id, e.address);
1637                         if(m_bc_peerhandler)
1638                                 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1639                         continue; }
1640                 case CONNEVENT_BIND_FAILED:
1641                         throw ConnectionBindFailed("Failed to bind socket "
1642                                         "(port already in use?)");
1643                 }
1644         }
1645         throw NoIncomingDataException("No incoming data");
1646 }
1647
1648 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1649 {
1650         assert(channelnum < CHANNEL_COUNT);
1651
1652         ConnectionCommand c;
1653         c.sendToAll(channelnum, data, reliable);
1654         putCommand(c);
1655 }
1656
1657 void Connection::Send(u16 peer_id, u8 channelnum,
1658                 SharedBuffer<u8> data, bool reliable)
1659 {
1660         assert(channelnum < CHANNEL_COUNT);
1661
1662         ConnectionCommand c;
1663         c.send(peer_id, channelnum, data, reliable);
1664         putCommand(c);
1665 }
1666
1667 void Connection::RunTimeouts(float dtime)
1668 {
1669         // No-op
1670 }
1671
1672 Address Connection::GetPeerAddress(u16 peer_id)
1673 {
1674         JMutexAutoLock peerlock(m_peers_mutex);
1675         return getPeer(peer_id)->address;
1676 }
1677
1678 float Connection::GetPeerAvgRTT(u16 peer_id)
1679 {
1680         JMutexAutoLock peerlock(m_peers_mutex);
1681         return getPeer(peer_id)->avg_rtt;
1682 }
1683
1684 void Connection::DeletePeer(u16 peer_id)
1685 {
1686         ConnectionCommand c;
1687         c.deletePeer(peer_id);
1688         putCommand(c);
1689 }
1690
1691 void Connection::PrintInfo(std::ostream &out)
1692 {
1693         out<<getDesc()<<": ";
1694 }
1695
1696 void Connection::PrintInfo()
1697 {
1698         PrintInfo(dout_con);
1699 }
1700
1701 std::string Connection::getDesc()
1702 {
1703         return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";
1704 }
1705
1706 } // namespace
1707