]> git.lizzy.rs Git - minetest.git/blob - src/connection.cpp
6cb655f2f090834d7a9553148353ccc23b1f351a
[minetest.git] / src / connection.cpp
1 /*
2 Minetest-c55
3 Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "connection.h"
21 #include "main.h"
22 #include "serialization.h"
23 #include "log.h"
24 #include "porting.h"
25
26 namespace con
27 {
28
29 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
30                 u32 protocol_id, u16 sender_peer_id, u8 channel)
31 {
32         u32 packet_size = datasize + BASE_HEADER_SIZE;
33         BufferedPacket p(packet_size);
34         p.address = address;
35
36         writeU32(&p.data[0], protocol_id);
37         writeU16(&p.data[4], sender_peer_id);
38         writeU8(&p.data[6], channel);
39
40         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
41
42         return p;
43 }
44
45 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
46                 u32 protocol_id, u16 sender_peer_id, u8 channel)
47 {
48         return makePacket(address, *data, data.getSize(),
49                         protocol_id, sender_peer_id, channel);
50 }
51
52 SharedBuffer<u8> makeOriginalPacket(
53                 SharedBuffer<u8> data)
54 {
55         u32 header_size = 1;
56         u32 packet_size = data.getSize() + header_size;
57         SharedBuffer<u8> b(packet_size);
58
59         writeU8(&b[0], TYPE_ORIGINAL);
60
61         memcpy(&b[header_size], *data, data.getSize());
62
63         return b;
64 }
65
66 core::list<SharedBuffer<u8> > makeSplitPacket(
67                 SharedBuffer<u8> data,
68                 u32 chunksize_max,
69                 u16 seqnum)
70 {
71         // Chunk packets, containing the TYPE_SPLIT header
72         core::list<SharedBuffer<u8> > chunks;
73         
74         u32 chunk_header_size = 7;
75         u32 maximum_data_size = chunksize_max - chunk_header_size;
76         u32 start = 0;
77         u32 end = 0;
78         u32 chunk_num = 0;
79         do{
80                 end = start + maximum_data_size - 1;
81                 if(end > data.getSize() - 1)
82                         end = data.getSize() - 1;
83                 
84                 u32 payload_size = end - start + 1;
85                 u32 packet_size = chunk_header_size + payload_size;
86
87                 SharedBuffer<u8> chunk(packet_size);
88                 
89                 writeU8(&chunk[0], TYPE_SPLIT);
90                 writeU16(&chunk[1], seqnum);
91                 // [3] u16 chunk_count is written at next stage
92                 writeU16(&chunk[5], chunk_num);
93                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
94
95                 chunks.push_back(chunk);
96                 
97                 start = end + 1;
98                 chunk_num++;
99         }
100         while(end != data.getSize() - 1);
101
102         u16 chunk_count = chunks.getSize();
103
104         core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
105         for(; i != chunks.end(); i++)
106         {
107                 // Write chunk_count
108                 writeU16(&((*i)[3]), chunk_count);
109         }
110
111         return chunks;
112 }
113
114 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
115                 SharedBuffer<u8> data,
116                 u32 chunksize_max,
117                 u16 &split_seqnum)
118 {
119         u32 original_header_size = 1;
120         core::list<SharedBuffer<u8> > list;
121         if(data.getSize() + original_header_size > chunksize_max)
122         {
123                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
124                 split_seqnum++;
125                 return list;
126         }
127         else
128         {
129                 list.push_back(makeOriginalPacket(data));
130         }
131         return list;
132 }
133
134 SharedBuffer<u8> makeReliablePacket(
135                 SharedBuffer<u8> data,
136                 u16 seqnum)
137 {
138         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
139         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
140                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
141         u32 header_size = 3;
142         u32 packet_size = data.getSize() + header_size;
143         SharedBuffer<u8> b(packet_size);
144
145         writeU8(&b[0], TYPE_RELIABLE);
146         writeU16(&b[1], seqnum);
147
148         memcpy(&b[header_size], *data, data.getSize());
149
150         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
151                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
152         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
153         return b;
154 }
155
156 /*
157         ReliablePacketBuffer
158 */
159
160 void ReliablePacketBuffer::print()
161 {
162         core::list<BufferedPacket>::Iterator i;
163         i = m_list.begin();
164         for(; i != m_list.end(); i++)
165         {
166                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
167                 dout_con<<s<<" ";
168         }
169 }
170 bool ReliablePacketBuffer::empty()
171 {
172         return m_list.empty();
173 }
174 u32 ReliablePacketBuffer::size()
175 {
176         return m_list.getSize();
177 }
178 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
179 {
180         core::list<BufferedPacket>::Iterator i;
181         i = m_list.begin();
182         for(; i != m_list.end(); i++)
183         {
184                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
185                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
186                                 <<", comparing to s="<<s<<std::endl;*/
187                 if(s == seqnum)
188                         break;
189         }
190         return i;
191 }
192 RPBSearchResult ReliablePacketBuffer::notFound()
193 {
194         return m_list.end();
195 }
196 u16 ReliablePacketBuffer::getFirstSeqnum()
197 {
198         if(empty())
199                 throw NotFoundException("Buffer is empty");
200         BufferedPacket p = *m_list.begin();
201         return readU16(&p.data[BASE_HEADER_SIZE+1]);
202 }
203 BufferedPacket ReliablePacketBuffer::popFirst()
204 {
205         if(empty())
206                 throw NotFoundException("Buffer is empty");
207         BufferedPacket p = *m_list.begin();
208         core::list<BufferedPacket>::Iterator i = m_list.begin();
209         m_list.erase(i);
210         return p;
211 }
212 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
213 {
214         RPBSearchResult r = findPacket(seqnum);
215         if(r == notFound()){
216                 dout_con<<"Not found"<<std::endl;
217                 throw NotFoundException("seqnum not found in buffer");
218         }
219         BufferedPacket p = *r;
220         m_list.erase(r);
221         return p;
222 }
223 void ReliablePacketBuffer::insert(BufferedPacket &p)
224 {
225         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
226         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
227         assert(type == TYPE_RELIABLE);
228         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
229
230         // Find the right place for the packet and insert it there
231
232         // If list is empty, just add it
233         if(m_list.empty())
234         {
235                 m_list.push_back(p);
236                 // Done.
237                 return;
238         }
239         // Otherwise find the right place
240         core::list<BufferedPacket>::Iterator i;
241         i = m_list.begin();
242         // Find the first packet in the list which has a higher seqnum
243         for(; i != m_list.end(); i++){
244                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
245                 if(s == seqnum){
246                         throw AlreadyExistsException("Same seqnum in list");
247                 }
248                 if(seqnum_higher(s, seqnum)){
249                         break;
250                 }
251         }
252         // If we're at the end of the list, add the packet to the
253         // end of the list
254         if(i == m_list.end())
255         {
256                 m_list.push_back(p);
257                 // Done.
258                 return;
259         }
260         // Insert before i
261         m_list.insert_before(i, p);
262 }
263
264 void ReliablePacketBuffer::incrementTimeouts(float dtime)
265 {
266         core::list<BufferedPacket>::Iterator i;
267         i = m_list.begin();
268         for(; i != m_list.end(); i++){
269                 i->time += dtime;
270                 i->totaltime += dtime;
271         }
272 }
273
274 void ReliablePacketBuffer::resetTimedOuts(float timeout)
275 {
276         core::list<BufferedPacket>::Iterator i;
277         i = m_list.begin();
278         for(; i != m_list.end(); i++){
279                 if(i->time >= timeout)
280                         i->time = 0.0;
281         }
282 }
283
284 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
285 {
286         core::list<BufferedPacket>::Iterator i;
287         i = m_list.begin();
288         for(; i != m_list.end(); i++){
289                 if(i->totaltime >= timeout)
290                         return true;
291         }
292         return false;
293 }
294
295 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
296 {
297         core::list<BufferedPacket> timed_outs;
298         core::list<BufferedPacket>::Iterator i;
299         i = m_list.begin();
300         for(; i != m_list.end(); i++)
301         {
302                 if(i->time >= timeout)
303                         timed_outs.push_back(*i);
304         }
305         return timed_outs;
306 }
307
308 /*
309         IncomingSplitBuffer
310 */
311
312 IncomingSplitBuffer::~IncomingSplitBuffer()
313 {
314         core::map<u16, IncomingSplitPacket*>::Iterator i;
315         i = m_buf.getIterator();
316         for(; i.atEnd() == false; i++)
317         {
318                 delete i.getNode()->getValue();
319         }
320 }
321 /*
322         This will throw a GotSplitPacketException when a full
323         split packet is constructed.
324 */
325 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
326 {
327         u32 headersize = BASE_HEADER_SIZE + 7;
328         assert(p.data.getSize() >= headersize);
329         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
330         assert(type == TYPE_SPLIT);
331         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
332         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
333         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
334
335         // Add if doesn't exist
336         if(m_buf.find(seqnum) == NULL)
337         {
338                 IncomingSplitPacket *sp = new IncomingSplitPacket();
339                 sp->chunk_count = chunk_count;
340                 sp->reliable = reliable;
341                 m_buf[seqnum] = sp;
342         }
343         
344         IncomingSplitPacket *sp = m_buf[seqnum];
345         
346         // TODO: These errors should be thrown or something? Dunno.
347         if(chunk_count != sp->chunk_count)
348                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
349                                 <<" != sp->chunk_count="<<sp->chunk_count
350                                 <<std::endl;
351         if(reliable != sp->reliable)
352                 derr_con<<"Connection: WARNING: reliable="<<reliable
353                                 <<" != sp->reliable="<<sp->reliable
354                                 <<std::endl;
355
356         // If chunk already exists, ignore it.
357         // Sometimes two identical packets may arrive when there is network
358         // lag and the server re-sends stuff.
359         if(sp->chunks.find(chunk_num) != NULL)
360                 return SharedBuffer<u8>();
361         
362         // Cut chunk data out of packet
363         u32 chunkdatasize = p.data.getSize() - headersize;
364         SharedBuffer<u8> chunkdata(chunkdatasize);
365         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
366         
367         // Set chunk data in buffer
368         sp->chunks[chunk_num] = chunkdata;
369         
370         // If not all chunks are received, return empty buffer
371         if(sp->allReceived() == false)
372                 return SharedBuffer<u8>();
373
374         // Calculate total size
375         u32 totalsize = 0;
376         core::map<u16, SharedBuffer<u8> >::Iterator i;
377         i = sp->chunks.getIterator();
378         for(; i.atEnd() == false; i++)
379         {
380                 totalsize += i.getNode()->getValue().getSize();
381         }
382         
383         SharedBuffer<u8> fulldata(totalsize);
384
385         // Copy chunks to data buffer
386         u32 start = 0;
387         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
388                         chunk_i++)
389         {
390                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
391                 u16 chunkdatasize = buf.getSize();
392                 memcpy(&fulldata[start], *buf, chunkdatasize);
393                 start += chunkdatasize;;
394         }
395
396         // Remove sp from buffer
397         m_buf.remove(seqnum);
398         delete sp;
399
400         return fulldata;
401 }
402 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
403 {
404         core::list<u16> remove_queue;
405         core::map<u16, IncomingSplitPacket*>::Iterator i;
406         i = m_buf.getIterator();
407         for(; i.atEnd() == false; i++)
408         {
409                 IncomingSplitPacket *p = i.getNode()->getValue();
410                 // Reliable ones are not removed by timeout
411                 if(p->reliable == true)
412                         continue;
413                 p->time += dtime;
414                 if(p->time >= timeout)
415                         remove_queue.push_back(i.getNode()->getKey());
416         }
417         core::list<u16>::Iterator j;
418         j = remove_queue.begin();
419         for(; j != remove_queue.end(); j++)
420         {
421                 dout_con<<"NOTE: Removing timed out unreliable split packet"
422                                 <<std::endl;
423                 delete m_buf[*j];
424                 m_buf.remove(*j);
425         }
426 }
427
428 /*
429         Channel
430 */
431
432 Channel::Channel()
433 {
434         next_outgoing_seqnum = SEQNUM_INITIAL;
435         next_incoming_seqnum = SEQNUM_INITIAL;
436         next_outgoing_split_seqnum = SEQNUM_INITIAL;
437 }
438 Channel::~Channel()
439 {
440 }
441
442 /*
443         Peer
444 */
445
446 Peer::Peer(u16 a_id, Address a_address):
447         address(a_address),
448         id(a_id),
449         timeout_counter(0.0),
450         ping_timer(0.0),
451         resend_timeout(0.5),
452         avg_rtt(-1.0),
453         has_sent_with_id(false),
454         m_sendtime_accu(0),
455         m_max_packets_per_second(10),
456         m_num_sent(0),
457         m_max_num_sent(0)
458 {
459 }
460 Peer::~Peer()
461 {
462 }
463
464 void Peer::reportRTT(float rtt)
465 {
466         if(rtt >= 0.0){
467                 if(rtt < 0.01){
468                         if(m_max_packets_per_second < 400)
469                                 m_max_packets_per_second += 10;
470                 } else if(rtt < 0.2){
471                         if(m_max_packets_per_second < 100)
472                                 m_max_packets_per_second += 2;
473                 } else {
474                         m_max_packets_per_second *= 0.8;
475                         if(m_max_packets_per_second < 10)
476                                 m_max_packets_per_second = 10;
477                 }
478         }
479
480         if(rtt < -0.999)
481         {}
482         else if(avg_rtt < 0.0)
483                 avg_rtt = rtt;
484         else
485                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
486         
487         // Calculate resend_timeout
488
489         /*int reliable_count = 0;
490         for(int i=0; i<CHANNEL_COUNT; i++)
491         {
492                 reliable_count += channels[i].outgoing_reliables.size();
493         }
494         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
495                         * ((float)reliable_count * 1);*/
496         
497         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
498         if(timeout < RESEND_TIMEOUT_MIN)
499                 timeout = RESEND_TIMEOUT_MIN;
500         if(timeout > RESEND_TIMEOUT_MAX)
501                 timeout = RESEND_TIMEOUT_MAX;
502         resend_timeout = timeout;
503 }
504                                 
505 /*
506         Connection
507 */
508
509 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout):
510         m_protocol_id(protocol_id),
511         m_max_packet_size(max_packet_size),
512         m_timeout(timeout),
513         m_peer_id(0),
514         m_bc_peerhandler(NULL),
515         m_bc_receive_timeout(0),
516         m_indentation(0)
517 {
518         m_socket.setTimeoutMs(5);
519
520         Start();
521 }
522
523 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
524                 PeerHandler *peerhandler):
525         m_protocol_id(protocol_id),
526         m_max_packet_size(max_packet_size),
527         m_timeout(timeout),
528         m_peer_id(0),
529         m_bc_peerhandler(peerhandler),
530         m_bc_receive_timeout(0),
531         m_indentation(0)
532 {
533         m_socket.setTimeoutMs(5);
534
535         Start();
536 }
537
538
539 Connection::~Connection()
540 {
541         stop();
542         // Delete peers
543         for(core::map<u16, Peer*>::Iterator
544                         j = m_peers.getIterator();
545                         j.atEnd() == false; j++)
546         {
547                 Peer *peer = j.getNode()->getValue();
548                 delete peer;
549         }
550 }
551
552 /* Internal stuff */
553
554 void * Connection::Thread()
555 {
556         ThreadStarted();
557         log_register_thread("Connection");
558
559         dout_con<<"Connection thread started"<<std::endl;
560         
561         u32 curtime = porting::getTimeMs();
562         u32 lasttime = curtime;
563
564         while(getRun())
565         {
566                 BEGIN_DEBUG_EXCEPTION_HANDLER
567                 
568                 lasttime = curtime;
569                 curtime = porting::getTimeMs();
570                 float dtime = (float)(curtime - lasttime) / 1000.;
571                 if(dtime > 0.1)
572                         dtime = 0.1;
573                 if(dtime < 0.0)
574                         dtime = 0.0;
575                 
576                 runTimeouts(dtime);
577
578                 while(m_command_queue.size() != 0){
579                         ConnectionCommand c = m_command_queue.pop_front();
580                         processCommand(c);
581                 }
582
583                 send(dtime);
584
585                 receive();
586                 
587                 END_DEBUG_EXCEPTION_HANDLER(derr_con);
588         }
589
590         return NULL;
591 }
592
593 void Connection::putEvent(ConnectionEvent &e)
594 {
595         assert(e.type != CONNEVENT_NONE);
596         m_event_queue.push_back(e);
597 }
598
599 void Connection::processCommand(ConnectionCommand &c)
600 {
601         switch(c.type){
602         case CONNCMD_NONE:
603                 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
604                 return;
605         case CONNCMD_SERVE:
606                 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
607                                 <<c.port<<std::endl;
608                 serve(c.port);
609                 return;
610         case CONNCMD_CONNECT:
611                 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
612                 connect(c.address);
613                 return;
614         case CONNCMD_DISCONNECT:
615                 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
616                 disconnect();
617                 return;
618         case CONNCMD_SEND:
619                 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
620                 send(c.peer_id, c.channelnum, c.data, c.reliable);
621                 return;
622         case CONNCMD_SEND_TO_ALL:
623                 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
624                 sendToAll(c.channelnum, c.data, c.reliable);
625                 return;
626         case CONNCMD_DELETE_PEER:
627                 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
628                 deletePeer(c.peer_id, false);
629                 return;
630         }
631 }
632
633 void Connection::send(float dtime)
634 {
635         for(core::map<u16, Peer*>::Iterator
636                         j = m_peers.getIterator();
637                         j.atEnd() == false; j++)
638         {
639                 Peer *peer = j.getNode()->getValue();
640                 peer->m_sendtime_accu += dtime;
641                 peer->m_num_sent = 0;
642                 peer->m_max_num_sent = peer->m_sendtime_accu *
643                                 peer->m_max_packets_per_second;
644         }
645         Queue<OutgoingPacket> postponed_packets;
646         while(m_outgoing_queue.size() != 0){
647                 OutgoingPacket packet = m_outgoing_queue.pop_front();
648                 Peer *peer = getPeerNoEx(packet.peer_id);
649                 if(!peer)
650                         continue;
651                 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
652                         postponed_packets.push_back(packet);
653                 } else if(peer->m_num_sent < peer->m_max_num_sent){
654                         rawSendAsPacket(packet.peer_id, packet.channelnum,
655                                         packet.data, packet.reliable);
656                         peer->m_num_sent++;
657                 } else {
658                         postponed_packets.push_back(packet);
659                 }
660         }
661         while(postponed_packets.size() != 0){
662                 m_outgoing_queue.push_back(postponed_packets.pop_front());
663         }
664         for(core::map<u16, Peer*>::Iterator
665                         j = m_peers.getIterator();
666                         j.atEnd() == false; j++)
667         {
668                 Peer *peer = j.getNode()->getValue();
669                 peer->m_sendtime_accu -= (float)peer->m_num_sent /
670                                 peer->m_max_packets_per_second;
671                 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
672                         peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
673         }
674 }
675
676 // Receive packets from the network and buffers and create ConnectionEvents
677 void Connection::receive()
678 {
679         u32 datasize = m_max_packet_size * 2;  // Double it just to be safe
680         // TODO: We can not know how many layers of header there are.
681         // For now, just assume there are no other than the base headers.
682         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
683         SharedBuffer<u8> packetdata(packet_maxsize);
684
685         bool single_wait_done = false;
686         
687         for(;;)
688         {
689         try{
690                 /* Check if some buffer has relevant data */
691                 {
692                         u16 peer_id;
693                         SharedBuffer<u8> resultdata;
694                         bool got = getFromBuffers(peer_id, resultdata);
695                         if(got){
696                                 ConnectionEvent e;
697                                 e.dataReceived(peer_id, resultdata);
698                                 putEvent(e);
699                                 continue;
700                         }
701                 }
702                 
703                 if(single_wait_done){
704                         if(m_socket.WaitData(0) == false)
705                                 break;
706                 }
707                 
708                 single_wait_done = true;
709
710                 Address sender;
711                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
712
713                 if(received_size < 0)
714                         break;
715                 if(received_size < BASE_HEADER_SIZE)
716                         continue;
717                 if(readU32(&packetdata[0]) != m_protocol_id)
718                         continue;
719                 
720                 u16 peer_id = readPeerId(*packetdata);
721                 u8 channelnum = readChannel(*packetdata);
722                 if(channelnum > CHANNEL_COUNT-1){
723                         PrintInfo(derr_con);
724                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
725                         throw InvalidIncomingDataException("Channel doesn't exist");
726                 }
727
728                 if(peer_id == PEER_ID_INEXISTENT)
729                 {
730                         /*
731                                 Somebody is trying to send stuff to us with no peer id.
732                                 
733                                 Check if the same address and port was added to our peer
734                                 list before.
735                                 Allow only entries that have has_sent_with_id==false.
736                         */
737
738                         core::map<u16, Peer*>::Iterator j;
739                         j = m_peers.getIterator();
740                         for(; j.atEnd() == false; j++)
741                         {
742                                 Peer *peer = j.getNode()->getValue();
743                                 if(peer->has_sent_with_id)
744                                         continue;
745                                 if(peer->address == sender)
746                                         break;
747                         }
748                         
749                         /*
750                                 If no peer was found with the same address and port,
751                                 we shall assume it is a new peer and create an entry.
752                         */
753                         if(j.atEnd())
754                         {
755                                 // Pass on to adding the peer
756                         }
757                         // Else: A peer was found.
758                         else
759                         {
760                                 Peer *peer = j.getNode()->getValue();
761                                 peer_id = peer->id;
762                                 PrintInfo(derr_con);
763                                 derr_con<<"WARNING: Assuming unknown peer to be "
764                                                 <<"peer_id="<<peer_id<<std::endl;
765                         }
766                 }
767                 
768                 /*
769                         The peer was not found in our lists. Add it.
770                 */
771                 if(peer_id == PEER_ID_INEXISTENT)
772                 {
773                         // Somebody wants to make a new connection
774
775                         // Get a unique peer id (2 or higher)
776                         u16 peer_id_new = 2;
777                         /*
778                                 Find an unused peer id
779                         */
780                         bool out_of_ids = false;
781                         for(;;)
782                         {
783                                 // Check if exists
784                                 if(m_peers.find(peer_id_new) == NULL)
785                                         break;
786                                 // Check for overflow
787                                 if(peer_id_new == 65535){
788                                         out_of_ids = true;
789                                         break;
790                                 }
791                                 peer_id_new++;
792                         }
793                         if(out_of_ids){
794                                 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
795                                 continue;
796                         }
797
798                         PrintInfo();
799                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
800                                         " giving peer_id="<<peer_id_new<<std::endl;
801
802                         // Create a peer
803                         Peer *peer = new Peer(peer_id_new, sender);
804                         m_peers.insert(peer->id, peer);
805                         
806                         // Create peer addition event
807                         ConnectionEvent e;
808                         e.peerAdded(peer_id_new, sender);
809                         putEvent(e);
810                         
811                         // Create CONTROL packet to tell the peer id to the new peer.
812                         SharedBuffer<u8> reply(4);
813                         writeU8(&reply[0], TYPE_CONTROL);
814                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
815                         writeU16(&reply[2], peer_id_new);
816                         sendAsPacket(peer_id_new, 0, reply, true);
817                         
818                         // We're now talking to a valid peer_id
819                         peer_id = peer_id_new;
820
821                         // Go on and process whatever it sent
822                 }
823
824                 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
825
826                 if(node == NULL)
827                 {
828                         // Peer not found
829                         // This means that the peer id of the sender is not PEER_ID_INEXISTENT
830                         // and it is invalid.
831                         PrintInfo(derr_con);
832                         derr_con<<"Receive(): Peer not found"<<std::endl;
833                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
834                 }
835
836                 Peer *peer = node->getValue();
837
838                 // Validate peer address
839                 if(peer->address != sender)
840                 {
841                         PrintInfo(derr_con);
842                         derr_con<<"Peer "<<peer_id<<" sending from different address."
843                                         " Ignoring."<<std::endl;
844                         continue;
845                 }
846                 
847                 peer->timeout_counter = 0.0;
848
849                 Channel *channel = &(peer->channels[channelnum]);
850                 
851                 // Throw the received packet to channel->processPacket()
852
853                 // Make a new SharedBuffer from the data without the base headers
854                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
855                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
856                                 strippeddata.getSize());
857                 
858                 try{
859                         // Process it (the result is some data with no headers made by us)
860                         SharedBuffer<u8> resultdata = processPacket
861                                         (channel, strippeddata, peer_id, channelnum, false);
862                         
863                         PrintInfo();
864                         dout_con<<"ProcessPacket returned data of size "
865                                         <<resultdata.getSize()<<std::endl;
866                         
867                         ConnectionEvent e;
868                         e.dataReceived(peer_id, resultdata);
869                         putEvent(e);
870                         continue;
871                 }catch(ProcessedSilentlyException &e){
872                 }
873         }catch(InvalidIncomingDataException &e){
874         }
875         catch(ProcessedSilentlyException &e){
876         }
877         } // for
878 }
879
880 void Connection::runTimeouts(float dtime)
881 {
882         core::list<u16> timeouted_peers;
883         core::map<u16, Peer*>::Iterator j;
884         j = m_peers.getIterator();
885         for(; j.atEnd() == false; j++)
886         {
887                 Peer *peer = j.getNode()->getValue();
888                 
889                 /*
890                         Check peer timeout
891                 */
892                 peer->timeout_counter += dtime;
893                 if(peer->timeout_counter > m_timeout)
894                 {
895                         PrintInfo(derr_con);
896                         derr_con<<"RunTimeouts(): Peer "<<peer->id
897                                         <<" has timed out."
898                                         <<" (source=peer->timeout_counter)"
899                                         <<std::endl;
900                         // Add peer to the list
901                         timeouted_peers.push_back(peer->id);
902                         // Don't bother going through the buffers of this one
903                         continue;
904                 }
905
906                 float resend_timeout = peer->resend_timeout;
907                 for(u16 i=0; i<CHANNEL_COUNT; i++)
908                 {
909                         core::list<BufferedPacket> timed_outs;
910                         core::list<BufferedPacket>::Iterator j;
911                         
912                         Channel *channel = &peer->channels[i];
913
914                         // Remove timed out incomplete unreliable split packets
915                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
916                         
917                         // Increment reliable packet times
918                         channel->outgoing_reliables.incrementTimeouts(dtime);
919
920                         // Check reliable packet total times, remove peer if
921                         // over timeout.
922                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
923                         {
924                                 PrintInfo(derr_con);
925                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
926                                                 <<" has timed out."
927                                                 <<" (source=reliable packet totaltime)"
928                                                 <<std::endl;
929                                 // Add peer to the to-be-removed list
930                                 timeouted_peers.push_back(peer->id);
931                                 goto nextpeer;
932                         }
933
934                         // Re-send timed out outgoing reliables
935                         
936                         timed_outs = channel->
937                                         outgoing_reliables.getTimedOuts(resend_timeout);
938
939                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
940
941                         j = timed_outs.begin();
942                         for(; j != timed_outs.end(); j++)
943                         {
944                                 u16 peer_id = readPeerId(*(j->data));
945                                 u8 channel = readChannel(*(j->data));
946                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
947
948                                 PrintInfo(derr_con);
949                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
950                                 j->address.print(&derr_con);
951                                 derr_con<<"(t/o="<<resend_timeout<<"): "
952                                                 <<"from_peer_id="<<peer_id
953                                                 <<", channel="<<((int)channel&0xff)
954                                                 <<", seqnum="<<seqnum
955                                                 <<std::endl;
956
957                                 rawSend(*j);
958
959                                 // Enlarge avg_rtt and resend_timeout:
960                                 // The rtt will be at least the timeout.
961                                 // NOTE: This won't affect the timeout of the next
962                                 // checked channel because it was cached.
963                                 peer->reportRTT(resend_timeout);
964                         }
965                 }
966                 
967                 /*
968                         Send pings
969                 */
970                 peer->ping_timer += dtime;
971                 if(peer->ping_timer >= 5.0)
972                 {
973                         // Create and send PING packet
974                         SharedBuffer<u8> data(2);
975                         writeU8(&data[0], TYPE_CONTROL);
976                         writeU8(&data[1], CONTROLTYPE_PING);
977                         rawSendAsPacket(peer->id, 0, data, true);
978
979                         peer->ping_timer = 0.0;
980                 }
981                 
982 nextpeer:
983                 continue;
984         }
985
986         // Remove timed out peers
987         core::list<u16>::Iterator i = timeouted_peers.begin();
988         for(; i != timeouted_peers.end(); i++)
989         {
990                 PrintInfo(derr_con);
991                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
992                 deletePeer(*i, true);
993         }
994 }
995
996 void Connection::serve(u16 port)
997 {
998         dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
999         try{
1000                 m_socket.Bind(port);
1001                 m_peer_id = PEER_ID_SERVER;
1002         }
1003         catch(SocketException &e){
1004                 // Create event
1005                 ConnectionEvent ce;
1006                 ce.bindFailed();
1007                 putEvent(ce);
1008         }
1009 }
1010
1011 void Connection::connect(Address address)
1012 {
1013         dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1014                         <<":"<<address.getPort()<<std::endl;
1015
1016         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1017         if(node != NULL){
1018                 throw ConnectionException("Already connected to a server");
1019         }
1020
1021         Peer *peer = new Peer(PEER_ID_SERVER, address);
1022         m_peers.insert(peer->id, peer);
1023
1024         // Create event
1025         ConnectionEvent e;
1026         e.peerAdded(peer->id, peer->address);
1027         putEvent(e);
1028         
1029         m_socket.Bind(0);
1030         
1031         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1032         m_peer_id = PEER_ID_INEXISTENT;
1033         SharedBuffer<u8> data(0);
1034         Send(PEER_ID_SERVER, 0, data, true);
1035 }
1036
1037 void Connection::disconnect()
1038 {
1039         dout_con<<getDesc()<<" disconnecting"<<std::endl;
1040
1041         // Create and send DISCO packet
1042         SharedBuffer<u8> data(2);
1043         writeU8(&data[0], TYPE_CONTROL);
1044         writeU8(&data[1], CONTROLTYPE_DISCO);
1045         
1046         // Send to all
1047         core::map<u16, Peer*>::Iterator j;
1048         j = m_peers.getIterator();
1049         for(; j.atEnd() == false; j++)
1050         {
1051                 Peer *peer = j.getNode()->getValue();
1052                 rawSendAsPacket(peer->id, 0, data, false);
1053         }
1054 }
1055
1056 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1057 {
1058         core::map<u16, Peer*>::Iterator j;
1059         j = m_peers.getIterator();
1060         for(; j.atEnd() == false; j++)
1061         {
1062                 Peer *peer = j.getNode()->getValue();
1063                 send(peer->id, channelnum, data, reliable);
1064         }
1065 }
1066
1067 void Connection::send(u16 peer_id, u8 channelnum,
1068                 SharedBuffer<u8> data, bool reliable)
1069 {
1070         dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1071
1072         assert(channelnum < CHANNEL_COUNT);
1073         
1074         Peer *peer = getPeerNoEx(peer_id);
1075         if(peer == NULL)
1076                 return;
1077         Channel *channel = &(peer->channels[channelnum]);
1078
1079         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1080         if(reliable)
1081                 chunksize_max -= RELIABLE_HEADER_SIZE;
1082
1083         core::list<SharedBuffer<u8> > originals;
1084         originals = makeAutoSplitPacket(data, chunksize_max,
1085                         channel->next_outgoing_split_seqnum);
1086         
1087         core::list<SharedBuffer<u8> >::Iterator i;
1088         i = originals.begin();
1089         for(; i != originals.end(); i++)
1090         {
1091                 SharedBuffer<u8> original = *i;
1092                 
1093                 sendAsPacket(peer_id, channelnum, original, reliable);
1094         }
1095 }
1096
1097 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1098                 SharedBuffer<u8> data, bool reliable)
1099 {
1100         OutgoingPacket packet(peer_id, channelnum, data, reliable);
1101         m_outgoing_queue.push_back(packet);
1102 }
1103
1104 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1105                 SharedBuffer<u8> data, bool reliable)
1106 {
1107         Peer *peer = getPeerNoEx(peer_id);
1108         if(!peer)
1109                 return;
1110         Channel *channel = &(peer->channels[channelnum]);
1111
1112         if(reliable)
1113         {
1114                 u16 seqnum = channel->next_outgoing_seqnum;
1115                 channel->next_outgoing_seqnum++;
1116
1117                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1118
1119                 // Add base headers and make a packet
1120                 BufferedPacket p = makePacket(peer->address, reliable,
1121                                 m_protocol_id, m_peer_id, channelnum);
1122                 
1123                 try{
1124                         // Buffer the packet
1125                         channel->outgoing_reliables.insert(p);
1126                 }
1127                 catch(AlreadyExistsException &e)
1128                 {
1129                         PrintInfo(derr_con);
1130                         derr_con<<"WARNING: Going to send a reliable packet "
1131                                         "seqnum="<<seqnum<<" that is already "
1132                                         "in outgoing buffer"<<std::endl;
1133                         //assert(0);
1134                 }
1135                 
1136                 // Send the packet
1137                 rawSend(p);
1138         }
1139         else
1140         {
1141                 // Add base headers and make a packet
1142                 BufferedPacket p = makePacket(peer->address, data,
1143                                 m_protocol_id, m_peer_id, channelnum);
1144
1145                 // Send the packet
1146                 rawSend(p);
1147         }
1148 }
1149
1150 void Connection::rawSend(const BufferedPacket &packet)
1151 {
1152         try{
1153                 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1154         } catch(SendFailedException &e){
1155                 derr_con<<"Connection::rawSend(): SendFailedException: "
1156                                 <<packet.address.serializeString()<<std::endl;
1157         }
1158 }
1159
1160 Peer* Connection::getPeer(u16 peer_id)
1161 {
1162         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1163
1164         if(node == NULL){
1165                 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1166         }
1167
1168         // Error checking
1169         assert(node->getValue()->id == peer_id);
1170
1171         return node->getValue();
1172 }
1173
1174 Peer* Connection::getPeerNoEx(u16 peer_id)
1175 {
1176         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1177
1178         if(node == NULL){
1179                 return NULL;
1180         }
1181
1182         // Error checking
1183         assert(node->getValue()->id == peer_id);
1184
1185         return node->getValue();
1186 }
1187
1188 core::list<Peer*> Connection::getPeers()
1189 {
1190         core::list<Peer*> list;
1191         core::map<u16, Peer*>::Iterator j;
1192         j = m_peers.getIterator();
1193         for(; j.atEnd() == false; j++)
1194         {
1195                 Peer *peer = j.getNode()->getValue();
1196                 list.push_back(peer);
1197         }
1198         return list;
1199 }
1200
1201 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1202 {
1203         core::map<u16, Peer*>::Iterator j;
1204         j = m_peers.getIterator();
1205         for(; j.atEnd() == false; j++)
1206         {
1207                 Peer *peer = j.getNode()->getValue();
1208                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1209                 {
1210                         Channel *channel = &peer->channels[i];
1211                         SharedBuffer<u8> resultdata;
1212                         bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1213                         if(got){
1214                                 dst = resultdata;
1215                                 return true;
1216                         }
1217                 }
1218         }
1219         return false;
1220 }
1221
1222 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1223                 SharedBuffer<u8> &dst)
1224 {
1225         u16 firstseqnum = 0;
1226         // Clear old packets from start of buffer
1227         try{
1228         for(;;){
1229                 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1230                 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1231                         channel->incoming_reliables.popFirst();
1232                 else
1233                         break;
1234         }
1235         // This happens if all packets are old
1236         }catch(con::NotFoundException)
1237         {}
1238         
1239         if(channel->incoming_reliables.empty() == false)
1240         {
1241                 if(firstseqnum == channel->next_incoming_seqnum)
1242                 {
1243                         BufferedPacket p = channel->incoming_reliables.popFirst();
1244                         
1245                         peer_id = readPeerId(*p.data);
1246                         u8 channelnum = readChannel(*p.data);
1247                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1248
1249                         PrintInfo();
1250                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
1251                                         <<" seqnum="<<seqnum
1252                                         <<" peer_id="<<peer_id
1253                                         <<" channel="<<((int)channelnum&0xff)
1254                                         <<std::endl;
1255
1256                         channel->next_incoming_seqnum++;
1257                         
1258                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1259                         // Get out the inside packet and re-process it
1260                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1261                         memcpy(*payload, &p.data[headers_size], payload.getSize());
1262
1263                         dst = processPacket(channel, payload, peer_id, channelnum, true);
1264                         return true;
1265                 }
1266         }
1267         return false;
1268 }
1269
1270 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1271                 SharedBuffer<u8> packetdata, u16 peer_id,
1272                 u8 channelnum, bool reliable)
1273 {
1274         IndentationRaiser iraiser(&(m_indentation));
1275
1276         if(packetdata.getSize() < 1)
1277                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1278
1279         u8 type = readU8(&packetdata[0]);
1280         
1281         if(type == TYPE_CONTROL)
1282         {
1283                 if(packetdata.getSize() < 2)
1284                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
1285
1286                 u8 controltype = readU8(&packetdata[1]);
1287
1288                 if(controltype == CONTROLTYPE_ACK)
1289                 {
1290                         if(packetdata.getSize() < 4)
1291                                 throw InvalidIncomingDataException
1292                                                 ("packetdata.getSize() < 4 (ACK header size)");
1293
1294                         u16 seqnum = readU16(&packetdata[2]);
1295                         PrintInfo();
1296                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1297                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1298                                         <<", seqnum="<<seqnum<<std::endl;
1299
1300                         try{
1301                                 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1302                                 // Get round trip time
1303                                 float rtt = p.totaltime;
1304
1305                                 // Let peer calculate stuff according to it
1306                                 // (avg_rtt and resend_timeout)
1307                                 Peer *peer = getPeer(peer_id);
1308                                 peer->reportRTT(rtt);
1309
1310                                 //PrintInfo(dout_con);
1311                                 //dout_con<<"RTT = "<<rtt<<std::endl;
1312
1313                                 /*dout_con<<"OUTGOING: ";
1314                                 PrintInfo();
1315                                 channel->outgoing_reliables.print();
1316                                 dout_con<<std::endl;*/
1317                         }
1318                         catch(NotFoundException &e){
1319                                 PrintInfo(derr_con);
1320                                 derr_con<<"WARNING: ACKed packet not "
1321                                                 "in outgoing queue"
1322                                                 <<std::endl;
1323                         }
1324
1325                         throw ProcessedSilentlyException("Got an ACK");
1326                 }
1327                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1328                 {
1329                         if(packetdata.getSize() < 4)
1330                                 throw InvalidIncomingDataException
1331                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1332                         u16 peer_id_new = readU16(&packetdata[2]);
1333                         PrintInfo();
1334                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1335
1336                         if(GetPeerID() != PEER_ID_INEXISTENT)
1337                         {
1338                                 PrintInfo(derr_con);
1339                                 derr_con<<"WARNING: Not changing"
1340                                                 " existing peer id."<<std::endl;
1341                         }
1342                         else
1343                         {
1344                                 dout_con<<"changing."<<std::endl;
1345                                 SetPeerID(peer_id_new);
1346                         }
1347                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
1348                 }
1349                 else if(controltype == CONTROLTYPE_PING)
1350                 {
1351                         // Just ignore it, the incoming data already reset
1352                         // the timeout counter
1353                         PrintInfo();
1354                         dout_con<<"PING"<<std::endl;
1355                         throw ProcessedSilentlyException("Got a PING");
1356                 }
1357                 else if(controltype == CONTROLTYPE_DISCO)
1358                 {
1359                         // Just ignore it, the incoming data already reset
1360                         // the timeout counter
1361                         PrintInfo();
1362                         dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1363                         
1364                         if(deletePeer(peer_id, false) == false)
1365                         {
1366                                 PrintInfo(derr_con);
1367                                 derr_con<<"DISCO: Peer not found"<<std::endl;
1368                         }
1369
1370                         throw ProcessedSilentlyException("Got a DISCO");
1371                 }
1372                 else{
1373                         PrintInfo(derr_con);
1374                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1375                                         <<((int)controltype&0xff)<<std::endl;
1376                         throw InvalidIncomingDataException("Invalid control type");
1377                 }
1378         }
1379         else if(type == TYPE_ORIGINAL)
1380         {
1381                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1382                         throw InvalidIncomingDataException
1383                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1384                 PrintInfo();
1385                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1386                                 <<std::endl;
1387                 // Get the inside packet out and return it
1388                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1389                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1390                 return payload;
1391         }
1392         else if(type == TYPE_SPLIT)
1393         {
1394                 // We have to create a packet again for buffering
1395                 // This isn't actually too bad an idea.
1396                 BufferedPacket packet = makePacket(
1397                                 getPeer(peer_id)->address,
1398                                 packetdata,
1399                                 GetProtocolID(),
1400                                 peer_id,
1401                                 channelnum);
1402                 // Buffer the packet
1403                 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1404                 if(data.getSize() != 0)
1405                 {
1406                         PrintInfo();
1407                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1408                                         <<"size="<<data.getSize()<<std::endl;
1409                         return data;
1410                 }
1411                 PrintInfo();
1412                 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1413                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1414         }
1415         else if(type == TYPE_RELIABLE)
1416         {
1417                 // Recursive reliable packets not allowed
1418                 assert(reliable == false);
1419
1420                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1421                         throw InvalidIncomingDataException
1422                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1423
1424                 u16 seqnum = readU16(&packetdata[1]);
1425
1426                 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1427                 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1428                 
1429                 PrintInfo();
1430                 if(is_future_packet)
1431                         dout_con<<"BUFFERING";
1432                 else if(is_old_packet)
1433                         dout_con<<"OLD";
1434                 else
1435                         dout_con<<"RECUR";
1436                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1437                                 <<" next="<<channel->next_incoming_seqnum;
1438                 dout_con<<" [sending CONTROLTYPE_ACK"
1439                                 " to peer_id="<<peer_id<<"]";
1440                 dout_con<<std::endl;
1441                 
1442                 //DEBUG
1443                 //assert(channel->incoming_reliables.size() < 100);
1444
1445                 // Send a CONTROLTYPE_ACK
1446                 SharedBuffer<u8> reply(4);
1447                 writeU8(&reply[0], TYPE_CONTROL);
1448                 writeU8(&reply[1], CONTROLTYPE_ACK);
1449                 writeU16(&reply[2], seqnum);
1450                 rawSendAsPacket(peer_id, channelnum, reply, false);
1451
1452                 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1453                 if(is_future_packet)
1454                 {
1455                         /*PrintInfo();
1456                         dout_con<<"Buffering reliable packet (seqnum="
1457                                         <<seqnum<<")"<<std::endl;*/
1458                         
1459                         // This one comes later, buffer it.
1460                         // Actually we have to make a packet to buffer one.
1461                         // Well, we have all the ingredients, so just do it.
1462                         BufferedPacket packet = makePacket(
1463                                         getPeer(peer_id)->address,
1464                                         packetdata,
1465                                         GetProtocolID(),
1466                                         peer_id,
1467                                         channelnum);
1468                         try{
1469                                 channel->incoming_reliables.insert(packet);
1470                                 
1471                                 /*PrintInfo();
1472                                 dout_con<<"INCOMING: ";
1473                                 channel->incoming_reliables.print();
1474                                 dout_con<<std::endl;*/
1475                         }
1476                         catch(AlreadyExistsException &e)
1477                         {
1478                         }
1479
1480                         throw ProcessedSilentlyException("Buffered future reliable packet");
1481                 }
1482                 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1483                 else if(is_old_packet)
1484                 {
1485                         // An old packet, dump it
1486                         throw InvalidIncomingDataException("Got an old reliable packet");
1487                 }
1488
1489                 channel->next_incoming_seqnum++;
1490
1491                 // Get out the inside packet and re-process it
1492                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1493                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1494
1495                 return processPacket(channel, payload, peer_id, channelnum, true);
1496         }
1497         else
1498         {
1499                 PrintInfo(derr_con);
1500                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1501                 throw InvalidIncomingDataException("Invalid packet type");
1502         }
1503         
1504         // We should never get here.
1505         // If you get here, add an exception or a return to some of the
1506         // above conditionals.
1507         assert(0);
1508         throw BaseException("Error in Channel::ProcessPacket()");
1509 }
1510
1511 bool Connection::deletePeer(u16 peer_id, bool timeout)
1512 {
1513         if(m_peers.find(peer_id) == NULL)
1514                 return false;
1515         
1516         Peer *peer = m_peers[peer_id];
1517
1518         // Create event
1519         ConnectionEvent e;
1520         e.peerRemoved(peer_id, timeout, peer->address);
1521         putEvent(e);
1522
1523         delete m_peers[peer_id];
1524         m_peers.remove(peer_id);
1525         return true;
1526 }
1527
1528 /* Interface */
1529
1530 ConnectionEvent Connection::getEvent()
1531 {
1532         if(m_event_queue.size() == 0){
1533                 ConnectionEvent e;
1534                 e.type = CONNEVENT_NONE;
1535                 return e;
1536         }
1537         return m_event_queue.pop_front();
1538 }
1539
1540 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1541 {
1542         try{
1543                 return m_event_queue.pop_front(timeout_ms);
1544         } catch(ItemNotFoundException &ex){
1545                 ConnectionEvent e;
1546                 e.type = CONNEVENT_NONE;
1547                 return e;
1548         }
1549 }
1550
1551 void Connection::putCommand(ConnectionCommand &c)
1552 {
1553         m_command_queue.push_back(c);
1554 }
1555
1556 void Connection::Serve(unsigned short port)
1557 {
1558         ConnectionCommand c;
1559         c.serve(port);
1560         putCommand(c);
1561 }
1562
1563 void Connection::Connect(Address address)
1564 {
1565         ConnectionCommand c;
1566         c.connect(address);
1567         putCommand(c);
1568 }
1569
1570 bool Connection::Connected()
1571 {
1572         JMutexAutoLock peerlock(m_peers_mutex);
1573
1574         if(m_peers.size() != 1)
1575                 return false;
1576                 
1577         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
1578         if(node == NULL)
1579                 return false;
1580         
1581         if(m_peer_id == PEER_ID_INEXISTENT)
1582                 return false;
1583         
1584         return true;
1585 }
1586
1587 void Connection::Disconnect()
1588 {
1589         ConnectionCommand c;
1590         c.disconnect();
1591         putCommand(c);
1592 }
1593
1594 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1595 {
1596         for(;;){
1597                 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1598                 if(e.type != CONNEVENT_NONE)
1599                         dout_con<<getDesc()<<": Receive: got event: "
1600                                         <<e.describe()<<std::endl;
1601                 switch(e.type){
1602                 case CONNEVENT_NONE:
1603                         throw NoIncomingDataException("No incoming data");
1604                 case CONNEVENT_DATA_RECEIVED:
1605                         peer_id = e.peer_id;
1606                         data = SharedBuffer<u8>(e.data);
1607                         return e.data.getSize();
1608                 case CONNEVENT_PEER_ADDED: {
1609                         Peer tmp(e.peer_id, e.address);
1610                         if(m_bc_peerhandler)
1611                                 m_bc_peerhandler->peerAdded(&tmp);
1612                         continue; }
1613                 case CONNEVENT_PEER_REMOVED: {
1614                         Peer tmp(e.peer_id, e.address);
1615                         if(m_bc_peerhandler)
1616                                 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1617                         continue; }
1618                 case CONNEVENT_BIND_FAILED:
1619                         throw ConnectionBindFailed("Failed to bind socket "
1620                                         "(port already in use?)");
1621                 }
1622         }
1623         throw NoIncomingDataException("No incoming data");
1624 }
1625
1626 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1627 {
1628         assert(channelnum < CHANNEL_COUNT);
1629
1630         ConnectionCommand c;
1631         c.sendToAll(channelnum, data, reliable);
1632         putCommand(c);
1633 }
1634
1635 void Connection::Send(u16 peer_id, u8 channelnum,
1636                 SharedBuffer<u8> data, bool reliable)
1637 {
1638         assert(channelnum < CHANNEL_COUNT);
1639
1640         ConnectionCommand c;
1641         c.send(peer_id, channelnum, data, reliable);
1642         putCommand(c);
1643 }
1644
1645 void Connection::RunTimeouts(float dtime)
1646 {
1647         // No-op
1648 }
1649
1650 Address Connection::GetPeerAddress(u16 peer_id)
1651 {
1652         JMutexAutoLock peerlock(m_peers_mutex);
1653         return getPeer(peer_id)->address;
1654 }
1655
1656 float Connection::GetPeerAvgRTT(u16 peer_id)
1657 {
1658         JMutexAutoLock peerlock(m_peers_mutex);
1659         return getPeer(peer_id)->avg_rtt;
1660 }
1661
1662 void Connection::DeletePeer(u16 peer_id)
1663 {
1664         ConnectionCommand c;
1665         c.deletePeer(peer_id);
1666         putCommand(c);
1667 }
1668
1669 void Connection::PrintInfo(std::ostream &out)
1670 {
1671         out<<getDesc()<<": ";
1672 }
1673
1674 void Connection::PrintInfo()
1675 {
1676         PrintInfo(dout_con);
1677 }
1678
1679 std::string Connection::getDesc()
1680 {
1681         return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";
1682 }
1683
1684 } // namespace
1685