]> git.lizzy.rs Git - dragonfireclient.git/blob - src/connection.cpp
Add one more curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
[dragonfireclient.git] / src / connection.cpp
1 /*
2 Minetest
3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "connection.h"
21 #include "main.h"
22 #include "serialization.h"
23 #include "log.h"
24 #include "porting.h"
25 #include "util/serialize.h"
26 #include "util/numeric.h"
27 #include "util/string.h"
28 #include "settings.h"
29
30 namespace con
31 {
32
33 static u16 readPeerId(u8 *packetdata)
34 {
35         return readU16(&packetdata[4]);
36 }
37 static u8 readChannel(u8 *packetdata)
38 {
39         return readU8(&packetdata[6]);
40 }
41
42 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
43                 u32 protocol_id, u16 sender_peer_id, u8 channel)
44 {
45         u32 packet_size = datasize + BASE_HEADER_SIZE;
46         BufferedPacket p(packet_size);
47         p.address = address;
48
49         writeU32(&p.data[0], protocol_id);
50         writeU16(&p.data[4], sender_peer_id);
51         writeU8(&p.data[6], channel);
52
53         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
54
55         return p;
56 }
57
58 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
59                 u32 protocol_id, u16 sender_peer_id, u8 channel)
60 {
61         return makePacket(address, *data, data.getSize(),
62                         protocol_id, sender_peer_id, channel);
63 }
64
65 SharedBuffer<u8> makeOriginalPacket(
66                 SharedBuffer<u8> data)
67 {
68         u32 header_size = 1;
69         u32 packet_size = data.getSize() + header_size;
70         SharedBuffer<u8> b(packet_size);
71
72         writeU8(&b[0], TYPE_ORIGINAL);
73
74         memcpy(&b[header_size], *data, data.getSize());
75
76         return b;
77 }
78
79 core::list<SharedBuffer<u8> > makeSplitPacket(
80                 SharedBuffer<u8> data,
81                 u32 chunksize_max,
82                 u16 seqnum)
83 {
84         // Chunk packets, containing the TYPE_SPLIT header
85         core::list<SharedBuffer<u8> > chunks;
86         
87         u32 chunk_header_size = 7;
88         u32 maximum_data_size = chunksize_max - chunk_header_size;
89         u32 start = 0;
90         u32 end = 0;
91         u32 chunk_num = 0;
92         do{
93                 end = start + maximum_data_size - 1;
94                 if(end > data.getSize() - 1)
95                         end = data.getSize() - 1;
96                 
97                 u32 payload_size = end - start + 1;
98                 u32 packet_size = chunk_header_size + payload_size;
99
100                 SharedBuffer<u8> chunk(packet_size);
101                 
102                 writeU8(&chunk[0], TYPE_SPLIT);
103                 writeU16(&chunk[1], seqnum);
104                 // [3] u16 chunk_count is written at next stage
105                 writeU16(&chunk[5], chunk_num);
106                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
107
108                 chunks.push_back(chunk);
109                 
110                 start = end + 1;
111                 chunk_num++;
112         }
113         while(end != data.getSize() - 1);
114
115         u16 chunk_count = chunks.getSize();
116
117         core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
118         for(; i != chunks.end(); i++)
119         {
120                 // Write chunk_count
121                 writeU16(&((*i)[3]), chunk_count);
122         }
123
124         return chunks;
125 }
126
127 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
128                 SharedBuffer<u8> data,
129                 u32 chunksize_max,
130                 u16 &split_seqnum)
131 {
132         u32 original_header_size = 1;
133         core::list<SharedBuffer<u8> > list;
134         if(data.getSize() + original_header_size > chunksize_max)
135         {
136                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
137                 split_seqnum++;
138                 return list;
139         }
140         else
141         {
142                 list.push_back(makeOriginalPacket(data));
143         }
144         return list;
145 }
146
147 SharedBuffer<u8> makeReliablePacket(
148                 SharedBuffer<u8> data,
149                 u16 seqnum)
150 {
151         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
152         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
153                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
154         u32 header_size = 3;
155         u32 packet_size = data.getSize() + header_size;
156         SharedBuffer<u8> b(packet_size);
157
158         writeU8(&b[0], TYPE_RELIABLE);
159         writeU16(&b[1], seqnum);
160
161         memcpy(&b[header_size], *data, data.getSize());
162
163         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
164                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
165         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
166         return b;
167 }
168
169 /*
170         ReliablePacketBuffer
171 */
172
173 void ReliablePacketBuffer::print()
174 {
175         core::list<BufferedPacket>::Iterator i;
176         i = m_list.begin();
177         for(; i != m_list.end(); i++)
178         {
179                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
180                 dout_con<<s<<" ";
181         }
182 }
183 bool ReliablePacketBuffer::empty()
184 {
185         return m_list.empty();
186 }
187 u32 ReliablePacketBuffer::size()
188 {
189         return m_list.getSize();
190 }
191 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
192 {
193         core::list<BufferedPacket>::Iterator i;
194         i = m_list.begin();
195         for(; i != m_list.end(); i++)
196         {
197                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
198                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
199                                 <<", comparing to s="<<s<<std::endl;*/
200                 if(s == seqnum)
201                         break;
202         }
203         return i;
204 }
205 RPBSearchResult ReliablePacketBuffer::notFound()
206 {
207         return m_list.end();
208 }
209 u16 ReliablePacketBuffer::getFirstSeqnum()
210 {
211         if(empty())
212                 throw NotFoundException("Buffer is empty");
213         BufferedPacket p = *m_list.begin();
214         return readU16(&p.data[BASE_HEADER_SIZE+1]);
215 }
216 BufferedPacket ReliablePacketBuffer::popFirst()
217 {
218         if(empty())
219                 throw NotFoundException("Buffer is empty");
220         BufferedPacket p = *m_list.begin();
221         core::list<BufferedPacket>::Iterator i = m_list.begin();
222         m_list.erase(i);
223         return p;
224 }
225 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
226 {
227         RPBSearchResult r = findPacket(seqnum);
228         if(r == notFound()){
229                 dout_con<<"Not found"<<std::endl;
230                 throw NotFoundException("seqnum not found in buffer");
231         }
232         BufferedPacket p = *r;
233         m_list.erase(r);
234         return p;
235 }
236 void ReliablePacketBuffer::insert(BufferedPacket &p)
237 {
238         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
239         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
240         assert(type == TYPE_RELIABLE);
241         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
242
243         // Find the right place for the packet and insert it there
244
245         // If list is empty, just add it
246         if(m_list.empty())
247         {
248                 m_list.push_back(p);
249                 // Done.
250                 return;
251         }
252         // Otherwise find the right place
253         core::list<BufferedPacket>::Iterator i;
254         i = m_list.begin();
255         // Find the first packet in the list which has a higher seqnum
256         for(; i != m_list.end(); i++){
257                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
258                 if(s == seqnum){
259                         throw AlreadyExistsException("Same seqnum in list");
260                 }
261                 if(seqnum_higher(s, seqnum)){
262                         break;
263                 }
264         }
265         // If we're at the end of the list, add the packet to the
266         // end of the list
267         if(i == m_list.end())
268         {
269                 m_list.push_back(p);
270                 // Done.
271                 return;
272         }
273         // Insert before i
274         m_list.insert_before(i, p);
275 }
276
277 void ReliablePacketBuffer::incrementTimeouts(float dtime)
278 {
279         core::list<BufferedPacket>::Iterator i;
280         i = m_list.begin();
281         for(; i != m_list.end(); i++){
282                 i->time += dtime;
283                 i->totaltime += dtime;
284         }
285 }
286
287 void ReliablePacketBuffer::resetTimedOuts(float timeout)
288 {
289         core::list<BufferedPacket>::Iterator i;
290         i = m_list.begin();
291         for(; i != m_list.end(); i++){
292                 if(i->time >= timeout)
293                         i->time = 0.0;
294         }
295 }
296
297 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
298 {
299         core::list<BufferedPacket>::Iterator i;
300         i = m_list.begin();
301         for(; i != m_list.end(); i++){
302                 if(i->totaltime >= timeout)
303                         return true;
304         }
305         return false;
306 }
307
308 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
309 {
310         core::list<BufferedPacket> timed_outs;
311         core::list<BufferedPacket>::Iterator i;
312         i = m_list.begin();
313         for(; i != m_list.end(); i++)
314         {
315                 if(i->time >= timeout)
316                         timed_outs.push_back(*i);
317         }
318         return timed_outs;
319 }
320
321 /*
322         IncomingSplitBuffer
323 */
324
325 IncomingSplitBuffer::~IncomingSplitBuffer()
326 {
327         core::map<u16, IncomingSplitPacket*>::Iterator i;
328         i = m_buf.getIterator();
329         for(; i.atEnd() == false; i++)
330         {
331                 delete i.getNode()->getValue();
332         }
333 }
334 /*
335         This will throw a GotSplitPacketException when a full
336         split packet is constructed.
337 */
338 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
339 {
340         u32 headersize = BASE_HEADER_SIZE + 7;
341         assert(p.data.getSize() >= headersize);
342         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
343         assert(type == TYPE_SPLIT);
344         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
345         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
346         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
347
348         // Add if doesn't exist
349         if(m_buf.find(seqnum) == NULL)
350         {
351                 IncomingSplitPacket *sp = new IncomingSplitPacket();
352                 sp->chunk_count = chunk_count;
353                 sp->reliable = reliable;
354                 m_buf[seqnum] = sp;
355         }
356         
357         IncomingSplitPacket *sp = m_buf[seqnum];
358         
359         // TODO: These errors should be thrown or something? Dunno.
360         if(chunk_count != sp->chunk_count)
361                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
362                                 <<" != sp->chunk_count="<<sp->chunk_count
363                                 <<std::endl;
364         if(reliable != sp->reliable)
365                 derr_con<<"Connection: WARNING: reliable="<<reliable
366                                 <<" != sp->reliable="<<sp->reliable
367                                 <<std::endl;
368
369         // If chunk already exists, ignore it.
370         // Sometimes two identical packets may arrive when there is network
371         // lag and the server re-sends stuff.
372         if(sp->chunks.find(chunk_num) != NULL)
373                 return SharedBuffer<u8>();
374         
375         // Cut chunk data out of packet
376         u32 chunkdatasize = p.data.getSize() - headersize;
377         SharedBuffer<u8> chunkdata(chunkdatasize);
378         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
379         
380         // Set chunk data in buffer
381         sp->chunks[chunk_num] = chunkdata;
382         
383         // If not all chunks are received, return empty buffer
384         if(sp->allReceived() == false)
385                 return SharedBuffer<u8>();
386
387         // Calculate total size
388         u32 totalsize = 0;
389         core::map<u16, SharedBuffer<u8> >::Iterator i;
390         i = sp->chunks.getIterator();
391         for(; i.atEnd() == false; i++)
392         {
393                 totalsize += i.getNode()->getValue().getSize();
394         }
395         
396         SharedBuffer<u8> fulldata(totalsize);
397
398         // Copy chunks to data buffer
399         u32 start = 0;
400         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
401                         chunk_i++)
402         {
403                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
404                 u16 chunkdatasize = buf.getSize();
405                 memcpy(&fulldata[start], *buf, chunkdatasize);
406                 start += chunkdatasize;;
407         }
408
409         // Remove sp from buffer
410         m_buf.remove(seqnum);
411         delete sp;
412
413         return fulldata;
414 }
415 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
416 {
417         core::list<u16> remove_queue;
418         core::map<u16, IncomingSplitPacket*>::Iterator i;
419         i = m_buf.getIterator();
420         for(; i.atEnd() == false; i++)
421         {
422                 IncomingSplitPacket *p = i.getNode()->getValue();
423                 // Reliable ones are not removed by timeout
424                 if(p->reliable == true)
425                         continue;
426                 p->time += dtime;
427                 if(p->time >= timeout)
428                         remove_queue.push_back(i.getNode()->getKey());
429         }
430         core::list<u16>::Iterator j;
431         j = remove_queue.begin();
432         for(; j != remove_queue.end(); j++)
433         {
434                 dout_con<<"NOTE: Removing timed out unreliable split packet"
435                                 <<std::endl;
436                 delete m_buf[*j];
437                 m_buf.remove(*j);
438         }
439 }
440
441 /*
442         Channel
443 */
444
445 Channel::Channel()
446 {
447         next_outgoing_seqnum = SEQNUM_INITIAL;
448         next_incoming_seqnum = SEQNUM_INITIAL;
449         next_outgoing_split_seqnum = SEQNUM_INITIAL;
450 }
451 Channel::~Channel()
452 {
453 }
454
455 /*
456         Peer
457 */
458
459 Peer::Peer(u16 a_id, Address a_address):
460         address(a_address),
461         id(a_id),
462         timeout_counter(0.0),
463         ping_timer(0.0),
464         resend_timeout(0.5),
465         avg_rtt(-1.0),
466         has_sent_with_id(false),
467         m_sendtime_accu(0),
468         m_max_packets_per_second(10),
469         m_num_sent(0),
470         m_max_num_sent(0),
471         congestion_control_aim_rtt(0.2),
472         congestion_control_max_rate(400),
473         congestion_control_min_rate(10)
474 {
475 }
476 Peer::~Peer()
477 {
478 }
479
480 void Peer::reportRTT(float rtt)
481 {
482         if(rtt >= 0.0){
483                 if(rtt < 0.01){
484                         if(m_max_packets_per_second < congestion_control_max_rate)
485                                 m_max_packets_per_second += 10;
486                 } else if(rtt < congestion_control_aim_rtt){
487                         if(m_max_packets_per_second < congestion_control_max_rate)
488                                 m_max_packets_per_second += 2;
489                 } else {
490                         m_max_packets_per_second *= 0.8;
491                         if(m_max_packets_per_second < congestion_control_min_rate)
492                                 m_max_packets_per_second = congestion_control_min_rate;
493                 }
494         }
495
496         if(rtt < -0.999)
497         {}
498         else if(avg_rtt < 0.0)
499                 avg_rtt = rtt;
500         else
501                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
502         
503         // Calculate resend_timeout
504
505         /*int reliable_count = 0;
506         for(int i=0; i<CHANNEL_COUNT; i++)
507         {
508                 reliable_count += channels[i].outgoing_reliables.size();
509         }
510         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
511                         * ((float)reliable_count * 1);*/
512         
513         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
514         if(timeout < RESEND_TIMEOUT_MIN)
515                 timeout = RESEND_TIMEOUT_MIN;
516         if(timeout > RESEND_TIMEOUT_MAX)
517                 timeout = RESEND_TIMEOUT_MAX;
518         resend_timeout = timeout;
519 }
520                                 
521 /*
522         Connection
523 */
524
525 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
526         m_protocol_id(protocol_id),
527         m_max_packet_size(max_packet_size),
528         m_timeout(timeout),
529         m_peer_id(0),
530         m_bc_peerhandler(NULL),
531         m_bc_receive_timeout(0),
532         m_indentation(0)
533 {
534         m_socket.setTimeoutMs(5);
535
536         Start();
537 }
538
539 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
540                 PeerHandler *peerhandler):
541         m_protocol_id(protocol_id),
542         m_max_packet_size(max_packet_size),
543         m_timeout(timeout),
544         m_peer_id(0),
545         m_bc_peerhandler(peerhandler),
546         m_bc_receive_timeout(0),
547         m_indentation(0)
548 {
549         m_socket.setTimeoutMs(5);
550
551         Start();
552 }
553
554
555 Connection::~Connection()
556 {
557         stop();
558         // Delete peers
559         for(core::map<u16, Peer*>::Iterator
560                         j = m_peers.getIterator();
561                         j.atEnd() == false; j++)
562         {
563                 Peer *peer = j.getNode()->getValue();
564                 delete peer;
565         }
566 }
567
568 /* Internal stuff */
569
570 void * Connection::Thread()
571 {
572         ThreadStarted();
573         log_register_thread("Connection");
574
575         dout_con<<"Connection thread started"<<std::endl;
576         
577         u32 curtime = porting::getTimeMs();
578         u32 lasttime = curtime;
579
580         while(getRun())
581         {
582                 BEGIN_DEBUG_EXCEPTION_HANDLER
583                 
584                 lasttime = curtime;
585                 curtime = porting::getTimeMs();
586                 float dtime = (float)(curtime - lasttime) / 1000.;
587                 if(dtime > 0.1)
588                         dtime = 0.1;
589                 if(dtime < 0.0)
590                         dtime = 0.0;
591                 
592                 runTimeouts(dtime);
593
594                 while(m_command_queue.size() != 0){
595                         ConnectionCommand c = m_command_queue.pop_front();
596                         processCommand(c);
597                 }
598
599                 send(dtime);
600
601                 receive();
602                 
603                 END_DEBUG_EXCEPTION_HANDLER(derr_con);
604         }
605
606         return NULL;
607 }
608
609 void Connection::putEvent(ConnectionEvent &e)
610 {
611         assert(e.type != CONNEVENT_NONE);
612         m_event_queue.push_back(e);
613 }
614
615 void Connection::processCommand(ConnectionCommand &c)
616 {
617         switch(c.type){
618         case CONNCMD_NONE:
619                 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
620                 return;
621         case CONNCMD_SERVE:
622                 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
623                                 <<c.port<<std::endl;
624                 serve(c.port);
625                 return;
626         case CONNCMD_CONNECT:
627                 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
628                 connect(c.address);
629                 return;
630         case CONNCMD_DISCONNECT:
631                 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
632                 disconnect();
633                 return;
634         case CONNCMD_SEND:
635                 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
636                 send(c.peer_id, c.channelnum, c.data, c.reliable);
637                 return;
638         case CONNCMD_SEND_TO_ALL:
639                 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
640                 sendToAll(c.channelnum, c.data, c.reliable);
641                 return;
642         case CONNCMD_DELETE_PEER:
643                 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
644                 deletePeer(c.peer_id, false);
645                 return;
646         }
647 }
648
649 void Connection::send(float dtime)
650 {
651         for(core::map<u16, Peer*>::Iterator
652                         j = m_peers.getIterator();
653                         j.atEnd() == false; j++)
654         {
655                 Peer *peer = j.getNode()->getValue();
656                 peer->m_sendtime_accu += dtime;
657                 peer->m_num_sent = 0;
658                 peer->m_max_num_sent = peer->m_sendtime_accu *
659                                 peer->m_max_packets_per_second;
660         }
661         Queue<OutgoingPacket> postponed_packets;
662         while(m_outgoing_queue.size() != 0){
663                 OutgoingPacket packet = m_outgoing_queue.pop_front();
664                 Peer *peer = getPeerNoEx(packet.peer_id);
665                 if(!peer)
666                         continue;
667                 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
668                         postponed_packets.push_back(packet);
669                 } else if(peer->m_num_sent < peer->m_max_num_sent){
670                         rawSendAsPacket(packet.peer_id, packet.channelnum,
671                                         packet.data, packet.reliable);
672                         peer->m_num_sent++;
673                 } else {
674                         postponed_packets.push_back(packet);
675                 }
676         }
677         while(postponed_packets.size() != 0){
678                 m_outgoing_queue.push_back(postponed_packets.pop_front());
679         }
680         for(core::map<u16, Peer*>::Iterator
681                         j = m_peers.getIterator();
682                         j.atEnd() == false; j++)
683         {
684                 Peer *peer = j.getNode()->getValue();
685                 peer->m_sendtime_accu -= (float)peer->m_num_sent /
686                                 peer->m_max_packets_per_second;
687                 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
688                         peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
689         }
690 }
691
692 // Receive packets from the network and buffers and create ConnectionEvents
693 void Connection::receive()
694 {
695         u32 datasize = m_max_packet_size * 2;  // Double it just to be safe
696         // TODO: We can not know how many layers of header there are.
697         // For now, just assume there are no other than the base headers.
698         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
699         SharedBuffer<u8> packetdata(packet_maxsize);
700
701         bool single_wait_done = false;
702         
703         for(;;)
704         {
705         try{
706                 /* Check if some buffer has relevant data */
707                 {
708                         u16 peer_id;
709                         SharedBuffer<u8> resultdata;
710                         bool got = getFromBuffers(peer_id, resultdata);
711                         if(got){
712                                 ConnectionEvent e;
713                                 e.dataReceived(peer_id, resultdata);
714                                 putEvent(e);
715                                 continue;
716                         }
717                 }
718                 
719                 if(single_wait_done){
720                         if(m_socket.WaitData(0) == false)
721                                 break;
722                 }
723                 
724                 single_wait_done = true;
725
726                 Address sender;
727                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
728
729                 if(received_size < 0)
730                         break;
731                 if(received_size < BASE_HEADER_SIZE)
732                         continue;
733                 if(readU32(&packetdata[0]) != m_protocol_id)
734                         continue;
735                 
736                 u16 peer_id = readPeerId(*packetdata);
737                 u8 channelnum = readChannel(*packetdata);
738                 if(channelnum > CHANNEL_COUNT-1){
739                         PrintInfo(derr_con);
740                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
741                         throw InvalidIncomingDataException("Channel doesn't exist");
742                 }
743
744                 if(peer_id == PEER_ID_INEXISTENT)
745                 {
746                         /*
747                                 Somebody is trying to send stuff to us with no peer id.
748                                 
749                                 Check if the same address and port was added to our peer
750                                 list before.
751                                 Allow only entries that have has_sent_with_id==false.
752                         */
753
754                         core::map<u16, Peer*>::Iterator j;
755                         j = m_peers.getIterator();
756                         for(; j.atEnd() == false; j++)
757                         {
758                                 Peer *peer = j.getNode()->getValue();
759                                 if(peer->has_sent_with_id)
760                                         continue;
761                                 if(peer->address == sender)
762                                         break;
763                         }
764                         
765                         /*
766                                 If no peer was found with the same address and port,
767                                 we shall assume it is a new peer and create an entry.
768                         */
769                         if(j.atEnd())
770                         {
771                                 // Pass on to adding the peer
772                         }
773                         // Else: A peer was found.
774                         else
775                         {
776                                 Peer *peer = j.getNode()->getValue();
777                                 peer_id = peer->id;
778                                 PrintInfo(derr_con);
779                                 derr_con<<"WARNING: Assuming unknown peer to be "
780                                                 <<"peer_id="<<peer_id<<std::endl;
781                         }
782                 }
783                 
784                 /*
785                         The peer was not found in our lists. Add it.
786                 */
787                 if(peer_id == PEER_ID_INEXISTENT)
788                 {
789                         // Somebody wants to make a new connection
790
791                         // Get a unique peer id (2 or higher)
792                         u16 peer_id_new = 2;
793                         /*
794                                 Find an unused peer id
795                         */
796                         bool out_of_ids = false;
797                         for(;;)
798                         {
799                                 // Check if exists
800                                 if(m_peers.find(peer_id_new) == NULL)
801                                         break;
802                                 // Check for overflow
803                                 if(peer_id_new == 65535){
804                                         out_of_ids = true;
805                                         break;
806                                 }
807                                 peer_id_new++;
808                         }
809                         if(out_of_ids){
810                                 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
811                                 continue;
812                         }
813
814                         PrintInfo();
815                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
816                                         " giving peer_id="<<peer_id_new<<std::endl;
817
818                         // Create a peer
819                         Peer *peer = new Peer(peer_id_new, sender);
820                         m_peers.insert(peer->id, peer);
821                         
822                         // Create peer addition event
823                         ConnectionEvent e;
824                         e.peerAdded(peer_id_new, sender);
825                         putEvent(e);
826                         
827                         // Create CONTROL packet to tell the peer id to the new peer.
828                         SharedBuffer<u8> reply(4);
829                         writeU8(&reply[0], TYPE_CONTROL);
830                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
831                         writeU16(&reply[2], peer_id_new);
832                         sendAsPacket(peer_id_new, 0, reply, true);
833                         
834                         // We're now talking to a valid peer_id
835                         peer_id = peer_id_new;
836
837                         // Go on and process whatever it sent
838                 }
839
840                 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
841
842                 if(node == NULL)
843                 {
844                         // Peer not found
845                         // This means that the peer id of the sender is not PEER_ID_INEXISTENT
846                         // and it is invalid.
847                         PrintInfo(derr_con);
848                         derr_con<<"Receive(): Peer not found"<<std::endl;
849                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
850                 }
851
852                 Peer *peer = node->getValue();
853
854                 // Validate peer address
855                 if(peer->address != sender)
856                 {
857                         PrintInfo(derr_con);
858                         derr_con<<"Peer "<<peer_id<<" sending from different address."
859                                         " Ignoring."<<std::endl;
860                         continue;
861                 }
862                 
863                 peer->timeout_counter = 0.0;
864
865                 Channel *channel = &(peer->channels[channelnum]);
866                 
867                 // Throw the received packet to channel->processPacket()
868
869                 // Make a new SharedBuffer from the data without the base headers
870                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
871                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
872                                 strippeddata.getSize());
873                 
874                 try{
875                         // Process it (the result is some data with no headers made by us)
876                         SharedBuffer<u8> resultdata = processPacket
877                                         (channel, strippeddata, peer_id, channelnum, false);
878                         
879                         PrintInfo();
880                         dout_con<<"ProcessPacket returned data of size "
881                                         <<resultdata.getSize()<<std::endl;
882                         
883                         ConnectionEvent e;
884                         e.dataReceived(peer_id, resultdata);
885                         putEvent(e);
886                         continue;
887                 }catch(ProcessedSilentlyException &e){
888                 }
889         }catch(InvalidIncomingDataException &e){
890         }
891         catch(ProcessedSilentlyException &e){
892         }
893         } // for
894 }
895
896 void Connection::runTimeouts(float dtime)
897 {
898         float congestion_control_aim_rtt
899                         = g_settings->getFloat("congestion_control_aim_rtt");
900         float congestion_control_max_rate
901                         = g_settings->getFloat("congestion_control_max_rate");
902         float congestion_control_min_rate
903                         = g_settings->getFloat("congestion_control_min_rate");
904
905         core::list<u16> timeouted_peers;
906         core::map<u16, Peer*>::Iterator j;
907         j = m_peers.getIterator();
908         for(; j.atEnd() == false; j++)
909         {
910                 Peer *peer = j.getNode()->getValue();
911
912                 // Update congestion control values
913                 peer->congestion_control_aim_rtt = congestion_control_aim_rtt;
914                 peer->congestion_control_max_rate = congestion_control_max_rate;
915                 peer->congestion_control_min_rate = congestion_control_min_rate;
916                 
917                 /*
918                         Check peer timeout
919                 */
920                 peer->timeout_counter += dtime;
921                 if(peer->timeout_counter > m_timeout)
922                 {
923                         PrintInfo(derr_con);
924                         derr_con<<"RunTimeouts(): Peer "<<peer->id
925                                         <<" has timed out."
926                                         <<" (source=peer->timeout_counter)"
927                                         <<std::endl;
928                         // Add peer to the list
929                         timeouted_peers.push_back(peer->id);
930                         // Don't bother going through the buffers of this one
931                         continue;
932                 }
933
934                 float resend_timeout = peer->resend_timeout;
935                 for(u16 i=0; i<CHANNEL_COUNT; i++)
936                 {
937                         core::list<BufferedPacket> timed_outs;
938                         core::list<BufferedPacket>::Iterator j;
939                         
940                         Channel *channel = &peer->channels[i];
941
942                         // Remove timed out incomplete unreliable split packets
943                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
944                         
945                         // Increment reliable packet times
946                         channel->outgoing_reliables.incrementTimeouts(dtime);
947
948                         // Check reliable packet total times, remove peer if
949                         // over timeout.
950                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
951                         {
952                                 PrintInfo(derr_con);
953                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
954                                                 <<" has timed out."
955                                                 <<" (source=reliable packet totaltime)"
956                                                 <<std::endl;
957                                 // Add peer to the to-be-removed list
958                                 timeouted_peers.push_back(peer->id);
959                                 goto nextpeer;
960                         }
961
962                         // Re-send timed out outgoing reliables
963                         
964                         timed_outs = channel->
965                                         outgoing_reliables.getTimedOuts(resend_timeout);
966
967                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
968
969                         j = timed_outs.begin();
970                         for(; j != timed_outs.end(); j++)
971                         {
972                                 u16 peer_id = readPeerId(*(j->data));
973                                 u8 channel = readChannel(*(j->data));
974                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
975
976                                 PrintInfo(derr_con);
977                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
978                                 j->address.print(&derr_con);
979                                 derr_con<<"(t/o="<<resend_timeout<<"): "
980                                                 <<"from_peer_id="<<peer_id
981                                                 <<", channel="<<((int)channel&0xff)
982                                                 <<", seqnum="<<seqnum
983                                                 <<std::endl;
984
985                                 rawSend(*j);
986
987                                 // Enlarge avg_rtt and resend_timeout:
988                                 // The rtt will be at least the timeout.
989                                 // NOTE: This won't affect the timeout of the next
990                                 // checked channel because it was cached.
991                                 peer->reportRTT(resend_timeout);
992                         }
993                 }
994                 
995                 /*
996                         Send pings
997                 */
998                 peer->ping_timer += dtime;
999                 if(peer->ping_timer >= 5.0)
1000                 {
1001                         // Create and send PING packet
1002                         SharedBuffer<u8> data(2);
1003                         writeU8(&data[0], TYPE_CONTROL);
1004                         writeU8(&data[1], CONTROLTYPE_PING);
1005                         rawSendAsPacket(peer->id, 0, data, true);
1006
1007                         peer->ping_timer = 0.0;
1008                 }
1009                 
1010 nextpeer:
1011                 continue;
1012         }
1013
1014         // Remove timed out peers
1015         core::list<u16>::Iterator i = timeouted_peers.begin();
1016         for(; i != timeouted_peers.end(); i++)
1017         {
1018                 PrintInfo(derr_con);
1019                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1020                 deletePeer(*i, true);
1021         }
1022 }
1023
1024 void Connection::serve(u16 port)
1025 {
1026         dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
1027         try{
1028                 m_socket.Bind(port);
1029                 m_peer_id = PEER_ID_SERVER;
1030         }
1031         catch(SocketException &e){
1032                 // Create event
1033                 ConnectionEvent ce;
1034                 ce.bindFailed();
1035                 putEvent(ce);
1036         }
1037 }
1038
1039 void Connection::connect(Address address)
1040 {
1041         dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1042                         <<":"<<address.getPort()<<std::endl;
1043
1044         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1045         if(node != NULL){
1046                 throw ConnectionException("Already connected to a server");
1047         }
1048
1049         Peer *peer = new Peer(PEER_ID_SERVER, address);
1050         m_peers.insert(peer->id, peer);
1051
1052         // Create event
1053         ConnectionEvent e;
1054         e.peerAdded(peer->id, peer->address);
1055         putEvent(e);
1056         
1057         m_socket.Bind(0);
1058         
1059         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1060         m_peer_id = PEER_ID_INEXISTENT;
1061         SharedBuffer<u8> data(0);
1062         Send(PEER_ID_SERVER, 0, data, true);
1063 }
1064
1065 void Connection::disconnect()
1066 {
1067         dout_con<<getDesc()<<" disconnecting"<<std::endl;
1068
1069         // Create and send DISCO packet
1070         SharedBuffer<u8> data(2);
1071         writeU8(&data[0], TYPE_CONTROL);
1072         writeU8(&data[1], CONTROLTYPE_DISCO);
1073         
1074         // Send to all
1075         core::map<u16, Peer*>::Iterator j;
1076         j = m_peers.getIterator();
1077         for(; j.atEnd() == false; j++)
1078         {
1079                 Peer *peer = j.getNode()->getValue();
1080                 rawSendAsPacket(peer->id, 0, data, false);
1081         }
1082 }
1083
1084 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1085 {
1086         core::map<u16, Peer*>::Iterator j;
1087         j = m_peers.getIterator();
1088         for(; j.atEnd() == false; j++)
1089         {
1090                 Peer *peer = j.getNode()->getValue();
1091                 send(peer->id, channelnum, data, reliable);
1092         }
1093 }
1094
1095 void Connection::send(u16 peer_id, u8 channelnum,
1096                 SharedBuffer<u8> data, bool reliable)
1097 {
1098         dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1099
1100         assert(channelnum < CHANNEL_COUNT);
1101         
1102         Peer *peer = getPeerNoEx(peer_id);
1103         if(peer == NULL)
1104                 return;
1105         Channel *channel = &(peer->channels[channelnum]);
1106
1107         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1108         if(reliable)
1109                 chunksize_max -= RELIABLE_HEADER_SIZE;
1110
1111         core::list<SharedBuffer<u8> > originals;
1112         originals = makeAutoSplitPacket(data, chunksize_max,
1113                         channel->next_outgoing_split_seqnum);
1114         
1115         core::list<SharedBuffer<u8> >::Iterator i;
1116         i = originals.begin();
1117         for(; i != originals.end(); i++)
1118         {
1119                 SharedBuffer<u8> original = *i;
1120                 
1121                 sendAsPacket(peer_id, channelnum, original, reliable);
1122         }
1123 }
1124
1125 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1126                 SharedBuffer<u8> data, bool reliable)
1127 {
1128         OutgoingPacket packet(peer_id, channelnum, data, reliable);
1129         m_outgoing_queue.push_back(packet);
1130 }
1131
1132 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1133                 SharedBuffer<u8> data, bool reliable)
1134 {
1135         Peer *peer = getPeerNoEx(peer_id);
1136         if(!peer)
1137                 return;
1138         Channel *channel = &(peer->channels[channelnum]);
1139
1140         if(reliable)
1141         {
1142                 u16 seqnum = channel->next_outgoing_seqnum;
1143                 channel->next_outgoing_seqnum++;
1144
1145                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1146
1147                 // Add base headers and make a packet
1148                 BufferedPacket p = makePacket(peer->address, reliable,
1149                                 m_protocol_id, m_peer_id, channelnum);
1150                 
1151                 try{
1152                         // Buffer the packet
1153                         channel->outgoing_reliables.insert(p);
1154                 }
1155                 catch(AlreadyExistsException &e)
1156                 {
1157                         PrintInfo(derr_con);
1158                         derr_con<<"WARNING: Going to send a reliable packet "
1159                                         "seqnum="<<seqnum<<" that is already "
1160                                         "in outgoing buffer"<<std::endl;
1161                         //assert(0);
1162                 }
1163                 
1164                 // Send the packet
1165                 rawSend(p);
1166         }
1167         else
1168         {
1169                 // Add base headers and make a packet
1170                 BufferedPacket p = makePacket(peer->address, data,
1171                                 m_protocol_id, m_peer_id, channelnum);
1172
1173                 // Send the packet
1174                 rawSend(p);
1175         }
1176 }
1177
1178 void Connection::rawSend(const BufferedPacket &packet)
1179 {
1180         try{
1181                 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1182         } catch(SendFailedException &e){
1183                 derr_con<<"Connection::rawSend(): SendFailedException: "
1184                                 <<packet.address.serializeString()<<std::endl;
1185         }
1186 }
1187
1188 Peer* Connection::getPeer(u16 peer_id)
1189 {
1190         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1191
1192         if(node == NULL){
1193                 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1194         }
1195
1196         // Error checking
1197         assert(node->getValue()->id == peer_id);
1198
1199         return node->getValue();
1200 }
1201
1202 Peer* Connection::getPeerNoEx(u16 peer_id)
1203 {
1204         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1205
1206         if(node == NULL){
1207                 return NULL;
1208         }
1209
1210         // Error checking
1211         assert(node->getValue()->id == peer_id);
1212
1213         return node->getValue();
1214 }
1215
1216 core::list<Peer*> Connection::getPeers()
1217 {
1218         core::list<Peer*> list;
1219         core::map<u16, Peer*>::Iterator j;
1220         j = m_peers.getIterator();
1221         for(; j.atEnd() == false; j++)
1222         {
1223                 Peer *peer = j.getNode()->getValue();
1224                 list.push_back(peer);
1225         }
1226         return list;
1227 }
1228
1229 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1230 {
1231         core::map<u16, Peer*>::Iterator j;
1232         j = m_peers.getIterator();
1233         for(; j.atEnd() == false; j++)
1234         {
1235                 Peer *peer = j.getNode()->getValue();
1236                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1237                 {
1238                         Channel *channel = &peer->channels[i];
1239                         SharedBuffer<u8> resultdata;
1240                         bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1241                         if(got){
1242                                 dst = resultdata;
1243                                 return true;
1244                         }
1245                 }
1246         }
1247         return false;
1248 }
1249
1250 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1251                 SharedBuffer<u8> &dst)
1252 {
1253         u16 firstseqnum = 0;
1254         // Clear old packets from start of buffer
1255         try{
1256         for(;;){
1257                 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1258                 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1259                         channel->incoming_reliables.popFirst();
1260                 else
1261                         break;
1262         }
1263         // This happens if all packets are old
1264         }catch(con::NotFoundException)
1265         {}
1266         
1267         if(channel->incoming_reliables.empty() == false)
1268         {
1269                 if(firstseqnum == channel->next_incoming_seqnum)
1270                 {
1271                         BufferedPacket p = channel->incoming_reliables.popFirst();
1272                         
1273                         peer_id = readPeerId(*p.data);
1274                         u8 channelnum = readChannel(*p.data);
1275                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1276
1277                         PrintInfo();
1278                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
1279                                         <<" seqnum="<<seqnum
1280                                         <<" peer_id="<<peer_id
1281                                         <<" channel="<<((int)channelnum&0xff)
1282                                         <<std::endl;
1283
1284                         channel->next_incoming_seqnum++;
1285                         
1286                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1287                         // Get out the inside packet and re-process it
1288                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1289                         memcpy(*payload, &p.data[headers_size], payload.getSize());
1290
1291                         dst = processPacket(channel, payload, peer_id, channelnum, true);
1292                         return true;
1293                 }
1294         }
1295         return false;
1296 }
1297
1298 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1299                 SharedBuffer<u8> packetdata, u16 peer_id,
1300                 u8 channelnum, bool reliable)
1301 {
1302         IndentationRaiser iraiser(&(m_indentation));
1303
1304         if(packetdata.getSize() < 1)
1305                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1306
1307         u8 type = readU8(&packetdata[0]);
1308         
1309         if(type == TYPE_CONTROL)
1310         {
1311                 if(packetdata.getSize() < 2)
1312                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
1313
1314                 u8 controltype = readU8(&packetdata[1]);
1315
1316                 if(controltype == CONTROLTYPE_ACK)
1317                 {
1318                         if(packetdata.getSize() < 4)
1319                                 throw InvalidIncomingDataException
1320                                                 ("packetdata.getSize() < 4 (ACK header size)");
1321
1322                         u16 seqnum = readU16(&packetdata[2]);
1323                         PrintInfo();
1324                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1325                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1326                                         <<", seqnum="<<seqnum<<std::endl;
1327
1328                         try{
1329                                 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1330                                 // Get round trip time
1331                                 float rtt = p.totaltime;
1332
1333                                 // Let peer calculate stuff according to it
1334                                 // (avg_rtt and resend_timeout)
1335                                 Peer *peer = getPeer(peer_id);
1336                                 peer->reportRTT(rtt);
1337
1338                                 //PrintInfo(dout_con);
1339                                 //dout_con<<"RTT = "<<rtt<<std::endl;
1340
1341                                 /*dout_con<<"OUTGOING: ";
1342                                 PrintInfo();
1343                                 channel->outgoing_reliables.print();
1344                                 dout_con<<std::endl;*/
1345                         }
1346                         catch(NotFoundException &e){
1347                                 PrintInfo(derr_con);
1348                                 derr_con<<"WARNING: ACKed packet not "
1349                                                 "in outgoing queue"
1350                                                 <<std::endl;
1351                         }
1352
1353                         throw ProcessedSilentlyException("Got an ACK");
1354                 }
1355                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1356                 {
1357                         if(packetdata.getSize() < 4)
1358                                 throw InvalidIncomingDataException
1359                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1360                         u16 peer_id_new = readU16(&packetdata[2]);
1361                         PrintInfo();
1362                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1363
1364                         if(GetPeerID() != PEER_ID_INEXISTENT)
1365                         {
1366                                 PrintInfo(derr_con);
1367                                 derr_con<<"WARNING: Not changing"
1368                                                 " existing peer id."<<std::endl;
1369                         }
1370                         else
1371                         {
1372                                 dout_con<<"changing."<<std::endl;
1373                                 SetPeerID(peer_id_new);
1374                         }
1375                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
1376                 }
1377                 else if(controltype == CONTROLTYPE_PING)
1378                 {
1379                         // Just ignore it, the incoming data already reset
1380                         // the timeout counter
1381                         PrintInfo();
1382                         dout_con<<"PING"<<std::endl;
1383                         throw ProcessedSilentlyException("Got a PING");
1384                 }
1385                 else if(controltype == CONTROLTYPE_DISCO)
1386                 {
1387                         // Just ignore it, the incoming data already reset
1388                         // the timeout counter
1389                         PrintInfo();
1390                         dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1391                         
1392                         if(deletePeer(peer_id, false) == false)
1393                         {
1394                                 PrintInfo(derr_con);
1395                                 derr_con<<"DISCO: Peer not found"<<std::endl;
1396                         }
1397
1398                         throw ProcessedSilentlyException("Got a DISCO");
1399                 }
1400                 else{
1401                         PrintInfo(derr_con);
1402                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1403                                         <<((int)controltype&0xff)<<std::endl;
1404                         throw InvalidIncomingDataException("Invalid control type");
1405                 }
1406         }
1407         else if(type == TYPE_ORIGINAL)
1408         {
1409                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1410                         throw InvalidIncomingDataException
1411                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1412                 PrintInfo();
1413                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1414                                 <<std::endl;
1415                 // Get the inside packet out and return it
1416                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1417                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1418                 return payload;
1419         }
1420         else if(type == TYPE_SPLIT)
1421         {
1422                 // We have to create a packet again for buffering
1423                 // This isn't actually too bad an idea.
1424                 BufferedPacket packet = makePacket(
1425                                 getPeer(peer_id)->address,
1426                                 packetdata,
1427                                 GetProtocolID(),
1428                                 peer_id,
1429                                 channelnum);
1430                 // Buffer the packet
1431                 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1432                 if(data.getSize() != 0)
1433                 {
1434                         PrintInfo();
1435                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1436                                         <<"size="<<data.getSize()<<std::endl;
1437                         return data;
1438                 }
1439                 PrintInfo();
1440                 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1441                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1442         }
1443         else if(type == TYPE_RELIABLE)
1444         {
1445                 // Recursive reliable packets not allowed
1446                 assert(reliable == false);
1447
1448                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1449                         throw InvalidIncomingDataException
1450                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1451
1452                 u16 seqnum = readU16(&packetdata[1]);
1453
1454                 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1455                 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1456                 
1457                 PrintInfo();
1458                 if(is_future_packet)
1459                         dout_con<<"BUFFERING";
1460                 else if(is_old_packet)
1461                         dout_con<<"OLD";
1462                 else
1463                         dout_con<<"RECUR";
1464                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1465                                 <<" next="<<channel->next_incoming_seqnum;
1466                 dout_con<<" [sending CONTROLTYPE_ACK"
1467                                 " to peer_id="<<peer_id<<"]";
1468                 dout_con<<std::endl;
1469                 
1470                 //DEBUG
1471                 //assert(channel->incoming_reliables.size() < 100);
1472
1473                 // Send a CONTROLTYPE_ACK
1474                 SharedBuffer<u8> reply(4);
1475                 writeU8(&reply[0], TYPE_CONTROL);
1476                 writeU8(&reply[1], CONTROLTYPE_ACK);
1477                 writeU16(&reply[2], seqnum);
1478                 rawSendAsPacket(peer_id, channelnum, reply, false);
1479
1480                 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1481                 if(is_future_packet)
1482                 {
1483                         /*PrintInfo();
1484                         dout_con<<"Buffering reliable packet (seqnum="
1485                                         <<seqnum<<")"<<std::endl;*/
1486                         
1487                         // This one comes later, buffer it.
1488                         // Actually we have to make a packet to buffer one.
1489                         // Well, we have all the ingredients, so just do it.
1490                         BufferedPacket packet = makePacket(
1491                                         getPeer(peer_id)->address,
1492                                         packetdata,
1493                                         GetProtocolID(),
1494                                         peer_id,
1495                                         channelnum);
1496                         try{
1497                                 channel->incoming_reliables.insert(packet);
1498                                 
1499                                 /*PrintInfo();
1500                                 dout_con<<"INCOMING: ";
1501                                 channel->incoming_reliables.print();
1502                                 dout_con<<std::endl;*/
1503                         }
1504                         catch(AlreadyExistsException &e)
1505                         {
1506                         }
1507
1508                         throw ProcessedSilentlyException("Buffered future reliable packet");
1509                 }
1510                 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1511                 else if(is_old_packet)
1512                 {
1513                         // An old packet, dump it
1514                         throw InvalidIncomingDataException("Got an old reliable packet");
1515                 }
1516
1517                 channel->next_incoming_seqnum++;
1518
1519                 // Get out the inside packet and re-process it
1520                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1521                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1522
1523                 return processPacket(channel, payload, peer_id, channelnum, true);
1524         }
1525         else
1526         {
1527                 PrintInfo(derr_con);
1528                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1529                 throw InvalidIncomingDataException("Invalid packet type");
1530         }
1531         
1532         // We should never get here.
1533         // If you get here, add an exception or a return to some of the
1534         // above conditionals.
1535         assert(0);
1536         throw BaseException("Error in Channel::ProcessPacket()");
1537 }
1538
1539 bool Connection::deletePeer(u16 peer_id, bool timeout)
1540 {
1541         if(m_peers.find(peer_id) == NULL)
1542                 return false;
1543         
1544         Peer *peer = m_peers[peer_id];
1545
1546         // Create event
1547         ConnectionEvent e;
1548         e.peerRemoved(peer_id, timeout, peer->address);
1549         putEvent(e);
1550
1551         delete m_peers[peer_id];
1552         m_peers.remove(peer_id);
1553         return true;
1554 }
1555
1556 /* Interface */
1557
1558 ConnectionEvent Connection::getEvent()
1559 {
1560         if(m_event_queue.size() == 0){
1561                 ConnectionEvent e;
1562                 e.type = CONNEVENT_NONE;
1563                 return e;
1564         }
1565         return m_event_queue.pop_front();
1566 }
1567
1568 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1569 {
1570         try{
1571                 return m_event_queue.pop_front(timeout_ms);
1572         } catch(ItemNotFoundException &ex){
1573                 ConnectionEvent e;
1574                 e.type = CONNEVENT_NONE;
1575                 return e;
1576         }
1577 }
1578
1579 void Connection::putCommand(ConnectionCommand &c)
1580 {
1581         m_command_queue.push_back(c);
1582 }
1583
1584 void Connection::Serve(unsigned short port)
1585 {
1586         ConnectionCommand c;
1587         c.serve(port);
1588         putCommand(c);
1589 }
1590
1591 void Connection::Connect(Address address)
1592 {
1593         ConnectionCommand c;
1594         c.connect(address);
1595         putCommand(c);
1596 }
1597
1598 bool Connection::Connected()
1599 {
1600         JMutexAutoLock peerlock(m_peers_mutex);
1601
1602         if(m_peers.size() != 1)
1603                 return false;
1604                 
1605         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1606         if(node == NULL)
1607                 return false;
1608         
1609         if(m_peer_id == PEER_ID_INEXISTENT)
1610                 return false;
1611         
1612         return true;
1613 }
1614
1615 void Connection::Disconnect()
1616 {
1617         ConnectionCommand c;
1618         c.disconnect();
1619         putCommand(c);
1620 }
1621
1622 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1623 {
1624         for(;;){
1625                 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1626                 if(e.type != CONNEVENT_NONE)
1627                         dout_con<<getDesc()<<": Receive: got event: "
1628                                         <<e.describe()<<std::endl;
1629                 switch(e.type){
1630                 case CONNEVENT_NONE:
1631                         throw NoIncomingDataException("No incoming data");
1632                 case CONNEVENT_DATA_RECEIVED:
1633                         peer_id = e.peer_id;
1634                         data = SharedBuffer<u8>(e.data);
1635                         return e.data.getSize();
1636                 case CONNEVENT_PEER_ADDED: {
1637                         Peer tmp(e.peer_id, e.address);
1638                         if(m_bc_peerhandler)
1639                                 m_bc_peerhandler->peerAdded(&tmp);
1640                         continue; }
1641                 case CONNEVENT_PEER_REMOVED: {
1642                         Peer tmp(e.peer_id, e.address);
1643                         if(m_bc_peerhandler)
1644                                 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1645                         continue; }
1646                 case CONNEVENT_BIND_FAILED:
1647                         throw ConnectionBindFailed("Failed to bind socket "
1648                                         "(port already in use?)");
1649                 }
1650         }
1651         throw NoIncomingDataException("No incoming data");
1652 }
1653
1654 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1655 {
1656         assert(channelnum < CHANNEL_COUNT);
1657
1658         ConnectionCommand c;
1659         c.sendToAll(channelnum, data, reliable);
1660         putCommand(c);
1661 }
1662
1663 void Connection::Send(u16 peer_id, u8 channelnum,
1664                 SharedBuffer<u8> data, bool reliable)
1665 {
1666         assert(channelnum < CHANNEL_COUNT);
1667
1668         ConnectionCommand c;
1669         c.send(peer_id, channelnum, data, reliable);
1670         putCommand(c);
1671 }
1672
1673 void Connection::RunTimeouts(float dtime)
1674 {
1675         // No-op
1676 }
1677
1678 Address Connection::GetPeerAddress(u16 peer_id)
1679 {
1680         JMutexAutoLock peerlock(m_peers_mutex);
1681         return getPeer(peer_id)->address;
1682 }
1683
1684 float Connection::GetPeerAvgRTT(u16 peer_id)
1685 {
1686         JMutexAutoLock peerlock(m_peers_mutex);
1687         return getPeer(peer_id)->avg_rtt;
1688 }
1689
1690 void Connection::DeletePeer(u16 peer_id)
1691 {
1692         ConnectionCommand c;
1693         c.deletePeer(peer_id);
1694         putCommand(c);
1695 }
1696
1697 void Connection::PrintInfo(std::ostream &out)
1698 {
1699         out<<getDesc()<<": ";
1700 }
1701
1702 void Connection::PrintInfo()
1703 {
1704         PrintInfo(dout_con);
1705 }
1706
1707 std::string Connection::getDesc()
1708 {
1709         return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";
1710 }
1711
1712 } // namespace
1713