]> git.lizzy.rs Git - minetest.git/blob - src/connection.cpp
42bfdfb9f24e77621e5f50e5355f4eecb64e65b1
[minetest.git] / src / connection.cpp
1 #include "connection.h"
2 #include "main.h"
3 #include "serialization.h"
4
5 namespace con
6 {
7
8 BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
9                 u32 protocol_id, u16 sender_peer_id, u8 channel)
10 {
11         u32 packet_size = datasize + BASE_HEADER_SIZE;
12         BufferedPacket p(packet_size);
13         p.address = address;
14
15         writeU32(&p.data[0], protocol_id);
16         writeU16(&p.data[4], sender_peer_id);
17         writeU8(&p.data[6], channel);
18
19         memcpy(&p.data[BASE_HEADER_SIZE], data, datasize);
20
21         return p;
22 }
23
24 BufferedPacket makePacket(Address &address, SharedBuffer<u8> &data,
25                 u32 protocol_id, u16 sender_peer_id, u8 channel)
26 {
27         return makePacket(address, *data, data.getSize(),
28                         protocol_id, sender_peer_id, channel);
29 }
30
31 SharedBuffer<u8> makeOriginalPacket(
32                 SharedBuffer<u8> data)
33 {
34         u32 header_size = 1;
35         u32 packet_size = data.getSize() + header_size;
36         SharedBuffer<u8> b(packet_size);
37
38         writeU8(&b[0], TYPE_ORIGINAL);
39
40         memcpy(&b[header_size], *data, data.getSize());
41
42         return b;
43 }
44
45 core::list<SharedBuffer<u8> > makeSplitPacket(
46                 SharedBuffer<u8> data,
47                 u32 chunksize_max,
48                 u16 seqnum)
49 {
50         // Chunk packets, containing the TYPE_SPLIT header
51         core::list<SharedBuffer<u8> > chunks;
52         
53         u32 chunk_header_size = 7;
54         u32 maximum_data_size = chunksize_max - chunk_header_size;
55         u32 start = 0;
56         u32 end = 0;
57         u32 chunk_num = 0;
58         do{
59                 end = start + maximum_data_size - 1;
60                 if(end > data.getSize() - 1)
61                         end = data.getSize() - 1;
62                 
63                 u32 payload_size = end - start + 1;
64                 u32 packet_size = chunk_header_size + payload_size;
65
66                 SharedBuffer<u8> chunk(packet_size);
67                 
68                 writeU8(&chunk[0], TYPE_SPLIT);
69                 writeU16(&chunk[1], seqnum);
70                 // [3] u16 chunk_count is written at next stage
71                 writeU16(&chunk[5], chunk_num);
72                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
73
74                 chunks.push_back(chunk);
75                 
76                 start = end + 1;
77                 chunk_num++;
78         }
79         while(end != data.getSize() - 1);
80
81         u16 chunk_count = chunks.getSize();
82
83         core::list<SharedBuffer<u8> >::Iterator i = chunks.begin();
84         for(; i != chunks.end(); i++)
85         {
86                 // Write chunk_count
87                 writeU16(&((*i)[3]), chunk_count);
88         }
89
90         return chunks;
91 }
92
93 core::list<SharedBuffer<u8> > makeAutoSplitPacket(
94                 SharedBuffer<u8> data,
95                 u32 chunksize_max,
96                 u16 &split_seqnum)
97 {
98         u32 original_header_size = 1;
99         core::list<SharedBuffer<u8> > list;
100         if(data.getSize() + original_header_size > chunksize_max)
101         {
102                 list = makeSplitPacket(data, chunksize_max, split_seqnum);
103                 split_seqnum++;
104                 return list;
105         }
106         else
107         {
108                 list.push_back(makeOriginalPacket(data));
109         }
110         return list;
111 }
112
113 SharedBuffer<u8> makeReliablePacket(
114                 SharedBuffer<u8> data,
115                 u16 seqnum)
116 {
117         /*dstream<<"BEGIN SharedBuffer<u8> makeReliablePacket()"<<std::endl;
118         dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
119                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
120         u32 header_size = 3;
121         u32 packet_size = data.getSize() + header_size;
122         SharedBuffer<u8> b(packet_size);
123
124         writeU8(&b[0], TYPE_RELIABLE);
125         writeU16(&b[1], seqnum);
126
127         memcpy(&b[header_size], *data, data.getSize());
128
129         /*dstream<<"data.getSize()="<<data.getSize()<<", data[0]="
130                         <<((unsigned int)data[0]&0xff)<<std::endl;*/
131         //dstream<<"END SharedBuffer<u8> makeReliablePacket()"<<std::endl;
132         return b;
133 }
134
135 /*
136         ReliablePacketBuffer
137 */
138
139 void ReliablePacketBuffer::print()
140 {
141         core::list<BufferedPacket>::Iterator i;
142         i = m_list.begin();
143         for(; i != m_list.end(); i++)
144         {
145                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
146                 dout_con<<s<<" ";
147         }
148 }
149 bool ReliablePacketBuffer::empty()
150 {
151         return m_list.empty();
152 }
153 u32 ReliablePacketBuffer::size()
154 {
155         return m_list.getSize();
156 }
157 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
158 {
159         core::list<BufferedPacket>::Iterator i;
160         i = m_list.begin();
161         for(; i != m_list.end(); i++)
162         {
163                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
164                 /*dout_con<<"findPacket(): finding seqnum="<<seqnum
165                                 <<", comparing to s="<<s<<std::endl;*/
166                 if(s == seqnum)
167                         break;
168         }
169         return i;
170 }
171 RPBSearchResult ReliablePacketBuffer::notFound()
172 {
173         return m_list.end();
174 }
175 u16 ReliablePacketBuffer::getFirstSeqnum()
176 {
177         if(empty())
178                 throw NotFoundException("Buffer is empty");
179         BufferedPacket p = *m_list.begin();
180         return readU16(&p.data[BASE_HEADER_SIZE+1]);
181 }
182 BufferedPacket ReliablePacketBuffer::popFirst()
183 {
184         if(empty())
185                 throw NotFoundException("Buffer is empty");
186         BufferedPacket p = *m_list.begin();
187         core::list<BufferedPacket>::Iterator i = m_list.begin();
188         m_list.erase(i);
189         return p;
190 }
191 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
192 {
193         RPBSearchResult r = findPacket(seqnum);
194         if(r == notFound()){
195                 dout_con<<"Not found"<<std::endl;
196                 throw NotFoundException("seqnum not found in buffer");
197         }
198         BufferedPacket p = *r;
199         m_list.erase(r);
200         return p;
201 }
202 void ReliablePacketBuffer::insert(BufferedPacket &p)
203 {
204         assert(p.data.getSize() >= BASE_HEADER_SIZE+3);
205         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
206         assert(type == TYPE_RELIABLE);
207         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
208
209         // Find the right place for the packet and insert it there
210
211         // If list is empty, just add it
212         if(m_list.empty())
213         {
214                 m_list.push_back(p);
215                 // Done.
216                 return;
217         }
218         // Otherwise find the right place
219         core::list<BufferedPacket>::Iterator i;
220         i = m_list.begin();
221         // Find the first packet in the list which has a higher seqnum
222         for(; i != m_list.end(); i++){
223                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
224                 if(s == seqnum){
225                         throw AlreadyExistsException("Same seqnum in list");
226                 }
227                 if(seqnum_higher(s, seqnum)){
228                         break;
229                 }
230         }
231         // If we're at the end of the list, add the packet to the
232         // end of the list
233         if(i == m_list.end())
234         {
235                 m_list.push_back(p);
236                 // Done.
237                 return;
238         }
239         // Insert before i
240         m_list.insert_before(i, p);
241 }
242
243 void ReliablePacketBuffer::incrementTimeouts(float dtime)
244 {
245         core::list<BufferedPacket>::Iterator i;
246         i = m_list.begin();
247         for(; i != m_list.end(); i++){
248                 i->time += dtime;
249                 i->totaltime += dtime;
250         }
251 }
252
253 void ReliablePacketBuffer::resetTimedOuts(float timeout)
254 {
255         core::list<BufferedPacket>::Iterator i;
256         i = m_list.begin();
257         for(; i != m_list.end(); i++){
258                 if(i->time >= timeout)
259                         i->time = 0.0;
260         }
261 }
262
263 bool ReliablePacketBuffer::anyTotaltimeReached(float timeout)
264 {
265         core::list<BufferedPacket>::Iterator i;
266         i = m_list.begin();
267         for(; i != m_list.end(); i++){
268                 if(i->totaltime >= timeout)
269                         return true;
270         }
271         return false;
272 }
273
274 core::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout)
275 {
276         core::list<BufferedPacket> timed_outs;
277         core::list<BufferedPacket>::Iterator i;
278         i = m_list.begin();
279         for(; i != m_list.end(); i++)
280         {
281                 if(i->time >= timeout)
282                         timed_outs.push_back(*i);
283         }
284         return timed_outs;
285 }
286
287 /*
288         IncomingSplitBuffer
289 */
290
291 IncomingSplitBuffer::~IncomingSplitBuffer()
292 {
293         core::map<u16, IncomingSplitPacket*>::Iterator i;
294         i = m_buf.getIterator();
295         for(; i.atEnd() == false; i++)
296         {
297                 delete i.getNode()->getValue();
298         }
299 }
300 /*
301         This will throw a GotSplitPacketException when a full
302         split packet is constructed.
303 */
304 void IncomingSplitBuffer::insert(BufferedPacket &p, bool reliable)
305 {
306         u32 headersize = BASE_HEADER_SIZE + 7;
307         assert(p.data.getSize() >= headersize);
308         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
309         assert(type == TYPE_SPLIT);
310         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
311         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
312         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
313
314         // Add if doesn't exist
315         if(m_buf.find(seqnum) == NULL)
316         {
317                 IncomingSplitPacket *sp = new IncomingSplitPacket();
318                 sp->chunk_count = chunk_count;
319                 sp->reliable = reliable;
320                 m_buf[seqnum] = sp;
321         }
322         
323         IncomingSplitPacket *sp = m_buf[seqnum];
324         
325         // TODO: These errors should be thrown or something? Dunno.
326         if(chunk_count != sp->chunk_count)
327                 derr_con<<"Connection: WARNING: chunk_count="<<chunk_count
328                                 <<" != sp->chunk_count="<<sp->chunk_count
329                                 <<std::endl;
330         if(reliable != sp->reliable)
331                 derr_con<<"Connection: WARNING: reliable="<<reliable
332                                 <<" != sp->reliable="<<sp->reliable
333                                 <<std::endl;
334
335         // If chunk already exists, cancel
336         if(sp->chunks.find(chunk_num) != NULL)
337                 throw AlreadyExistsException("Chunk already in buffer");
338         
339         // Cut chunk data out of packet
340         u32 chunkdatasize = p.data.getSize() - headersize;
341         SharedBuffer<u8> chunkdata(chunkdatasize);
342         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
343         
344         // Set chunk data in buffer
345         sp->chunks[chunk_num] = chunkdata;
346         
347         // If not all chunks are received, return
348         if(sp->allReceived() == false)
349                 return;
350
351         // Calculate total size
352         u32 totalsize = 0;
353         core::map<u16, SharedBuffer<u8> >::Iterator i;
354         i = sp->chunks.getIterator();
355         for(; i.atEnd() == false; i++)
356         {
357                 totalsize += i.getNode()->getValue().getSize();
358         }
359         
360         SharedBuffer<u8> fulldata(totalsize);
361
362         // Copy chunks to data buffer
363         u32 start = 0;
364         for(u32 chunk_i=0; chunk_i<sp->chunk_count;
365                         chunk_i++)
366         {
367                 SharedBuffer<u8> buf = sp->chunks[chunk_i];
368                 u16 chunkdatasize = buf.getSize();
369                 memcpy(&fulldata[start], *buf, chunkdatasize);
370                 start += chunkdatasize;;
371         }
372
373         // Remove sp from buffer
374         m_buf.remove(seqnum);
375         delete sp;
376         
377         throw GotSplitPacketException(fulldata);
378 }
379 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
380 {
381         core::list<u16> remove_queue;
382         core::map<u16, IncomingSplitPacket*>::Iterator i;
383         i = m_buf.getIterator();
384         for(; i.atEnd() == false; i++)
385         {
386                 IncomingSplitPacket *p = i.getNode()->getValue();
387                 // Reliable ones are not removed by timeout
388                 if(p->reliable == true)
389                         continue;
390                 p->time += dtime;
391                 if(p->time >= timeout)
392                         remove_queue.push_back(i.getNode()->getKey());
393         }
394         core::list<u16>::Iterator j;
395         j = remove_queue.begin();
396         for(; j != remove_queue.end(); j++)
397         {
398                 dout_con<<"NOTE: Removing timed out unreliable split packet"
399                                 <<std::endl;
400                 delete m_buf[*j];
401                 m_buf.remove(*j);
402         }
403 }
404
405 /*
406         Channel
407 */
408
409 Channel::Channel()
410 {
411         next_outgoing_seqnum = SEQNUM_INITIAL;
412         next_incoming_seqnum = SEQNUM_INITIAL;
413         next_outgoing_split_seqnum = SEQNUM_INITIAL;
414 }
415 Channel::~Channel()
416 {
417 }
418
419 /*
420         Peer
421 */
422
423 Peer::Peer(u16 a_id, Address a_address)
424 {
425         id = a_id;
426         address = a_address;
427         timeout_counter = 0.0;
428         //resend_timeout = RESEND_TIMEOUT_MINIMUM;
429         resend_timeout = 0.5;
430         avg_rtt = -1.0;
431         has_sent_with_id = false;
432 }
433 Peer::~Peer()
434 {
435 }
436
437 void Peer::reportRTT(float rtt)
438 {
439         if(rtt < -0.999)
440         {}
441         else if(avg_rtt < 0.0)
442                 avg_rtt = rtt;
443         else
444                 avg_rtt = rtt * 0.1 + avg_rtt * 0.9;
445         
446         // Calculate resend_timeout
447
448         /*int reliable_count = 0;
449         for(int i=0; i<CHANNEL_COUNT; i++)
450         {
451                 reliable_count += channels[i].outgoing_reliables.size();
452         }
453         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR
454                         * ((float)reliable_count * 1);*/
455         
456         float timeout = avg_rtt * RESEND_TIMEOUT_FACTOR;
457         if(timeout < RESEND_TIMEOUT_MIN)
458                 timeout = RESEND_TIMEOUT_MIN;
459         if(timeout > RESEND_TIMEOUT_MAX)
460                 timeout = RESEND_TIMEOUT_MAX;
461         resend_timeout = timeout;
462 }
463                                 
464 /*
465         Connection
466 */
467
468 Connection::Connection(
469         u32 protocol_id,
470         u32 max_packet_size,
471         float timeout,
472         PeerHandler *peerhandler
473 )
474 {
475         assert(peerhandler != NULL);
476
477         m_protocol_id = protocol_id;
478         m_max_packet_size = max_packet_size;
479         m_timeout = timeout;
480         m_peer_id = PEER_ID_NEW;
481         //m_waiting_new_peer_id = false;
482         m_indentation = 0;
483         m_peerhandler = peerhandler;
484 }
485
486 Connection::~Connection()
487 {
488         // Clear peers
489         core::map<u16, Peer*>::Iterator j;
490         j = m_peers.getIterator();
491         for(; j.atEnd() == false; j++)
492         {
493                 Peer *peer = j.getNode()->getValue();
494                 delete peer;
495         }
496 }
497
498 void Connection::Serve(unsigned short port)
499 {
500         m_socket.Bind(port);
501         m_peer_id = PEER_ID_SERVER;
502 }
503
504 void Connection::Connect(Address address)
505 {
506         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
507         if(node != NULL){
508                 throw ConnectionException("Already connected to a server");
509         }
510
511         Peer *peer = new Peer(PEER_ID_SERVER, address);
512         m_peers.insert(peer->id, peer);
513         m_peerhandler->peerAdded(peer);
514         
515         m_socket.Bind(0);
516         
517         // Send a dummy packet to server with peer_id = PEER_ID_NEW
518         m_peer_id = PEER_ID_NEW;
519         SharedBuffer<u8> data(0);
520         Send(PEER_ID_SERVER, 0, data, true);
521
522         //m_waiting_new_peer_id = true;
523 }
524
525 bool Connection::Connected()
526 {
527         if(m_peers.size() != 1)
528                 return false;
529                 
530         core::map<u16, Peer*>::Node *node = m_peers.find(PEER_ID_SERVER);
531         if(node == NULL)
532                 return false;
533         
534         if(m_peer_id == PEER_ID_NEW)
535                 return false;
536         
537         return true;
538 }
539
540 SharedBuffer<u8> Channel::ProcessPacket(
541                 SharedBuffer<u8> packetdata,
542                 Connection *con,
543                 u16 peer_id,
544                 u8 channelnum,
545                 bool reliable)
546 {
547         IndentationRaiser iraiser(&(con->m_indentation));
548
549         if(packetdata.getSize() < 1)
550                 throw InvalidIncomingDataException("packetdata.getSize() < 1");
551
552         u8 type = readU8(&packetdata[0]);
553         
554         if(type == TYPE_CONTROL)
555         {
556                 if(packetdata.getSize() < 2)
557                         throw InvalidIncomingDataException("packetdata.getSize() < 2");
558
559                 u8 controltype = readU8(&packetdata[1]);
560
561                 if(controltype == CONTROLTYPE_ACK)
562                 {
563                         if(packetdata.getSize() < 4)
564                                 throw InvalidIncomingDataException
565                                                 ("packetdata.getSize() < 4 (ACK header size)");
566
567                         u16 seqnum = readU16(&packetdata[2]);
568                         con->PrintInfo();
569                         dout_con<<"Got CONTROLTYPE_ACK: channelnum="
570                                         <<((int)channelnum&0xff)<<", peer_id="<<peer_id
571                                         <<", seqnum="<<seqnum<<std::endl;
572
573                         try{
574                                 BufferedPacket p = outgoing_reliables.popSeqnum(seqnum);
575                                 // Get round trip time
576                                 float rtt = p.totaltime;
577
578                                 // Let peer calculate stuff according to it
579                                 // (avg_rtt and resend_timeout)
580                                 Peer *peer = con->GetPeer(peer_id);
581                                 peer->reportRTT(rtt);
582
583                                 //con->PrintInfo(dout_con);
584                                 //dout_con<<"RTT = "<<rtt<<std::endl;
585
586                                 /*dout_con<<"OUTGOING: ";
587                                 con->PrintInfo();
588                                 outgoing_reliables.print();
589                                 dout_con<<std::endl;*/
590                         }
591                         catch(NotFoundException &e){
592                                 con->PrintInfo(derr_con);
593                                 derr_con<<"WARNING: ACKed packet not "
594                                                 "in outgoing queue"
595                                                 <<std::endl;
596                         }
597
598                         throw ProcessedSilentlyException("Got an ACK");
599                 }
600                 else if(controltype == CONTROLTYPE_SET_PEER_ID)
601                 {
602                         if(packetdata.getSize() < 4)
603                                 throw InvalidIncomingDataException
604                                                 ("packetdata.getSize() < 4 (SET_PEER_ID header size)");
605                         u16 peer_id_new = readU16(&packetdata[2]);
606                         con->PrintInfo();
607                         dout_con<<"Got new peer id: "<<peer_id_new<<"... "<<std::endl;
608
609                         if(con->GetPeerID() != PEER_ID_NEW)
610                         {
611                                 con->PrintInfo(derr_con);
612                                 derr_con<<"WARNING: Not changing"
613                                                 " existing peer id."<<std::endl;
614                         }
615                         else
616                         {
617                                 dout_con<<"changing."<<std::endl;
618                                 con->SetPeerID(peer_id_new);
619                         }
620                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
621                 }
622                 else if(controltype == CONTROLTYPE_PING)
623                 {
624                         // Just ignore it, the incoming data already reset
625                         // the timeout counter
626                         con->PrintInfo();
627                         dout_con<<"PING"<<std::endl;
628                         throw ProcessedSilentlyException("Got a SET_PEER_ID");
629                 }
630                 else{
631                         con->PrintInfo(derr_con);
632                         derr_con<<"INVALID TYPE_CONTROL: invalid controltype="
633                                         <<((int)controltype&0xff)<<std::endl;
634                         throw InvalidIncomingDataException("Invalid control type");
635                 }
636         }
637         else if(type == TYPE_ORIGINAL)
638         {
639                 if(packetdata.getSize() < ORIGINAL_HEADER_SIZE)
640                         throw InvalidIncomingDataException
641                                         ("packetdata.getSize() < ORIGINAL_HEADER_SIZE");
642                 con->PrintInfo();
643                 dout_con<<"RETURNING TYPE_ORIGINAL to user"
644                                 <<std::endl;
645                 // Get the inside packet out and return it
646                 SharedBuffer<u8> payload(packetdata.getSize() - ORIGINAL_HEADER_SIZE);
647                 memcpy(*payload, &packetdata[ORIGINAL_HEADER_SIZE], payload.getSize());
648                 return payload;
649         }
650         else if(type == TYPE_SPLIT)
651         {
652                 // We have to create a packet again for buffering
653                 // This isn't actually too bad an idea.
654                 BufferedPacket packet = makePacket(
655                                 con->GetPeer(peer_id)->address,
656                                 packetdata,
657                                 con->GetProtocolID(),
658                                 peer_id,
659                                 channelnum);
660                 try{
661                         // Buffer the packet
662                         incoming_splits.insert(packet, reliable);
663                 }
664                 // This exception happens when all the pieces of a packet
665                 // are collected.
666                 catch(GotSplitPacketException &e)
667                 {
668                         con->PrintInfo();
669                         dout_con<<"RETURNING TYPE_SPLIT: Constructed full data, "
670                                         <<"size="<<e.getData().getSize()<<std::endl;
671                         return e.getData();
672                 }
673                 con->PrintInfo();
674                 dout_con<<"BUFFERING TYPE_SPLIT"<<std::endl;
675                 throw ProcessedSilentlyException("Buffered a split packet chunk");
676         }
677         else if(type == TYPE_RELIABLE)
678         {
679                 // Recursive reliable packets not allowed
680                 assert(reliable == false);
681
682                 if(packetdata.getSize() < RELIABLE_HEADER_SIZE)
683                         throw InvalidIncomingDataException
684                                         ("packetdata.getSize() < RELIABLE_HEADER_SIZE");
685
686                 u16 seqnum = readU16(&packetdata[1]);
687
688                 bool is_future_packet = seqnum_higher(seqnum, next_incoming_seqnum);
689                 bool is_old_packet = seqnum_higher(next_incoming_seqnum, seqnum);
690                 
691                 con->PrintInfo();
692                 if(is_future_packet)
693                         dout_con<<"BUFFERING";
694                 else if(is_old_packet)
695                         dout_con<<"OLD";
696                 else
697                         dout_con<<"RECUR";
698                 dout_con<<" TYPE_RELIABLE seqnum="<<seqnum
699                                 <<" next="<<next_incoming_seqnum;
700                 dout_con<<" [sending CONTROLTYPE_ACK"
701                                 " to peer_id="<<peer_id<<"]";
702                 dout_con<<std::endl;
703                 
704                 //DEBUG
705                 //assert(incoming_reliables.size() < 100);
706
707                 // Send a CONTROLTYPE_ACK
708                 SharedBuffer<u8> reply(4);
709                 writeU8(&reply[0], TYPE_CONTROL);
710                 writeU8(&reply[1], CONTROLTYPE_ACK);
711                 writeU16(&reply[2], seqnum);
712                 con->SendAsPacket(peer_id, channelnum, reply, false);
713
714                 //if(seqnum_higher(seqnum, next_incoming_seqnum))
715                 if(is_future_packet)
716                 {
717                         /*con->PrintInfo();
718                         dout_con<<"Buffering reliable packet (seqnum="
719                                         <<seqnum<<")"<<std::endl;*/
720                         
721                         // This one comes later, buffer it.
722                         // Actually we have to make a packet to buffer one.
723                         // Well, we have all the ingredients, so just do it.
724                         BufferedPacket packet = makePacket(
725                                         con->GetPeer(peer_id)->address,
726                                         packetdata,
727                                         con->GetProtocolID(),
728                                         peer_id,
729                                         channelnum);
730                         try{
731                                 incoming_reliables.insert(packet);
732                                 
733                                 /*con->PrintInfo();
734                                 dout_con<<"INCOMING: ";
735                                 incoming_reliables.print();
736                                 dout_con<<std::endl;*/
737                         }
738                         catch(AlreadyExistsException &e)
739                         {
740                         }
741
742                         throw ProcessedSilentlyException("Buffered future reliable packet");
743                 }
744                 //else if(seqnum_higher(next_incoming_seqnum, seqnum))
745                 else if(is_old_packet)
746                 {
747                         // An old packet, dump it
748                         throw InvalidIncomingDataException("Got an old reliable packet");
749                 }
750
751                 next_incoming_seqnum++;
752
753                 // Get out the inside packet and re-process it
754                 SharedBuffer<u8> payload(packetdata.getSize() - RELIABLE_HEADER_SIZE);
755                 memcpy(*payload, &packetdata[RELIABLE_HEADER_SIZE], payload.getSize());
756
757                 return ProcessPacket(payload, con, peer_id, channelnum, true);
758         }
759         else
760         {
761                 con->PrintInfo(derr_con);
762                 derr_con<<"Got invalid type="<<((int)type&0xff)<<std::endl;
763                 throw InvalidIncomingDataException("Invalid packet type");
764         }
765         
766         // We should never get here.
767         // If you get here, add an exception or a return to some of the
768         // above conditionals.
769         assert(0);
770         throw BaseException("Error in Channel::ProcessPacket()");
771 }
772
773 SharedBuffer<u8> Channel::CheckIncomingBuffers(Connection *con,
774                 u16 &peer_id)
775 {
776         u16 firstseqnum = 0;
777         // Clear old packets from start of buffer
778         try{
779         for(;;){
780                 firstseqnum = incoming_reliables.getFirstSeqnum();
781                 if(seqnum_higher(next_incoming_seqnum, firstseqnum))
782                         incoming_reliables.popFirst();
783                 else
784                         break;
785         }
786         // This happens if all packets are old
787         }catch(con::NotFoundException)
788         {}
789         
790         if(incoming_reliables.empty() == false)
791         {
792                 if(firstseqnum == next_incoming_seqnum)
793                 {
794                         BufferedPacket p = incoming_reliables.popFirst();
795                         
796                         peer_id = readPeerId(*p.data);
797                         u8 channelnum = readChannel(*p.data);
798                         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
799
800                         con->PrintInfo();
801                         dout_con<<"UNBUFFERING TYPE_RELIABLE"
802                                         <<" seqnum="<<seqnum
803                                         <<" peer_id="<<peer_id
804                                         <<" channel="<<((int)channelnum&0xff)
805                                         <<std::endl;
806
807                         next_incoming_seqnum++;
808                         
809                         u32 headers_size = BASE_HEADER_SIZE + RELIABLE_HEADER_SIZE;
810                         // Get out the inside packet and re-process it
811                         SharedBuffer<u8> payload(p.data.getSize() - headers_size);
812                         memcpy(*payload, &p.data[headers_size], payload.getSize());
813
814                         return ProcessPacket(payload, con, peer_id, channelnum, true);
815                 }
816         }
817                 
818         throw NoIncomingDataException("No relevant data in buffers");
819 }
820
821 SharedBuffer<u8> Connection::GetFromBuffers(u16 &peer_id)
822 {
823         core::map<u16, Peer*>::Iterator j;
824         j = m_peers.getIterator();
825         for(; j.atEnd() == false; j++)
826         {
827                 Peer *peer = j.getNode()->getValue();
828                 for(u16 i=0; i<CHANNEL_COUNT; i++)
829                 {
830                         Channel *channel = &peer->channels[i];
831                         try{
832                                 SharedBuffer<u8> resultdata = channel->CheckIncomingBuffers
833                                                 (this, peer_id);
834
835                                 return resultdata;
836                         }
837                         catch(NoIncomingDataException &e)
838                         {
839                         }
840                         catch(InvalidIncomingDataException &e)
841                         {
842                         }
843                         catch(ProcessedSilentlyException &e)
844                         {
845                         }
846                 }
847         }
848         throw NoIncomingDataException("No relevant data in buffers");
849 }
850
851 u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
852 {
853         /*
854                 Receive a packet from the network
855         */
856         
857         // TODO: We can not know how many layers of header there are.
858         // For now, just assume there are no other than the base headers.
859         u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
860         Buffer<u8> packetdata(packet_maxsize);
861         
862         for(;;)
863         {
864         try
865         {
866                 /*
867                         Check if some buffer has relevant data
868                 */
869                 try{
870                         SharedBuffer<u8> resultdata = GetFromBuffers(peer_id);
871
872                         if(datasize < resultdata.getSize())
873                                 throw InvalidIncomingDataException
874                                                 ("Buffer too small for received data");
875                                 
876                         memcpy(data, *resultdata, resultdata.getSize());
877                         return resultdata.getSize();
878                 }
879                 catch(NoIncomingDataException &e)
880                 {
881                 }
882         
883                 Address sender;
884
885                 s32 received_size = m_socket.Receive(sender, *packetdata, packet_maxsize);
886
887                 if(received_size < 0)
888                         throw NoIncomingDataException("No incoming data");
889                 if(received_size < BASE_HEADER_SIZE)
890                         throw InvalidIncomingDataException("No full header received");
891                 if(readU32(&packetdata[0]) != m_protocol_id)
892                         throw InvalidIncomingDataException("Invalid protocol id");
893                 
894                 peer_id = readPeerId(*packetdata);
895                 u8 channelnum = readChannel(*packetdata);
896                 if(channelnum > CHANNEL_COUNT-1){
897                         PrintInfo(derr_con);
898                         derr_con<<"Receive(): Invalid channel "<<channelnum<<std::endl;
899                         throw InvalidIncomingDataException("Channel doesn't exist");
900                 }
901
902                 if(peer_id == PEER_ID_NEW)
903                 {
904                         /*
905                                 Somebody is trying to send stuff to us with no peer id.
906                                 
907                                 Check if the same address and port was added to our peer
908                                 list before.
909                                 Allow only entries that have has_sent_with_id==false.
910                         */
911
912                         core::map<u16, Peer*>::Iterator j;
913                         j = m_peers.getIterator();
914                         for(; j.atEnd() == false; j++)
915                         {
916                                 Peer *peer = j.getNode()->getValue();
917                                 if(peer->has_sent_with_id)
918                                         continue;
919                                 if(peer->address == sender)
920                                         break;
921                         }
922                         
923                         /*
924                                 If no peer was found with the same address and port,
925                                 we shall assume it is a new peer and create an entry.
926                         */
927                         if(j.atEnd())
928                         {
929                                 // Pass on to adding the peer
930                         }
931                         // Else: A peer was found.
932                         else
933                         {
934                                 Peer *peer = j.getNode()->getValue();
935                                 peer_id = peer->id;
936                                 PrintInfo(derr_con);
937                                 derr_con<<"WARNING: Assuming unknown peer to be "
938                                                 <<"peer_id="<<peer_id<<std::endl;
939                         }
940                 }
941                 
942                 /*
943                         The peer was not found in our lists. Add it.
944                 */
945                 if(peer_id == PEER_ID_NEW)
946                 {
947                         // Somebody wants to make a new connection
948
949                         // Get a unique peer id (2 or higher)
950                         u16 peer_id_new = 2;
951                         /*
952                                 Find an unused peer id
953                         */
954                         for(;;)
955                         {
956                                 // Check if exists
957                                 if(m_peers.find(peer_id_new) == NULL)
958                                         break;
959                                 // Check for overflow
960                                 if(peer_id_new == 65535)
961                                         throw ConnectionException
962                                                 ("Connection ran out of peer ids");
963                                 peer_id_new++;
964                         }
965
966                         PrintInfo();
967                         dout_con<<"Receive(): Got a packet with peer_id=PEER_ID_NEW,"
968                                         " giving peer_id="<<peer_id_new<<std::endl;
969
970                         // Create a peer
971                         Peer *peer = new Peer(peer_id_new, sender);
972                         m_peers.insert(peer->id, peer);
973                         m_peerhandler->peerAdded(peer);
974                         
975                         // Create CONTROL packet to tell the peer id to the new peer.
976                         SharedBuffer<u8> reply(4);
977                         writeU8(&reply[0], TYPE_CONTROL);
978                         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
979                         writeU16(&reply[2], peer_id_new);
980                         SendAsPacket(peer_id_new, 0, reply, true);
981                         
982                         // We're now talking to a valid peer_id
983                         peer_id = peer_id_new;
984
985                         // Go on and process whatever it sent
986                 }
987
988                 core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
989
990                 if(node == NULL)
991                 {
992                         // Peer not found
993                         // This means that the peer id of the sender is not PEER_ID_NEW
994                         // and it is invalid.
995                         PrintInfo(derr_con);
996                         derr_con<<"Receive(): Peer not found"<<std::endl;
997                         throw InvalidIncomingDataException("Peer not found (possible timeout)");
998                 }
999
1000                 Peer *peer = node->getValue();
1001
1002                 // Validate peer address
1003                 if(peer->address != sender)
1004                 {
1005                         PrintInfo(derr_con);
1006                         derr_con<<"Peer "<<peer_id<<" sending from different address."
1007                                         " Ignoring."<<std::endl;
1008                         throw InvalidIncomingDataException
1009                                         ("Peer sending from different address");
1010                         /*// If there is more data, receive again
1011                         if(m_socket.WaitData(0) == true)
1012                                 continue;
1013                         throw NoIncomingDataException("No incoming data (2)");*/
1014                 }
1015                 
1016                 peer->timeout_counter = 0.0;
1017
1018                 Channel *channel = &(peer->channels[channelnum]);
1019                 
1020                 // Throw the received packet to channel->processPacket()
1021
1022                 // Make a new SharedBuffer from the data without the base headers
1023                 SharedBuffer<u8> strippeddata(received_size - BASE_HEADER_SIZE);
1024                 memcpy(*strippeddata, &packetdata[BASE_HEADER_SIZE],
1025                                 strippeddata.getSize());
1026                 
1027                 try{
1028                         // Process it (the result is some data with no headers made by us)
1029                         SharedBuffer<u8> resultdata = channel->ProcessPacket
1030                                         (strippeddata, this, peer_id, channelnum);
1031                         
1032                         PrintInfo();
1033                         dout_con<<"ProcessPacket returned data of size "
1034                                         <<resultdata.getSize()<<std::endl;
1035                         
1036                         if(datasize < resultdata.getSize())
1037                                 throw InvalidIncomingDataException
1038                                                 ("Buffer too small for received data");
1039                         
1040                         memcpy(data, *resultdata, resultdata.getSize());
1041                         return resultdata.getSize();
1042                 }
1043                 catch(ProcessedSilentlyException &e)
1044                 {
1045                         // If there is more data, receive again
1046                         if(m_socket.WaitData(0) == true)
1047                                 continue;
1048                 }
1049                 throw NoIncomingDataException("No incoming data (2)");
1050         } // try
1051         catch(InvalidIncomingDataException &e)
1052         {
1053                 // If there is more data, receive again
1054                 if(m_socket.WaitData(0) == true)
1055                         continue;
1056         }
1057         } // for
1058 }
1059
1060 void Connection::SendToAll(u8 channelnum, SharedBuffer<u8> data, bool reliable)
1061 {
1062         core::map<u16, Peer*>::Iterator j;
1063         j = m_peers.getIterator();
1064         for(; j.atEnd() == false; j++)
1065         {
1066                 Peer *peer = j.getNode()->getValue();
1067                 Send(peer->id, channelnum, data, reliable);
1068         }
1069 }
1070
1071 void Connection::Send(u16 peer_id, u8 channelnum,
1072                 SharedBuffer<u8> data, bool reliable)
1073 {
1074         assert(channelnum < CHANNEL_COUNT);
1075         
1076         Peer *peer = GetPeer(peer_id);
1077         Channel *channel = &(peer->channels[channelnum]);
1078
1079         u32 chunksize_max = m_max_packet_size - BASE_HEADER_SIZE;
1080         if(reliable)
1081                 chunksize_max -= RELIABLE_HEADER_SIZE;
1082
1083         core::list<SharedBuffer<u8> > originals;
1084         originals = makeAutoSplitPacket(data, chunksize_max,
1085                         channel->next_outgoing_split_seqnum);
1086         
1087         core::list<SharedBuffer<u8> >::Iterator i;
1088         i = originals.begin();
1089         for(; i != originals.end(); i++)
1090         {
1091                 SharedBuffer<u8> original = *i;
1092                 
1093                 SendAsPacket(peer_id, channelnum, original, reliable);
1094         }
1095 }
1096
1097 void Connection::SendAsPacket(u16 peer_id, u8 channelnum,
1098                 SharedBuffer<u8> data, bool reliable)
1099 {
1100         Peer *peer = GetPeer(peer_id);
1101         Channel *channel = &(peer->channels[channelnum]);
1102
1103         if(reliable)
1104         {
1105                 u16 seqnum = channel->next_outgoing_seqnum;
1106                 channel->next_outgoing_seqnum++;
1107
1108                 SharedBuffer<u8> reliable = makeReliablePacket(data, seqnum);
1109
1110                 // Add base headers and make a packet
1111                 BufferedPacket p = makePacket(peer->address, reliable,
1112                                 m_protocol_id, m_peer_id, channelnum);
1113                 
1114                 try{
1115                         // Buffer the packet
1116                         channel->outgoing_reliables.insert(p);
1117                 }
1118                 catch(AlreadyExistsException &e)
1119                 {
1120                         PrintInfo(derr_con);
1121                         derr_con<<"WARNING: Going to send a reliable packet "
1122                                         "seqnum="<<seqnum<<" that is already "
1123                                         "in outgoing buffer"<<std::endl;
1124                         //assert(0);
1125                 }
1126                 
1127                 // Send the packet
1128                 RawSend(p);
1129         }
1130         else
1131         {
1132                 // Add base headers and make a packet
1133                 BufferedPacket p = makePacket(peer->address, data,
1134                                 m_protocol_id, m_peer_id, channelnum);
1135
1136                 // Send the packet
1137                 RawSend(p);
1138         }
1139 }
1140
1141 void Connection::RawSend(const BufferedPacket &packet)
1142 {
1143         m_socket.Send(packet.address, *packet.data, packet.data.getSize());
1144 }
1145
1146 void Connection::RunTimeouts(float dtime)
1147 {
1148         core::list<u16> timeouted_peers;
1149         core::map<u16, Peer*>::Iterator j;
1150         j = m_peers.getIterator();
1151         for(; j.atEnd() == false; j++)
1152         {
1153                 Peer *peer = j.getNode()->getValue();
1154                 
1155                 /*
1156                         Check peer timeout
1157                 */
1158                 peer->timeout_counter += dtime;
1159                 if(peer->timeout_counter > m_timeout)
1160                 {
1161                         PrintInfo(derr_con);
1162                         derr_con<<"RunTimeouts(): Peer "<<peer->id
1163                                         <<" has timed out."
1164                                         <<" (source=peer->timeout_counter)"
1165                                         <<std::endl;
1166                         // Add peer to the list
1167                         timeouted_peers.push_back(peer->id);
1168                         // Don't bother going through the buffers of this one
1169                         continue;
1170                 }
1171
1172                 float resend_timeout = peer->resend_timeout;
1173                 for(u16 i=0; i<CHANNEL_COUNT; i++)
1174                 {
1175                         core::list<BufferedPacket> timed_outs;
1176                         core::list<BufferedPacket>::Iterator j;
1177                         
1178                         Channel *channel = &peer->channels[i];
1179
1180                         // Remove timed out incomplete unreliable split packets
1181                         channel->incoming_splits.removeUnreliableTimedOuts(dtime, m_timeout);
1182                         
1183                         // Increment reliable packet times
1184                         channel->outgoing_reliables.incrementTimeouts(dtime);
1185
1186                         // Check reliable packet total times, remove peer if
1187                         // over timeout.
1188                         if(channel->outgoing_reliables.anyTotaltimeReached(m_timeout))
1189                         {
1190                                 PrintInfo(derr_con);
1191                                 derr_con<<"RunTimeouts(): Peer "<<peer->id
1192                                                 <<" has timed out."
1193                                                 <<" (source=reliable packet totaltime)"
1194                                                 <<std::endl;
1195                                 // Add peer to the to-be-removed list
1196                                 timeouted_peers.push_back(peer->id);
1197                                 goto nextpeer;
1198                         }
1199
1200                         // Re-send timed out outgoing reliables
1201                         
1202                         timed_outs = channel->
1203                                         outgoing_reliables.getTimedOuts(resend_timeout);
1204
1205                         channel->outgoing_reliables.resetTimedOuts(resend_timeout);
1206
1207                         j = timed_outs.begin();
1208                         for(; j != timed_outs.end(); j++)
1209                         {
1210                                 u16 peer_id = readPeerId(*(j->data));
1211                                 u8 channel = readChannel(*(j->data));
1212                                 u16 seqnum = readU16(&(j->data[BASE_HEADER_SIZE+1]));
1213
1214                                 PrintInfo(derr_con);
1215                                 derr_con<<"RE-SENDING timed-out RELIABLE to ";
1216                                 j->address.print(&derr_con);
1217                                 derr_con<<"(t/o="<<resend_timeout<<"): "
1218                                                 <<"from_peer_id="<<peer_id
1219                                                 <<", channel="<<((int)channel&0xff)
1220                                                 <<", seqnum="<<seqnum
1221                                                 <<std::endl;
1222
1223                                 RawSend(*j);
1224
1225                                 // Enlarge avg_rtt and resend_timeout:
1226                                 // The rtt will be at least the timeout.
1227                                 // NOTE: This won't affect the timeout of the next
1228                                 // checked channel because it was cached.
1229                                 peer->reportRTT(resend_timeout);
1230                         }
1231                 }
1232                 
1233                 /*
1234                         Send pings
1235                 */
1236                 peer->ping_timer += dtime;
1237                 if(peer->ping_timer >= 5.0)
1238                 {
1239                         // Create and send PING packet
1240                         SharedBuffer<u8> data(2);
1241                         writeU8(&data[0], TYPE_CONTROL);
1242                         writeU8(&data[1], CONTROLTYPE_PING);
1243                         SendAsPacket(peer->id, 0, data, true);
1244
1245                         peer->ping_timer = 0.0;
1246                 }
1247                 
1248 nextpeer:
1249                 continue;
1250         }
1251
1252         // Remove timeouted peers
1253         core::list<u16>::Iterator i = timeouted_peers.begin();
1254         for(; i != timeouted_peers.end(); i++)
1255         {
1256                 PrintInfo(derr_con);
1257                 derr_con<<"RunTimeouts(): Removing peer "<<(*i)<<std::endl;
1258                 m_peerhandler->deletingPeer(m_peers[*i], true);
1259                 delete m_peers[*i];
1260                 m_peers.remove(*i);
1261         }
1262 }
1263
1264 Peer* Connection::GetPeer(u16 peer_id)
1265 {
1266         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1267
1268         if(node == NULL){
1269                 // Peer not found
1270                 throw PeerNotFoundException("Peer not found (possible timeout)");
1271         }
1272
1273         // Error checking
1274         assert(node->getValue()->id == peer_id);
1275
1276         return node->getValue();
1277 }
1278
1279 Peer* Connection::GetPeerNoEx(u16 peer_id)
1280 {
1281         core::map<u16, Peer*>::Node *node = m_peers.find(peer_id);
1282
1283         if(node == NULL){
1284                 return NULL;
1285         }
1286
1287         // Error checking
1288         assert(node->getValue()->id == peer_id);
1289
1290         return node->getValue();
1291 }
1292
1293 core::list<Peer*> Connection::GetPeers()
1294 {
1295         core::list<Peer*> list;
1296         core::map<u16, Peer*>::Iterator j;
1297         j = m_peers.getIterator();
1298         for(; j.atEnd() == false; j++)
1299         {
1300                 Peer *peer = j.getNode()->getValue();
1301                 list.push_back(peer);
1302         }
1303         return list;
1304 }
1305
1306 void Connection::PrintInfo(std::ostream &out)
1307 {
1308         out<<m_socket.GetHandle();
1309         out<<" ";
1310         out<<"con "<<m_peer_id<<": ";
1311         for(s16 i=0; i<(s16)m_indentation-1; i++)
1312                 out<<"  ";
1313 }
1314
1315 void Connection::PrintInfo()
1316 {
1317         PrintInfo(dout_con);
1318 }
1319
1320 } // namespace
1321