]> git.lizzy.rs Git - minetest.git/blob - src/connection.cpp
Use hexadecimal RRGGBB instead of colorkeys, rename getColor to parseColor
[minetest.git] / src / connection.cpp
1 /*
2 Minetest
3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include "connection.h"
21 #include "main.h"
22 #include "serialization.h"
23 #include "log.h"
24 #include "porting.h"
25 #include "util/serialize.h"
26 #include "util/numeric.h"
27 #include "util/string.h"
28 #include "settings.h"
29
30 namespace con
31 {
32
33 static u16 readPeerId(u8 *packetdata)
34 {
35         return readU16(&packetdata[4]);
36 }
37 static u8 readChannel(u8 *packetdata)
38 {
39         return readU8(&packetdata[6]);
40 }
41
42 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
43                 u32 protocol_id, u16 sender_peer_id, u8 channel)
44 {
45         u32 packet_size = datasize + BASE_HEADER_SIZE;
46         BufferedPacket p(packet_size);
47         p.address = address;
48
49         writeU32(&p.data[0], protocol_id);
50         writeU16(&p.data[4], sender_peer_id);
51         writeU8(&p.data[6], channel);
52
53         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
54
55         return p;
56 }
57
58 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
59                 u32 protocol_id, u16 sender_peer_id, u8 channel)
60 {
61         return makePacket(address, *data, data.getSize(),
62                         protocol_id, sender_peer_id, channel);
63 }
64
65 SharedBuffer<u8> makeOriginalPacket(
66                 SharedBuffer<u8> data)
67 {
68         u32 header_size = 1;
69         u32 packet_size = data.getSize() + header_size;
70         SharedBuffer<u8> b(packet_size);
71
72         writeU8(&b[0], TYPE_ORIGINAL);
73
74         memcpy(&b[header_size], *data, data.getSize());
75
76         return b;
77 }
78
79 std::list<SharedBuffer<u8> > makeSplitPacket(
80                 SharedBuffer<u8> data,
81                 u32 chunksize_max,
82                 u16 seqnum)
83 {
84         // Chunk packets, containing the TYPE_SPLIT header
85         std::list<SharedBuffer<u8> > chunks;
86         
87         u32 chunk_header_size = 7;
88         u32 maximum_data_size = chunksize_max - chunk_header_size;
89         u32 start = 0;
90         u32 end = 0;
91         u32 chunk_num = 0;
92         u16 chunk_count = 0;
93         do{
94                 end = start + maximum_data_size - 1;
95                 if(end > data.getSize() - 1)
96                         end = data.getSize() - 1;
97                 
98                 u32 payload_size = end - start + 1;
99                 u32 packet_size = chunk_header_size + payload_size;
100
101                 SharedBuffer<u8> chunk(packet_size);
102                 
103                 writeU8(&chunk[0], TYPE_SPLIT);
104                 writeU16(&chunk[1], seqnum);
105                 // [3] u16 chunk_count is written at next stage
106                 writeU16(&chunk[5], chunk_num);
107                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
108
109                 chunks.push_back(chunk);
110                 chunk_count++;
111                 
112                 start = end + 1;
113                 chunk_num++;
114         }
115         while(end != data.getSize() - 1);
116
117         for(std::list<SharedBuffer<u8> >::iterator i = chunks.begin();
118                 i != chunks.end(); ++i)
119         {
120                 // Write chunk_count
121                 writeU16(&((*i)[3]), chunk_count);
122         }
123
124         return chunks;
125 }
126
127 std::list<SharedBuffer<u8> > makeAutoSplitPacket(
128                 SharedBuffer<u8> data,
129                 u32 chunksize_max,
130                 u16 &split_seqnum)
131 {
132         u32 original_header_size = 1;
133         std::list<SharedBuffer<u8> > list;
134         if(data.getSize() + original_header_size > chunksize_max)
135         {
136                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
137                 split_seqnum++;
138                 return list;
139         }
140         else
141         {
142                 list.push_back(makeOriginalPacket(data));
143         }
144         return list;
145 }
146
147 SharedBuffer<u8> makeReliablePacket(
148                 SharedBuffer<u8> data,
149                 u16 seqnum)
150 {
151         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
152         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
153                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
154         u32 header_size = 3;
155         u32 packet_size = data.getSize() + header_size;
156         SharedBuffer<u8> b(packet_size);
157
158         writeU8(&b[0], TYPE_RELIABLE);
159         writeU16(&b[1], seqnum);
160
161         memcpy(&b[header_size], *data, data.getSize());
162
163         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
164                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
165         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
166         return b;
167 }
168
169 /*
170         ReliablePacketBuffer
171 */
172
173 ReliablePacketBuffer::ReliablePacketBuffer(): m_list_size(0) {}
174
175 void ReliablePacketBuffer::print()
176 {
177         for(std::list<BufferedPacket>::iterator i = m_list.begin();
178                 i != m_list.end();
179                 ++i)
180         {
181                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
182                 dout_con<<s<<" ";
183         }
184 }
185 bool ReliablePacketBuffer::empty()
186 {
187         return m_list.empty();
188 }
189 u32 ReliablePacketBuffer::size()
190 {
191         return m_list_size;
192 }
193 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
194 {
195         std::list<BufferedPacket>::iterator i = m_list.begin();
196         for(; i != m_list.end(); ++i)
197         {
198                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
199                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
200                                 <<", comparing to s="<<s<<std::endl;*/
201                 if(s == seqnum)
202                         break;
203         }
204         return i;
205 }
206 RPBSearchResult ReliablePacketBuffer::notFound()
207 {
208         return m_list.end();
209 }
210 u16 ReliablePacketBuffer::getFirstSeqnum()
211 {
212         if(empty())
213                 throw NotFoundException("Buffer is empty");
214         BufferedPacket p = *m_list.begin();
215         return readU16(&p.data[BASE_HEADER_SIZE+1]);
216 }
217 BufferedPacket ReliablePacketBuffer::popFirst()
218 {
219         if(empty())
220                 throw NotFoundException("Buffer is empty");
221         BufferedPacket p = *m_list.begin();
222         m_list.erase(m_list.begin());
223         --m_list_size;
224         return p;
225 }
226 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
227 {
228         RPBSearchResult r = findPacket(seqnum);
229         if(r == notFound()){
230                 dout_con<<"Not found"<<std::endl;
231                 throw NotFoundException("seqnum not found in buffer");
232         }
233         BufferedPacket p = *r;
234         m_list.erase(r);
235         --m_list_size;
236         return p;
237 }
238 void ReliablePacketBuffer::insert(BufferedPacket &p)
239 {
240         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
241         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
242         assert(type == TYPE_RELIABLE);
243         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
244
245         ++m_list_size;
246         // Find the right place for the packet and insert it there
247
248         // If list is empty, just add it
249         if(m_list.empty())
250         {
251                 m_list.push_back(p);
252                 // Done.
253                 return;
254         }
255         // Otherwise find the right place
256         std::list<BufferedPacket>::iterator i = m_list.begin();
257         // Find the first packet in the list which has a higher seqnum
258         for(; i != m_list.end(); ++i){
259                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
260                 if(s == seqnum){
261                         --m_list_size;
262                         throw AlreadyExistsException("Same seqnum in list");
263                 }
264                 if(seqnum_higher(s, seqnum)){
265                         break;
266                 }
267         }
268         // If we're at the end of the list, add the packet to the
269         // end of the list
270         if(i == m_list.end())
271         {
272                 m_list.push_back(p);
273                 // Done.
274                 return;
275         }
276         // Insert before i
277         m_list.insert(i, p);
278 }
279
280 void ReliablePacketBuffer::incrementTimeouts(float dtime)
281 {
282         for(std::list<BufferedPacket>::iterator i = m_list.begin();
283                 i != m_list.end(); ++i)
284         {
285                 i->time += dtime;
286                 i->totaltime += dtime;
287         }
288 }
289
290 void ReliablePacketBuffer::resetTimedOuts(float timeout)
291 {
292         for(std::list<BufferedPacket>::iterator i = m_list.begin();
293                 i != m_list.end(); ++i)
294         {
295                 if(i->time >= timeout)
296                         i->time = 0.0;
297         }
298 }
299
300 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
301 {
302         for(std::list<BufferedPacket>::iterator i = m_list.begin();
303                 i != m_list.end(); ++i)
304         {
305                 if(i->totaltime >= timeout)
306                         return true;
307         }
308         return false;
309 }
310
311 std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
312 {
313         std::list<BufferedPacket> timed_outs;
314         for(std::list<BufferedPacket>::iterator i = m_list.begin();
315                 i != m_list.end(); ++i)
316         {
317                 if(i->time >= timeout)
318                         timed_outs.push_back(*i);
319         }
320         return timed_outs;
321 }
322
323 /*
324         IncomingSplitBuffer
325 */
326
327 IncomingSplitBuffer::~IncomingSplitBuffer()
328 {
329         for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
330                 i != m_buf.end(); ++i)
331         {
332                 delete i->second;
333         }
334 }
335 /*
336         This will throw a GotSplitPacketException when a full
337         split packet is constructed.
338 */
339 SharedBuffer<u8> IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
340 {
341         u32 headersize = BASE_HEADER_SIZE + 7;
342         assert(p.data.getSize() >= headersize);
343         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
344         assert(type == TYPE_SPLIT);
345         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
346         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
347         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
348
349         // Add if doesn't exist
350         if(m_buf.find(seqnum) == m_buf.end())
351         {
352                 IncomingSplitPacket *sp = new IncomingSplitPacket();
353                 sp->chunk_count = chunk_count;
354                 sp->reliable = reliable;
355                 m_buf[seqnum] = sp;
356         }
357         
358         IncomingSplitPacket *sp = m_buf[seqnum];
359         
360         // TODO: These errors should be thrown or something? Dunno.
361         if(chunk_count != sp->chunk_count)
362                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
363                                 <<" != sp->chunk_count="<<sp->chunk_count
364                                 <<std::endl;
365         if(reliable != sp->reliable)
366                 derr_con<<"Connection: WARNING: reliable="<<reliable
367                                 <<" != sp->reliable="<<sp->reliable
368                                 <<std::endl;
369
370         // If chunk already exists, ignore it.
371         // Sometimes two identical packets may arrive when there is network
372         // lag and the server re-sends stuff.
373         if(sp->chunks.find(chunk_num) != sp->chunks.end())
374                 return SharedBuffer<u8>();
375         
376         // Cut chunk data out of packet
377         u32 chunkdatasize = p.data.getSize() - headersize;
378         SharedBuffer<u8> chunkdata(chunkdatasize);
379         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
380         
381         // Set chunk data in buffer
382         sp->chunks[chunk_num] = chunkdata;
383         
384         // If not all chunks are received, return empty buffer
385         if(sp->allReceived() == false)
386                 return SharedBuffer<u8>();
387
388         // Calculate total size
389         u32 totalsize = 0;
390         for(std::map<u16, SharedBuffer<u8> >::iterator i = sp->chunks.begin();
391                 i != sp->chunks.end(); ++i)
392         {
393                 totalsize += i->second.getSize();
394         }
395         
396         SharedBuffer<u8> fulldata(totalsize);
397
398         // Copy chunks to data buffer
399         u32 start = 0;
400         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
401                         chunk_i++)
402         {
403                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
404                 u16 chunkdatasize = buf.getSize();
405                 memcpy(&fulldata[start], *buf, chunkdatasize);
406                 start += chunkdatasize;;
407         }
408
409         // Remove sp from buffer
410         m_buf.erase(seqnum);
411         delete sp;
412
413         return fulldata;
414 }
415 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
416 {
417         std::list<u16> remove_queue;
418         for(std::map<u16, IncomingSplitPacket*>::iterator i = m_buf.begin();
419                 i != m_buf.end(); ++i)
420         {
421                 IncomingSplitPacket *p = i->second;
422                 // Reliable ones are not removed by timeout
423                 if(p->reliable == true)
424                         continue;
425                 p->time += dtime;
426                 if(p->time >= timeout)
427                         remove_queue.push_back(i->first);
428         }
429         for(std::list<u16>::iterator j = remove_queue.begin();
430                 j != remove_queue.end(); ++j)
431         {
432                 dout_con<<"NOTE: Removing timed out unreliable split packet"
433                                 <<std::endl;
434                 delete m_buf[*j];
435                 m_buf.erase(*j);
436         }
437 }
438
439 /*
440         Channel
441 */
442
443 Channel::Channel()
444 {
445         next_outgoing_seqnum = SEQNUM_INITIAL;
446         next_incoming_seqnum = SEQNUM_INITIAL;
447         next_outgoing_split_seqnum = SEQNUM_INITIAL;
448 }
449 Channel::~Channel()
450 {
451 }
452
453 /*
454         Peer
455 */
456
457 Peer::Peer(u16 a_id, Address a_address):
458         address(a_address),
459         id(a_id),
460         timeout_counter(0.0),
461         ping_timer(0.0),
462         resend_timeout(0.5),
463         avg_rtt(-1.0),
464         has_sent_with_id(false),
465         m_sendtime_accu(0),
466         m_max_packets_per_second(10),
467         m_num_sent(0),
468         m_max_num_sent(0),
469         congestion_control_aim_rtt(0.2),
470         congestion_control_max_rate(400),
471         congestion_control_min_rate(10)
472 {
473 }
474 Peer::~Peer()
475 {
476 }
477
478 void Peer::reportRTT(float rtt)
479 {
480         if(rtt >= 0.0){
481                 if(rtt < 0.01){
482                         if(m_max_packets_per_second < congestion_control_max_rate)
483                                 m_max_packets_per_second += 10;
484                 } else if(rtt < congestion_control_aim_rtt){
485                         if(m_max_packets_per_second < congestion_control_max_rate)
486                                 m_max_packets_per_second += 2;
487                 } else {
488                         m_max_packets_per_second *= 0.8;
489                         if(m_max_packets_per_second < congestion_control_min_rate)
490                                 m_max_packets_per_second = congestion_control_min_rate;
491                 }
492         }
493
494         if(rtt < -0.999)
495         {}
496         else if(avg_rtt < 0.0)
497                 avg_rtt = rtt;
498         else
499                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
500         
501         // Calculate resend_timeout
502
503         /*int reliable_count = 0;
504         for(int i=0; i<CHANNEL_COUNT; i++)
505         {
506                 reliable_count += channels[i].outgoing_reliables.size();
507         }
508         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
509                         * ((float)reliable_count * 1);*/
510         
511         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
512         if(timeout < RESEND_TIMEOUT_MIN)
513                 timeout = RESEND_TIMEOUT_MIN;
514         if(timeout > RESEND_TIMEOUT_MAX)
515                 timeout = RESEND_TIMEOUT_MAX;
516         resend_timeout = timeout;
517 }
518                                 
519 /*
520         Connection
521 */
522
523 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
524                 bool ipv6):
525         m_protocol_id(protocol_id),
526         m_max_packet_size(max_packet_size),
527         m_timeout(timeout),
528         m_socket(ipv6),
529         m_peer_id(0),
530         m_bc_peerhandler(NULL),
531         m_bc_receive_timeout(0),
532         m_indentation(0)
533 {
534         m_socket.setTimeoutMs(5);
535
536         Start();
537 }
538
539 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
540                 bool ipv6, PeerHandler *peerhandler):
541         m_protocol_id(protocol_id),
542         m_max_packet_size(max_packet_size),
543         m_timeout(timeout),
544         m_socket(ipv6),
545         m_peer_id(0),
546         m_bc_peerhandler(peerhandler),
547         m_bc_receive_timeout(0),
548         m_indentation(0)
549 {
550         m_socket.setTimeoutMs(5);
551
552         Start();
553 }
554
555
556 Connection::~Connection()
557 {
558         stop();
559         // Delete peers
560         for(std::map<u16, Peer*>::iterator
561                         j = m_peers.begin();
562                         j != m_peers.end(); ++j)
563         {
564                 delete j->second;
565         }
566 }
567
568 /* Internal stuff */
569
570 void * Connection::Thread()
571 {
572         ThreadStarted();
573         log_register_thread("Connection");
574
575         dout_con<<"Connection thread started"<<std::endl;
576         
577         u32 curtime = porting::getTimeMs();
578         u32 lasttime = curtime;
579
580         while(getRun())
581         {
582                 BEGIN_DEBUG_EXCEPTION_HANDLER
583                 
584                 lasttime = curtime;
585                 curtime = porting::getTimeMs();
586                 float dtime = (float)(curtime - lasttime) / 1000.;
587                 if(dtime > 0.1)
588                         dtime = 0.1;
589                 if(dtime < 0.0)
590                         dtime = 0.0;
591                 
592                 runTimeouts(dtime);
593
594                 while(!m_command_queue.empty()){
595                         ConnectionCommand c = m_command_queue.pop_front();
596                         processCommand(c);
597                 }
598
599                 send(dtime);
600
601                 receive();
602                 
603                 END_DEBUG_EXCEPTION_HANDLER(derr_con);
604         }
605
606         return NULL;
607 }
608
609 void Connection::putEvent(ConnectionEvent &e)
610 {
611         assert(e.type != CONNEVENT_NONE);
612         m_event_queue.push_back(e);
613 }
614
615 void Connection::processCommand(ConnectionCommand &c)
616 {
617         switch(c.type){
618         case CONNCMD_NONE:
619                 dout_con<<getDesc()<<" processing CONNCMD_NONE"<<std::endl;
620                 return;
621         case CONNCMD_SERVE:
622                 dout_con<<getDesc()<<" processing CONNCMD_SERVE port="
623                                 <<c.port<<std::endl;
624                 serve(c.port);
625                 return;
626         case CONNCMD_CONNECT:
627                 dout_con<<getDesc()<<" processing CONNCMD_CONNECT"<<std::endl;
628                 connect(c.address);
629                 return;
630         case CONNCMD_DISCONNECT:
631                 dout_con<<getDesc()<<" processing CONNCMD_DISCONNECT"<<std::endl;
632                 disconnect();
633                 return;
634         case CONNCMD_SEND:
635                 dout_con<<getDesc()<<" processing CONNCMD_SEND"<<std::endl;
636                 send(c.peer_id, c.channelnum, c.data, c.reliable);
637                 return;
638         case CONNCMD_SEND_TO_ALL:
639                 dout_con<<getDesc()<<" processing CONNCMD_SEND_TO_ALL"<<std::endl;
640                 sendToAll(c.channelnum, c.data, c.reliable);
641                 return;
642         case CONNCMD_DELETE_PEER:
643                 dout_con<<getDesc()<<" processing CONNCMD_DELETE_PEER"<<std::endl;
644                 deletePeer(c.peer_id, false);
645                 return;
646         }
647 }
648
649 void Connection::send(float dtime)
650 {
651         for(std::map<u16, Peer*>::iterator
652                         j = m_peers.begin();
653                         j != m_peers.end(); ++j)
654         {
655                 Peer *peer = j->second;
656                 peer->m_sendtime_accu += dtime;
657                 peer->m_num_sent = 0;
658                 peer->m_max_num_sent = peer->m_sendtime_accu *
659                                 peer->m_max_packets_per_second;
660         }
661         Queue<OutgoingPacket> postponed_packets;
662         while(!m_outgoing_queue.empty()){
663                 OutgoingPacket packet = m_outgoing_queue.pop_front();
664                 Peer *peer = getPeerNoEx(packet.peer_id);
665                 if(!peer)
666                         continue;
667                 if(peer->channels[packet.channelnum].outgoing_reliables.size() >= 5){
668                         postponed_packets.push_back(packet);
669                 } else if(peer->m_num_sent < peer->m_max_num_sent){
670                         rawSendAsPacket(packet.peer_id, packet.channelnum,
671                                         packet.data, packet.reliable);
672                         peer->m_num_sent++;
673                 } else {
674                         postponed_packets.push_back(packet);
675                 }
676         }
677         while(!postponed_packets.empty()){
678                 m_outgoing_queue.push_back(postponed_packets.pop_front());
679         }
680         for(std::map<u16, Peer*>::iterator
681                         j = m_peers.begin();
682                         j != m_peers.end(); ++j)
683         {
684                 Peer *peer = j->second;
685                 peer->m_sendtime_accu -= (float)peer->m_num_sent /
686                                 peer->m_max_packets_per_second;
687                 if(peer->m_sendtime_accu > 10. / peer->m_max_packets_per_second)
688                         peer->m_sendtime_accu = 10. / peer->m_max_packets_per_second;
689         }
690 }
691
692 // Receive packets from the network and buffers and create ConnectionEvents
693 void Connection::receive()
694 {
695         u32 datasize = m_max_packet_size * 2;  // Double it just to be safe
696         // TODO: We can not know how many layers of header there are.
697         // For now, just assume there are no other than the base headers.
698         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
699         SharedBuffer<u8> packetdata(packet_maxsize);
700
701         bool single_wait_done = false;
702         
703         for(;;)
704         {
705         try{
706                 /* Check if some buffer has relevant data */
707                 {
708                         u16 peer_id;
709                         SharedBuffer<u8> resultdata;
710                         bool got = getFromBuffers(peer_id, resultdata);
711                         if(got){
712                                 ConnectionEvent e;
713                                 e.dataReceived(peer_id, resultdata);
714                                 putEvent(e);
715                                 continue;
716                         }
717                 }
718                 
719                 if(single_wait_done){
720                         if(m_socket.WaitData(0) == false)
721                                 break;
722                 }
723                 
724                 single_wait_done = true;
725
726                 Address sender;
727                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
728
729                 if(received_size < 0)
730                         break;
731                 if(received_size < BASE_HEADER_SIZE)
732                         continue;
733                 if(readU32(&packetdata[0]) != m_protocol_id)
734                         continue;
735                 
736                 u16 peer_id = readPeerId(*packetdata);
737                 u8 channelnum = readChannel(*packetdata);
738                 if(channelnum > CHANNEL_COUNT-1){
739                         PrintInfo(derr_con);
740                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
741                         throw InvalidIncomingDataException("Channel doesn't exist");
742                 }
743
744                 if(peer_id == PEER_ID_INEXISTENT)
745                 {
746                         /*
747                                 Somebody is trying to send stuff to us with no peer id.
748                                 
749                                 Check if the same address and port was added to our peer
750                                 list before.
751                                 Allow only entries that have has_sent_with_id==false.
752                         */
753
754                         std::map<u16, Peer*>::iterator j;
755                         j = m_peers.begin();
756                         for(; j != m_peers.end(); ++j)
757                         {
758                                 Peer *peer = j->second;
759                                 if(peer->has_sent_with_id)
760                                         continue;
761                                 if(peer->address == sender)
762                                         break;
763                         }
764                         
765                         /*
766                                 If no peer was found with the same address and port,
767                                 we shall assume it is a new peer and create an entry.
768                         */
769                         if(j == m_peers.end())
770                         {
771                                 // Pass on to adding the peer
772                         }
773                         // Else: A peer was found.
774                         else
775                         {
776                                 Peer *peer = j->second;
777                                 peer_id = peer->id;
778                                 PrintInfo(derr_con);
779                                 derr_con<<"WARNING: Assuming unknown peer to be "
780                                                 <<"peer_id="<<peer_id<<std::endl;
781                         }
782                 }
783                 
784                 /*
785                         The peer was not found in our lists. Add it.
786                 */
787                 if(peer_id == PEER_ID_INEXISTENT)
788                 {
789                         // Somebody wants to make a new connection
790
791                         // Get a unique peer id (2 or higher)
792                         u16 peer_id_new = 2;
793                         /*
794                                 Find an unused peer id
795                         */
796                         bool out_of_ids = false;
797                         for(;;)
798                         {
799                                 // Check if exists
800                                 if(m_peers.find(peer_id_new) == m_peers.end())
801                                         break;
802                                 // Check for overflow
803                                 if(peer_id_new == 65535){
804                                         out_of_ids = true;
805                                         break;
806                                 }
807                                 peer_id_new++;
808                         }
809                         if(out_of_ids){
810                                 errorstream<<getDesc()<<" ran out of peer ids"<<std::endl;
811                                 continue;
812                         }
813
814                         PrintInfo();
815                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_INEXISTENT,"
816                                         " giving peer_id="<<peer_id_new<<std::endl;
817
818                         // Create a peer
819                         Peer *peer = new Peer(peer_id_new, sender);
820                         m_peers[peer->id] = peer;
821                         
822                         // Create peer addition event
823                         ConnectionEvent e;
824                         e.peerAdded(peer_id_new, sender);
825                         putEvent(e);
826                         
827                         // Create CONTROL packet to tell the peer id to the new peer.
828                         SharedBuffer<u8> reply(4);
829                         writeU8(&reply[0], TYPE_CONTROL);
830                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
831                         writeU16(&reply[2], peer_id_new);
832                         sendAsPacket(peer_id_new, 0, reply, true);
833                         
834                         // We're now talking to a valid peer_id
835                         peer_id = peer_id_new;
836
837                         // Go on and process whatever it sent
838                 }
839
840                 std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
841
842                 if(node == m_peers.end())
843                 {
844                         // Peer not found
845                         // This means that the peer id of the sender is not PEER_ID_INEXISTENT
846                         // and it is invalid.
847                         PrintInfo(derr_con);
848                         derr_con<<"Receive(): Peer not found"<<std::endl;
849                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
850                 }
851
852                 Peer *peer = node->second;
853
854                 // Validate peer address
855                 if(peer->address != sender)
856                 {
857                         PrintInfo(derr_con);
858                         derr_con<<"Peer "<<peer_id<<" sending from different address."
859                                         " Ignoring."<<std::endl;
860                         continue;
861                 }
862                 
863                 peer->timeout_counter = 0.0;
864
865                 Channel *channel = &(peer->channels[channelnum]);
866                 
867                 // Throw the received packet to channel->processPacket()
868
869                 // Make a new SharedBuffer from the data without the base headers
870                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
871                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
872                                 strippeddata.getSize());
873                 
874                 try{
875                         // Process it (the result is some data with no headers made by us)
876                         SharedBuffer<u8> resultdata = processPacket
877                                         (channel, strippeddata, peer_id, channelnum, false);
878                         
879                         PrintInfo();
880                         dout_con<<"ProcessPacket returned data of size "
881                                         <<resultdata.getSize()<<std::endl;
882                         
883                         ConnectionEvent e;
884                         e.dataReceived(peer_id, resultdata);
885                         putEvent(e);
886                         continue;
887                 }catch(ProcessedSilentlyException &e){
888                 }
889         }catch(InvalidIncomingDataException &e){
890         }
891         catch(ProcessedSilentlyException &e){
892         }
893         } // for
894 }
895
896 void Connection::runTimeouts(float dtime)
897 {
898         float congestion_control_aim_rtt
899                         = g_settings->getFloat("congestion_control_aim_rtt");
900         float congestion_control_max_rate
901                         = g_settings->getFloat("congestion_control_max_rate");
902         float congestion_control_min_rate
903                         = g_settings->getFloat("congestion_control_min_rate");
904
905         std::list<u16> timeouted_peers;
906         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
907                 j != m_peers.end(); ++j)
908         {
909                 Peer *peer = j->second;
910
911                 // Update congestion control values
912                 peer->congestion_control_aim_rtt = congestion_control_aim_rtt;
913                 peer->congestion_control_max_rate = congestion_control_max_rate;
914                 peer->congestion_control_min_rate = congestion_control_min_rate;
915                 
916                 /*
917                         Check peer timeout
918                 */
919                 peer->timeout_counter += dtime;
920                 if(peer->timeout_counter > m_timeout)
921                 {
922                         PrintInfo(derr_con);
923                         derr_con<<"RunTimeouts(): Peer "<<peer->id
924                                         <<" has timed out."
925                                         <<" (source=peer->timeout_counter)"
926                                         <<std::endl;
927                         // Add peer to the list
928                         timeouted_peers.push_back(peer->id);
929                         // Don't bother going through the buffers of this one
930                         continue;
931                 }
932
933                 float resend_timeout = peer->resend_timeout;
934                 for(u16 i=0; i<CHANNEL_COUNT; i++)
935                 {
936                         std::list<BufferedPacket> timed_outs;
937                         
938                         Channel *channel = &peer->channels[i];
939
940                         // Remove timed out incomplete unreliable split packets
941                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
942                         
943                         // Increment reliable packet times
944                         channel->outgoing_reliables.incrementTimeouts(dtime);
945
946                         // Check reliable packet total times, remove peer if
947                         // over timeout.
948                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
949                         {
950                                 PrintInfo(derr_con);
951                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
952                                                 <<" has timed out."
953                                                 <<" (source=reliable packet totaltime)"
954                                                 <<std::endl;
955                                 // Add peer to the to-be-removed list
956                                 timeouted_peers.push_back(peer->id);
957                                 goto nextpeer;
958                         }
959
960                         // Re-send timed out outgoing reliables
961                         
962                         timed_outs = channel->
963                                         outgoing_reliables.getTimedOuts(resend_timeout);
964
965                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
966
967                         for(std::list<BufferedPacket>::iterator j = timed_outs.begin();
968                                 j != timed_outs.end(); ++j)
969                         {
970                                 u16 peer_id = readPeerId(*(j->data));
971                                 u8 channel = readChannel(*(j->data));
972                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
973
974                                 PrintInfo(derr_con);
975                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
976                                 j->address.print(&derr_con);
977                                 derr_con<<"(t/o="<<resend_timeout<<"): "
978                                                 <<"from_peer_id="<<peer_id
979                                                 <<", channel="<<((int)channel&0xff)
980                                                 <<", seqnum="<<seqnum
981                                                 <<std::endl;
982
983                                 rawSend(*j);
984
985                                 // Enlarge avg_rtt and resend_timeout:
986                                 // The rtt will be at least the timeout.
987                                 // NOTE: This won't affect the timeout of the next
988                                 // checked channel because it was cached.
989                                 peer->reportRTT(resend_timeout);
990                         }
991                 }
992                 
993                 /*
994                         Send pings
995                 */
996                 peer->ping_timer += dtime;
997                 if(peer->ping_timer >= 5.0)
998                 {
999                         // Create and send PING packet
1000                         SharedBuffer<u8> data(2);
1001                         writeU8(&data[0], TYPE_CONTROL);
1002                         writeU8(&data[1], CONTROLTYPE_PING);
1003                         rawSendAsPacket(peer->id, 0, data, true);
1004
1005                         peer->ping_timer = 0.0;
1006                 }
1007                 
1008 nextpeer:
1009                 continue;
1010         }
1011
1012         // Remove timed out peers
1013         for(std::list<u16>::iterator i = timeouted_peers.begin();
1014                 i != timeouted_peers.end(); ++i)
1015         {
1016                 PrintInfo(derr_con);
1017                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1018                 deletePeer(*i, true);
1019         }
1020 }
1021
1022 void Connection::serve(u16 port)
1023 {
1024         dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
1025         try{
1026                 m_socket.Bind(port);
1027                 m_peer_id = PEER_ID_SERVER;
1028         }
1029         catch(SocketException &e){
1030                 // Create event
1031                 ConnectionEvent ce;
1032                 ce.bindFailed();
1033                 putEvent(ce);
1034         }
1035 }
1036
1037 void Connection::connect(Address address)
1038 {
1039         dout_con<<getDesc()<<" connecting to "<<address.serializeString()
1040                         <<":"<<address.getPort()<<std::endl;
1041
1042         std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1043         if(node != m_peers.end()){
1044                 throw ConnectionException("Already connected to a server");
1045         }
1046
1047         Peer *peer = new Peer(PEER_ID_SERVER, address);
1048         m_peers[peer->id] = peer;
1049
1050         // Create event
1051         ConnectionEvent e;
1052         e.peerAdded(peer->id, peer->address);
1053         putEvent(e);
1054         
1055         m_socket.Bind(0);
1056         
1057         // Send a dummy packet to server with peer_id = PEER_ID_INEXISTENT
1058         m_peer_id = PEER_ID_INEXISTENT;
1059         SharedBuffer<u8> data(0);
1060         Send(PEER_ID_SERVER, 0, data, true);
1061 }
1062
1063 void Connection::disconnect()
1064 {
1065         dout_con<<getDesc()<<" disconnecting"<<std::endl;
1066
1067         // Create and send DISCO packet
1068         SharedBuffer<u8> data(2);
1069         writeU8(&data[0], TYPE_CONTROL);
1070         writeU8(&data[1], CONTROLTYPE_DISCO);
1071         
1072         // Send to all
1073         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1074                 j != m_peers.end(); ++j)
1075         {
1076                 Peer *peer = j->second;
1077                 rawSendAsPacket(peer->id, 0, data, false);
1078         }
1079 }
1080
1081 void Connection::sendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1082 {
1083         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1084                 j != m_peers.end(); ++j)
1085         {
1086                 Peer *peer = j->second;
1087                 send(peer->id, channelnum, data, reliable);
1088         }
1089 }
1090
1091 void Connection::send(u16 peer_id, u8 channelnum,
1092                 SharedBuffer<u8> data, bool reliable)
1093 {
1094         dout_con<<getDesc()<<" sending to peer_id="<<peer_id<<std::endl;
1095
1096         assert(channelnum < CHANNEL_COUNT);
1097         
1098         Peer *peer = getPeerNoEx(peer_id);
1099         if(peer == NULL)
1100                 return;
1101         Channel *channel = &(peer->channels[channelnum]);
1102
1103         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1104         if(reliable)
1105                 chunksize_max -= RELIABLE_HEADER_SIZE;
1106
1107         std::list<SharedBuffer<u8> > originals;
1108         originals = makeAutoSplitPacket(data, chunksize_max,
1109                         channel->next_outgoing_split_seqnum);
1110         
1111         for(std::list<SharedBuffer<u8> >::iterator i = originals.begin();
1112                 i != originals.end(); ++i)
1113         {
1114                 SharedBuffer<u8> original = *i;
1115                 
1116                 sendAsPacket(peer_id, channelnum, original, reliable);
1117         }
1118 }
1119
1120 void Connection::sendAsPacket(u16 peer_id, u8 channelnum,
1121                 SharedBuffer<u8> data, bool reliable)
1122 {
1123         OutgoingPacket packet(peer_id, channelnum, data, reliable);
1124         m_outgoing_queue.push_back(packet);
1125 }
1126
1127 void Connection::rawSendAsPacket(u16 peer_id, u8 channelnum,
1128                 SharedBuffer<u8> data, bool reliable)
1129 {
1130         Peer *peer = getPeerNoEx(peer_id);
1131         if(!peer)
1132                 return;
1133         Channel *channel = &(peer->channels[channelnum]);
1134
1135         if(reliable)
1136         {
1137                 u16 seqnum = channel->next_outgoing_seqnum;
1138                 channel->next_outgoing_seqnum++;
1139
1140                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1141
1142                 // Add base headers and make a packet
1143                 BufferedPacket p = makePacket(peer->address, reliable,
1144                                 m_protocol_id, m_peer_id, channelnum);
1145                 
1146                 try{
1147                         // Buffer the packet
1148                         channel->outgoing_reliables.insert(p);
1149                 }
1150                 catch(AlreadyExistsException &e)
1151                 {
1152                         PrintInfo(derr_con);
1153                         derr_con<<"WARNING: Going to send a reliable packet "
1154                                         "seqnum="<<seqnum<<" that is already "
1155                                         "in outgoing buffer"<<std::endl;
1156                         //assert(0);
1157                 }
1158                 
1159                 // Send the packet
1160                 rawSend(p);
1161         }
1162         else
1163         {
1164                 // Add base headers and make a packet
1165                 BufferedPacket p = makePacket(peer->address, data,
1166                                 m_protocol_id, m_peer_id, channelnum);
1167
1168                 // Send the packet
1169                 rawSend(p);
1170         }
1171 }
1172
1173 void Connection::rawSend(const BufferedPacket &packet)
1174 {
1175         try{
1176                 m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1177         } catch(SendFailedException &e){
1178                 derr_con<<"Connection::rawSend(): SendFailedException: "
1179                                 <<packet.address.serializeString()<<std::endl;
1180         }
1181 }
1182
1183 Peer* Connection::getPeer(u16 peer_id)
1184 {
1185         std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1186
1187         if(node == m_peers.end()){
1188                 throw PeerNotFoundException("GetPeer: Peer not found (possible timeout)");
1189         }
1190
1191         // Error checking
1192         assert(node->second->id == peer_id);
1193
1194         return node->second;
1195 }
1196
1197 Peer* Connection::getPeerNoEx(u16 peer_id)
1198 {
1199         std::map<u16, Peer*>::iterator node = m_peers.find(peer_id);
1200
1201         if(node == m_peers.end()){
1202                 return NULL;
1203         }
1204
1205         // Error checking
1206         assert(node->second->id == peer_id);
1207
1208         return node->second;
1209 }
1210
1211 std::list<Peer*> Connection::getPeers()
1212 {
1213         std::list<Peer*> list;
1214         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1215                 j != m_peers.end(); ++j)
1216         {
1217                 Peer *peer = j->second;
1218                 list.push_back(peer);
1219         }
1220         return list;
1221 }
1222
1223 bool Connection::getFromBuffers(u16 &peer_id, SharedBuffer<u8> &dst)
1224 {
1225         for(std::map<u16, Peer*>::iterator j = m_peers.begin();
1226                 j != m_peers.end(); ++j)
1227         {
1228                 Peer *peer = j->second;
1229                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1230                 {
1231                         Channel *channel = &peer->channels[i];
1232                         SharedBuffer<u8> resultdata;
1233                         bool got = checkIncomingBuffers(channel, peer_id, resultdata);
1234                         if(got){
1235                                 dst = resultdata;
1236                                 return true;
1237                         }
1238                 }
1239         }
1240         return false;
1241 }
1242
1243 bool Connection::checkIncomingBuffers(Channel *channel, u16 &peer_id,
1244                 SharedBuffer<u8> &dst)
1245 {
1246         u16 firstseqnum = 0;
1247         // Clear old packets from start of buffer
1248         try{
1249         for(;;){
1250                 firstseqnum = channel->incoming_reliables.getFirstSeqnum();
1251                 if(seqnum_higher(channel->next_incoming_seqnum, firstseqnum))
1252                         channel->incoming_reliables.popFirst();
1253                 else
1254                         break;
1255         }
1256         // This happens if all packets are old
1257         }catch(con::NotFoundException)
1258         {}
1259         
1260         if(channel->incoming_reliables.empty() == false)
1261         {
1262                 if(firstseqnum == channel->next_incoming_seqnum)
1263                 {
1264                         BufferedPacket p = channel->incoming_reliables.popFirst();
1265                         
1266                         peer_id = readPeerId(*p.data);
1267                         u8 channelnum = readChannel(*p.data);
1268                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
1269
1270                         PrintInfo();
1271                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
1272                                         <<" seqnum="<<seqnum
1273                                         <<" peer_id="<<peer_id
1274                                         <<" channel="<<((int)channelnum&0xff)
1275                                         <<std::endl;
1276
1277                         channel->next_incoming_seqnum++;
1278                         
1279                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
1280                         // Get out the inside packet and re-process it
1281                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
1282                         memcpy(*payload, &p.data[headers_size], payload.getSize());
1283
1284                         dst = processPacket(channel, payload, peer_id, channelnum, true);
1285                         return true;
1286                 }
1287         }
1288         return false;
1289 }
1290
1291 SharedBuffer<u8> Connection::processPacket(Channel *channel,
1292                 SharedBuffer<u8> packetdata, u16 peer_id,
1293                 u8 channelnum, bool reliable)
1294 {
1295         IndentationRaiser iraiser(&(m_indentation));
1296
1297         if(packetdata.getSize() < 1)
1298                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
1299
1300         u8 type = readU8(&packetdata[0]);
1301         
1302         if(type == TYPE_CONTROL)
1303         {
1304                 if(packetdata.getSize() < 2)
1305                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
1306
1307                 u8 controltype = readU8(&packetdata[1]);
1308
1309                 if(controltype == CONTROLTYPE_ACK)
1310                 {
1311                         if(packetdata.getSize() < 4)
1312                                 throw InvalidIncomingDataException
1313                                                 ("packetdata.getSize() < 4 (ACK header size)");
1314
1315                         u16 seqnum = readU16(&packetdata[2]);
1316                         PrintInfo();
1317                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
1318                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
1319                                         <<", seqnum="<<seqnum<<std::endl;
1320
1321                         try{
1322                                 BufferedPacket p = channel->outgoing_reliables.popSeqnum(seqnum);
1323                                 // Get round trip time
1324                                 float rtt = p.totaltime;
1325
1326                                 // Let peer calculate stuff according to it
1327                                 // (avg_rtt and resend_timeout)
1328                                 Peer *peer = getPeer(peer_id);
1329                                 peer->reportRTT(rtt);
1330
1331                                 //PrintInfo(dout_con);
1332                                 //dout_con<<"RTT = "<<rtt<<std::endl;
1333
1334                                 /*dout_con<<"OUTGOING: ";
1335                                 PrintInfo();
1336                                 channel->outgoing_reliables.print();
1337                                 dout_con<<std::endl;*/
1338                         }
1339                         catch(NotFoundException &e){
1340                                 PrintInfo(derr_con);
1341                                 derr_con<<"WARNING: ACKed packet not "
1342                                                 "in outgoing queue"
1343                                                 <<std::endl;
1344                         }
1345
1346                         throw ProcessedSilentlyException("Got an ACK");
1347                 }
1348                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
1349                 {
1350                         if(packetdata.getSize() < 4)
1351                                 throw InvalidIncomingDataException
1352                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
1353                         u16 peer_id_new = readU16(&packetdata[2]);
1354                         PrintInfo();
1355                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
1356
1357                         if(GetPeerID() != PEER_ID_INEXISTENT)
1358                         {
1359                                 PrintInfo(derr_con);
1360                                 derr_con<<"WARNING: Not changing"
1361                                                 " existing peer id."<<std::endl;
1362                         }
1363                         else
1364                         {
1365                                 dout_con<<"changing."<<std::endl;
1366                                 SetPeerID(peer_id_new);
1367                         }
1368                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
1369                 }
1370                 else if(controltype == CONTROLTYPE_PING)
1371                 {
1372                         // Just ignore it, the incoming data already reset
1373                         // the timeout counter
1374                         PrintInfo();
1375                         dout_con<<"PING"<<std::endl;
1376                         throw ProcessedSilentlyException("Got a PING");
1377                 }
1378                 else if(controltype == CONTROLTYPE_DISCO)
1379                 {
1380                         // Just ignore it, the incoming data already reset
1381                         // the timeout counter
1382                         PrintInfo();
1383                         dout_con<<"DISCO: Removing peer "<<(peer_id)<<std::endl;
1384                         
1385                         if(deletePeer(peer_id, false) == false)
1386                         {
1387                                 PrintInfo(derr_con);
1388                                 derr_con<<"DISCO: Peer not found"<<std::endl;
1389                         }
1390
1391                         throw ProcessedSilentlyException("Got a DISCO");
1392                 }
1393                 else{
1394                         PrintInfo(derr_con);
1395                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
1396                                         <<((int)controltype&0xff)<<std::endl;
1397                         throw InvalidIncomingDataException("Invalid control type");
1398                 }
1399         }
1400         else if(type == TYPE_ORIGINAL)
1401         {
1402                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
1403                         throw InvalidIncomingDataException
1404                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
1405                 PrintInfo();
1406                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
1407                                 <<std::endl;
1408                 // Get the inside packet out and return it
1409                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
1410                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
1411                 return payload;
1412         }
1413         else if(type == TYPE_SPLIT)
1414         {
1415                 // We have to create a packet again for buffering
1416                 // This isn't actually too bad an idea.
1417                 BufferedPacket packet = makePacket(
1418                                 getPeer(peer_id)->address,
1419                                 packetdata,
1420                                 GetProtocolID(),
1421                                 peer_id,
1422                                 channelnum);
1423                 // Buffer the packet
1424                 SharedBuffer<u8> data = channel->incoming_splits.insert(packet, reliable);
1425                 if(data.getSize() != 0)
1426                 {
1427                         PrintInfo();
1428                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
1429                                         <<"size="<<data.getSize()<<std::endl;
1430                         return data;
1431                 }
1432                 PrintInfo();
1433                 dout_con<<"BUFFERED TYPE_SPLIT"<<std::endl;
1434                 throw ProcessedSilentlyException("Buffered a split packet chunk");
1435         }
1436         else if(type == TYPE_RELIABLE)
1437         {
1438                 // Recursive reliable packets not allowed
1439                 if(reliable)
1440                         throw InvalidIncomingDataException("Found nested reliable packets");
1441
1442                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
1443                         throw InvalidIncomingDataException
1444                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
1445
1446                 u16 seqnum = readU16(&packetdata[1]);
1447
1448                 bool is_future_packet = seqnum_higher(seqnum, channel->next_incoming_seqnum);
1449                 bool is_old_packet = seqnum_higher(channel->next_incoming_seqnum, seqnum);
1450                 
1451                 PrintInfo();
1452                 if(is_future_packet)
1453                         dout_con<<"BUFFERING";
1454                 else if(is_old_packet)
1455                         dout_con<<"OLD";
1456                 else
1457                         dout_con<<"RECUR";
1458                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
1459                                 <<" next="<<channel->next_incoming_seqnum;
1460                 dout_con<<" [sending CONTROLTYPE_ACK"
1461                                 " to peer_id="<<peer_id<<"]";
1462                 dout_con<<std::endl;
1463                 
1464                 //DEBUG
1465                 //assert(channel->incoming_reliables.size() < 100);
1466
1467                 // Send a CONTROLTYPE_ACK
1468                 SharedBuffer<u8> reply(4);
1469                 writeU8(&reply[0], TYPE_CONTROL);
1470                 writeU8(&reply[1], CONTROLTYPE_ACK);
1471                 writeU16(&reply[2], seqnum);
1472                 rawSendAsPacket(peer_id, channelnum, reply, false);
1473
1474                 //if(seqnum_higher(seqnum, channel->next_incoming_seqnum))
1475                 if(is_future_packet)
1476                 {
1477                         /*PrintInfo();
1478                         dout_con<<"Buffering reliable packet (seqnum="
1479                                         <<seqnum<<")"<<std::endl;*/
1480                         
1481                         // This one comes later, buffer it.
1482                         // Actually we have to make a packet to buffer one.
1483                         // Well, we have all the ingredients, so just do it.
1484                         BufferedPacket packet = makePacket(
1485                                         getPeer(peer_id)->address,
1486                                         packetdata,
1487                                         GetProtocolID(),
1488                                         peer_id,
1489                                         channelnum);
1490                         try{
1491                                 channel->incoming_reliables.insert(packet);
1492                                 
1493                                 /*PrintInfo();
1494                                 dout_con<<"INCOMING: ";
1495                                 channel->incoming_reliables.print();
1496                                 dout_con<<std::endl;*/
1497                         }
1498                         catch(AlreadyExistsException &e)
1499                         {
1500                         }
1501
1502                         throw ProcessedSilentlyException("Buffered future reliable packet");
1503                 }
1504                 //else if(seqnum_higher(channel->next_incoming_seqnum, seqnum))
1505                 else if(is_old_packet)
1506                 {
1507                         // An old packet, dump it
1508                         throw InvalidIncomingDataException("Got an old reliable packet");
1509                 }
1510
1511                 channel->next_incoming_seqnum++;
1512
1513                 // Get out the inside packet and re-process it
1514                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
1515                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
1516
1517                 return processPacket(channel, payload, peer_id, channelnum, true);
1518         }
1519         else
1520         {
1521                 PrintInfo(derr_con);
1522                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
1523                 throw InvalidIncomingDataException("Invalid packet type");
1524         }
1525         
1526         // We should never get here.
1527         // If you get here, add an exception or a return to some of the
1528         // above conditionals.
1529         assert(0);
1530         throw BaseException("Error in Channel::ProcessPacket()");
1531 }
1532
1533 bool Connection::deletePeer(u16 peer_id, bool timeout)
1534 {
1535         if(m_peers.find(peer_id) == m_peers.end())
1536                 return false;
1537         
1538         Peer *peer = m_peers[peer_id];
1539
1540         // Create event
1541         ConnectionEvent e;
1542         e.peerRemoved(peer_id, timeout, peer->address);
1543         putEvent(e);
1544
1545         delete m_peers[peer_id];
1546         m_peers.erase(peer_id);
1547         return true;
1548 }
1549
1550 /* Interface */
1551
1552 ConnectionEvent Connection::getEvent()
1553 {
1554         if(m_event_queue.empty()){
1555                 ConnectionEvent e;
1556                 e.type = CONNEVENT_NONE;
1557                 return e;
1558         }
1559         return m_event_queue.pop_front();
1560 }
1561
1562 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1563 {
1564         try{
1565                 return m_event_queue.pop_front(timeout_ms);
1566         } catch(ItemNotFoundException &ex){
1567                 ConnectionEvent e;
1568                 e.type = CONNEVENT_NONE;
1569                 return e;
1570         }
1571 }
1572
1573 void Connection::putCommand(ConnectionCommand &c)
1574 {
1575         m_command_queue.push_back(c);
1576 }
1577
1578 void Connection::Serve(unsigned short port)
1579 {
1580         ConnectionCommand c;
1581         c.serve(port);
1582         putCommand(c);
1583 }
1584
1585 void Connection::Connect(Address address)
1586 {
1587         ConnectionCommand c;
1588         c.connect(address);
1589         putCommand(c);
1590 }
1591
1592 bool Connection::Connected()
1593 {
1594         JMutexAutoLock peerlock(m_peers_mutex);
1595
1596         if(m_peers.size() != 1)
1597                 return false;
1598                 
1599         std::map<u16, Peer*>::iterator node = m_peers.find(PEER_ID_SERVER);
1600         if(node == m_peers.end())
1601                 return false;
1602         
1603         if(m_peer_id == PEER_ID_INEXISTENT)
1604                 return false;
1605         
1606         return true;
1607 }
1608
1609 void Connection::Disconnect()
1610 {
1611         ConnectionCommand c;
1612         c.disconnect();
1613         putCommand(c);
1614 }
1615
1616 u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
1617 {
1618         for(;;){
1619                 ConnectionEvent e = waitEvent(m_bc_receive_timeout);
1620                 if(e.type != CONNEVENT_NONE)
1621                         dout_con<<getDesc()<<": Receive: got event: "
1622                                         <<e.describe()<<std::endl;
1623                 switch(e.type){
1624                 case CONNEVENT_NONE:
1625                         throw NoIncomingDataException("No incoming data");
1626                 case CONNEVENT_DATA_RECEIVED:
1627                         peer_id = e.peer_id;
1628                         data = SharedBuffer<u8>(e.data);
1629                         return e.data.getSize();
1630                 case CONNEVENT_PEER_ADDED: {
1631                         Peer tmp(e.peer_id, e.address);
1632                         if(m_bc_peerhandler)
1633                                 m_bc_peerhandler->peerAdded(&tmp);
1634                         continue; }
1635                 case CONNEVENT_PEER_REMOVED: {
1636                         Peer tmp(e.peer_id, e.address);
1637                         if(m_bc_peerhandler)
1638                                 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1639                         continue; }
1640                 case CONNEVENT_BIND_FAILED:
1641                         throw ConnectionBindFailed("Failed to bind socket "
1642                                         "(port already in use?)");
1643                 }
1644         }
1645         throw NoIncomingDataException("No incoming data");
1646 }
1647
1648 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1649 {
1650         assert(channelnum < CHANNEL_COUNT);
1651
1652         ConnectionCommand c;
1653         c.sendToAll(channelnum, data, reliable);
1654         putCommand(c);
1655 }
1656
1657 void Connection::Send(u16 peer_id, u8 channelnum,
1658                 SharedBuffer<u8> data, bool reliable)
1659 {
1660         assert(channelnum < CHANNEL_COUNT);
1661
1662         ConnectionCommand c;
1663         c.send(peer_id, channelnum, data, reliable);
1664         putCommand(c);
1665 }
1666
1667 void Connection::RunTimeouts(float dtime)
1668 {
1669         // No-op
1670 }
1671
1672 Address Connection::GetPeerAddress(u16 peer_id)
1673 {
1674         JMutexAutoLock peerlock(m_peers_mutex);
1675         return getPeer(peer_id)->address;
1676 }
1677
1678 float Connection::GetPeerAvgRTT(u16 peer_id)
1679 {
1680         JMutexAutoLock peerlock(m_peers_mutex);
1681         return getPeer(peer_id)->avg_rtt;
1682 }
1683
1684 void Connection::DeletePeer(u16 peer_id)
1685 {
1686         ConnectionCommand c;
1687         c.deletePeer(peer_id);
1688         putCommand(c);
1689 }
1690
1691 void Connection::PrintInfo(std::ostream &out)
1692 {
1693         out<<getDesc()<<": ";
1694 }
1695
1696 void Connection::PrintInfo()
1697 {
1698         PrintInfo(dout_con);
1699 }
1700
1701 std::string Connection::getDesc()
1702 {
1703         return std::string("con(")+itos(m_socket.GetHandle())+"/"+itos(m_peer_id)+")";
1704 }
1705
1706 } // namespace
1707