]> git.lizzy.rs Git - dragonfireclient.git/blob - src/network/connection.cpp
Miscellaneous networking improvements (#9611)
[dragonfireclient.git] / src / network / connection.cpp
1 /*
2 Minetest
3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include <iomanip>
21 #include <cerrno>
22 #include <algorithm>
23 #include <cmath>
24 #include "connection.h"
25 #include "serialization.h"
26 #include "log.h"
27 #include "porting.h"
28 #include "network/connectionthreads.h"
29 #include "network/networkpacket.h"
30 #include "network/peerhandler.h"
31 #include "util/serialize.h"
32 #include "util/numeric.h"
33 #include "util/string.h"
34 #include "settings.h"
35 #include "profiler.h"
36
37 namespace con
38 {
39
40 /******************************************************************************/
41 /* defines used for debugging and profiling                                   */
42 /******************************************************************************/
43 #ifdef NDEBUG
44         #define LOG(a) a
45         #define PROFILE(a)
46 #else
47         #if 0
48         /* this mutex is used to achieve log message consistency */
49         std::mutex log_message_mutex;
50         #define LOG(a)                                                                 \
51                 {                                                                          \
52                 MutexAutoLock loglock(log_message_mutex);                                 \
53                 a;                                                                         \
54                 }
55         #else
56         // Prevent deadlocks until a solution is found after 5.2.0 (TODO)
57         #define LOG(a) a
58         #endif
59
60         #define PROFILE(a) a
61 #endif
62
63 #define PING_TIMEOUT 5.0
64
65 BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
66                 u32 protocol_id, session_t sender_peer_id, u8 channel)
67 {
68         u32 packet_size = data.getSize() + BASE_HEADER_SIZE;
69         BufferedPacket p(packet_size);
70         p.address = address;
71
72         writeU32(&p.data[0], protocol_id);
73         writeU16(&p.data[4], sender_peer_id);
74         writeU8(&p.data[6], channel);
75
76         memcpy(&p.data[BASE_HEADER_SIZE], *data, data.getSize());
77
78         return p;
79 }
80
81 SharedBuffer<u8> makeOriginalPacket(const SharedBuffer<u8> &data)
82 {
83         u32 header_size = 1;
84         u32 packet_size = data.getSize() + header_size;
85         SharedBuffer<u8> b(packet_size);
86
87         writeU8(&(b[0]), PACKET_TYPE_ORIGINAL);
88         if (data.getSize() > 0) {
89                 memcpy(&(b[header_size]), *data, data.getSize());
90         }
91         return b;
92 }
93
94 // Split data in chunks and add TYPE_SPLIT headers to them
95 void makeSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max, u16 seqnum,
96                 std::list<SharedBuffer<u8>> *chunks)
97 {
98         // Chunk packets, containing the TYPE_SPLIT header
99         u32 chunk_header_size = 7;
100         u32 maximum_data_size = chunksize_max - chunk_header_size;
101         u32 start = 0;
102         u32 end = 0;
103         u32 chunk_num = 0;
104         u16 chunk_count = 0;
105         do {
106                 end = start + maximum_data_size - 1;
107                 if (end > data.getSize() - 1)
108                         end = data.getSize() - 1;
109
110                 u32 payload_size = end - start + 1;
111                 u32 packet_size = chunk_header_size + payload_size;
112
113                 SharedBuffer<u8> chunk(packet_size);
114
115                 writeU8(&chunk[0], PACKET_TYPE_SPLIT);
116                 writeU16(&chunk[1], seqnum);
117                 // [3] u16 chunk_count is written at next stage
118                 writeU16(&chunk[5], chunk_num);
119                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
120
121                 chunks->push_back(chunk);
122                 chunk_count++;
123
124                 start = end + 1;
125                 chunk_num++;
126         }
127         while (end != data.getSize() - 1);
128
129         for (SharedBuffer<u8> &chunk : *chunks) {
130                 // Write chunk_count
131                 writeU16(&(chunk[3]), chunk_count);
132         }
133 }
134
135 void makeAutoSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max,
136                 u16 &split_seqnum, std::list<SharedBuffer<u8>> *list)
137 {
138         u32 original_header_size = 1;
139
140         if (data.getSize() + original_header_size > chunksize_max) {
141                 makeSplitPacket(data, chunksize_max, split_seqnum, list);
142                 split_seqnum++;
143                 return;
144         }
145
146         list->push_back(makeOriginalPacket(data));
147 }
148
149 SharedBuffer<u8> makeReliablePacket(const SharedBuffer<u8> &data, u16 seqnum)
150 {
151         u32 header_size = 3;
152         u32 packet_size = data.getSize() + header_size;
153         SharedBuffer<u8> b(packet_size);
154
155         writeU8(&b[0], PACKET_TYPE_RELIABLE);
156         writeU16(&b[1], seqnum);
157
158         memcpy(&b[header_size], *data, data.getSize());
159
160         return b;
161 }
162
163 /*
164         ReliablePacketBuffer
165 */
166
167 void ReliablePacketBuffer::print()
168 {
169         MutexAutoLock listlock(m_list_mutex);
170         LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl);
171         unsigned int index = 0;
172         for (BufferedPacket &bufferedPacket : m_list) {
173                 u16 s = readU16(&(bufferedPacket.data[BASE_HEADER_SIZE+1]));
174                 LOG(dout_con<<index<< ":" << s << std::endl);
175                 index++;
176         }
177 }
178
179 bool ReliablePacketBuffer::empty()
180 {
181         MutexAutoLock listlock(m_list_mutex);
182         return m_list.empty();
183 }
184
185 u32 ReliablePacketBuffer::size()
186 {
187         MutexAutoLock listlock(m_list_mutex);
188         return m_list.size();
189 }
190
191 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
192 {
193         std::list<BufferedPacket>::iterator i = m_list.begin();
194         for(; i != m_list.end(); ++i)
195         {
196                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
197                 if (s == seqnum)
198                         break;
199         }
200         return i;
201 }
202
203 RPBSearchResult ReliablePacketBuffer::notFound()
204 {
205         return m_list.end();
206 }
207
208 bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
209 {
210         MutexAutoLock listlock(m_list_mutex);
211         if (m_list.empty())
212                 return false;
213         const BufferedPacket &p = *m_list.begin();
214         result = readU16(&p.data[BASE_HEADER_SIZE + 1]);
215         return true;
216 }
217
218 BufferedPacket ReliablePacketBuffer::popFirst()
219 {
220         MutexAutoLock listlock(m_list_mutex);
221         if (m_list.empty())
222                 throw NotFoundException("Buffer is empty");
223         BufferedPacket p = *m_list.begin();
224         m_list.erase(m_list.begin());
225
226         if (m_list.empty()) {
227                 m_oldest_non_answered_ack = 0;
228         } else {
229                 m_oldest_non_answered_ack =
230                                 readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]);
231         }
232         return p;
233 }
234
235 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
236 {
237         MutexAutoLock listlock(m_list_mutex);
238         RPBSearchResult r = findPacket(seqnum);
239         if (r == notFound()) {
240                 LOG(dout_con<<"Sequence number: " << seqnum
241                                 << " not found in reliable buffer"<<std::endl);
242                 throw NotFoundException("seqnum not found in buffer");
243         }
244         BufferedPacket p = *r;
245
246
247         RPBSearchResult next = r;
248         ++next;
249         if (next != notFound()) {
250                 u16 s = readU16(&(next->data[BASE_HEADER_SIZE+1]));
251                 m_oldest_non_answered_ack = s;
252         }
253
254         m_list.erase(r);
255
256         if (m_list.empty()) {
257                 m_oldest_non_answered_ack = 0;
258         } else {
259                 m_oldest_non_answered_ack =
260                                 readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]);
261         }
262         return p;
263 }
264
265 void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected)
266 {
267         MutexAutoLock listlock(m_list_mutex);
268         if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
269                 errorstream << "ReliablePacketBuffer::insert(): Invalid data size for "
270                         "reliable packet" << std::endl;
271                 return;
272         }
273         u8 type = readU8(&p.data[BASE_HEADER_SIZE + 0]);
274         if (type != PACKET_TYPE_RELIABLE) {
275                 errorstream << "ReliablePacketBuffer::insert(): type is not reliable"
276                         << std::endl;
277                 return;
278         }
279         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
280
281         if (!seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE)) {
282                 errorstream << "ReliablePacketBuffer::insert(): seqnum is outside of "
283                         "expected window " << std::endl;
284                 return;
285         }
286         if (seqnum == next_expected) {
287                 errorstream << "ReliablePacketBuffer::insert(): seqnum is next expected"
288                         << std::endl;
289                 return;
290         }
291
292         sanity_check(m_list.size() <= SEQNUM_MAX); // FIXME: Handle the error?
293
294         // Find the right place for the packet and insert it there
295         // If list is empty, just add it
296         if (m_list.empty())
297         {
298                 m_list.push_back(p);
299                 m_oldest_non_answered_ack = seqnum;
300                 // Done.
301                 return;
302         }
303
304         // Otherwise find the right place
305         std::list<BufferedPacket>::iterator i = m_list.begin();
306         // Find the first packet in the list which has a higher seqnum
307         u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
308
309         /* case seqnum is smaller then next_expected seqnum */
310         /* this is true e.g. on wrap around */
311         if (seqnum < next_expected) {
312                 while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) {
313                         ++i;
314                         if (i != m_list.end())
315                                 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
316                 }
317         }
318         /* non wrap around case (at least for incoming and next_expected */
319         else
320         {
321                 while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) {
322                         ++i;
323                         if (i != m_list.end())
324                                 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
325                 }
326         }
327
328         if (s == seqnum) {
329                 /* nothing to do this seems to be a resent packet */
330                 /* for paranoia reason data should be compared */
331                 if (
332                         (readU16(&(i->data[BASE_HEADER_SIZE+1])) != seqnum) ||
333                         (i->data.getSize() != p.data.getSize()) ||
334                         (i->address != p.address)
335                         )
336                 {
337                         /* if this happens your maximum transfer window may be to big */
338                         fprintf(stderr,
339                                         "Duplicated seqnum %d non matching packet detected:\n",
340                                         seqnum);
341                         fprintf(stderr, "Old: seqnum: %05d size: %04d, address: %s\n",
342                                         readU16(&(i->data[BASE_HEADER_SIZE+1])),i->data.getSize(),
343                                         i->address.serializeString().c_str());
344                         fprintf(stderr, "New: seqnum: %05d size: %04u, address: %s\n",
345                                         readU16(&(p.data[BASE_HEADER_SIZE+1])),p.data.getSize(),
346                                         p.address.serializeString().c_str());
347                         throw IncomingDataCorruption("duplicated packet isn't same as original one");
348                 }
349         }
350         /* insert or push back */
351         else if (i != m_list.end()) {
352                 m_list.insert(i, p);
353         } else {
354                 m_list.push_back(p);
355         }
356
357         /* update last packet number */
358         m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]);
359 }
360
361 void ReliablePacketBuffer::incrementTimeouts(float dtime)
362 {
363         MutexAutoLock listlock(m_list_mutex);
364         for (BufferedPacket &bufferedPacket : m_list) {
365                 bufferedPacket.time += dtime;
366                 bufferedPacket.totaltime += dtime;
367         }
368 }
369
370 std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout,
371                                                                                                         unsigned int max_packets)
372 {
373         MutexAutoLock listlock(m_list_mutex);
374         std::list<BufferedPacket> timed_outs;
375         for (BufferedPacket &bufferedPacket : m_list) {
376                 if (bufferedPacket.time >= timeout) {
377                         timed_outs.push_back(bufferedPacket);
378
379                         //this packet will be sent right afterwards reset timeout here
380                         bufferedPacket.time = 0.0f;
381                         if (timed_outs.size() >= max_packets)
382                                 break;
383                 }
384         }
385         return timed_outs;
386 }
387
388 /*
389         IncomingSplitPacket
390 */
391
392 bool IncomingSplitPacket::insert(u32 chunk_num, SharedBuffer<u8> &chunkdata)
393 {
394         sanity_check(chunk_num < chunk_count);
395
396         // If chunk already exists, ignore it.
397         // Sometimes two identical packets may arrive when there is network
398         // lag and the server re-sends stuff.
399         if (chunks.find(chunk_num) != chunks.end())
400                 return false;
401
402         // Set chunk data in buffer
403         chunks[chunk_num] = chunkdata;
404
405         return true;
406 }
407
408 SharedBuffer<u8> IncomingSplitPacket::reassemble()
409 {
410         sanity_check(allReceived());
411
412         // Calculate total size
413         u32 totalsize = 0;
414         for (const auto &chunk : chunks)
415                 totalsize += chunk.second.getSize();
416
417         SharedBuffer<u8> fulldata(totalsize);
418
419         // Copy chunks to data buffer
420         u32 start = 0;
421         for (u32 chunk_i = 0; chunk_i < chunk_count; chunk_i++) {
422                 const SharedBuffer<u8> &buf = chunks[chunk_i];
423                 memcpy(&fulldata[start], *buf, buf.getSize());
424                 start += buf.getSize();
425         }
426
427         return fulldata;
428 }
429
430 /*
431         IncomingSplitBuffer
432 */
433
434 IncomingSplitBuffer::~IncomingSplitBuffer()
435 {
436         MutexAutoLock listlock(m_map_mutex);
437         for (auto &i : m_buf) {
438                 delete i.second;
439         }
440 }
441
442 SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool reliable)
443 {
444         MutexAutoLock listlock(m_map_mutex);
445         u32 headersize = BASE_HEADER_SIZE + 7;
446         if (p.data.getSize() < headersize) {
447                 errorstream << "Invalid data size for split packet" << std::endl;
448                 return SharedBuffer<u8>();
449         }
450         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
451         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
452         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
453         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
454
455         if (type != PACKET_TYPE_SPLIT) {
456                 errorstream << "IncomingSplitBuffer::insert(): type is not split"
457                         << std::endl;
458                 return SharedBuffer<u8>();
459         }
460         if (chunk_num >= chunk_count) {
461                 errorstream << "IncomingSplitBuffer::insert(): chunk_num=" << chunk_num
462                                 << " >= chunk_count=" << chunk_count << std::endl;
463                 return SharedBuffer<u8>();
464         }
465
466         // Add if doesn't exist
467         IncomingSplitPacket *sp;
468         if (m_buf.find(seqnum) == m_buf.end()) {
469                 sp = new IncomingSplitPacket(chunk_count, reliable);
470                 m_buf[seqnum] = sp;
471         } else {
472                 sp = m_buf[seqnum];
473         }
474
475         if (chunk_count != sp->chunk_count) {
476                 errorstream << "IncomingSplitBuffer::insert(): chunk_count="
477                                 << chunk_count << " != sp->chunk_count=" << sp->chunk_count
478                                 << std::endl;
479                 return SharedBuffer<u8>();
480         }
481         if (reliable != sp->reliable)
482                 LOG(derr_con<<"Connection: WARNING: reliable="<<reliable
483                                 <<" != sp->reliable="<<sp->reliable
484                                 <<std::endl);
485
486         // Cut chunk data out of packet
487         u32 chunkdatasize = p.data.getSize() - headersize;
488         SharedBuffer<u8> chunkdata(chunkdatasize);
489         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
490
491         if (!sp->insert(chunk_num, chunkdata))
492                 return SharedBuffer<u8>();
493
494         // If not all chunks are received, return empty buffer
495         if (!sp->allReceived())
496                 return SharedBuffer<u8>();
497
498         SharedBuffer<u8> fulldata = sp->reassemble();
499
500         // Remove sp from buffer
501         m_buf.erase(seqnum);
502         delete sp;
503
504         return fulldata;
505 }
506
507 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
508 {
509         std::deque<u16> remove_queue;
510         {
511                 MutexAutoLock listlock(m_map_mutex);
512                 for (auto &i : m_buf) {
513                         IncomingSplitPacket *p = i.second;
514                         // Reliable ones are not removed by timeout
515                         if (p->reliable)
516                                 continue;
517                         p->time += dtime;
518                         if (p->time >= timeout)
519                                 remove_queue.push_back(i.first);
520                 }
521         }
522         for (u16 j : remove_queue) {
523                 MutexAutoLock listlock(m_map_mutex);
524                 LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"<<std::endl);
525                 delete m_buf[j];
526                 m_buf.erase(j);
527         }
528 }
529
530 /*
531         ConnectionCommand
532  */
533
534 void ConnectionCommand::send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt,
535         bool reliable_)
536 {
537         type = CONNCMD_SEND;
538         peer_id = peer_id_;
539         channelnum = channelnum_;
540         data = pkt->oldForgePacket();
541         reliable = reliable_;
542 }
543
544 /*
545         Channel
546 */
547
548 u16 Channel::readNextIncomingSeqNum()
549 {
550         MutexAutoLock internal(m_internal_mutex);
551         return next_incoming_seqnum;
552 }
553
554 u16 Channel::incNextIncomingSeqNum()
555 {
556         MutexAutoLock internal(m_internal_mutex);
557         u16 retval = next_incoming_seqnum;
558         next_incoming_seqnum++;
559         return retval;
560 }
561
562 u16 Channel::readNextSplitSeqNum()
563 {
564         MutexAutoLock internal(m_internal_mutex);
565         return next_outgoing_split_seqnum;
566 }
567 void Channel::setNextSplitSeqNum(u16 seqnum)
568 {
569         MutexAutoLock internal(m_internal_mutex);
570         next_outgoing_split_seqnum = seqnum;
571 }
572
573 u16 Channel::getOutgoingSequenceNumber(bool& successful)
574 {
575         MutexAutoLock internal(m_internal_mutex);
576         u16 retval = next_outgoing_seqnum;
577         u16 lowest_unacked_seqnumber;
578
579         /* shortcut if there ain't any packet in outgoing list */
580         if (outgoing_reliables_sent.empty())
581         {
582                 next_outgoing_seqnum++;
583                 return retval;
584         }
585
586         if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber))
587         {
588                 if (lowest_unacked_seqnumber < next_outgoing_seqnum) {
589                         // ugly cast but this one is required in order to tell compiler we
590                         // know about difference of two unsigned may be negative in general
591                         // but we already made sure it won't happen in this case
592                         if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > window_size) {
593                                 successful = false;
594                                 return 0;
595                         }
596                 }
597                 else {
598                         // ugly cast but this one is required in order to tell compiler we
599                         // know about difference of two unsigned may be negative in general
600                         // but we already made sure it won't happen in this case
601                         if ((next_outgoing_seqnum + (u16)(SEQNUM_MAX - lowest_unacked_seqnumber)) >
602                                 window_size) {
603                                 successful = false;
604                                 return 0;
605                         }
606                 }
607         }
608
609         next_outgoing_seqnum++;
610         return retval;
611 }
612
613 u16 Channel::readOutgoingSequenceNumber()
614 {
615         MutexAutoLock internal(m_internal_mutex);
616         return next_outgoing_seqnum;
617 }
618
619 bool Channel::putBackSequenceNumber(u16 seqnum)
620 {
621         if (((seqnum + 1) % (SEQNUM_MAX+1)) == next_outgoing_seqnum) {
622
623                 next_outgoing_seqnum = seqnum;
624                 return true;
625         }
626         return false;
627 }
628
629 void Channel::UpdateBytesSent(unsigned int bytes, unsigned int packets)
630 {
631         MutexAutoLock internal(m_internal_mutex);
632         current_bytes_transfered += bytes;
633         current_packet_successful += packets;
634 }
635
636 void Channel::UpdateBytesReceived(unsigned int bytes) {
637         MutexAutoLock internal(m_internal_mutex);
638         current_bytes_received += bytes;
639 }
640
641 void Channel::UpdateBytesLost(unsigned int bytes)
642 {
643         MutexAutoLock internal(m_internal_mutex);
644         current_bytes_lost += bytes;
645 }
646
647
648 void Channel::UpdatePacketLossCounter(unsigned int count)
649 {
650         MutexAutoLock internal(m_internal_mutex);
651         current_packet_loss += count;
652 }
653
654 void Channel::UpdatePacketTooLateCounter()
655 {
656         MutexAutoLock internal(m_internal_mutex);
657         current_packet_too_late++;
658 }
659
660 void Channel::UpdateTimers(float dtime)
661 {
662         bpm_counter += dtime;
663         packet_loss_counter += dtime;
664
665         if (packet_loss_counter > 1.0f) {
666                 packet_loss_counter -= 1.0f;
667
668                 unsigned int packet_loss = 11; /* use a neutral value for initialization */
669                 unsigned int packets_successful = 0;
670                 //unsigned int packet_too_late = 0;
671
672                 bool reasonable_amount_of_data_transmitted = false;
673
674                 {
675                         MutexAutoLock internal(m_internal_mutex);
676                         packet_loss = current_packet_loss;
677                         //packet_too_late = current_packet_too_late;
678                         packets_successful = current_packet_successful;
679
680                         if (current_bytes_transfered > (unsigned int) (window_size*512/2)) {
681                                 reasonable_amount_of_data_transmitted = true;
682                         }
683                         current_packet_loss = 0;
684                         current_packet_too_late = 0;
685                         current_packet_successful = 0;
686                 }
687
688                 /* dynamic window size */
689                 float successful_to_lost_ratio = 0.0f;
690                 bool done = false;
691
692                 if (packets_successful > 0) {
693                         successful_to_lost_ratio = packet_loss/packets_successful;
694                 } else if (packet_loss > 0) {
695                         window_size = std::max(
696                                         (window_size - 10),
697                                         MIN_RELIABLE_WINDOW_SIZE);
698                         done = true;
699                 }
700
701                 if (!done) {
702                         if ((successful_to_lost_ratio < 0.01f) &&
703                                 (window_size < MAX_RELIABLE_WINDOW_SIZE)) {
704                                 /* don't even think about increasing if we didn't even
705                                  * use major parts of our window */
706                                 if (reasonable_amount_of_data_transmitted)
707                                         window_size = std::min(
708                                                         (window_size + 100),
709                                                         MAX_RELIABLE_WINDOW_SIZE);
710                         } else if ((successful_to_lost_ratio < 0.05f) &&
711                                         (window_size < MAX_RELIABLE_WINDOW_SIZE)) {
712                                 /* don't even think about increasing if we didn't even
713                                  * use major parts of our window */
714                                 if (reasonable_amount_of_data_transmitted)
715                                         window_size = std::min(
716                                                         (window_size + 50),
717                                                         MAX_RELIABLE_WINDOW_SIZE);
718                         } else if (successful_to_lost_ratio > 0.15f) {
719                                 window_size = std::max(
720                                                 (window_size - 100),
721                                                 MIN_RELIABLE_WINDOW_SIZE);
722                         } else if (successful_to_lost_ratio > 0.1f) {
723                                 window_size = std::max(
724                                                 (window_size - 50),
725                                                 MIN_RELIABLE_WINDOW_SIZE);
726                         }
727                 }
728         }
729
730         if (bpm_counter > 10.0f) {
731                 {
732                         MutexAutoLock internal(m_internal_mutex);
733                         cur_kbps                 =
734                                         (((float) current_bytes_transfered)/bpm_counter)/1024.0f;
735                         current_bytes_transfered = 0;
736                         cur_kbps_lost            =
737                                         (((float) current_bytes_lost)/bpm_counter)/1024.0f;
738                         current_bytes_lost       = 0;
739                         cur_incoming_kbps        =
740                                         (((float) current_bytes_received)/bpm_counter)/1024.0f;
741                         current_bytes_received   = 0;
742                         bpm_counter              = 0.0f;
743                 }
744
745                 if (cur_kbps > max_kbps) {
746                         max_kbps = cur_kbps;
747                 }
748
749                 if (cur_kbps_lost > max_kbps_lost) {
750                         max_kbps_lost = cur_kbps_lost;
751                 }
752
753                 if (cur_incoming_kbps > max_incoming_kbps) {
754                         max_incoming_kbps = cur_incoming_kbps;
755                 }
756
757                 rate_samples       = MYMIN(rate_samples+1,10);
758                 float old_fraction = ((float) (rate_samples-1) )/( (float) rate_samples);
759                 avg_kbps           = avg_kbps * old_fraction +
760                                 cur_kbps * (1.0 - old_fraction);
761                 avg_kbps_lost      = avg_kbps_lost * old_fraction +
762                                 cur_kbps_lost * (1.0 - old_fraction);
763                 avg_incoming_kbps  = avg_incoming_kbps * old_fraction +
764                                 cur_incoming_kbps * (1.0 - old_fraction);
765         }
766 }
767
768
769 /*
770         Peer
771 */
772
773 PeerHelper::PeerHelper(Peer* peer) :
774         m_peer(peer)
775 {
776         if (peer && !peer->IncUseCount())
777                 m_peer = nullptr;
778 }
779
780 PeerHelper::~PeerHelper()
781 {
782         if (m_peer)
783                 m_peer->DecUseCount();
784
785         m_peer = nullptr;
786 }
787
788 PeerHelper& PeerHelper::operator=(Peer* peer)
789 {
790         m_peer = peer;
791         if (peer && !peer->IncUseCount())
792                 m_peer = nullptr;
793         return *this;
794 }
795
796 Peer* PeerHelper::operator->() const
797 {
798         return m_peer;
799 }
800
801 Peer* PeerHelper::operator&() const
802 {
803         return m_peer;
804 }
805
806 bool PeerHelper::operator!()
807 {
808         return ! m_peer;
809 }
810
811 bool PeerHelper::operator!=(void* ptr)
812 {
813         return ((void*) m_peer != ptr);
814 }
815
816 bool Peer::IncUseCount()
817 {
818         MutexAutoLock lock(m_exclusive_access_mutex);
819
820         if (!m_pending_deletion) {
821                 this->m_usage++;
822                 return true;
823         }
824
825         return false;
826 }
827
828 void Peer::DecUseCount()
829 {
830         {
831                 MutexAutoLock lock(m_exclusive_access_mutex);
832                 sanity_check(m_usage > 0);
833                 m_usage--;
834
835                 if (!((m_pending_deletion) && (m_usage == 0)))
836                         return;
837         }
838         delete this;
839 }
840
841 void Peer::RTTStatistics(float rtt, const std::string &profiler_id,
842                 unsigned int num_samples) {
843
844         if (m_last_rtt > 0) {
845                 /* set min max values */
846                 if (rtt < m_rtt.min_rtt)
847                         m_rtt.min_rtt = rtt;
848                 if (rtt >= m_rtt.max_rtt)
849                         m_rtt.max_rtt = rtt;
850
851                 /* do average calculation */
852                 if (m_rtt.avg_rtt < 0.0)
853                         m_rtt.avg_rtt  = rtt;
854                 else
855                         m_rtt.avg_rtt  = m_rtt.avg_rtt * (num_samples/(num_samples-1)) +
856                                                                 rtt * (1/num_samples);
857
858                 /* do jitter calculation */
859
860                 //just use some neutral value at beginning
861                 float jitter = m_rtt.jitter_min;
862
863                 if (rtt > m_last_rtt)
864                         jitter = rtt-m_last_rtt;
865
866                 if (rtt <= m_last_rtt)
867                         jitter = m_last_rtt - rtt;
868
869                 if (jitter < m_rtt.jitter_min)
870                         m_rtt.jitter_min = jitter;
871                 if (jitter >= m_rtt.jitter_max)
872                         m_rtt.jitter_max = jitter;
873
874                 if (m_rtt.jitter_avg < 0.0)
875                         m_rtt.jitter_avg  = jitter;
876                 else
877                         m_rtt.jitter_avg  = m_rtt.jitter_avg * (num_samples/(num_samples-1)) +
878                                                                 jitter * (1/num_samples);
879
880                 if (!profiler_id.empty()) {
881                         g_profiler->graphAdd(profiler_id + " RTT [ms]", rtt * 1000.f);
882                         g_profiler->graphAdd(profiler_id + " jitter [ms]", jitter * 1000.f);
883                 }
884         }
885         /* save values required for next loop */
886         m_last_rtt = rtt;
887 }
888
889 bool Peer::isTimedOut(float timeout)
890 {
891         MutexAutoLock lock(m_exclusive_access_mutex);
892         u64 current_time = porting::getTimeMs();
893
894         float dtime = CALC_DTIME(m_last_timeout_check,current_time);
895         m_last_timeout_check = current_time;
896
897         m_timeout_counter += dtime;
898
899         return m_timeout_counter > timeout;
900 }
901
902 void Peer::Drop()
903 {
904         {
905                 MutexAutoLock usage_lock(m_exclusive_access_mutex);
906                 m_pending_deletion = true;
907                 if (m_usage != 0)
908                         return;
909         }
910
911         PROFILE(std::stringstream peerIdentifier1);
912         PROFILE(peerIdentifier1 << "runTimeouts[" << m_connection->getDesc()
913                         << ";" << id << ";RELIABLE]");
914         PROFILE(g_profiler->remove(peerIdentifier1.str()));
915         PROFILE(std::stringstream peerIdentifier2);
916         PROFILE(peerIdentifier2 << "sendPackets[" << m_connection->getDesc()
917                         << ";" << id << ";RELIABLE]");
918         PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier2.str(), SPT_AVG));
919
920         delete this;
921 }
922
923 UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) :
924         Peer(a_address,a_id,connection)
925 {
926         for (Channel &channel : channels)
927                 channel.setWindowSize(START_RELIABLE_WINDOW_SIZE);
928 }
929
930 bool UDPPeer::getAddress(MTProtocols type,Address& toset)
931 {
932         if ((type == MTP_UDP) || (type == MTP_MINETEST_RELIABLE_UDP) || (type == MTP_PRIMARY))
933         {
934                 toset = address;
935                 return true;
936         }
937
938         return false;
939 }
940
941 void UDPPeer::reportRTT(float rtt)
942 {
943         if (rtt < 0.0) {
944                 return;
945         }
946         RTTStatistics(rtt,"rudp",MAX_RELIABLE_WINDOW_SIZE*10);
947
948         float timeout = getStat(AVG_RTT) * RESEND_TIMEOUT_FACTOR;
949         if (timeout < RESEND_TIMEOUT_MIN)
950                 timeout = RESEND_TIMEOUT_MIN;
951         if (timeout > RESEND_TIMEOUT_MAX)
952                 timeout = RESEND_TIMEOUT_MAX;
953
954         MutexAutoLock usage_lock(m_exclusive_access_mutex);
955         resend_timeout = timeout;
956 }
957
958 bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data)
959 {
960         m_ping_timer += dtime;
961         if (m_ping_timer >= PING_TIMEOUT)
962         {
963                 // Create and send PING packet
964                 writeU8(&data[0], PACKET_TYPE_CONTROL);
965                 writeU8(&data[1], CONTROLTYPE_PING);
966                 m_ping_timer = 0.0;
967                 return true;
968         }
969         return false;
970 }
971
972 void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
973                 unsigned int max_packet_size)
974 {
975         if (m_pending_disconnect)
976                 return;
977
978         Channel &chan = channels[c.channelnum];
979
980         if (chan.queued_commands.empty() &&
981                         /* don't queue more packets then window size */
982                         (chan.queued_reliables.size() < chan.getWindowSize() / 2)) {
983                 LOG(dout_con<<m_connection->getDesc()
984                                 <<" processing reliable command for peer id: " << c.peer_id
985                                 <<" data size: " << c.data.getSize() << std::endl);
986                 if (!processReliableSendCommand(c,max_packet_size)) {
987                         chan.queued_commands.push_back(c);
988                 }
989         }
990         else {
991                 LOG(dout_con<<m_connection->getDesc()
992                                 <<" Queueing reliable command for peer id: " << c.peer_id
993                                 <<" data size: " << c.data.getSize() <<std::endl);
994                 chan.queued_commands.push_back(c);
995                 if (chan.queued_commands.size() >= chan.getWindowSize() / 2) {
996                         LOG(derr_con << m_connection->getDesc()
997                                         << "Possible packet stall to peer id: " << c.peer_id
998                                         << " queued_commands=" << chan.queued_commands.size()
999                                         << std::endl);
1000                 }
1001         }
1002 }
1003
1004 bool UDPPeer::processReliableSendCommand(
1005                                 ConnectionCommand &c,
1006                                 unsigned int max_packet_size)
1007 {
1008         if (m_pending_disconnect)
1009                 return true;
1010
1011         Channel &chan = channels[c.channelnum];
1012
1013         u32 chunksize_max = max_packet_size
1014                                                         - BASE_HEADER_SIZE
1015                                                         - RELIABLE_HEADER_SIZE;
1016
1017         sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512);
1018
1019         std::list<SharedBuffer<u8>> originals;
1020         u16 split_sequence_number = chan.readNextSplitSeqNum();
1021
1022         if (c.raw) {
1023                 originals.emplace_back(c.data);
1024         } else {
1025                 makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number, &originals);
1026                 chan.setNextSplitSeqNum(split_sequence_number);
1027         }
1028
1029         bool have_sequence_number = true;
1030         bool have_initial_sequence_number = false;
1031         std::queue<BufferedPacket> toadd;
1032         volatile u16 initial_sequence_number = 0;
1033
1034         for (SharedBuffer<u8> &original : originals) {
1035                 u16 seqnum = chan.getOutgoingSequenceNumber(have_sequence_number);
1036
1037                 /* oops, we don't have enough sequence numbers to send this packet */
1038                 if (!have_sequence_number)
1039                         break;
1040
1041                 if (!have_initial_sequence_number)
1042                 {
1043                         initial_sequence_number = seqnum;
1044                         have_initial_sequence_number = true;
1045                 }
1046
1047                 SharedBuffer<u8> reliable = makeReliablePacket(original, seqnum);
1048
1049                 // Add base headers and make a packet
1050                 BufferedPacket p = con::makePacket(address, reliable,
1051                                 m_connection->GetProtocolID(), m_connection->GetPeerID(),
1052                                 c.channelnum);
1053
1054                 toadd.push(p);
1055         }
1056
1057         if (have_sequence_number) {
1058                 volatile u16 pcount = 0;
1059                 while (!toadd.empty()) {
1060                         BufferedPacket p = toadd.front();
1061                         toadd.pop();
1062 //                      LOG(dout_con<<connection->getDesc()
1063 //                                      << " queuing reliable packet for peer_id: " << c.peer_id
1064 //                                      << " channel: " << (c.channelnum&0xFF)
1065 //                                      << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
1066 //                                      << std::endl)
1067                         chan.queued_reliables.push(p);
1068                         pcount++;
1069                 }
1070                 sanity_check(chan.queued_reliables.size() < 0xFFFF);
1071                 return true;
1072         }
1073
1074         volatile u16 packets_available = toadd.size();
1075         /* we didn't get a single sequence number no need to fill queue */
1076         if (!have_initial_sequence_number) {
1077                 return false;
1078         }
1079
1080         while (!toadd.empty()) {
1081                 /* remove packet */
1082                 toadd.pop();
1083
1084                 bool successfully_put_back_sequence_number
1085                         = chan.putBackSequenceNumber(
1086                                 (initial_sequence_number+toadd.size() % (SEQNUM_MAX+1)));
1087
1088                 FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error");
1089         }
1090
1091         // DO NOT REMOVE n_queued! It avoids a deadlock of async locked
1092         // 'log_message_mutex' and 'm_list_mutex'.
1093         u32 n_queued = chan.outgoing_reliables_sent.size();
1094
1095         LOG(dout_con<<m_connection->getDesc()
1096                         << " Windowsize exceeded on reliable sending "
1097                         << c.data.getSize() << " bytes"
1098                         << std::endl << "\t\tinitial_sequence_number: "
1099                         << initial_sequence_number
1100                         << std::endl << "\t\tgot at most            : "
1101                         << packets_available << " packets"
1102                         << std::endl << "\t\tpackets queued         : "
1103                         << n_queued
1104                         << std::endl);
1105
1106         return false;
1107 }
1108
1109 void UDPPeer::RunCommandQueues(
1110                                                         unsigned int max_packet_size,
1111                                                         unsigned int maxcommands,
1112                                                         unsigned int maxtransfer)
1113 {
1114
1115         for (Channel &channel : channels) {
1116                 unsigned int commands_processed = 0;
1117
1118                 if ((!channel.queued_commands.empty()) &&
1119                                 (channel.queued_reliables.size() < maxtransfer) &&
1120                                 (commands_processed < maxcommands)) {
1121                         try {
1122                                 ConnectionCommand c = channel.queued_commands.front();
1123
1124                                 LOG(dout_con << m_connection->getDesc()
1125                                                 << " processing queued reliable command " << std::endl);
1126
1127                                 // Packet is processed, remove it from queue
1128                                 if (processReliableSendCommand(c,max_packet_size)) {
1129                                         channel.queued_commands.pop_front();
1130                                 } else {
1131                                         LOG(dout_con << m_connection->getDesc()
1132                                                         << " Failed to queue packets for peer_id: " << c.peer_id
1133                                                         << ", delaying sending of " << c.data.getSize()
1134                                                         << " bytes" << std::endl);
1135                                 }
1136                         }
1137                         catch (ItemNotFoundException &e) {
1138                                 // intentionally empty
1139                         }
1140                 }
1141         }
1142 }
1143
1144 u16 UDPPeer::getNextSplitSequenceNumber(u8 channel)
1145 {
1146         assert(channel < CHANNEL_COUNT); // Pre-condition
1147         return channels[channel].readNextSplitSeqNum();
1148 }
1149
1150 void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum)
1151 {
1152         assert(channel < CHANNEL_COUNT); // Pre-condition
1153         channels[channel].setNextSplitSeqNum(seqnum);
1154 }
1155
1156 SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, const BufferedPacket &toadd,
1157         bool reliable)
1158 {
1159         assert(channel < CHANNEL_COUNT); // Pre-condition
1160         return channels[channel].incoming_splits.insert(toadd, reliable);
1161 }
1162
1163 /*
1164         Connection
1165 */
1166
1167 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
1168                 bool ipv6, PeerHandler *peerhandler) :
1169         m_udpSocket(ipv6),
1170         m_protocol_id(protocol_id),
1171         m_sendThread(new ConnectionSendThread(max_packet_size, timeout)),
1172         m_receiveThread(new ConnectionReceiveThread(max_packet_size)),
1173         m_bc_peerhandler(peerhandler)
1174
1175 {
1176         m_udpSocket.setTimeoutMs(5);
1177
1178         m_sendThread->setParent(this);
1179         m_receiveThread->setParent(this);
1180
1181         m_sendThread->start();
1182         m_receiveThread->start();
1183 }
1184
1185
1186 Connection::~Connection()
1187 {
1188         m_shutting_down = true;
1189         // request threads to stop
1190         m_sendThread->stop();
1191         m_receiveThread->stop();
1192
1193         //TODO for some unkonwn reason send/receive threads do not exit as they're
1194         // supposed to be but wait on peer timeout. To speed up shutdown we reduce
1195         // timeout to half a second.
1196         m_sendThread->setPeerTimeout(0.5);
1197
1198         // wait for threads to finish
1199         m_sendThread->wait();
1200         m_receiveThread->wait();
1201
1202         // Delete peers
1203         for (auto &peer : m_peers) {
1204                 delete peer.second;
1205         }
1206 }
1207
1208 /* Internal stuff */
1209 void Connection::putEvent(ConnectionEvent &e)
1210 {
1211         assert(e.type != CONNEVENT_NONE); // Pre-condition
1212         m_event_queue.push_back(e);
1213 }
1214
1215 void Connection::TriggerSend()
1216 {
1217         m_sendThread->Trigger();
1218 }
1219
1220 PeerHelper Connection::getPeerNoEx(session_t peer_id)
1221 {
1222         MutexAutoLock peerlock(m_peers_mutex);
1223         std::map<session_t, Peer *>::iterator node = m_peers.find(peer_id);
1224
1225         if (node == m_peers.end()) {
1226                 return PeerHelper(NULL);
1227         }
1228
1229         // Error checking
1230         FATAL_ERROR_IF(node->second->id != peer_id, "Invalid peer id");
1231
1232         return PeerHelper(node->second);
1233 }
1234
1235 /* find peer_id for address */
1236 u16 Connection::lookupPeer(Address& sender)
1237 {
1238         MutexAutoLock peerlock(m_peers_mutex);
1239         std::map<u16, Peer*>::iterator j;
1240         j = m_peers.begin();
1241         for(; j != m_peers.end(); ++j)
1242         {
1243                 Peer *peer = j->second;
1244                 if (peer->isPendingDeletion())
1245                         continue;
1246
1247                 Address tocheck;
1248
1249                 if ((peer->getAddress(MTP_MINETEST_RELIABLE_UDP, tocheck)) && (tocheck == sender))
1250                         return peer->id;
1251
1252                 if ((peer->getAddress(MTP_UDP, tocheck)) && (tocheck == sender))
1253                         return peer->id;
1254         }
1255
1256         return PEER_ID_INEXISTENT;
1257 }
1258
1259 bool Connection::deletePeer(session_t peer_id, bool timeout)
1260 {
1261         Peer *peer = 0;
1262
1263         /* lock list as short as possible */
1264         {
1265                 MutexAutoLock peerlock(m_peers_mutex);
1266                 if (m_peers.find(peer_id) == m_peers.end())
1267                         return false;
1268                 peer = m_peers[peer_id];
1269                 m_peers.erase(peer_id);
1270                 m_peer_ids.remove(peer_id);
1271         }
1272
1273         Address peer_address;
1274         //any peer has a primary address this never fails!
1275         peer->getAddress(MTP_PRIMARY, peer_address);
1276         // Create event
1277         ConnectionEvent e;
1278         e.peerRemoved(peer_id, timeout, peer_address);
1279         putEvent(e);
1280
1281
1282         peer->Drop();
1283         return true;
1284 }
1285
1286 /* Interface */
1287
1288 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1289 {
1290         try {
1291                 return m_event_queue.pop_front(timeout_ms);
1292         } catch(ItemNotFoundException &ex) {
1293                 ConnectionEvent e;
1294                 e.type = CONNEVENT_NONE;
1295                 return e;
1296         }
1297 }
1298
1299 void Connection::putCommand(ConnectionCommand &c)
1300 {
1301         if (!m_shutting_down) {
1302                 m_command_queue.push_back(c);
1303                 m_sendThread->Trigger();
1304         }
1305 }
1306
1307 void Connection::Serve(Address bind_addr)
1308 {
1309         ConnectionCommand c;
1310         c.serve(bind_addr);
1311         putCommand(c);
1312 }
1313
1314 void Connection::Connect(Address address)
1315 {
1316         ConnectionCommand c;
1317         c.connect(address);
1318         putCommand(c);
1319 }
1320
1321 bool Connection::Connected()
1322 {
1323         MutexAutoLock peerlock(m_peers_mutex);
1324
1325         if (m_peers.size() != 1)
1326                 return false;
1327
1328         std::map<session_t, Peer *>::iterator node = m_peers.find(PEER_ID_SERVER);
1329         if (node == m_peers.end())
1330                 return false;
1331
1332         if (m_peer_id == PEER_ID_INEXISTENT)
1333                 return false;
1334
1335         return true;
1336 }
1337
1338 void Connection::Disconnect()
1339 {
1340         ConnectionCommand c;
1341         c.disconnect();
1342         putCommand(c);
1343 }
1344
1345 bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
1346 {
1347         /*
1348                 Note that this function can potentially wait infinitely if non-data
1349                 events keep happening before the timeout expires.
1350                 This is not considered to be a problem (is it?)
1351         */
1352         for(;;) {
1353                 ConnectionEvent e = waitEvent(timeout);
1354                 if (e.type != CONNEVENT_NONE)
1355                         LOG(dout_con << getDesc() << ": Receive: got event: "
1356                                         << e.describe() << std::endl);
1357                 switch(e.type) {
1358                 case CONNEVENT_NONE:
1359                         return false;
1360                 case CONNEVENT_DATA_RECEIVED:
1361                         // Data size is lesser than command size, ignoring packet
1362                         if (e.data.getSize() < 2) {
1363                                 continue;
1364                         }
1365
1366                         pkt->putRawPacket(*e.data, e.data.getSize(), e.peer_id);
1367                         return true;
1368                 case CONNEVENT_PEER_ADDED: {
1369                         UDPPeer tmp(e.peer_id, e.address, this);
1370                         if (m_bc_peerhandler)
1371                                 m_bc_peerhandler->peerAdded(&tmp);
1372                         continue;
1373                 }
1374                 case CONNEVENT_PEER_REMOVED: {
1375                         UDPPeer tmp(e.peer_id, e.address, this);
1376                         if (m_bc_peerhandler)
1377                                 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1378                         continue;
1379                 }
1380                 case CONNEVENT_BIND_FAILED:
1381                         throw ConnectionBindFailed("Failed to bind socket "
1382                                         "(port already in use?)");
1383                 }
1384         }
1385         return false;
1386 }
1387
1388 void Connection::Receive(NetworkPacket *pkt)
1389 {
1390         bool any = Receive(pkt, m_bc_receive_timeout);
1391         if (!any)
1392                 throw NoIncomingDataException("No incoming data");
1393 }
1394
1395 bool Connection::TryReceive(NetworkPacket *pkt)
1396 {
1397         return Receive(pkt, 0);
1398 }
1399
1400 void Connection::Send(session_t peer_id, u8 channelnum,
1401                 NetworkPacket *pkt, bool reliable)
1402 {
1403         assert(channelnum < CHANNEL_COUNT); // Pre-condition
1404
1405         ConnectionCommand c;
1406
1407         c.send(peer_id, channelnum, pkt, reliable);
1408         putCommand(c);
1409 }
1410
1411 Address Connection::GetPeerAddress(session_t peer_id)
1412 {
1413         PeerHelper peer = getPeerNoEx(peer_id);
1414
1415         if (!peer)
1416                 throw PeerNotFoundException("No address for peer found!");
1417         Address peer_address;
1418         peer->getAddress(MTP_PRIMARY, peer_address);
1419         return peer_address;
1420 }
1421
1422 float Connection::getPeerStat(session_t peer_id, rtt_stat_type type)
1423 {
1424         PeerHelper peer = getPeerNoEx(peer_id);
1425         if (!peer) return -1;
1426         return peer->getStat(type);
1427 }
1428
1429 float Connection::getLocalStat(rate_stat_type type)
1430 {
1431         PeerHelper peer = getPeerNoEx(PEER_ID_SERVER);
1432
1433         FATAL_ERROR_IF(!peer, "Connection::getLocalStat we couldn't get our own peer? are you serious???");
1434
1435         float retval = 0.0;
1436
1437         for (Channel &channel : dynamic_cast<UDPPeer *>(&peer)->channels) {
1438                 switch(type) {
1439                         case CUR_DL_RATE:
1440                                 retval += channel.getCurrentDownloadRateKB();
1441                                 break;
1442                         case AVG_DL_RATE:
1443                                 retval += channel.getAvgDownloadRateKB();
1444                                 break;
1445                         case CUR_INC_RATE:
1446                                 retval += channel.getCurrentIncomingRateKB();
1447                                 break;
1448                         case AVG_INC_RATE:
1449                                 retval += channel.getAvgIncomingRateKB();
1450                                 break;
1451                         case AVG_LOSS_RATE:
1452                                 retval += channel.getAvgLossRateKB();
1453                                 break;
1454                         case CUR_LOSS_RATE:
1455                                 retval += channel.getCurrentLossRateKB();
1456                                 break;
1457                 default:
1458                         FATAL_ERROR("Connection::getLocalStat Invalid stat type");
1459                 }
1460         }
1461         return retval;
1462 }
1463
1464 u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
1465 {
1466         // Somebody wants to make a new connection
1467
1468         // Get a unique peer id (2 or higher)
1469         session_t peer_id_new = m_next_remote_peer_id;
1470         u16 overflow =  MAX_UDP_PEERS;
1471
1472         /*
1473                 Find an unused peer id
1474         */
1475         MutexAutoLock lock(m_peers_mutex);
1476         bool out_of_ids = false;
1477         for(;;) {
1478                 // Check if exists
1479                 if (m_peers.find(peer_id_new) == m_peers.end())
1480
1481                         break;
1482                 // Check for overflow
1483                 if (peer_id_new == overflow) {
1484                         out_of_ids = true;
1485                         break;
1486                 }
1487                 peer_id_new++;
1488         }
1489
1490         if (out_of_ids) {
1491                 errorstream << getDesc() << " ran out of peer ids" << std::endl;
1492                 return PEER_ID_INEXISTENT;
1493         }
1494
1495         // Create a peer
1496         Peer *peer = 0;
1497         peer = new UDPPeer(peer_id_new, sender, this);
1498
1499         m_peers[peer->id] = peer;
1500         m_peer_ids.push_back(peer->id);
1501
1502         m_next_remote_peer_id = (peer_id_new +1 ) % MAX_UDP_PEERS;
1503
1504         LOG(dout_con << getDesc()
1505                         << "createPeer(): giving peer_id=" << peer_id_new << std::endl);
1506
1507         ConnectionCommand cmd;
1508         SharedBuffer<u8> reply(4);
1509         writeU8(&reply[0], PACKET_TYPE_CONTROL);
1510         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
1511         writeU16(&reply[2], peer_id_new);
1512         cmd.createPeer(peer_id_new,reply);
1513         putCommand(cmd);
1514
1515         // Create peer addition event
1516         ConnectionEvent e;
1517         e.peerAdded(peer_id_new, sender);
1518         putEvent(e);
1519
1520         // We're now talking to a valid peer_id
1521         return peer_id_new;
1522 }
1523
1524 void Connection::PrintInfo(std::ostream &out)
1525 {
1526         m_info_mutex.lock();
1527         out<<getDesc()<<": ";
1528         m_info_mutex.unlock();
1529 }
1530
1531 const std::string Connection::getDesc()
1532 {
1533         return std::string("con(")+
1534                         itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")";
1535 }
1536
1537 void Connection::DisconnectPeer(session_t peer_id)
1538 {
1539         ConnectionCommand discon;
1540         discon.disconnect_peer(peer_id);
1541         putCommand(discon);
1542 }
1543
1544 void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
1545 {
1546         assert(channelnum < CHANNEL_COUNT); // Pre-condition
1547
1548         LOG(dout_con<<getDesc()
1549                         <<" Queuing ACK command to peer_id: " << peer_id <<
1550                         " channel: " << (channelnum & 0xFF) <<
1551                         " seqnum: " << seqnum << std::endl);
1552
1553         ConnectionCommand c;
1554         SharedBuffer<u8> ack(4);
1555         writeU8(&ack[0], PACKET_TYPE_CONTROL);
1556         writeU8(&ack[1], CONTROLTYPE_ACK);
1557         writeU16(&ack[2], seqnum);
1558
1559         c.ack(peer_id, channelnum, ack);
1560         putCommand(c);
1561         m_sendThread->Trigger();
1562 }
1563
1564 UDPPeer* Connection::createServerPeer(Address& address)
1565 {
1566         if (getPeerNoEx(PEER_ID_SERVER) != 0)
1567         {
1568                 throw ConnectionException("Already connected to a server");
1569         }
1570
1571         UDPPeer *peer = new UDPPeer(PEER_ID_SERVER, address, this);
1572
1573         {
1574                 MutexAutoLock lock(m_peers_mutex);
1575                 m_peers[peer->id] = peer;
1576                 m_peer_ids.push_back(peer->id);
1577         }
1578
1579         return peer;
1580 }
1581
1582 } // namespace