]> git.lizzy.rs Git - minetest.git/blob - src/connection.cpp
Update ContentFeatures serialization format now as PROTOCOL_VERSION was changed
[minetest.git] / src / connection.cpp
1 /*
2 Minetest-c55
3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "connection.h"
21 #include "main.h"
22 #include "serialization.h"
23 #include "log.h"
24 #include "porting.h"
25 #include "util/serialize.h"
26 #include "util/numeric.h"
27 #include "util/string.h"
28
29 namespace con
30 {
31
32 static u16 readPeerId(u8 *packetdata)
33 {
34         return readU16(&packetdata[4]);
35 }
36 static u8 readChannel(u8 *packetdata)
37 {
38         return readU8(&packetdata[6]);
39 }
40
41 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
42                 u32 protocol_id, u16 sender_peer_id, u8 channel)
43 {
44         u32 packet_size = datasize + BASE_HEADER_SIZE;
45         BufferedPacket p(packet_size);
46         p.address = address;
47
48         writeU32(&p.data[0], protocol_id);
49         writeU16(&p.data[4], sender_peer_id);
50         writeU8(&p.data[6], channel);
51
52         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
53
54         return p;
55 }
56
57 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
58                 u32 protocol_id, u16 sender_peer_id, u8 channel)
59 {
60         return makePacket(address, *data, data.getSize(),
61                         protocol_id, sender_peer_id, channel);
62 }
63
64 SharedBuffer<u8> makeOriginalPacket(
65                 SharedBuffer<u8> data)
66 {
67         u32 header_size = 1;
68         u32 packet_size = data.getSize() + header_size;
69         SharedBuffer<u8> b(packet_size);
70
71         writeU8(&b[0], TYPE_ORIGINAL);
72
73         memcpy(&b[header_size], *data, data.getSize());
74
75         return b;
76 }
77
78 core::list<SharedBuffer<u8> > makeSplitPacket(
79                 SharedBuffer<u8> data,
80                 u32 chunksize_max,
81                 u16 seqnum)
82 {
83         // Chunk packets, containing the TYPE_SPLIT header
84         core::list<SharedBuffer<u8> > chunks;
85         
86         u32 chunk_header_size = 7;
87         u32 maximum_data_size = chunksize_max - chunk_header_size;
88         u32 start = 0;
89         u32 end = 0;
90         u32 chunk_num = 0;
91         do{
92                 end = start + maximum_data_size - 1;
93                 if(end > data.getSize() - 1)
94                         end = data.getSize() - 1;
95                 
96                 u32 payload_size = end - start + 1;
97                 u32 packet_size = chunk_header_size + payload_size;
98
99                 SharedBuffer<u8> chunk(packet_size);
100                 
101                 writeU8(&chunk[0], TYPE_SPLIT);
102                 writeU16(&chunk[1], seqnum);
103                 // [3] u16 chunk_count is written at next stage
104                 writeU16(&chunk[5], chunk_num);
105                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
106
107                 chunks.push_back(chunk);
108                 
109                 start = end + 1;
110                 chunk_num++;
111         }
112         while(end != data.getSize() - 1);
113
114         u16 chunk_count = chunks.getSize();
115
116         core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
117         for(; i != chunks.end(); i++)
118         {
119                 // Write chunk_count
120                 writeU16(&((*i)[3]), chunk_count);
121         }
122
123         return chunks;
124 }
125
126 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
127                 SharedBuffer<u8> data,
128                 u32 chunksize_max,
129                 u16 &split_seqnum)
130 {
131         u32 original_header_size = 1;
132         core::list<SharedBuffer<u8> > list;
133         if(data.getSize() + original_header_size > chunksize_max)
134         {
135                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
136                 split_seqnum++;
137                 return list;
138         }
139         else
140         {
141                 list.push_back(makeOriginalPacket(data));
142         }
143         return list;
144 }
145
146 SharedBuffer<u8> makeReliablePacket(
147                 SharedBuffer<u8> data,
148                 u16 seqnum)
149 {
150         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
151         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
152                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
153         u32 header_size = 3;
154         u32 packet_size = data.getSize() + header_size;
155         SharedBuffer<u8> b(packet_size);
156
157         writeU8(&b[0], TYPE_RELIABLE);
158         writeU16(&b[1], seqnum);
159
160         memcpy(&b[header_size], *data, data.getSize());
161
162         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
163                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
164         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
165         return b;
166 }
167
168 /*
169         ReliablePacketBuffer
170 */
171
172 void ReliablePacketBuffer::print()
173 {
174         core::list<BufferedPacket>::Iterator i;
175         i = m_list.begin();
176         for(; i != m_list.end(); i++)
177         {
178                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
179                 dout_con<<s<<" ";
180         }
181 }
182 bool ReliablePacketBuffer::empty()
183 {
184         return m_list.empty();
185 }
186 u32 ReliablePacketBuffer::size()
187 {
188         return m_list.getSize();
189 }
190 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
191 {
192         core::list<BufferedPacket>::Iterator i;
193         i = m_list.begin();
194         for(; i != m_list.end(); i++)
195         {
196                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
197                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
198                                 <<", comparing to s="<<s<<std::endl;*/
199                 if(s == seqnum)
200                         break;
201         }
202         return i;
203 }
204 RPBSearchResult ReliablePacketBuffer::notFound()
205 {
206         return m_list.end();
207 }
208 u16 ReliablePacketBuffer::getFirstSeqnum()
209 {
210         if(empty())
211                 throw NotFoundException("Buffer is empty");
212         BufferedPacket p = *m_list.begin();
213         return readU16(&p.data[BASE_HEADER_SIZE+1]);
214 }
215 BufferedPacket ReliablePacketBuffer::popFirst()
216 {
217         if(empty())
218                 throw NotFoundException("Buffer is empty");
219         BufferedPacket p = *m_list.begin();
220         core::list<BufferedPacket>::Iterator i = m_list.begin();
221         m_list.erase(i);
222         return p;
223 }
224 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
225 {
226         RPBSearchResult r = findPacket(seqnum);
227         if(r == notFound()){
228                 dout_con<<"Not found"<<std::endl;
229                 throw NotFoundException("seqnum not found in buffer");
230         }
231         BufferedPacket p = *r;
232         m_list.erase(r);
233         return p;
234 }
235 void ReliablePacketBuffer::insert(BufferedPacket &p)
236 {
237         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
238         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
239         assert(type == TYPE_RELIABLE);
240         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
241
242         // Find the right place for the packet and insert it there
243
244         // If list is empty, just add it
245         if(m_list.empty())
246         {
247                 m_list.push_back(p);
248                 // Done.
249                 return;
250         }
251         // Otherwise find the right place
252         core::list<BufferedPacket>::Iterator i;
253         i = m_list.begin();
254         // Find the first packet in the list which has a higher seqnum
255         for(; i != m_list.end(); i++){
256                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
257                 if(s == seqnum){
258                         throw AlreadyExistsException("Same seqnum in list");
259                 }
260                 if(seqnum_higher(s, seqnum)){
261                         break;
262                 }
263         }
264         // If we're at the end of the list, add the packet to the
265         // end of the list
266         if(i == m_list.end())
267         {
268                 m_list.push_back(p);
269                 // Done.
270                 return;
271         }
272         // Insert before i
273         m_list.insert_before(i, p);
274 }
275
276 void ReliablePacketBuffer::incrementTimeouts(float dtime)
277 {
278         core::list<BufferedPacket>::Iterator i;
279         i = m_list.begin();
280         for(; i != m_list.end(); i++){
281                 i->time += dtime;
282                 i->totaltime += dtime;
283         }
284 }
285
286 void ReliablePacketBuffer::resetTimedOuts(float timeout)
287 {
288         core::list<BufferedPacket>::Iterator i;
289         i = m_list.begin();
290         for(; i != m_list.end(); i++){
291                 if(i->time >= timeout)
292                         i->time = 0.0;
293         }
294 }
295
296 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
297 {
298         core::list<BufferedPacket>::Iterator i;
299         i = m_list.begin();
300         for(; i != m_list.end(); i++){
301                 if(i->totaltime >= timeout)
302                         return true;
303         }
304         return false;
305 }
306
307 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
308 {
309         core::list<BufferedPacket> timed_outs;
310         core::list<BufferedPacket>::Iterator i;
311         i = m_list.begin();
312         for(; i != m_list.end(); i++)
313         {
314                 if(i->time >= timeout)
315                         timed_outs.push_back(*i);
316         }
317         return timed_outs;
318 }
319
320 /*
321         IncomingSplitBuffer
322 */
323
324 IncomingSplitBuffer::~IncomingSplitBuffer()
325 {
326         core::map<u16, IncomingSplitPacket*>::Iterator i;
327         i = m_buf.getIterator();
328         for(; i.atEnd() == false; i++)
329         {
330                 delete i.getNode()->getValue();
331         }
332 }
333 /*
334         This will throw a GotSplitPacketException when a full
335         split packet is constructed.
336 */
337 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
338 {
339         u32 headersize = BASE_HEADER_SIZE + 7;
340         assert(p.data.getSize() >= headersize);
341         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
342         assert(type == TYPE_SPLIT);
343         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
344         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
345         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
346
347         // Add if doesn't exist
348         if(m_buf.find(seqnum) == NULL)
349         {
350                 IncomingSplitPacket *sp = new IncomingSplitPacket();
351                 sp->chunk_count = chunk_count;
352                 sp->reliable = reliable;
353                 m_buf[seqnum] = sp;
354         }
355         
356         IncomingSplitPacket *sp = m_buf[seqnum];
357         
358         // TODO: These errors should be thrown or something? Dunno.
359         if(chunk_count != sp->chunk_count)
360                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
361                                 <<" != sp->chunk_count="<<sp->chunk_count
362                                 <<std::endl;
363         if(reliable != sp->reliable)
364                 derr_con<<"Connection: WARNING: reliable="<<reliable
365                                 <<" != sp->reliable="<<sp->reliable
366                                 <<std::endl;
367
368         // If chunk already exists, ignore it.
369         // Sometimes two identical packets may arrive when there is network
370         // lag and the server re-sends stuff.
371         if(sp->chunks.find(chunk_num) != NULL)
372                 return SharedBuffer<u8>();
373         
374         // Cut chunk data out of packet
375         u32 chunkdatasize = p.data.getSize() - headersize;
376         SharedBuffer<u8> chunkdata(chunkdatasize);
377         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
378         
379         // Set chunk data in buffer
380         sp->chunks[chunk_num] = chunkdata;
381         
382         // If not all chunks are received, return empty buffer
383         if(sp->allReceived() == false)
384                 return SharedBuffer<u8>();
385
386         // Calculate total size
387         u32 totalsize = 0;
388         core::map<u16, SharedBuffer<u8> >::Iterator i;
389         i = sp->chunks.getIterator();
390         for(; i.atEnd() == false; i++)
391         {
392                 totalsize += i.getNode()->getValue().getSize();
393         }
394         
395         SharedBuffer<u8> fulldata(totalsize);
396
397         // Copy chunks to data buffer
398         u32 start = 0;
399         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
400                         chunk_i++)
401         {
402                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
403                 u16 chunkdatasize = buf.getSize();
404                 memcpy(&fulldata[start], *buf, chunkdatasize);
405                 start += chunkdatasize;;
406         }
407
408         // Remove sp from buffer
409         m_buf.remove(seqnum);
410         delete sp;
411
412         return fulldata;
413 }
414 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
415 {
416         core::list<u16> remove_queue;
417         core::map<u16, IncomingSplitPacket*>::Iterator i;
418         i = m_buf.getIterator();
419         for(; i.atEnd() == false; i++)
420         {
421                 IncomingSplitPacket *p = i.getNode()->getValue();
422                 // Reliable ones are not removed by timeout
423                 if(p->reliable == true)
424                         continue;
425                 p->time += dtime;
426                 if(p->time >= timeout)
427                         remove_queue.push_back(i.getNode()->getKey());
428         }
429         core::list<u16>::Iterator j;
430         j = remove_queue.begin();
431         for(; j != remove_queue.end(); j++)
432         {
433                 dout_con<<"NOTE: Removing timed out unreliable split packet"
434                                 <<std::endl;
435                 delete m_buf[*j];
436                 m_buf.remove(*j);
437         }
438 }
439
440 /*
441         Channel
442 */
443
444 Channel::Channel()
445 {
446         next_outgoing_seqnum = SEQNUM_INITIAL;
447         next_incoming_seqnum = SEQNUM_INITIAL;
448         next_outgoing_split_seqnum = SEQNUM_INITIAL;
449 }
450 Channel::~Channel()
451 {
452 }
453
454 /*
455         Peer
456 */
457
458 Peer::Peer(u16 a_id, Address a_address):
459         address(a_address),
460         id(a_id),
461         timeout_counter(0.0),
462         ping_timer(0.0),
463         resend_timeout(0.5),
464         avg_rtt(-1.0),
465         has_sent_with_id(false),
466         m_sendtime_accu(0),
467         m_max_packets_per_second(10),
468         m_num_sent(0),
469         m_max_num_sent(0)
470 {
471 }
472 Peer::~Peer()
473 {
474 }
475
476 void Peer::reportRTT(float rtt)
477 {
478         if(rtt >= 0.0){
479                 if(rtt < 0.01){
480                         if(m_max_packets_per_second < 400)
481                                 m_max_packets_per_second += 10;
482                 } else if(rtt < 0.2){
483                         if(m_max_packets_per_second < 100)
484                                 m_max_packets_per_second += 2;
485                 } else {
486                         m_max_packets_per_second *= 0.8;
487                         if(m_max_packets_per_second < 10)
488                                 m_max_packets_per_second = 10;
489                 }
490         }
491
492         if(rtt < -0.999)
493         {}
494         else if(avg_rtt < 0.0)
495                 avg_rtt = rtt;
496         else
497                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
498         
499         // Calculate resend_timeout
500
501         /*int reliable_count = 0;
502         for(int i=0; i<CHANNEL_COUNT; i++)
503         {
504                 reliable_count += channels[i].outgoing_reliables.size();
505         }
506         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
507                         * ((float)reliable_count * 1);*/
508         
509         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
510         if(timeout < RESEND_TIMEOUT_MIN)
511                 timeout = RESEND_TIMEOUT_MIN;
512         if(timeout > RESEND_TIMEOUT_MAX)
513                 timeout = RESEND_TIMEOUT_MAX;
514         resend_timeout = timeout;
515 }
516                                 
517 /*
518         Connection
519 */
520
521 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
522         m_protocol_id(protocol_id),
523         m_max_packet_size(max_packet_size),
524         m_timeout(timeout),
525         m_peer_id(0),
526         m_bc_peerhandler(NULL),
527         m_bc_receive_timeout(0),
528         m_indentation(0)
529 {
530         m_socket.setTimeoutMs(5);
531
532         Start();
533 }
534
535 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
536                 PeerHandler *peerhandler):
537         m_protocol_id(protocol_id),
538         m_max_packet_size(max_packet_size),
539         m_timeout(timeout),
540         m_peer_id(0),
541         m_bc_peerhandler(peerhandler),
542         m_bc_receive_timeout(0),
543         m_indentation(0)
544 {
545         m_socket.setTimeoutMs(5);
546
547         Start();
548 }
549
550
551 Connection::~Connection()
552 {
553         stop();
554         // Delete peers
555         for(core::map<u16, Peer*>::Iterator
556                         j = m_peers.getIterator();
557                         j.atEnd() == false; j++)
558         {
559                 Peer *peer = j.getNode()->getValue();
560                 delete peer;
561         }
562 }
563
564 /* Internal stuff */
565
566 void * Connection::Thread()
567 {
568         ThreadStarted();
569         log_register_thread("Connection");
570
571         dout_con<<"Connection thread started"<<std::endl;
572         
573         u32 curtime = porting::getTimeMs();
574         u32 lasttime = curtime;
575
576         while(getRun())
577         {
578                 BEGIN_DEBUG_EXCEPTION_HANDLER
579                 
580                 lasttime = curtime;
581                 curtime = porting::getTimeMs();
582                 float dtime = (float)(curtime - lasttime) / 1000.;
583                 if(dtime > 0.1)
584                         dtime = 0.1;
585                 if(dtime < 0.0)
586                         dtime = 0.0;
587                 
588                 runTimeouts(dtime);
589
590                 while(m_command_queue.size() != 0){
591                         ConnectionCommand c = m_command_queue.pop_front();
592                         processCommand(c);
593                 }
594
595                 send(dtime);
596
597                 receive();
598                 
599                 END_DEBUG_EXCEPTION_HANDLER(derr_con);
600         }
601
602         return NULL;
603 }
604
605 void Connection::putEvent(ConnectionEvent &e)
606 {
607         assert(e.type != CONNEVENT_NONE);
608         m_event_queue.push_back(e);
609 }
610
611 void Connection::processCommand(ConnectionCommand &c)
612 {
613         switch(c.type){
614         case CONNCMD_NONE:
615                 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
616                 return;
617         case CONNCMD_SERVE:
618                 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
619                                 <<c.port<<std::endl;
620                 serve(c.port);
621                 return;
622         case CONNCMD_CONNECT:
623                 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
624                 connect(c.address);
625                 return;
626         case CONNCMD_DISCONNECT:
627                 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
628                 disconnect();
629                 return;
630         case CONNCMD_SEND:
631                 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
632                 send(c.peer_id, c.channelnum, c.data, c.reliable);
633                 return;
634         case CONNCMD_SEND_TO_ALL:
635                 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
636                 sendToAll(c.channelnum, c.data, c.reliable);
637                 return;
638         case CONNCMD_DELETE_PEER:
639                 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
640                 deletePeer(c.peer_id, false);
641                 return;
642         }
643 }
644
645 void Connection::send(float dtime)
646 {
647         for(core::map<u16, Peer*>::Iterator
648                         j = m_peers.getIterator();
649                         j.atEnd() == false; j++)
650         {
651                 Peer *peer = j.getNode()->getValue();
652                 peer->m_sendtime_accu += dtime;
653                 peer->m_num_sent = 0;
654                 peer->m_max_num_sent = peer->m_sendtime_accu *
655                                 peer->m_max_packets_per_second;
656         }
657         Queue<OutgoingPacket> postponed_packets;
658         while(m_outgoing_queue.size() != 0){
659                 OutgoingPacket packet = m_outgoing_queue.pop_front();
660                 Peer *peer = getPeerNoEx(packet.peer_id);
661                 if(!peer)
662                         continue;
663                 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
664                         postponed_packets.push_back(packet);
665                 } else if(peer->m_num_sent < peer->m_max_num_sent){
666                         rawSendAsPacket(packet.peer_id, packet.channelnum,
667                                         packet.data, packet.reliable);
668                         peer->m_num_sent++;
669                 } else {
670                         postponed_packets.push_back(packet);
671                 }
672         }
673         while(postponed_packets.size() != 0){
674                 m_outgoing_queue.push_back(postponed_packets.pop_front());
675         }
676         for(core::map<u16, Peer*>::Iterator
677                         j = m_peers.getIterator();
678                         j.atEnd() == false; j++)
679         {
680                 Peer *peer = j.getNode()->getValue();
681                 peer->m_sendtime_accu -= (float)peer->m_num_sent /
682                                 peer->m_max_packets_per_second;
683                 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
684                         peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
685         }
686 }
687
688 // Receive packets from the network and buffers and create ConnectionEvents
689 void Connection::receive()
690 {
691         u32 datasize = m_max_packet_size * 2;  // Double it just to be safe
692         // TODO: We can not know how many layers of header there are.
693         // For now, just assume there are no other than the base headers.
694         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
695         SharedBuffer<u8> packetdata(packet_maxsize);
696
697         bool single_wait_done = false;
698         
699         for(;;)
700         {
701         try{
702                 /* Check if some buffer has relevant data */
703                 {
704                         u16 peer_id;
705                         SharedBuffer<u8> resultdata;
706                         bool got = getFromBuffers(peer_id, resultdata);
707                         if(got){
708                                 ConnectionEvent e;
709                                 e.dataReceived(peer_id, resultdata);
710                                 putEvent(e);
711                                 continue;
712                         }
713                 }
714                 
715                 if(single_wait_done){
716                         if(m_socket.WaitData(0) == false)
717                                 break;
718                 }
719                 
720                 single_wait_done = true;
721
722                 Address sender;
723                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
724
725                 if(received_size < 0)
726                         break;
727                 if(received_size < BASE_HEADER_SIZE)
728                         continue;
729                 if(readU32(&packetdata[0]) != m_protocol_id)
730                         continue;
731                 
732                 u16 peer_id = readPeerId(*packetdata);
733                 u8 channelnum = readChannel(*packetdata);
734                 if(channelnum > CHANNEL_COUNT-1){
735                         PrintInfo(derr_con);
736                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
737                         throw InvalidIncomingDataException("Channel doesn't exist");
738                 }
739
740                 if(peer_id == PEER_ID_INEXISTENT)
741                 {
742                         /*
743                                 Somebody is trying to send stuff to us with no peer id.
744                                 
745                                 Check if the same address and port was added to our peer
746                                 list before.
747                                 Allow only entries that have has_sent_with_id==false.
748                         */
749
750                         core::map<u16, Peer*>::Iterator j;
751                         j = m_peers.getIterator();
752                         for(; j.atEnd() == false; j++)
753                         {
754                                 Peer *peer = j.getNode()->getValue();
755                                 if(peer->has_sent_with_id)
756                                         continue;
757                                 if(peer->address == sender)
758                                         break;
759                         }
760                         
761                         /*
762                                 If no peer was found with the same address and port,
763                                 we shall assume it is a new peer and create an entry.
764                         */
765                         if(j.atEnd())
766                         {
767                                 // Pass on to adding the peer
768                         }
769                         // Else: A peer was found.
770                         else
771                         {
772                                 Peer *peer = j.getNode()->getValue();
773                                 peer_id = peer->id;
774                                 PrintInfo(derr_con);
775                                 derr_con<<"WARNING: Assuming unknown peer to be "
776                                                 <<"peer_id="<<peer_id<<std::endl;
777                         }
778                 }
779                 
780                 /*
781                         The peer was not found in our lists. Add it.
782                 */
783                 if(peer_id == PEER_ID_INEXISTENT)
784                 {
785                         // Somebody wants to make a new connection
786
787                         // Get a unique peer id (2 or higher)
788                         u16 peer_id_new = 2;
789                         /*
790                                 Find an unused peer id
791                         */
792                         bool out_of_ids = false;
793                         for(;;)
794                         {
795                                 // Check if exists
796                                 if(m_peers.find(peer_id_new) == NULL)
797                                         break;
798                                 // Check for overflow
799                                 if(peer_id_new == 65535){
800                                         out_of_ids = true;
801                                         break;
802                                 }
803                                 peer_id_new++;
804                         }
805                         if(out_of_ids){
806                                 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
807                                 continue;
808                         }
809
810                         PrintInfo();
811                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
812                                         " giving peer_id="<<peer_id_new<<std::endl;
813
814                         // Create a peer
815                         Peer *peer = new Peer(peer_id_new, sender);
816                         m_peers.insert(peer->id, peer);
817                         
818                         // Create peer addition event
819                         ConnectionEvent e;
820                         e.peerAdded(peer_id_new, sender);
821                         putEvent(e);
822                         
823                         // Create CONTROL packet to tell the peer id to the new peer.
824                         SharedBuffer<u8> reply(4);
825                         writeU8(&reply[0], TYPE_CONTROL);
826                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
827                         writeU16(&reply[2], peer_id_new);
828                         sendAsPacket(peer_id_new, 0, reply, true);
829                         
830                         // We're now talking to a valid peer_id
831                         peer_id = peer_id_new;
832
833                         // Go on and process whatever it sent
834                 }
835
836                 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
837
838                 if(node == NULL)
839                 {
840                         // Peer not found
841                         // This means that the peer id of the sender is not PEER_ID_INEXISTENT
842                         // and it is invalid.
843                         PrintInfo(derr_con);
844                         derr_con<<"Receive(): Peer not found"<<std::endl;
845                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
846                 }
847
848                 Peer *peer = node->getValue();
849
850                 // Validate peer address
851                 if(peer->address != sender)
852                 {
853                         PrintInfo(derr_con);
854                         derr_con<<"Peer "<<peer_id<<" sending from different address."
855                                         " Ignoring."<<std::endl;
856                         continue;
857                 }
858                 
859                 peer->timeout_counter = 0.0;
860
861                 Channel *channel = &(peer->channels[channelnum]);
862                 
863                 // Throw the received packet to channel->processPacket()
864
865                 // Make a new SharedBuffer from the data without the base headers
866                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
867                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
868                                 strippeddata.getSize());
869                 
870                 try{
871                         // Process it (the result is some data with no headers made by us)
872                         SharedBuffer<u8> resultdata = processPacket
873                                         (channel, strippeddata, peer_id, channelnum, false);
874                         
875                         PrintInfo();
876                         dout_con<<"ProcessPacket returned data of size "
877                                         <<resultdata.getSize()<<std::endl;
878                         
879                         ConnectionEvent e;
880                         e.dataReceived(peer_id, resultdata);
881                         putEvent(e);
882                         continue;
883                 }catch(ProcessedSilentlyException &e){
884                 }
885         }catch(InvalidIncomingDataException &e){
886         }
887         catch(ProcessedSilentlyException &e){
888         }
889         } // for
890 }
891
892 void Connection::runTimeouts(float dtime)
893 {
894         core::list<u16> timeouted_peers;
895         core::map<u16, Peer*>::Iterator j;
896         j = m_peers.getIterator();
897         for(; j.atEnd() == false; j++)
898         {
899                 Peer *peer = j.getNode()->getValue();
900                 
901                 /*
902                         Check peer timeout
903                 */
904                 peer->timeout_counter += dtime;
905                 if(peer->timeout_counter > m_timeout)
906                 {
907                         PrintInfo(derr_con);
908                         derr_con<<"RunTimeouts(): Peer "<<peer->id
909                                         <<" has timed out."
910                                         <<" (source=peer->timeout_counter)"
911                                         <<std::endl;
912                         // Add peer to the list
913                         timeouted_peers.push_back(peer->id);
914                         // Don't bother going through the buffers of this one
915                         continue;
916                 }
917
918                 float resend_timeout = peer->resend_timeout;
919                 for(u16 i=0; i<CHANNEL_COUNT; i++)
920                 {
921                         core::list<BufferedPacket> timed_outs;
922                         core::list<BufferedPacket>::Iterator j;
923                         
924                         Channel *channel = &peer->channels[i];
925
926                         // Remove timed out incomplete unreliable split packets
927                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
928                         
929                         // Increment reliable packet times
930                         channel->outgoing_reliables.incrementTimeouts(dtime);
931
932                         // Check reliable packet total times, remove peer if
933                         // over timeout.
934                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
935                         {
936                                 PrintInfo(derr_con);
937                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
938                                                 <<" has timed out."
939                                                 <<" (source=reliable packet totaltime)"
940                                                 <<std::endl;
941                                 // Add peer to the to-be-removed list
942                                 timeouted_peers.push_back(peer->id);
943                                 goto nextpeer;
944                         }
945
946                         // Re-send timed out outgoing reliables
947                         
948                         timed_outs = channel->
949                                         outgoing_reliables.getTimedOuts(resend_timeout);
950
951                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
952
953                         j = timed_outs.begin();
954                         for(; j != timed_outs.end(); j++)
955                         {
956                                 u16 peer_id = readPeerId(*(j->data));
957                                 u8 channel = readChannel(*(j->data));
958                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
959
960                                 PrintInfo(derr_con);
961                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
962                                 j->address.print(&derr_con);
963                                 derr_con<<"(t/o="<<resend_timeout<<"): "
964                                                 <<"from_peer_id="<<peer_id
965                                                 <<", channel="<<((int)channel&0xff)
966                                                 <<", seqnum="<<seqnum
967                                                 <<std::endl;
968
969                                 rawSend(*j);
970
971                                 // Enlarge avg_rtt and resend_timeout:
972                                 // The rtt will be at least the timeout.
973                                 // NOTE: This won't affect the timeout of the next
974                                 // checked channel because it was cached.
975                                 peer->reportRTT(resend_timeout);
976                         }
977                 }
978                 
979                 /*
980                         Send pings
981                 */
982                 peer->ping_timer += dtime;
983                 if(peer->ping_timer >= 5.0)
984                 {
985                         // Create and send PING packet
986                         SharedBuffer<u8> data(2);
987                         writeU8(&data[0], TYPE_CONTROL);
988                         writeU8(&data[1], CONTROLTYPE_PING);
989                         rawSendAsPacket(peer->id, 0, data, true);
990
991                         peer->ping_timer = 0.0;
992                 }
993                 
994 nextpeer:
995                 continue;
996         }
997
998         // Remove timed out peers
999         core::list<u16>::Iterator i = timeouted_peers.begin();
1000         for(; i != timeouted_peers.end(); i++)
1001         {
1002                 PrintInfo(derr_con);
1003                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1004                 deletePeer(*i, true);
1005         }
1006 }
1007
1008 void Connection::serve(u16 port)
1009 {
1010         dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
1011         try{
1012                 m_socket.Bind(port);
1013                 m_peer_id = PEER_ID_SERVER;
1014         }
1015         catch(SocketException &e){
1016                 // Create event
1017                 ConnectionEvent ce;
1018                 ce.bindFailed();
1019                 putEvent(ce);
1020         }
1021 }
1022
1023 void Connection::connect(Address address)
1024 {
1025         dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1026                         <<":"<<address.getPort()<<std::endl;
1027
1028         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1029         if(node != NULL){
1030                 throw ConnectionException("Already connected to a server");
1031         }
1032
1033         Peer *peer = new Peer(PEER_ID_SERVER, address);
1034         m_peers.insert(peer->id, peer);
1035
1036         // Create event
1037         ConnectionEvent e;
1038         e.peerAdded(peer->id, peer->address);
1039         putEvent(e);
1040         
1041         m_socket.Bind(0);
1042         
1043         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1044         m_peer_id = PEER_ID_INEXISTENT;
1045         SharedBuffer<u8> data(0);
1046         Send(PEER_ID_SERVER, 0, data, true);
1047 }
1048
1049 void Connection::disconnect()
1050 {
1051         dout_con<<getDesc()<<" disconnecting"<<std::endl;
1052
1053         // Create and send DISCO packet
1054         SharedBuffer<u8> data(2);
1055         writeU8(&data[0], TYPE_CONTROL);
1056         writeU8(&data[1], CONTROLTYPE_DISCO);
1057         
1058         // Send to all
1059         core::map<u16, Peer*>::Iterator j;
1060         j = m_peers.getIterator();
1061         for(; j.atEnd() == false; j++)
1062         {
1063                 Peer *peer = j.getNode()->getValue();
1064                 rawSendAsPacket(peer->id, 0, data, false);
1065         }
1066 }
1067
1068 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1069 {
1070         core::map<u16, Peer*>::Iterator j;
1071         j = m_peers.getIterator();
1072         for(; j.atEnd() == false; j++)
1073         {
1074                 Peer *peer = j.getNode()->getValue();
1075                 send(peer->id, channelnum, data, reliable);
1076         }
1077 }
1078
1079 void Connection::send(u16 peer_id, u8 channelnum,
1080                 SharedBuffer<u8> data, bool reliable)
1081 {
1082         dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1083
1084         assert(channelnum < CHANNEL_COUNT);
1085         
1086         Peer *peer = getPeerNoEx(peer_id);
1087         if(peer == NULL)
1088                 return;
1089         Channel *channel = &(peer->channels[channelnum]);
1090
1091         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1092         if(reliable)
1093                 chunksize_max -= RELIABLE_HEADER_SIZE;
1094
1095         core::list<SharedBuffer<u8> > originals;
1096         originals = makeAutoSplitPacket(data, chunksize_max,
1097                         channel->next_outgoing_split_seqnum);
1098         
1099         core::list<SharedBuffer<u8> >::Iterator i;
1100         i = originals.begin();
1101         for(; i != originals.end(); i++)
1102         {
1103                 SharedBuffer<u8> original = *i;
1104                 
1105                 sendAsPacket(peer_id, channelnum, original, reliable);
1106         }
1107 }
1108
1109 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1110                 SharedBuffer<u8> data, bool reliable)
1111 {
1112         OutgoingPacket packet(peer_id, channelnum, data, reliable);
1113         m_outgoing_queue.push_back(packet);
1114 }
1115
1116 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1117                 SharedBuffer<u8> data, bool reliable)
1118 {
1119         Peer *peer = getPeerNoEx(peer_id);
1120         if(!peer)
1121                 return;
1122         Channel *channel = &(peer->channels[channelnum]);
1123
1124         if(reliable)
1125         {
1126                 u16 seqnum = channel->next_outgoing_seqnum;
1127                 channel->next_outgoing_seqnum++;
1128
1129                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1130
1131                 // Add base headers and make a packet
1132                 BufferedPacket p = makePacket(peer->address, reliable,
1133                                 m_protocol_id, m_peer_id, channelnum);
1134                 
1135                 try{
1136                         // Buffer the packet
1137                         channel->outgoing_reliables.insert(p);
1138                 }
1139                 catch(AlreadyExistsException &e)
1140                 {
1141                         PrintInfo(derr_con);
1142                         derr_con<<"WARNING: Going to send a reliable packet "
1143                                         "seqnum="<<seqnum<<" that is already "
1144                                         "in outgoing buffer"<<std::endl;
1145                         //assert(0);
1146                 }
1147                 
1148                 // Send the packet
1149                 rawSend(p);
1150         }
1151         else
1152         {
1153                 // Add base headers and make a packet
1154                 BufferedPacket p = makePacket(peer->address, data,
1155                                 m_protocol_id, m_peer_id, channelnum);
1156
1157                 // Send the packet
1158                 rawSend(p);
1159         }
1160 }
1161
1162 void Connection::rawSend(const BufferedPacket &packet)
1163 {
1164         try{
1165                 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1166         } catch(SendFailedException &e){
1167                 derr_con<<"Connection::rawSend(): SendFailedException: "
1168                                 <<packet.address.serializeString()<<std::endl;
1169         }
1170 }
1171
1172 Peer* Connection::getPeer(u16 peer_id)
1173 {
1174         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1175
1176         if(node == NULL){
1177                 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1178         }
1179
1180         // Error checking
1181         assert(node->getValue()->id == peer_id);
1182
1183         return node->getValue();
1184 }
1185
1186 Peer* Connection::getPeerNoEx(u16 peer_id)
1187 {
1188         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1189
1190         if(node == NULL){
1191                 return NULL;
1192         }
1193
1194         // Error checking
1195         assert(node->getValue()->id == peer_id);
1196
1197         return node->getValue();
1198 }
1199
1200 core::list<Peer*> Connection::getPeers()
1201 {
1202         core::list<Peer*> list;
1203         core::map<u16, Peer*>::Iterator j;
1204         j = m_peers.getIterator();
1205         for(; j.atEnd() == false; j++)
1206         {
1207                 Peer *peer = j.getNode()->getValue();
1208                 list.push_back(peer);
1209         }
1210         return list;
1211 }
1212
1213 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1214 {
1215         core::map<u16, Peer*>::Iterator j;
1216         j = m_peers.getIterator();
1217         for(; j.atEnd() == false; j++)
1218         {
1219                 Peer *peer = j.getNode()->getValue();
1220                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1221                 {
1222                         Channel *channel = &peer->channels[i];
1223                         SharedBuffer<u8> resultdata;
1224                         bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1225                         if(got){
1226                                 dst = resultdata;
1227                                 return true;
1228                         }
1229                 }
1230         }
1231         return false;
1232 }
1233
1234 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1235                 SharedBuffer<u8> &dst)
1236 {
1237         u16 firstseqnum = 0;
1238         // Clear old packets from start of buffer
1239         try{
1240         for(;;){
1241                 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1242                 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1243                         channel->incoming_reliables.popFirst();
1244                 else
1245                         break;
1246         }
1247         // This happens if all packets are old
1248         }catch(con::NotFoundException)
1249         {}
1250         
1251         if(channel->incoming_reliables.empty() == false)
1252         {
1253                 if(firstseqnum == channel->next_incoming_seqnum)
1254                 {
1255                         BufferedPacket p = channel->incoming_reliables.popFirst();
1256                         
1257                         peer_id = readPeerId(*p.data);
1258                         u8 channelnum = readChannel(*p.data);
1259                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1260
1261                         PrintInfo();
1262                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
1263                                         <<" seqnum="<<seqnum
1264                                         <<" peer_id="<<peer_id
1265                                         <<" channel="<<((int)channelnum&0xff)
1266                                         <<std::endl;
1267
1268                         channel->next_incoming_seqnum++;
1269                         
1270                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1271                         // Get out the inside packet and re-process it
1272                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1273                         memcpy(*payload, &p.data[headers_size], payload.getSize());
1274
1275                         dst = processPacket(channel, payload, peer_id, channelnum, true);
1276                         return true;
1277                 }
1278         }
1279         return false;
1280 }
1281
1282 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1283                 SharedBuffer<u8> packetdata, u16 peer_id,
1284                 u8 channelnum, bool reliable)
1285 {
1286         IndentationRaiser iraiser(&(m_indentation));
1287
1288         if(packetdata.getSize() < 1)
1289                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1290
1291         u8 type = readU8(&packetdata[0]);
1292         
1293         if(type == TYPE_CONTROL)
1294         {
1295                 if(packetdata.getSize() < 2)
1296                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
1297
1298                 u8 controltype = readU8(&packetdata[1]);
1299
1300                 if(controltype == CONTROLTYPE_ACK)
1301                 {
1302                         if(packetdata.getSize() < 4)
1303                                 throw InvalidIncomingDataException
1304                                                 ("packetdata.getSize() < 4 (ACK header size)");
1305
1306                         u16 seqnum = readU16(&packetdata[2]);
1307                         PrintInfo();
1308                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1309                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1310                                         <<", seqnum="<<seqnum<<std::endl;
1311
1312                         try{
1313                                 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1314                                 // Get round trip time
1315                                 float rtt = p.totaltime;
1316
1317                                 // Let peer calculate stuff according to it
1318                                 // (avg_rtt and resend_timeout)
1319                                 Peer *peer = getPeer(peer_id);
1320                                 peer->reportRTT(rtt);
1321
1322                                 //PrintInfo(dout_con);
1323                                 //dout_con<<"RTT = "<<rtt<<std::endl;
1324
1325                                 /*dout_con<<"OUTGOING: ";
1326                                 PrintInfo();
1327                                 channel->outgoing_reliables.print();
1328                                 dout_con<<std::endl;*/
1329                         }
1330                         catch(NotFoundException &e){
1331                                 PrintInfo(derr_con);
1332                                 derr_con<<"WARNING: ACKed packet not "
1333                                                 "in outgoing queue"
1334                                                 <<std::endl;
1335                         }
1336
1337                         throw ProcessedSilentlyException("Got an ACK");
1338                 }
1339                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1340                 {
1341                         if(packetdata.getSize() < 4)
1342                                 throw InvalidIncomingDataException
1343                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1344                         u16 peer_id_new = readU16(&packetdata[2]);
1345                         PrintInfo();
1346                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1347
1348                         if(GetPeerID() != PEER_ID_INEXISTENT)
1349                         {
1350                                 PrintInfo(derr_con);
1351                                 derr_con<<"WARNING: Not changing"
1352                                                 " existing peer id."<<std::endl;
1353                         }
1354                         else
1355                         {
1356                                 dout_con<<"changing."<<std::endl;
1357                                 SetPeerID(peer_id_new);
1358                         }
1359                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
1360                 }
1361                 else if(controltype == CONTROLTYPE_PING)
1362                 {
1363                         // Just ignore it, the incoming data already reset
1364                         // the timeout counter
1365                         PrintInfo();
1366                         dout_con<<"PING"<<std::endl;
1367                         throw ProcessedSilentlyException("Got a PING");
1368                 }
1369                 else if(controltype == CONTROLTYPE_DISCO)
1370                 {
1371                         // Just ignore it, the incoming data already reset
1372                         // the timeout counter
1373                         PrintInfo();
1374                         dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1375                         
1376                         if(deletePeer(peer_id, false) == false)
1377                         {
1378                                 PrintInfo(derr_con);
1379                                 derr_con<<"DISCO: Peer not found"<<std::endl;
1380                         }
1381
1382                         throw ProcessedSilentlyException("Got a DISCO");
1383                 }
1384                 else{
1385                         PrintInfo(derr_con);
1386                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1387                                         <<((int)controltype&0xff)<<std::endl;
1388                         throw InvalidIncomingDataException("Invalid control type");
1389                 }
1390         }
1391         else if(type == TYPE_ORIGINAL)
1392         {
1393                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1394                         throw InvalidIncomingDataException
1395                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1396                 PrintInfo();
1397                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1398                                 <<std::endl;
1399                 // Get the inside packet out and return it
1400                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1401                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1402                 return payload;
1403         }
1404         else if(type == TYPE_SPLIT)
1405         {
1406                 // We have to create a packet again for buffering
1407                 // This isn't actually too bad an idea.
1408                 BufferedPacket packet = makePacket(
1409                                 getPeer(peer_id)->address,
1410                                 packetdata,
1411                                 GetProtocolID(),
1412                                 peer_id,
1413                                 channelnum);
1414                 // Buffer the packet
1415                 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1416                 if(data.getSize() != 0)
1417                 {
1418                         PrintInfo();
1419                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1420                                         <<"size="<<data.getSize()<<std::endl;
1421                         return data;
1422                 }
1423                 PrintInfo();
1424                 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1425                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1426         }
1427         else if(type == TYPE_RELIABLE)
1428         {
1429                 // Recursive reliable packets not allowed
1430                 assert(reliable == false);
1431
1432                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1433                         throw InvalidIncomingDataException
1434                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1435
1436                 u16 seqnum = readU16(&packetdata[1]);
1437
1438                 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1439                 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1440                 
1441                 PrintInfo();
1442                 if(is_future_packet)
1443                         dout_con<<"BUFFERING";
1444                 else if(is_old_packet)
1445                         dout_con<<"OLD";
1446                 else
1447                         dout_con<<"RECUR";
1448                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1449                                 <<" next="<<channel->next_incoming_seqnum;
1450                 dout_con<<" [sending CONTROLTYPE_ACK"
1451                                 " to peer_id="<<peer_id<<"]";
1452                 dout_con<<std::endl;
1453                 
1454                 //DEBUG
1455                 //assert(channel->incoming_reliables.size() < 100);
1456
1457                 // Send a CONTROLTYPE_ACK
1458                 SharedBuffer<u8> reply(4);
1459                 writeU8(&reply[0], TYPE_CONTROL);
1460                 writeU8(&reply[1], CONTROLTYPE_ACK);
1461                 writeU16(&reply[2], seqnum);
1462                 rawSendAsPacket(peer_id, channelnum, reply, false);
1463
1464                 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1465                 if(is_future_packet)
1466                 {
1467                         /*PrintInfo();
1468                         dout_con<<"Buffering reliable packet (seqnum="
1469                                         <<seqnum<<")"<<std::endl;*/
1470                         
1471                         // This one comes later, buffer it.
1472                         // Actually we have to make a packet to buffer one.
1473                         // Well, we have all the ingredients, so just do it.
1474                         BufferedPacket packet = makePacket(
1475                                         getPeer(peer_id)->address,
1476                                         packetdata,
1477                                         GetProtocolID(),
1478                                         peer_id,
1479                                         channelnum);
1480                         try{
1481                                 channel->incoming_reliables.insert(packet);
1482                                 
1483                                 /*PrintInfo();
1484                                 dout_con<<"INCOMING: ";
1485                                 channel->incoming_reliables.print();
1486                                 dout_con<<std::endl;*/
1487                         }
1488                         catch(AlreadyExistsException &e)
1489                         {
1490                         }
1491
1492                         throw ProcessedSilentlyException("Buffered future reliable packet");
1493                 }
1494                 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1495                 else if(is_old_packet)
1496                 {
1497                         // An old packet, dump it
1498                         throw InvalidIncomingDataException("Got an old reliable packet");
1499                 }
1500
1501                 channel->next_incoming_seqnum++;
1502
1503                 // Get out the inside packet and re-process it
1504                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1505                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1506
1507                 return processPacket(channel, payload, peer_id, channelnum, true);
1508         }
1509         else
1510         {
1511                 PrintInfo(derr_con);
1512                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1513                 throw InvalidIncomingDataException("Invalid packet type");
1514         }
1515         
1516         // We should never get here.
1517         // If you get here, add an exception or a return to some of the
1518         // above conditionals.
1519         assert(0);
1520         throw BaseException("Error in Channel::ProcessPacket()");
1521 }
1522
1523 bool Connection::deletePeer(u16 peer_id, bool timeout)
1524 {
1525         if(m_peers.find(peer_id) == NULL)
1526                 return false;
1527         
1528         Peer *peer = m_peers[peer_id];
1529
1530         // Create event
1531         ConnectionEvent e;
1532         e.peerRemoved(peer_id, timeout, peer->address);
1533         putEvent(e);
1534
1535         delete m_peers[peer_id];
1536         m_peers.remove(peer_id);
1537         return true;
1538 }
1539
1540 /* Interface */
1541
1542 ConnectionEvent Connection::getEvent()
1543 {
1544         if(m_event_queue.size() == 0){
1545                 ConnectionEvent e;
1546                 e.type = CONNEVENT_NONE;
1547                 return e;
1548         }
1549         return m_event_queue.pop_front();
1550 }
1551
1552 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1553 {
1554         try{
1555                 return m_event_queue.pop_front(timeout_ms);
1556         } catch(ItemNotFoundException &ex){
1557                 ConnectionEvent e;
1558                 e.type = CONNEVENT_NONE;
1559                 return e;
1560         }
1561 }
1562
1563 void Connection::putCommand(ConnectionCommand &c)
1564 {
1565         m_command_queue.push_back(c);
1566 }
1567
1568 void Connection::Serve(unsigned short port)
1569 {
1570         ConnectionCommand c;
1571         c.serve(port);
1572         putCommand(c);
1573 }
1574
1575 void Connection::Connect(Address address)
1576 {
1577         ConnectionCommand c;
1578         c.connect(address);
1579         putCommand(c);
1580 }
1581
1582 bool Connection::Connected()
1583 {
1584         JMutexAutoLock peerlock(m_peers_mutex);
1585
1586         if(m_peers.size() != 1)
1587                 return false;
1588                 
1589         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1590         if(node == NULL)
1591                 return false;
1592         
1593         if(m_peer_id == PEER_ID_INEXISTENT)
1594                 return false;
1595         
1596         return true;
1597 }
1598
1599 void Connection::Disconnect()
1600 {
1601         ConnectionCommand c;
1602         c.disconnect();
1603         putCommand(c);
1604 }
1605
1606 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1607 {
1608         for(;;){
1609                 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1610                 if(e.type != CONNEVENT_NONE)
1611                         dout_con<<getDesc()<<": Receive: got event: "
1612                                         <<e.describe()<<std::endl;
1613                 switch(e.type){
1614                 case CONNEVENT_NONE:
1615                         throw NoIncomingDataException("No incoming data");
1616                 case CONNEVENT_DATA_RECEIVED:
1617                         peer_id = e.peer_id;
1618                         data = SharedBuffer<u8>(e.data);
1619                         return e.data.getSize();
1620                 case CONNEVENT_PEER_ADDED: {
1621                         Peer tmp(e.peer_id, e.address);
1622                         if(m_bc_peerhandler)
1623                                 m_bc_peerhandler->peerAdded(&tmp);
1624                         continue; }
1625                 case CONNEVENT_PEER_REMOVED: {
1626                         Peer tmp(e.peer_id, e.address);
1627                         if(m_bc_peerhandler)
1628                                 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1629                         continue; }
1630                 case CONNEVENT_BIND_FAILED:
1631                         throw ConnectionBindFailed("Failed to bind socket "
1632                                         "(port already in use?)");
1633                 }
1634         }
1635         throw NoIncomingDataException("No incoming data");
1636 }
1637
1638 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1639 {
1640         assert(channelnum < CHANNEL_COUNT);
1641
1642         ConnectionCommand c;
1643         c.sendToAll(channelnum, data, reliable);
1644         putCommand(c);
1645 }
1646
1647 void Connection::Send(u16 peer_id, u8 channelnum,
1648                 SharedBuffer<u8> data, bool reliable)
1649 {
1650         assert(channelnum < CHANNEL_COUNT);
1651
1652         ConnectionCommand c;
1653         c.send(peer_id, channelnum, data, reliable);
1654         putCommand(c);
1655 }
1656
1657 void Connection::RunTimeouts(float dtime)
1658 {
1659         // No-op
1660 }
1661
1662 Address Connection::GetPeerAddress(u16 peer_id)
1663 {
1664         JMutexAutoLock peerlock(m_peers_mutex);
1665         return getPeer(peer_id)->address;
1666 }
1667
1668 float Connection::GetPeerAvgRTT(u16 peer_id)
1669 {
1670         JMutexAutoLock peerlock(m_peers_mutex);
1671         return getPeer(peer_id)->avg_rtt;
1672 }
1673
1674 void Connection::DeletePeer(u16 peer_id)
1675 {
1676         ConnectionCommand c;
1677         c.deletePeer(peer_id);
1678         putCommand(c);
1679 }
1680
1681 void Connection::PrintInfo(std::ostream &out)
1682 {
1683         out<<getDesc()<<": ";
1684 }
1685
1686 void Connection::PrintInfo()
1687 {
1688         PrintInfo(dout_con);
1689 }
1690
1691 std::string Connection::getDesc()
1692 {
1693         return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";
1694 }
1695
1696 } // namespace
1697