]> git.lizzy.rs Git - minetest-m13.git/blob - src/connection.cpp
Update code style to C++11
[minetest-m13.git] / src / connection.cpp
1 /*
2 Minetest-c55
3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "connection.h"
21 #include "main.h"
22 #include "serialization.h"
23 #include "log.h"
24 #include "porting.h"
25
26 namespace con
27 {
28
29 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
30                 u32 protocol_id, u16 sender_peer_id, u8 channel)
31 {
32         u32 packet_size = datasize + BASE_HEADER_SIZE;
33         BufferedPacket p(packet_size);
34         p.address = address;
35
36         writeU32(&p.data[0], protocol_id);
37         writeU16(&p.data[4], sender_peer_id);
38         writeU8(&p.data[6], channel);
39
40         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
41
42         return p;
43 }
44
45 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
46                 u32 protocol_id, u16 sender_peer_id, u8 channel)
47 {
48         return makePacket(address, *data, data.getSize(),
49                         protocol_id, sender_peer_id, channel);
50 }
51
52 SharedBuffer<u8> makeOriginalPacket(
53                 SharedBuffer<u8> data)
54 {
55         u32 header_size = 1;
56         u32 packet_size = data.getSize() + header_size;
57         SharedBuffer<u8> b(packet_size);
58
59         writeU8(&b[0], TYPE_ORIGINAL);
60
61         memcpy(&b[header_size], *data, data.getSize());
62
63         return b;
64 }
65
66 core::list<SharedBuffer<u8> > makeSplitPacket(
67                 SharedBuffer<u8> data,
68                 u32 chunksize_max,
69                 u16 seqnum)
70 {
71         // Chunk packets, containing the TYPE_SPLIT header
72         core::list<SharedBuffer<u8> > chunks;
73
74         u32 chunk_header_size = 7;
75         u32 maximum_data_size = chunksize_max - chunk_header_size;
76         u32 start = 0;
77         u32 end = 0;
78         u32 chunk_num = 0;
79         do{
80                 end = start + maximum_data_size - 1;
81                 if(end > data.getSize() - 1)
82                         end = data.getSize() - 1;
83
84                 u32 payload_size = end - start + 1;
85                 u32 packet_size = chunk_header_size + payload_size;
86
87                 SharedBuffer<u8> chunk(packet_size);
88
89                 writeU8(&chunk[0], TYPE_SPLIT);
90                 writeU16(&chunk[1], seqnum);
91                 // [3] u16 chunk_count is written at next stage
92                 writeU16(&chunk[5], chunk_num);
93                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
94
95                 chunks.push_back(chunk);
96
97                 start = end + 1;
98                 chunk_num++;
99         }
100         while(end != data.getSize() - 1);
101
102         u16 chunk_count = chunks.getSize();
103
104         core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
105         for(; i != chunks.end(); i++)
106         {
107                 // Write chunk_count
108                 writeU16(&((*i)[3]), chunk_count);
109         }
110
111         return chunks;
112 }
113
114 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
115                 SharedBuffer<u8> data,
116                 u32 chunksize_max,
117                 u16 &split_seqnum)
118 {
119         u32 original_header_size = 1;
120         core::list<SharedBuffer<u8> > list;
121         if(data.getSize() + original_header_size > chunksize_max)
122         {
123                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
124                 split_seqnum++;
125                 return list;
126         }
127         else
128         {
129                 list.push_back(makeOriginalPacket(data));
130         }
131         return list;
132 }
133
134 SharedBuffer<u8> makeReliablePacket(
135                 SharedBuffer<u8> data,
136                 u16 seqnum)
137 {
138         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
139         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
140                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
141         u32 header_size = 3;
142         u32 packet_size = data.getSize() + header_size;
143         SharedBuffer<u8> b(packet_size);
144
145         writeU8(&b[0], TYPE_RELIABLE);
146         writeU16(&b[1], seqnum);
147
148         memcpy(&b[header_size], *data, data.getSize());
149
150         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
151                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
152         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
153         return b;
154 }
155
156 /*
157         ReliablePacketBuffer
158 */
159
160 void ReliablePacketBuffer::print()
161 {
162         core::list<BufferedPacket>::Iterator i;
163         i = m_list.begin();
164         for(; i != m_list.end(); i++)
165         {
166                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
167                 dout_con<<s<<" ";
168         }
169 }
170 bool ReliablePacketBuffer::empty()
171 {
172         return m_list.empty();
173 }
174 u32 ReliablePacketBuffer::size()
175 {
176         return m_list.getSize();
177 }
178 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
179 {
180         core::list<BufferedPacket>::Iterator i;
181         i = m_list.begin();
182         for(; i != m_list.end(); i++)
183         {
184                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
186                                 <<", comparing to s="<<s<<std::endl;*/
187                 if(s == seqnum)
188                         break;
189         }
190         return i;
191 }
192 RPBSearchResult ReliablePacketBuffer::notFound()
193 {
194         return m_list.end();
195 }
196 u16 ReliablePacketBuffer::getFirstSeqnum()
197 {
198         if(empty())
199                 throw NotFoundException("Buffer is empty");
200         BufferedPacket p = *m_list.begin();
201         return readU16(&p.data[BASE_HEADER_SIZE+1]);
202 }
203 BufferedPacket ReliablePacketBuffer::popFirst()
204 {
205         if(empty())
206                 throw NotFoundException("Buffer is empty");
207         BufferedPacket p = *m_list.begin();
208         core::list<BufferedPacket>::Iterator i = m_list.begin();
209         m_list.erase(i);
210         return p;
211 }
212 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
213 {
214         RPBSearchResult r = findPacket(seqnum);
215         if(r == notFound()){
216                 dout_con<<"Not found"<<std::endl;
217                 throw NotFoundException("seqnum not found in buffer");
218         }
219         BufferedPacket p = *r;
220         m_list.erase(r);
221         return p;
222 }
223 void ReliablePacketBuffer::insert(BufferedPacket &p)
224 {
225         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
226         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
227         assert(type == TYPE_RELIABLE);
228         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
229
230         // Find the right place for the packet and insert it there
231
232         // If list is empty, just add it
233         if(m_list.empty())
234         {
235                 m_list.push_back(p);
236                 // Done.
237                 return;
238         }
239         // Otherwise find the right place
240         core::list<BufferedPacket>::Iterator i;
241         i = m_list.begin();
242         // Find the first packet in the list which has a higher seqnum
243         for(; i != m_list.end(); i++){
244                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
245                 if(s == seqnum){
246                         throw AlreadyExistsException("Same seqnum in list");
247                 }
248                 if(seqnum_higher(s, seqnum)){
249                         break;
250                 }
251         }
252         // If we're at the end of the list, add the packet to the
253         // end of the list
254         if(i == m_list.end())
255         {
256                 m_list.push_back(p);
257                 // Done.
258                 return;
259         }
260         // Insert before i
261         m_list.insert_before(i, p);
262 }
263
264 void ReliablePacketBuffer::incrementTimeouts(float dtime)
265 {
266         core::list<BufferedPacket>::Iterator i;
267         i = m_list.begin();
268         for(; i != m_list.end(); i++){
269                 i->time += dtime;
270                 i->totaltime += dtime;
271         }
272 }
273
274 void ReliablePacketBuffer::resetTimedOuts(float timeout)
275 {
276         core::list<BufferedPacket>::Iterator i;
277         i = m_list.begin();
278         for(; i != m_list.end(); i++){
279                 if(i->time >= timeout)
280                         i->time = 0.0;
281         }
282 }
283
284 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
285 {
286         core::list<BufferedPacket>::Iterator i;
287         i = m_list.begin();
288         for(; i != m_list.end(); i++){
289                 if(i->totaltime >= timeout)
290                         return true;
291         }
292         return false;
293 }
294
295 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
296 {
297         core::list<BufferedPacket> timed_outs;
298         core::list<BufferedPacket>::Iterator i;
299         i = m_list.begin();
300         for(; i != m_list.end(); i++)
301         {
302                 if(i->time >= timeout)
303                         timed_outs.push_back(*i);
304         }
305         return timed_outs;
306 }
307
308 /*
309         IncomingSplitBuffer
310 */
311
312 IncomingSplitBuffer::~IncomingSplitBuffer()
313 {
314         core::map<u16, IncomingSplitPacket*>::Iterator i;
315         i = m_buf.getIterator();
316         for(; i.atEnd() == false; i++)
317         {
318                 delete i.getNode()->getValue();
319         }
320 }
321 /*
322         This will throw a GotSplitPacketException when a full
323         split packet is constructed.
324 */
325 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
326 {
327         u32 headersize = BASE_HEADER_SIZE + 7;
328         assert(p.data.getSize() >= headersize);
329         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
330         assert(type == TYPE_SPLIT);
331         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
332         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
333         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
334
335         // Add if doesn't exist
336         if(m_buf.find(seqnum) == NULL)
337         {
338                 IncomingSplitPacket *sp = new IncomingSplitPacket();
339                 sp->chunk_count = chunk_count;
340                 sp->reliable = reliable;
341                 m_buf[seqnum] = sp;
342         }
343
344         IncomingSplitPacket *sp = m_buf[seqnum];
345
346         // TODO: These errors should be thrown or something? Dunno.
347         if(chunk_count != sp->chunk_count)
348                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
349                                 <<" != sp->chunk_count="<<sp->chunk_count
350                                 <<std::endl;
351         if(reliable != sp->reliable)
352                 derr_con<<"Connection: WARNING: reliable="<<reliable
353                                 <<" != sp->reliable="<<sp->reliable
354                                 <<std::endl;
355
356         // If chunk already exists, cancel
357         if(sp->chunks.find(chunk_num) != NULL)
358                 throw AlreadyExistsException("Chunk already in buffer");
359
360         // Cut chunk data out of packet
361         u32 chunkdatasize = p.data.getSize() - headersize;
362         SharedBuffer<u8> chunkdata(chunkdatasize);
363         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
364
365         // Set chunk data in buffer
366         sp->chunks[chunk_num] = chunkdata;
367
368         // If not all chunks are received, return empty buffer
369         if(sp->allReceived() == false)
370                 return SharedBuffer<u8>();
371
372         // Calculate total size
373         u32 totalsize = 0;
374         core::map<u16, SharedBuffer<u8> >::Iterator i;
375         i = sp->chunks.getIterator();
376         for(; i.atEnd() == false; i++)
377         {
378                 totalsize += i.getNode()->getValue().getSize();
379         }
380
381         SharedBuffer<u8> fulldata(totalsize);
382
383         // Copy chunks to data buffer
384         u32 start = 0;
385         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
386                         chunk_i++)
387         {
388                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
389                 u16 chunkdatasize = buf.getSize();
390                 memcpy(&fulldata[start], *buf, chunkdatasize);
391                 start += chunkdatasize;;
392         }
393
394         // Remove sp from buffer
395         m_buf.remove(seqnum);
396         delete sp;
397
398         return fulldata;
399 }
400 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
401 {
402         core::list<u16> remove_queue;
403         core::map<u16, IncomingSplitPacket*>::Iterator i;
404         i = m_buf.getIterator();
405         for(; i.atEnd() == false; i++)
406         {
407                 IncomingSplitPacket *p = i.getNode()->getValue();
408                 // Reliable ones are not removed by timeout
409                 if(p->reliable == true)
410                         continue;
411                 p->time += dtime;
412                 if(p->time >= timeout)
413                         remove_queue.push_back(i.getNode()->getKey());
414         }
415         core::list<u16>::Iterator j;
416         j = remove_queue.begin();
417         for(; j != remove_queue.end(); j++)
418         {
419                 dout_con<<"NOTE: Removing timed out unreliable split packet"
420                                 <<std::endl;
421                 delete m_buf[*j];
422                 m_buf.remove(*j);
423         }
424 }
425
426 /*
427         Channel
428 */
429
430 Channel::Channel()
431 {
432         next_outgoing_seqnum = SEQNUM_INITIAL;
433         next_incoming_seqnum = SEQNUM_INITIAL;
434         next_outgoing_split_seqnum = SEQNUM_INITIAL;
435 }
436 Channel::~Channel()
437 {
438 }
439
440 /*
441         Peer
442 */
443
444 Peer::Peer(u16 a_id, Address a_address):
445         address(a_address),
446         id(a_id),
447         timeout_counter(0.0),
448         ping_timer(0.0),
449         resend_timeout(0.5),
450         avg_rtt(-1.0),
451         has_sent_with_id(false),
452         m_sendtime_accu(0),
453         m_max_packets_per_second(10),
454         m_num_sent(0),
455         m_max_num_sent(0)
456 {
457 }
458 Peer::~Peer()
459 {
460 }
461
462 void Peer::reportRTT(float rtt)
463 {
464         if(rtt >= 0.0){
465                 if(rtt < 0.01){
466                         if(m_max_packets_per_second < 100)
467                                 m_max_packets_per_second += 10;
468                 } else if(rtt < 0.2){
469                         if(m_max_packets_per_second < 100)
470                                 m_max_packets_per_second += 2;
471                 } else {
472                         m_max_packets_per_second *= 0.8;
473                         if(m_max_packets_per_second < 10)
474                                 m_max_packets_per_second = 10;
475                 }
476         }
477
478         if(rtt < -0.999)
479         {}
480         else if(avg_rtt < 0.0)
481                 avg_rtt = rtt;
482         else
483                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
484
485         // Calculate resend_timeout
486
487         /*int reliable_count = 0;
488         for(int i=0; i<CHANNEL_COUNT; i++)
489         {
490                 reliable_count += channels[i].outgoing_reliables.size();
491         }
492         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
493                         * ((float)reliable_count * 1);*/
494
495         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
496         if(timeout < RESEND_TIMEOUT_MIN)
497                 timeout = RESEND_TIMEOUT_MIN;
498         if(timeout > RESEND_TIMEOUT_MAX)
499                 timeout = RESEND_TIMEOUT_MAX;
500         resend_timeout = timeout;
501 }
502
503 /*
504         Connection
505 */
506
507 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
508         m_protocol_id(protocol_id),
509         m_max_packet_size(max_packet_size),
510         m_timeout(timeout),
511         m_peer_id(0),
512         m_bc_peerhandler(NULL),
513         m_bc_receive_timeout(0),
514         m_indentation(0)
515 {
516         m_socket.setTimeoutMs(5);
517
518         Start();
519 }
520
521 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
522                 PeerHandler *peerhandler):
523         m_protocol_id(protocol_id),
524         m_max_packet_size(max_packet_size),
525         m_timeout(timeout),
526         m_peer_id(0),
527         m_bc_peerhandler(peerhandler),
528         m_bc_receive_timeout(0),
529         m_indentation(0)
530 {
531         m_socket.setTimeoutMs(5);
532
533         Start();
534 }
535
536
537 Connection::~Connection()
538 {
539         stop();
540 }
541
542 /* Internal stuff */
543
544 void * Connection::Thread()
545 {
546         ThreadStarted();
547         log_register_thread("Connection");
548
549         dout_con<<"Connection thread started"<<std::endl;
550
551         u32 curtime = porting::getTimeMs();
552         u32 lasttime = curtime;
553
554         while(getRun())
555         {
556                 BEGIN_DEBUG_EXCEPTION_HANDLER
557
558                 lasttime = curtime;
559                 curtime = porting::getTimeMs();
560                 float dtime = (float)(curtime - lasttime) / 1000.;
561                 if(dtime > 0.1)
562                         dtime = 0.1;
563                 if(dtime < 0.0)
564                         dtime = 0.0;
565
566                 runTimeouts(dtime);
567
568                 while(m_command_queue.size() != 0){
569                         ConnectionCommand c = m_command_queue.pop_front();
570                         processCommand(c);
571                 }
572
573                 send(dtime);
574
575                 receive();
576
577                 END_DEBUG_EXCEPTION_HANDLER(derr_con);
578         }
579
580         return NULL;
581 }
582
583 void Connection::putEvent(ConnectionEvent &e)
584 {
585         assert(e.type != CONNEVENT_NONE);
586         m_event_queue.push_back(e);
587 }
588
589 void Connection::processCommand(ConnectionCommand &c)
590 {
591         switch(c.type){
592         case CONNCMD_NONE:
593                 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
594                 return;
595         case CONNCMD_SERVE:
596                 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
597                                 <<c.port<<std::endl;
598                 serve(c.port);
599                 return;
600         case CONNCMD_CONNECT:
601                 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
602                 connect(c.address);
603                 return;
604         case CONNCMD_DISCONNECT:
605                 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
606                 disconnect();
607                 return;
608         case CONNCMD_SEND:
609                 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
610                 send(c.peer_id, c.channelnum, c.data, c.reliable);
611                 return;
612         case CONNCMD_SEND_TO_ALL:
613                 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
614                 sendToAll(c.channelnum, c.data, c.reliable);
615                 return;
616         case CONNCMD_DELETE_PEER:
617                 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
618                 deletePeer(c.peer_id, false);
619                 return;
620         }
621 }
622
623 void Connection::send(float dtime)
624 {
625         for(core::map<u16, Peer*>::Iterator
626                         j = m_peers.getIterator();
627                         j.atEnd() == false; j++)
628         {
629                 Peer *peer = j.getNode()->getValue();
630                 peer->m_sendtime_accu += dtime;
631                 peer->m_num_sent = 0;
632                 peer->m_max_num_sent = peer->m_sendtime_accu *
633                                 peer->m_max_packets_per_second;
634         }
635         Queue<OutgoingPacket> postponed_packets;
636         while(m_outgoing_queue.size() != 0){
637                 OutgoingPacket packet = m_outgoing_queue.pop_front();
638                 Peer *peer = getPeerNoEx(packet.peer_id);
639                 if(!peer)
640                         continue;
641                 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
642                         postponed_packets.push_back(packet);
643                 } else if(peer->m_num_sent < peer->m_max_num_sent){
644                         rawSendAsPacket(packet.peer_id, packet.channelnum,
645                                         packet.data, packet.reliable);
646                         peer->m_num_sent++;
647                 } else {
648                         postponed_packets.push_back(packet);
649                 }
650         }
651         while(postponed_packets.size() != 0){
652                 m_outgoing_queue.push_back(postponed_packets.pop_front());
653         }
654         for(core::map<u16, Peer*>::Iterator
655                         j = m_peers.getIterator();
656                         j.atEnd() == false; j++)
657         {
658                 Peer *peer = j.getNode()->getValue();
659                 peer->m_sendtime_accu -= (float)peer->m_num_sent /
660                                 peer->m_max_packets_per_second;
661                 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
662                         peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
663         }
664 }
665
666 // Receive packets from the network and buffers and create ConnectionEvents
667 void Connection::receive()
668 {
669         u32 datasize = m_max_packet_size * 2;  // Double it just to be safe
670         // TODO: We can not know how many layers of header there are.
671         // For now, just assume there are no other than the base headers.
672         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
673         SharedBuffer<u8> packetdata(packet_maxsize);
674
675         bool single_wait_done = false;
676
677         for(;;)
678         {
679         try{
680                 /* Check if some buffer has relevant data */
681                 {
682                         u16 peer_id;
683                         SharedBuffer<u8> resultdata;
684                         bool got = getFromBuffers(peer_id, resultdata);
685                         if(got){
686                                 ConnectionEvent e;
687                                 e.dataReceived(peer_id, resultdata);
688                                 putEvent(e);
689                                 continue;
690                         }
691                 }
692
693                 if(single_wait_done){
694                         if(m_socket.WaitData(0) == false)
695                                 break;
696                 }
697
698                 single_wait_done = true;
699
700                 Address sender;
701                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
702
703                 if(received_size < 0)
704                         break;
705                 if(received_size < BASE_HEADER_SIZE)
706                         continue;
707                 if(readU32(&packetdata[0]) != m_protocol_id)
708                         continue;
709
710                 u16 peer_id = readPeerId(*packetdata);
711                 u8 channelnum = readChannel(*packetdata);
712                 if(channelnum > CHANNEL_COUNT-1){
713                         PrintInfo(derr_con);
714                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
715                         throw InvalidIncomingDataException("Channel doesn't exist");
716                 }
717
718                 if(peer_id == PEER_ID_INEXISTENT)
719                 {
720                         /*
721                                 Somebody is trying to send stuff to us with no peer id.
722
723                                 Check if the same address and port was added to our peer
724                                 list before.
725                                 Allow only entries that have has_sent_with_id==false.
726                         */
727
728                         core::map<u16, Peer*>::Iterator j;
729                         j = m_peers.getIterator();
730                         for(; j.atEnd() == false; j++)
731                         {
732                                 Peer *peer = j.getNode()->getValue();
733                                 if(peer->has_sent_with_id)
734                                         continue;
735                                 if(peer->address == sender)
736                                         break;
737                         }
738
739                         /*
740                                 If no peer was found with the same address and port,
741                                 we shall assume it is a new peer and create an entry.
742                         */
743                         if(j.atEnd())
744                         {
745                                 // Pass on to adding the peer
746                         }
747                         // Else: A peer was found.
748                         else
749                         {
750                                 Peer *peer = j.getNode()->getValue();
751                                 peer_id = peer->id;
752                                 PrintInfo(derr_con);
753                                 derr_con<<"WARNING: Assuming unknown peer to be "
754                                                 <<"peer_id="<<peer_id<<std::endl;
755                         }
756                 }
757
758                 /*
759                         The peer was not found in our lists. Add it.
760                 */
761                 if(peer_id == PEER_ID_INEXISTENT)
762                 {
763                         // Somebody wants to make a new connection
764
765                         // Get a unique peer id (2 or higher)
766                         u16 peer_id_new = 2;
767                         /*
768                                 Find an unused peer id
769                         */
770                         bool out_of_ids = false;
771                         for(;;)
772                         {
773                                 // Check if exists
774                                 if(m_peers.find(peer_id_new) == NULL)
775                                         break;
776                                 // Check for overflow
777                                 if(peer_id_new == 65535){
778                                         out_of_ids = true;
779                                         break;
780                                 }
781                                 peer_id_new++;
782                         }
783                         if(out_of_ids){
784                                 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
785                                 continue;
786                         }
787
788                         PrintInfo();
789                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
790                                         " giving peer_id="<<peer_id_new<<std::endl;
791
792                         // Create a peer
793                         Peer *peer = new Peer(peer_id_new, sender);
794                         m_peers.insert(peer->id, peer);
795
796                         // Create peer addition event
797                         ConnectionEvent e;
798                         e.peerAdded(peer_id_new, sender);
799                         putEvent(e);
800
801                         // Create CONTROL packet to tell the peer id to the new peer.
802                         SharedBuffer<u8> reply(4);
803                         writeU8(&reply[0], TYPE_CONTROL);
804                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
805                         writeU16(&reply[2], peer_id_new);
806                         sendAsPacket(peer_id_new, 0, reply, true);
807
808                         // We're now talking to a valid peer_id
809                         peer_id = peer_id_new;
810
811                         // Go on and process whatever it sent
812                 }
813
814                 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
815
816                 if(node == NULL)
817                 {
818                         // Peer not found
819                         // This means that the peer id of the sender is not PEER_ID_INEXISTENT
820                         // and it is invalid.
821                         PrintInfo(derr_con);
822                         derr_con<<"Receive(): Peer not found"<<std::endl;
823                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
824                 }
825
826                 Peer *peer = node->getValue();
827
828                 // Validate peer address
829                 if(peer->address != sender)
830                 {
831                         PrintInfo(derr_con);
832                         derr_con<<"Peer "<<peer_id<<" sending from different address."
833                                         " Ignoring."<<std::endl;
834                         continue;
835                 }
836
837                 peer->timeout_counter = 0.0;
838
839                 Channel *channel = &(peer->channels[channelnum]);
840
841                 // Throw the received packet to channel->processPacket()
842
843                 // Make a new SharedBuffer from the data without the base headers
844                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
845                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
846                                 strippeddata.getSize());
847
848                 try{
849                         // Process it (the result is some data with no headers made by us)
850                         SharedBuffer<u8> resultdata = processPacket
851                                         (channel, strippeddata, peer_id, channelnum, false);
852
853                         PrintInfo();
854                         dout_con<<"ProcessPacket returned data of size "
855                                         <<resultdata.getSize()<<std::endl;
856
857                         ConnectionEvent e;
858                         e.dataReceived(peer_id, resultdata);
859                         putEvent(e);
860                         continue;
861                 }catch(ProcessedSilentlyException &e){
862                 }
863         }catch(InvalidIncomingDataException &e){
864         }
865         catch(ProcessedSilentlyException &e){
866         }
867         } // for
868 }
869
870 void Connection::runTimeouts(float dtime)
871 {
872         core::list<u16> timeouted_peers;
873         core::map<u16, Peer*>::Iterator j;
874         j = m_peers.getIterator();
875         for(; j.atEnd() == false; j++)
876         {
877                 Peer *peer = j.getNode()->getValue();
878
879                 /*
880                         Check peer timeout
881                 */
882                 peer->timeout_counter += dtime;
883                 if(peer->timeout_counter > m_timeout)
884                 {
885                         PrintInfo(derr_con);
886                         derr_con<<"RunTimeouts(): Peer "<<peer->id
887                                         <<" has timed out."
888                                         <<" (source=peer->timeout_counter)"
889                                         <<std::endl;
890                         // Add peer to the list
891                         timeouted_peers.push_back(peer->id);
892                         // Don't bother going through the buffers of this one
893                         continue;
894                 }
895
896                 float resend_timeout = peer->resend_timeout;
897                 for(u16 i=0; i<CHANNEL_COUNT; i++)
898                 {
899                         core::list<BufferedPacket> timed_outs;
900                         core::list<BufferedPacket>::Iterator j;
901
902                         Channel *channel = &peer->channels[i];
903
904                         // Remove timed out incomplete unreliable split packets
905                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
906
907                         // Increment reliable packet times
908                         channel->outgoing_reliables.incrementTimeouts(dtime);
909
910                         // Check reliable packet total times, remove peer if
911                         // over timeout.
912                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
913                         {
914                                 PrintInfo(derr_con);
915                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
916                                                 <<" has timed out."
917                                                 <<" (source=reliable packet totaltime)"
918                                                 <<std::endl;
919                                 // Add peer to the to-be-removed list
920                                 timeouted_peers.push_back(peer->id);
921                                 goto nextpeer;
922                         }
923
924                         // Re-send timed out outgoing reliables
925
926                         timed_outs = channel->
927                                         outgoing_reliables.getTimedOuts(resend_timeout);
928
929                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
930
931                         j = timed_outs.begin();
932                         for(; j != timed_outs.end(); j++)
933                         {
934                                 u16 peer_id = readPeerId(*(j->data));
935                                 u8 channel = readChannel(*(j->data));
936                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
937
938                                 PrintInfo(derr_con);
939                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
940                                 j->address.print(&derr_con);
941                                 derr_con<<"(t/o="<<resend_timeout<<"): "
942                                                 <<"from_peer_id="<<peer_id
943                                                 <<", channel="<<((int)channel&0xff)
944                                                 <<", seqnum="<<seqnum
945                                                 <<std::endl;
946
947                                 rawSend(*j);
948
949                                 // Enlarge avg_rtt and resend_timeout:
950                                 // The rtt will be at least the timeout.
951                                 // NOTE: This won't affect the timeout of the next
952                                 // checked channel because it was cached.
953                                 peer->reportRTT(resend_timeout);
954                         }
955                 }
956
957                 /*
958                         Send pings
959                 */
960                 peer->ping_timer += dtime;
961                 if(peer->ping_timer >= 5.0)
962                 {
963                         // Create and send PING packet
964                         SharedBuffer<u8> data(2);
965                         writeU8(&data[0], TYPE_CONTROL);
966                         writeU8(&data[1], CONTROLTYPE_PING);
967                         rawSendAsPacket(peer->id, 0, data, true);
968
969                         peer->ping_timer = 0.0;
970                 }
971
972 nextpeer:
973                 continue;
974         }
975
976         // Remove timed out peers
977         core::list<u16>::Iterator i = timeouted_peers.begin();
978         for(; i != timeouted_peers.end(); i++)
979         {
980                 PrintInfo(derr_con);
981                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
982                 deletePeer(*i, true);
983         }
984 }
985
986 void Connection::serve(u16 port)
987 {
988         dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
989         m_socket.Bind(port);
990         m_peer_id = PEER_ID_SERVER;
991 }
992
993 void Connection::connect(Address address)
994 {
995         dout_con<<getDesc()<<" connecting to "<<address.serializeString()
996                         <<":"<<address.getPort()<<std::endl;
997
998         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
999         if(node != NULL){
1000                 throw ConnectionException("Already connected to a server");
1001         }
1002
1003         Peer *peer = new Peer(PEER_ID_SERVER, address);
1004         m_peers.insert(peer->id, peer);
1005
1006         // Create event
1007         ConnectionEvent e;
1008         e.peerAdded(peer->id, peer->address);
1009         putEvent(e);
1010
1011         m_socket.Bind(0);
1012
1013         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1014         m_peer_id = PEER_ID_INEXISTENT;
1015         SharedBuffer<u8> data(0);
1016         Send(PEER_ID_SERVER, 0, data, true);
1017 }
1018
1019 void Connection::disconnect()
1020 {
1021         dout_con<<getDesc()<<" disconnecting"<<std::endl;
1022
1023         // Create and send DISCO packet
1024         SharedBuffer<u8> data(2);
1025         writeU8(&data[0], TYPE_CONTROL);
1026         writeU8(&data[1], CONTROLTYPE_DISCO);
1027
1028         // Send to all
1029         core::map<u16, Peer*>::Iterator j;
1030         j = m_peers.getIterator();
1031         for(; j.atEnd() == false; j++)
1032         {
1033                 Peer *peer = j.getNode()->getValue();
1034                 rawSendAsPacket(peer->id, 0, data, false);
1035         }
1036 }
1037
1038 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1039 {
1040         core::map<u16, Peer*>::Iterator j;
1041         j = m_peers.getIterator();
1042         for(; j.atEnd() == false; j++)
1043         {
1044                 Peer *peer = j.getNode()->getValue();
1045                 send(peer->id, channelnum, data, reliable);
1046         }
1047 }
1048
1049 void Connection::send(u16 peer_id, u8 channelnum,
1050                 SharedBuffer<u8> data, bool reliable)
1051 {
1052         dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1053
1054         assert(channelnum < CHANNEL_COUNT);
1055
1056         Peer *peer = getPeerNoEx(peer_id);
1057         if(peer == NULL)
1058                 return;
1059         Channel *channel = &(peer->channels[channelnum]);
1060
1061         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1062         if(reliable)
1063                 chunksize_max -= RELIABLE_HEADER_SIZE;
1064
1065         core::list<SharedBuffer<u8> > originals;
1066         originals = makeAutoSplitPacket(data, chunksize_max,
1067                         channel->next_outgoing_split_seqnum);
1068
1069         core::list<SharedBuffer<u8> >::Iterator i;
1070         i = originals.begin();
1071         for(; i != originals.end(); i++)
1072         {
1073                 SharedBuffer<u8> original = *i;
1074
1075                 sendAsPacket(peer_id, channelnum, original, reliable);
1076         }
1077 }
1078
1079 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1080                 SharedBuffer<u8> data, bool reliable)
1081 {
1082         OutgoingPacket packet(peer_id, channelnum, data, reliable);
1083         m_outgoing_queue.push_back(packet);
1084 }
1085
1086 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1087                 SharedBuffer<u8> data, bool reliable)
1088 {
1089         Peer *peer = getPeerNoEx(peer_id);
1090         if(!peer)
1091                 return;
1092         Channel *channel = &(peer->channels[channelnum]);
1093
1094         if(reliable)
1095         {
1096                 u16 seqnum = channel->next_outgoing_seqnum;
1097                 channel->next_outgoing_seqnum++;
1098
1099                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1100
1101                 // Add base headers and make a packet
1102                 BufferedPacket p = makePacket(peer->address, reliable,
1103                                 m_protocol_id, m_peer_id, channelnum);
1104
1105                 try{
1106                         // Buffer the packet
1107                         channel->outgoing_reliables.insert(p);
1108                 }
1109                 catch(AlreadyExistsException &e)
1110                 {
1111                         PrintInfo(derr_con);
1112                         derr_con<<"WARNING: Going to send a reliable packet "
1113                                         "seqnum="<<seqnum<<" that is already "
1114                                         "in outgoing buffer"<<std::endl;
1115                         //assert(0);
1116                 }
1117
1118                 // Send the packet
1119                 rawSend(p);
1120         }
1121         else
1122         {
1123                 // Add base headers and make a packet
1124                 BufferedPacket p = makePacket(peer->address, data,
1125                                 m_protocol_id, m_peer_id, channelnum);
1126
1127                 // Send the packet
1128                 rawSend(p);
1129         }
1130 }
1131
1132 void Connection::rawSend(const BufferedPacket &packet)
1133 {
1134         try{
1135                 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1136         } catch(SendFailedException &e){
1137                 derr_con<<"Connection::rawSend(): SendFailedException: "
1138                                 <<packet.address.serializeString()<<std::endl;
1139         }
1140 }
1141
1142 Peer* Connection::getPeer(u16 peer_id)
1143 {
1144         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1145
1146         if(node == NULL){
1147                 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1148         }
1149
1150         // Error checking
1151         assert(node->getValue()->id == peer_id);
1152
1153         return node->getValue();
1154 }
1155
1156 Peer* Connection::getPeerNoEx(u16 peer_id)
1157 {
1158         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1159
1160         if(node == NULL){
1161                 return NULL;
1162         }
1163
1164         // Error checking
1165         assert(node->getValue()->id == peer_id);
1166
1167         return node->getValue();
1168 }
1169
1170 core::list<Peer*> Connection::getPeers()
1171 {
1172         core::list<Peer*> list;
1173         core::map<u16, Peer*>::Iterator j;
1174         j = m_peers.getIterator();
1175         for(; j.atEnd() == false; j++)
1176         {
1177                 Peer *peer = j.getNode()->getValue();
1178                 list.push_back(peer);
1179         }
1180         return list;
1181 }
1182
1183 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1184 {
1185         core::map<u16, Peer*>::Iterator j;
1186         j = m_peers.getIterator();
1187         for(; j.atEnd() == false; j++)
1188         {
1189                 Peer *peer = j.getNode()->getValue();
1190                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1191                 {
1192                         Channel *channel = &peer->channels[i];
1193                         SharedBuffer<u8> resultdata;
1194                         bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1195                         if(got){
1196                                 dst = resultdata;
1197                                 return true;
1198                         }
1199                 }
1200         }
1201         return false;
1202 }
1203
1204 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1205                 SharedBuffer<u8> &dst)
1206 {
1207         u16 firstseqnum = 0;
1208         // Clear old packets from start of buffer
1209         try{
1210         for(;;){
1211                 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1212                 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1213                         channel->incoming_reliables.popFirst();
1214                 else
1215                         break;
1216         }
1217         // This happens if all packets are old
1218         }catch(con::NotFoundException &)
1219         {}
1220
1221         if(channel->incoming_reliables.empty() == false)
1222         {
1223                 if(firstseqnum == channel->next_incoming_seqnum)
1224                 {
1225                         BufferedPacket p = channel->incoming_reliables.popFirst();
1226
1227                         peer_id = readPeerId(*p.data);
1228                         u8 channelnum = readChannel(*p.data);
1229                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1230
1231                         PrintInfo();
1232                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
1233                                         <<" seqnum="<<seqnum
1234                                         <<" peer_id="<<peer_id
1235                                         <<" channel="<<((int)channelnum&0xff)
1236                                         <<std::endl;
1237
1238                         channel->next_incoming_seqnum++;
1239
1240                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1241                         // Get out the inside packet and re-process it
1242                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1243                         memcpy(*payload, &p.data[headers_size], payload.getSize());
1244
1245                         dst = processPacket(channel, payload, peer_id, channelnum, true);
1246                         return true;
1247                 }
1248         }
1249         return false;
1250 }
1251
1252 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1253                 SharedBuffer<u8> packetdata, u16 peer_id,
1254                 u8 channelnum, bool reliable)
1255 {
1256         IndentationRaiser iraiser(&(m_indentation));
1257
1258         if(packetdata.getSize() < 1)
1259                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1260
1261         u8 type = readU8(&packetdata[0]);
1262
1263         if(type == TYPE_CONTROL)
1264         {
1265                 if(packetdata.getSize() < 2)
1266                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
1267
1268                 u8 controltype = readU8(&packetdata[1]);
1269
1270                 if(controltype == CONTROLTYPE_ACK)
1271                 {
1272                         if(packetdata.getSize() < 4)
1273                                 throw InvalidIncomingDataException
1274                                                 ("packetdata.getSize() < 4 (ACK header size)");
1275
1276                         u16 seqnum = readU16(&packetdata[2]);
1277                         PrintInfo();
1278                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1279                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1280                                         <<", seqnum="<<seqnum<<std::endl;
1281
1282                         try{
1283                                 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1284                                 // Get round trip time
1285                                 float rtt = p.totaltime;
1286
1287                                 // Let peer calculate stuff according to it
1288                                 // (avg_rtt and resend_timeout)
1289                                 Peer *peer = getPeer(peer_id);
1290                                 peer->reportRTT(rtt);
1291
1292                                 //PrintInfo(dout_con);
1293                                 //dout_con<<"RTT = "<<rtt<<std::endl;
1294
1295                                 /*dout_con<<"OUTGOING: ";
1296                                 PrintInfo();
1297                                 channel->outgoing_reliables.print();
1298                                 dout_con<<std::endl;*/
1299                         }
1300                         catch(NotFoundException &e){
1301                                 PrintInfo(derr_con);
1302                                 derr_con<<"WARNING: ACKed packet not "
1303                                                 "in outgoing queue"
1304                                                 <<std::endl;
1305                         }
1306
1307                         throw ProcessedSilentlyException("Got an ACK");
1308                 }
1309                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1310                 {
1311                         if(packetdata.getSize() < 4)
1312                                 throw InvalidIncomingDataException
1313                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1314                         u16 peer_id_new = readU16(&packetdata[2]);
1315                         PrintInfo();
1316                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1317
1318                         if(GetPeerID() != PEER_ID_INEXISTENT)
1319                         {
1320                                 PrintInfo(derr_con);
1321                                 derr_con<<"WARNING: Not changing"
1322                                                 " existing peer id."<<std::endl;
1323                         }
1324                         else
1325                         {
1326                                 dout_con<<"changing."<<std::endl;
1327                                 SetPeerID(peer_id_new);
1328                         }
1329                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
1330                 }
1331                 else if(controltype == CONTROLTYPE_PING)
1332                 {
1333                         // Just ignore it, the incoming data already reset
1334                         // the timeout counter
1335                         PrintInfo();
1336                         dout_con<<"PING"<<std::endl;
1337                         throw ProcessedSilentlyException("Got a PING");
1338                 }
1339                 else if(controltype == CONTROLTYPE_DISCO)
1340                 {
1341                         // Just ignore it, the incoming data already reset
1342                         // the timeout counter
1343                         PrintInfo();
1344                         dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1345
1346                         if(deletePeer(peer_id, false) == false)
1347                         {
1348                                 PrintInfo(derr_con);
1349                                 derr_con<<"DISCO: Peer not found"<<std::endl;
1350                         }
1351
1352                         throw ProcessedSilentlyException("Got a DISCO");
1353                 }
1354                 else{
1355                         PrintInfo(derr_con);
1356                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1357                                         <<((int)controltype&0xff)<<std::endl;
1358                         throw InvalidIncomingDataException("Invalid control type");
1359                 }
1360         }
1361         else if(type == TYPE_ORIGINAL)
1362         {
1363                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1364                         throw InvalidIncomingDataException
1365                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1366                 PrintInfo();
1367                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1368                                 <<std::endl;
1369                 // Get the inside packet out and return it
1370                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1371                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1372                 return payload;
1373         }
1374         else if(type == TYPE_SPLIT)
1375         {
1376                 // We have to create a packet again for buffering
1377                 // This isn't actually too bad an idea.
1378                 BufferedPacket packet = makePacket(
1379                                 getPeer(peer_id)->address,
1380                                 packetdata,
1381                                 GetProtocolID(),
1382                                 peer_id,
1383                                 channelnum);
1384                 // Buffer the packet
1385                 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1386                 if(data.getSize() != 0)
1387                 {
1388                         PrintInfo();
1389                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1390                                         <<"size="<<data.getSize()<<std::endl;
1391                         return data;
1392                 }
1393                 PrintInfo();
1394                 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1395                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1396         }
1397         else if(type == TYPE_RELIABLE)
1398         {
1399                 // Recursive reliable packets not allowed
1400                 assert(reliable == false);
1401
1402                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1403                         throw InvalidIncomingDataException
1404                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1405
1406                 u16 seqnum = readU16(&packetdata[1]);
1407
1408                 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1409                 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1410
1411                 PrintInfo();
1412                 if(is_future_packet)
1413                         dout_con<<"BUFFERING";
1414                 else if(is_old_packet)
1415                         dout_con<<"OLD";
1416                 else
1417                         dout_con<<"RECUR";
1418                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1419                                 <<" next="<<channel->next_incoming_seqnum;
1420                 dout_con<<" [sending CONTROLTYPE_ACK"
1421                                 " to peer_id="<<peer_id<<"]";
1422                 dout_con<<std::endl;
1423
1424                 //DEBUG
1425                 //assert(channel->incoming_reliables.size() < 100);
1426
1427                 // Send a CONTROLTYPE_ACK
1428                 SharedBuffer<u8> reply(4);
1429                 writeU8(&reply[0], TYPE_CONTROL);
1430                 writeU8(&reply[1], CONTROLTYPE_ACK);
1431                 writeU16(&reply[2], seqnum);
1432                 rawSendAsPacket(peer_id, channelnum, reply, false);
1433
1434                 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1435                 if(is_future_packet)
1436                 {
1437                         /*PrintInfo();
1438                         dout_con<<"Buffering reliable packet (seqnum="
1439                                         <<seqnum<<")"<<std::endl;*/
1440
1441                         // This one comes later, buffer it.
1442                         // Actually we have to make a packet to buffer one.
1443                         // Well, we have all the ingredients, so just do it.
1444                         BufferedPacket packet = makePacket(
1445                                         getPeer(peer_id)->address,
1446                                         packetdata,
1447                                         GetProtocolID(),
1448                                         peer_id,
1449                                         channelnum);
1450                         try{
1451                                 channel->incoming_reliables.insert(packet);
1452
1453                                 /*PrintInfo();
1454                                 dout_con<<"INCOMING: ";
1455                                 channel->incoming_reliables.print();
1456                                 dout_con<<std::endl;*/
1457                         }
1458                         catch(AlreadyExistsException &e)
1459                         {
1460                         }
1461
1462                         throw ProcessedSilentlyException("Buffered future reliable packet");
1463                 }
1464                 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1465                 else if(is_old_packet)
1466                 {
1467                         // An old packet, dump it
1468                         throw InvalidIncomingDataException("Got an old reliable packet");
1469                 }
1470
1471                 channel->next_incoming_seqnum++;
1472
1473                 // Get out the inside packet and re-process it
1474                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1475                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1476
1477                 return processPacket(channel, payload, peer_id, channelnum, true);
1478         }
1479         else
1480         {
1481                 PrintInfo(derr_con);
1482                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1483                 throw InvalidIncomingDataException("Invalid packet type");
1484         }
1485
1486         // We should never get here.
1487         // If you get here, add an exception or a return to some of the
1488         // above conditionals.
1489         assert(0);
1490         throw BaseException("Error in Channel::ProcessPacket()");
1491 }
1492
1493 bool Connection::deletePeer(u16 peer_id, bool timeout)
1494 {
1495         if(m_peers.find(peer_id) == NULL)
1496                 return false;
1497
1498         Peer *peer = m_peers[peer_id];
1499
1500         // Create event
1501         ConnectionEvent e;
1502         e.peerRemoved(peer_id, timeout, peer->address);
1503         putEvent(e);
1504
1505         delete m_peers[peer_id];
1506         m_peers.remove(peer_id);
1507         return true;
1508 }
1509
1510 /* Interface */
1511
1512 ConnectionEvent Connection::getEvent()
1513 {
1514         if(m_event_queue.size() == 0){
1515                 ConnectionEvent e;
1516                 e.type = CONNEVENT_NONE;
1517                 return e;
1518         }
1519         return m_event_queue.pop_front();
1520 }
1521
1522 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1523 {
1524         try{
1525                 return m_event_queue.pop_front(timeout_ms);
1526         } catch(ItemNotFoundException &ex){
1527                 ConnectionEvent e;
1528                 e.type = CONNEVENT_NONE;
1529                 return e;
1530         }
1531 }
1532
1533 void Connection::putCommand(ConnectionCommand &c)
1534 {
1535         m_command_queue.push_back(c);
1536 }
1537
1538 void Connection::Serve(unsigned short port)
1539 {
1540         ConnectionCommand c;
1541         c.serve(port);
1542         putCommand(c);
1543 }
1544
1545 void Connection::Connect(Address address)
1546 {
1547         ConnectionCommand c;
1548         c.connect(address);
1549         putCommand(c);
1550 }
1551
1552 bool Connection::Connected()
1553 {
1554         JMutexAutoLock peerlock(m_peers_mutex);
1555
1556         if(m_peers.size() != 1)
1557                 return false;
1558
1559         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1560         if(node == NULL)
1561                 return false;
1562
1563         if(m_peer_id == PEER_ID_INEXISTENT)
1564                 return false;
1565
1566         return true;
1567 }
1568
1569 void Connection::Disconnect()
1570 {
1571         ConnectionCommand c;
1572         c.disconnect();
1573         putCommand(c);
1574 }
1575
1576 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1577 {
1578         for(;;){
1579                 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1580                 if(e.type != CONNEVENT_NONE)
1581                         dout_con<<getDesc()<<": Receive: got event: "
1582                                         <<e.describe()<<std::endl;
1583                 switch(e.type){
1584                 case CONNEVENT_NONE:
1585                         throw NoIncomingDataException("No incoming data");
1586                 case CONNEVENT_DATA_RECEIVED:
1587                         peer_id = e.peer_id;
1588                         data = SharedBuffer<u8>(e.data);
1589                         return e.data.getSize();
1590                 case CONNEVENT_PEER_ADDED: {
1591                         Peer tmp(e.peer_id, e.address);
1592                         if(m_bc_peerhandler)
1593                                 m_bc_peerhandler->peerAdded(&tmp);
1594                         continue; }
1595                 case CONNEVENT_PEER_REMOVED: {
1596                         Peer tmp(e.peer_id, e.address);
1597                         if(m_bc_peerhandler)
1598                                 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1599                         continue; }
1600                 }
1601         }
1602         throw NoIncomingDataException("No incoming data");
1603 }
1604
1605 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1606 {
1607         assert(channelnum < CHANNEL_COUNT);
1608
1609         ConnectionCommand c;
1610         c.sendToAll(channelnum, data, reliable);
1611         putCommand(c);
1612 }
1613
1614 void Connection::Send(u16 peer_id, u8 channelnum,
1615                 SharedBuffer<u8> data, bool reliable)
1616 {
1617         assert(channelnum < CHANNEL_COUNT);
1618
1619         ConnectionCommand c;
1620         c.send(peer_id, channelnum, data, reliable);
1621         putCommand(c);
1622 }
1623
1624 void Connection::RunTimeouts(float dtime)
1625 {
1626         // No-op
1627 }
1628
1629 Address Connection::GetPeerAddress(u16 peer_id)
1630 {
1631         JMutexAutoLock peerlock(m_peers_mutex);
1632         return getPeer(peer_id)->address;
1633 }
1634
1635 float Connection::GetPeerAvgRTT(u16 peer_id)
1636 {
1637         JMutexAutoLock peerlock(m_peers_mutex);
1638         return getPeer(peer_id)->avg_rtt;
1639 }
1640
1641 void Connection::DeletePeer(u16 peer_id)
1642 {
1643         ConnectionCommand c;
1644         c.deletePeer(peer_id);
1645         putCommand(c);
1646 }
1647
1648 void Connection::PrintInfo(std::ostream &out)
1649 {
1650         out<<getDesc()<<": ";
1651 }
1652
1653 void Connection::PrintInfo()
1654 {
1655         PrintInfo(dout_con);
1656 }
1657
1658 std::string Connection::getDesc()
1659 {
1660         return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";
1661 }
1662
1663 } // namespace
1664