]> git.lizzy.rs Git - dragonfireclient.git/blob - src/network/connection.cpp
Connection: Fix deadlock in debug mode (#9550)
[dragonfireclient.git] / src / network / connection.cpp
1 /*
2 Minetest
3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include <iomanip>
21 #include <cerrno>
22 #include <algorithm>
23 #include <cmath>
24 #include "connection.h"
25 #include "serialization.h"
26 #include "log.h"
27 #include "porting.h"
28 #include "network/connectionthreads.h"
29 #include "network/networkpacket.h"
30 #include "network/peerhandler.h"
31 #include "util/serialize.h"
32 #include "util/numeric.h"
33 #include "util/string.h"
34 #include "settings.h"
35 #include "profiler.h"
36
37 namespace con
38 {
39
40 /******************************************************************************/
41 /* defines used for debugging and profiling                                   */
42 /******************************************************************************/
43 #ifdef NDEBUG
44         #define LOG(a) a
45         #define PROFILE(a)
46 #else
47         #if 0
48         /* this mutex is used to achieve log message consistency */
49         std::mutex log_message_mutex;
50         #define LOG(a)                                                                 \
51                 {                                                                          \
52                 MutexAutoLock loglock(log_message_mutex);                                 \
53                 a;                                                                         \
54                 }
55         #else
56         // Prevent deadlocks until a solution is found after 5.2.0 (TODO)
57         #define LOG(a) a
58         #endif
59
60         #define PROFILE(a) a
61 #endif
62
63 #define PING_TIMEOUT 5.0
64
65 BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
66                 u32 protocol_id, session_t sender_peer_id, u8 channel)
67 {
68         u32 packet_size = data.getSize() + BASE_HEADER_SIZE;
69         BufferedPacket p(packet_size);
70         p.address = address;
71
72         writeU32(&p.data[0], protocol_id);
73         writeU16(&p.data[4], sender_peer_id);
74         writeU8(&p.data[6], channel);
75
76         memcpy(&p.data[BASE_HEADER_SIZE], *data, data.getSize());
77
78         return p;
79 }
80
81 SharedBuffer<u8> makeOriginalPacket(const SharedBuffer<u8> &data)
82 {
83         u32 header_size = 1;
84         u32 packet_size = data.getSize() + header_size;
85         SharedBuffer<u8> b(packet_size);
86
87         writeU8(&(b[0]), PACKET_TYPE_ORIGINAL);
88         if (data.getSize() > 0) {
89                 memcpy(&(b[header_size]), *data, data.getSize());
90         }
91         return b;
92 }
93
94 // Split data in chunks and add TYPE_SPLIT headers to them
95 void makeSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max, u16 seqnum,
96                 std::list<SharedBuffer<u8>> *chunks)
97 {
98         // Chunk packets, containing the TYPE_SPLIT header
99         u32 chunk_header_size = 7;
100         u32 maximum_data_size = chunksize_max - chunk_header_size;
101         u32 start = 0;
102         u32 end = 0;
103         u32 chunk_num = 0;
104         u16 chunk_count = 0;
105         do {
106                 end = start + maximum_data_size - 1;
107                 if (end > data.getSize() - 1)
108                         end = data.getSize() - 1;
109
110                 u32 payload_size = end - start + 1;
111                 u32 packet_size = chunk_header_size + payload_size;
112
113                 SharedBuffer<u8> chunk(packet_size);
114
115                 writeU8(&chunk[0], PACKET_TYPE_SPLIT);
116                 writeU16(&chunk[1], seqnum);
117                 // [3] u16 chunk_count is written at next stage
118                 writeU16(&chunk[5], chunk_num);
119                 memcpy(&chunk[chunk_header_size], &data[start], payload_size);
120
121                 chunks->push_back(chunk);
122                 chunk_count++;
123
124                 start = end + 1;
125                 chunk_num++;
126         }
127         while (end != data.getSize() - 1);
128
129         for (SharedBuffer<u8> &chunk : *chunks) {
130                 // Write chunk_count
131                 writeU16(&(chunk[3]), chunk_count);
132         }
133 }
134
135 void makeAutoSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max,
136                 u16 &split_seqnum, std::list<SharedBuffer<u8>> *list)
137 {
138         u32 original_header_size = 1;
139
140         if (data.getSize() + original_header_size > chunksize_max) {
141                 makeSplitPacket(data, chunksize_max, split_seqnum, list);
142                 split_seqnum++;
143                 return;
144         }
145
146         list->push_back(makeOriginalPacket(data));
147 }
148
149 SharedBuffer<u8> makeReliablePacket(const SharedBuffer<u8> &data, u16 seqnum)
150 {
151         u32 header_size = 3;
152         u32 packet_size = data.getSize() + header_size;
153         SharedBuffer<u8> b(packet_size);
154
155         writeU8(&b[0], PACKET_TYPE_RELIABLE);
156         writeU16(&b[1], seqnum);
157
158         memcpy(&b[header_size], *data, data.getSize());
159
160         return b;
161 }
162
163 /*
164         ReliablePacketBuffer
165 */
166
167 void ReliablePacketBuffer::print()
168 {
169         MutexAutoLock listlock(m_list_mutex);
170         LOG(dout_con<<"Dump of ReliablePacketBuffer:" << std::endl);
171         unsigned int index = 0;
172         for (BufferedPacket &bufferedPacket : m_list) {
173                 u16 s = readU16(&(bufferedPacket.data[BASE_HEADER_SIZE+1]));
174                 LOG(dout_con<<index<< ":" << s << std::endl);
175                 index++;
176         }
177 }
178
179 bool ReliablePacketBuffer::empty()
180 {
181         MutexAutoLock listlock(m_list_mutex);
182         return m_list.empty();
183 }
184
185 u32 ReliablePacketBuffer::size()
186 {
187         MutexAutoLock listlock(m_list_mutex);
188         return m_list.size();
189 }
190
191 RPBSearchResult ReliablePacketBuffer::findPacket(u16 seqnum)
192 {
193         std::list<BufferedPacket>::iterator i = m_list.begin();
194         for(; i != m_list.end(); ++i)
195         {
196                 u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
197                 if (s == seqnum)
198                         break;
199         }
200         return i;
201 }
202
203 RPBSearchResult ReliablePacketBuffer::notFound()
204 {
205         return m_list.end();
206 }
207
208 bool ReliablePacketBuffer::getFirstSeqnum(u16& result)
209 {
210         MutexAutoLock listlock(m_list_mutex);
211         if (m_list.empty())
212                 return false;
213         const BufferedPacket &p = *m_list.begin();
214         result = readU16(&p.data[BASE_HEADER_SIZE + 1]);
215         return true;
216 }
217
218 BufferedPacket ReliablePacketBuffer::popFirst()
219 {
220         MutexAutoLock listlock(m_list_mutex);
221         if (m_list.empty())
222                 throw NotFoundException("Buffer is empty");
223         BufferedPacket p = *m_list.begin();
224         m_list.erase(m_list.begin());
225
226         if (m_list.empty()) {
227                 m_oldest_non_answered_ack = 0;
228         } else {
229                 m_oldest_non_answered_ack =
230                                 readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]);
231         }
232         return p;
233 }
234
235 BufferedPacket ReliablePacketBuffer::popSeqnum(u16 seqnum)
236 {
237         MutexAutoLock listlock(m_list_mutex);
238         RPBSearchResult r = findPacket(seqnum);
239         if (r == notFound()) {
240                 LOG(dout_con<<"Sequence number: " << seqnum
241                                 << " not found in reliable buffer"<<std::endl);
242                 throw NotFoundException("seqnum not found in buffer");
243         }
244         BufferedPacket p = *r;
245
246
247         RPBSearchResult next = r;
248         ++next;
249         if (next != notFound()) {
250                 u16 s = readU16(&(next->data[BASE_HEADER_SIZE+1]));
251                 m_oldest_non_answered_ack = s;
252         }
253
254         m_list.erase(r);
255
256         if (m_list.empty()) {
257                 m_oldest_non_answered_ack = 0;
258         } else {
259                 m_oldest_non_answered_ack =
260                                 readU16(&m_list.begin()->data[BASE_HEADER_SIZE + 1]);
261         }
262         return p;
263 }
264
265 void ReliablePacketBuffer::insert(BufferedPacket &p, u16 next_expected)
266 {
267         MutexAutoLock listlock(m_list_mutex);
268         if (p.data.getSize() < BASE_HEADER_SIZE + 3) {
269                 errorstream << "ReliablePacketBuffer::insert(): Invalid data size for "
270                         "reliable packet" << std::endl;
271                 return;
272         }
273         u8 type = readU8(&p.data[BASE_HEADER_SIZE + 0]);
274         if (type != PACKET_TYPE_RELIABLE) {
275                 errorstream << "ReliablePacketBuffer::insert(): type is not reliable"
276                         << std::endl;
277                 return;
278         }
279         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE + 1]);
280
281         if (!seqnum_in_window(seqnum, next_expected, MAX_RELIABLE_WINDOW_SIZE)) {
282                 errorstream << "ReliablePacketBuffer::insert(): seqnum is outside of "
283                         "expected window " << std::endl;
284                 return;
285         }
286         if (seqnum == next_expected) {
287                 errorstream << "ReliablePacketBuffer::insert(): seqnum is next expected"
288                         << std::endl;
289                 return;
290         }
291
292         sanity_check(m_list.size() <= SEQNUM_MAX); // FIXME: Handle the error?
293
294         // Find the right place for the packet and insert it there
295         // If list is empty, just add it
296         if (m_list.empty())
297         {
298                 m_list.push_back(p);
299                 m_oldest_non_answered_ack = seqnum;
300                 // Done.
301                 return;
302         }
303
304         // Otherwise find the right place
305         std::list<BufferedPacket>::iterator i = m_list.begin();
306         // Find the first packet in the list which has a higher seqnum
307         u16 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
308
309         /* case seqnum is smaller then next_expected seqnum */
310         /* this is true e.g. on wrap around */
311         if (seqnum < next_expected) {
312                 while(((s < seqnum) || (s >= next_expected)) && (i != m_list.end())) {
313                         ++i;
314                         if (i != m_list.end())
315                                 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
316                 }
317         }
318         /* non wrap around case (at least for incoming and next_expected */
319         else
320         {
321                 while(((s < seqnum) && (s >= next_expected)) && (i != m_list.end())) {
322                         ++i;
323                         if (i != m_list.end())
324                                 s = readU16(&(i->data[BASE_HEADER_SIZE+1]));
325                 }
326         }
327
328         if (s == seqnum) {
329                 /* nothing to do this seems to be a resent packet */
330                 /* for paranoia reason data should be compared */
331                 if (
332                         (readU16(&(i->data[BASE_HEADER_SIZE+1])) != seqnum) ||
333                         (i->data.getSize() != p.data.getSize()) ||
334                         (i->address != p.address)
335                         )
336                 {
337                         /* if this happens your maximum transfer window may be to big */
338                         fprintf(stderr,
339                                         "Duplicated seqnum %d non matching packet detected:\n",
340                                         seqnum);
341                         fprintf(stderr, "Old: seqnum: %05d size: %04d, address: %s\n",
342                                         readU16(&(i->data[BASE_HEADER_SIZE+1])),i->data.getSize(),
343                                         i->address.serializeString().c_str());
344                         fprintf(stderr, "New: seqnum: %05d size: %04u, address: %s\n",
345                                         readU16(&(p.data[BASE_HEADER_SIZE+1])),p.data.getSize(),
346                                         p.address.serializeString().c_str());
347                         throw IncomingDataCorruption("duplicated packet isn't same as original one");
348                 }
349         }
350         /* insert or push back */
351         else if (i != m_list.end()) {
352                 m_list.insert(i, p);
353         } else {
354                 m_list.push_back(p);
355         }
356
357         /* update last packet number */
358         m_oldest_non_answered_ack = readU16(&(*m_list.begin()).data[BASE_HEADER_SIZE+1]);
359 }
360
361 void ReliablePacketBuffer::incrementTimeouts(float dtime)
362 {
363         MutexAutoLock listlock(m_list_mutex);
364         for (BufferedPacket &bufferedPacket : m_list) {
365                 bufferedPacket.time += dtime;
366                 bufferedPacket.totaltime += dtime;
367         }
368 }
369
370 std::list<BufferedPacket> ReliablePacketBuffer::getTimedOuts(float timeout,
371                                                                                                         unsigned int max_packets)
372 {
373         MutexAutoLock listlock(m_list_mutex);
374         std::list<BufferedPacket> timed_outs;
375         for (BufferedPacket &bufferedPacket : m_list) {
376                 if (bufferedPacket.time >= timeout) {
377                         timed_outs.push_back(bufferedPacket);
378
379                         //this packet will be sent right afterwards reset timeout here
380                         bufferedPacket.time = 0.0f;
381                         if (timed_outs.size() >= max_packets)
382                                 break;
383                 }
384         }
385         return timed_outs;
386 }
387
388 /*
389         IncomingSplitPacket
390 */
391
392 bool IncomingSplitPacket::insert(u32 chunk_num, SharedBuffer<u8> &chunkdata)
393 {
394         sanity_check(chunk_num < chunk_count);
395
396         // If chunk already exists, ignore it.
397         // Sometimes two identical packets may arrive when there is network
398         // lag and the server re-sends stuff.
399         if (chunks.find(chunk_num) != chunks.end())
400                 return false;
401
402         // Set chunk data in buffer
403         chunks[chunk_num] = chunkdata;
404
405         return true;
406 }
407
408 SharedBuffer<u8> IncomingSplitPacket::reassemble()
409 {
410         sanity_check(allReceived());
411
412         // Calculate total size
413         u32 totalsize = 0;
414         for (const auto &chunk : chunks)
415                 totalsize += chunk.second.getSize();
416
417         SharedBuffer<u8> fulldata(totalsize);
418
419         // Copy chunks to data buffer
420         u32 start = 0;
421         for (u32 chunk_i = 0; chunk_i < chunk_count; chunk_i++) {
422                 const SharedBuffer<u8> &buf = chunks[chunk_i];
423                 memcpy(&fulldata[start], *buf, buf.getSize());
424                 start += buf.getSize();
425         }
426
427         return fulldata;
428 }
429
430 /*
431         IncomingSplitBuffer
432 */
433
434 IncomingSplitBuffer::~IncomingSplitBuffer()
435 {
436         MutexAutoLock listlock(m_map_mutex);
437         for (auto &i : m_buf) {
438                 delete i.second;
439         }
440 }
441
442 SharedBuffer<u8> IncomingSplitBuffer::insert(const BufferedPacket &p, bool reliable)
443 {
444         MutexAutoLock listlock(m_map_mutex);
445         u32 headersize = BASE_HEADER_SIZE + 7;
446         if (p.data.getSize() < headersize) {
447                 errorstream << "Invalid data size for split packet" << std::endl;
448                 return SharedBuffer<u8>();
449         }
450         u8 type = readU8(&p.data[BASE_HEADER_SIZE+0]);
451         u16 seqnum = readU16(&p.data[BASE_HEADER_SIZE+1]);
452         u16 chunk_count = readU16(&p.data[BASE_HEADER_SIZE+3]);
453         u16 chunk_num = readU16(&p.data[BASE_HEADER_SIZE+5]);
454
455         if (type != PACKET_TYPE_SPLIT) {
456                 errorstream << "IncomingSplitBuffer::insert(): type is not split"
457                         << std::endl;
458                 return SharedBuffer<u8>();
459         }
460         if (chunk_num >= chunk_count) {
461                 errorstream << "IncomingSplitBuffer::insert(): chunk_num=" << chunk_num
462                                 << " >= chunk_count=" << chunk_count << std::endl;
463                 return SharedBuffer<u8>();
464         }
465
466         // Add if doesn't exist
467         IncomingSplitPacket *sp;
468         if (m_buf.find(seqnum) == m_buf.end()) {
469                 sp = new IncomingSplitPacket(chunk_count, reliable);
470                 m_buf[seqnum] = sp;
471         } else {
472                 sp = m_buf[seqnum];
473         }
474
475         if (chunk_count != sp->chunk_count) {
476                 errorstream << "IncomingSplitBuffer::insert(): chunk_count="
477                                 << chunk_count << " != sp->chunk_count=" << sp->chunk_count
478                                 << std::endl;
479                 return SharedBuffer<u8>();
480         }
481         if (reliable != sp->reliable)
482                 LOG(derr_con<<"Connection: WARNING: reliable="<<reliable
483                                 <<" != sp->reliable="<<sp->reliable
484                                 <<std::endl);
485
486         // Cut chunk data out of packet
487         u32 chunkdatasize = p.data.getSize() - headersize;
488         SharedBuffer<u8> chunkdata(chunkdatasize);
489         memcpy(*chunkdata, &(p.data[headersize]), chunkdatasize);
490
491         if (!sp->insert(chunk_num, chunkdata))
492                 return SharedBuffer<u8>();
493
494         // If not all chunks are received, return empty buffer
495         if (!sp->allReceived())
496                 return SharedBuffer<u8>();
497
498         SharedBuffer<u8> fulldata = sp->reassemble();
499
500         // Remove sp from buffer
501         m_buf.erase(seqnum);
502         delete sp;
503
504         return fulldata;
505 }
506
507 void IncomingSplitBuffer::removeUnreliableTimedOuts(float dtime, float timeout)
508 {
509         std::deque<u16> remove_queue;
510         {
511                 MutexAutoLock listlock(m_map_mutex);
512                 for (auto &i : m_buf) {
513                         IncomingSplitPacket *p = i.second;
514                         // Reliable ones are not removed by timeout
515                         if (p->reliable)
516                                 continue;
517                         p->time += dtime;
518                         if (p->time >= timeout)
519                                 remove_queue.push_back(i.first);
520                 }
521         }
522         for (u16 j : remove_queue) {
523                 MutexAutoLock listlock(m_map_mutex);
524                 LOG(dout_con<<"NOTE: Removing timed out unreliable split packet"<<std::endl);
525                 delete m_buf[j];
526                 m_buf.erase(j);
527         }
528 }
529
530 /*
531         ConnectionCommand
532  */
533
534 void ConnectionCommand::send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt,
535         bool reliable_)
536 {
537         type = CONNCMD_SEND;
538         peer_id = peer_id_;
539         channelnum = channelnum_;
540         data = pkt->oldForgePacket();
541         reliable = reliable_;
542 }
543
544 /*
545         Channel
546 */
547
548 u16 Channel::readNextIncomingSeqNum()
549 {
550         MutexAutoLock internal(m_internal_mutex);
551         return next_incoming_seqnum;
552 }
553
554 u16 Channel::incNextIncomingSeqNum()
555 {
556         MutexAutoLock internal(m_internal_mutex);
557         u16 retval = next_incoming_seqnum;
558         next_incoming_seqnum++;
559         return retval;
560 }
561
562 u16 Channel::readNextSplitSeqNum()
563 {
564         MutexAutoLock internal(m_internal_mutex);
565         return next_outgoing_split_seqnum;
566 }
567 void Channel::setNextSplitSeqNum(u16 seqnum)
568 {
569         MutexAutoLock internal(m_internal_mutex);
570         next_outgoing_split_seqnum = seqnum;
571 }
572
573 u16 Channel::getOutgoingSequenceNumber(bool& successful)
574 {
575         MutexAutoLock internal(m_internal_mutex);
576         u16 retval = next_outgoing_seqnum;
577         u16 lowest_unacked_seqnumber;
578
579         /* shortcut if there ain't any packet in outgoing list */
580         if (outgoing_reliables_sent.empty())
581         {
582                 next_outgoing_seqnum++;
583                 return retval;
584         }
585
586         if (outgoing_reliables_sent.getFirstSeqnum(lowest_unacked_seqnumber))
587         {
588                 if (lowest_unacked_seqnumber < next_outgoing_seqnum) {
589                         // ugly cast but this one is required in order to tell compiler we
590                         // know about difference of two unsigned may be negative in general
591                         // but we already made sure it won't happen in this case
592                         if (((u16)(next_outgoing_seqnum - lowest_unacked_seqnumber)) > window_size) {
593                                 successful = false;
594                                 return 0;
595                         }
596                 }
597                 else {
598                         // ugly cast but this one is required in order to tell compiler we
599                         // know about difference of two unsigned may be negative in general
600                         // but we already made sure it won't happen in this case
601                         if ((next_outgoing_seqnum + (u16)(SEQNUM_MAX - lowest_unacked_seqnumber)) >
602                                 window_size) {
603                                 successful = false;
604                                 return 0;
605                         }
606                 }
607         }
608
609         next_outgoing_seqnum++;
610         return retval;
611 }
612
613 u16 Channel::readOutgoingSequenceNumber()
614 {
615         MutexAutoLock internal(m_internal_mutex);
616         return next_outgoing_seqnum;
617 }
618
619 bool Channel::putBackSequenceNumber(u16 seqnum)
620 {
621         if (((seqnum + 1) % (SEQNUM_MAX+1)) == next_outgoing_seqnum) {
622
623                 next_outgoing_seqnum = seqnum;
624                 return true;
625         }
626         return false;
627 }
628
629 void Channel::UpdateBytesSent(unsigned int bytes, unsigned int packets)
630 {
631         MutexAutoLock internal(m_internal_mutex);
632         current_bytes_transfered += bytes;
633         current_packet_successful += packets;
634 }
635
636 void Channel::UpdateBytesReceived(unsigned int bytes) {
637         MutexAutoLock internal(m_internal_mutex);
638         current_bytes_received += bytes;
639 }
640
641 void Channel::UpdateBytesLost(unsigned int bytes)
642 {
643         MutexAutoLock internal(m_internal_mutex);
644         current_bytes_lost += bytes;
645 }
646
647
648 void Channel::UpdatePacketLossCounter(unsigned int count)
649 {
650         MutexAutoLock internal(m_internal_mutex);
651         current_packet_loss += count;
652 }
653
654 void Channel::UpdatePacketTooLateCounter()
655 {
656         MutexAutoLock internal(m_internal_mutex);
657         current_packet_too_late++;
658 }
659
660 void Channel::UpdateTimers(float dtime)
661 {
662         bpm_counter += dtime;
663         packet_loss_counter += dtime;
664
665         if (packet_loss_counter > 1.0f) {
666                 packet_loss_counter -= 1.0f;
667
668                 unsigned int packet_loss = 11; /* use a neutral value for initialization */
669                 unsigned int packets_successful = 0;
670                 //unsigned int packet_too_late = 0;
671
672                 bool reasonable_amount_of_data_transmitted = false;
673
674                 {
675                         MutexAutoLock internal(m_internal_mutex);
676                         packet_loss = current_packet_loss;
677                         //packet_too_late = current_packet_too_late;
678                         packets_successful = current_packet_successful;
679
680                         if (current_bytes_transfered > (unsigned int) (window_size*512/2)) {
681                                 reasonable_amount_of_data_transmitted = true;
682                         }
683                         current_packet_loss = 0;
684                         current_packet_too_late = 0;
685                         current_packet_successful = 0;
686                 }
687
688                 /* dynamic window size */
689                 float successful_to_lost_ratio = 0.0f;
690                 bool done = false;
691
692                 if (packets_successful > 0) {
693                         successful_to_lost_ratio = packet_loss/packets_successful;
694                 } else if (packet_loss > 0) {
695                         window_size = std::max(
696                                         (window_size - 10),
697                                         MIN_RELIABLE_WINDOW_SIZE);
698                         done = true;
699                 }
700
701                 if (!done) {
702                         if ((successful_to_lost_ratio < 0.01f) &&
703                                 (window_size < MAX_RELIABLE_WINDOW_SIZE)) {
704                                 /* don't even think about increasing if we didn't even
705                                  * use major parts of our window */
706                                 if (reasonable_amount_of_data_transmitted)
707                                         window_size = std::min(
708                                                         (window_size + 100),
709                                                         MAX_RELIABLE_WINDOW_SIZE);
710                         } else if ((successful_to_lost_ratio < 0.05f) &&
711                                         (window_size < MAX_RELIABLE_WINDOW_SIZE)) {
712                                 /* don't even think about increasing if we didn't even
713                                  * use major parts of our window */
714                                 if (reasonable_amount_of_data_transmitted)
715                                         window_size = std::min(
716                                                         (window_size + 50),
717                                                         MAX_RELIABLE_WINDOW_SIZE);
718                         } else if (successful_to_lost_ratio > 0.15f) {
719                                 window_size = std::max(
720                                                 (window_size - 100),
721                                                 MIN_RELIABLE_WINDOW_SIZE);
722                         } else if (successful_to_lost_ratio > 0.1f) {
723                                 window_size = std::max(
724                                                 (window_size - 50),
725                                                 MIN_RELIABLE_WINDOW_SIZE);
726                         }
727                 }
728         }
729
730         if (bpm_counter > 10.0f) {
731                 {
732                         MutexAutoLock internal(m_internal_mutex);
733                         cur_kbps                 =
734                                         (((float) current_bytes_transfered)/bpm_counter)/1024.0f;
735                         current_bytes_transfered = 0;
736                         cur_kbps_lost            =
737                                         (((float) current_bytes_lost)/bpm_counter)/1024.0f;
738                         current_bytes_lost       = 0;
739                         cur_incoming_kbps        =
740                                         (((float) current_bytes_received)/bpm_counter)/1024.0f;
741                         current_bytes_received   = 0;
742                         bpm_counter              = 0.0f;
743                 }
744
745                 if (cur_kbps > max_kbps) {
746                         max_kbps = cur_kbps;
747                 }
748
749                 if (cur_kbps_lost > max_kbps_lost) {
750                         max_kbps_lost = cur_kbps_lost;
751                 }
752
753                 if (cur_incoming_kbps > max_incoming_kbps) {
754                         max_incoming_kbps = cur_incoming_kbps;
755                 }
756
757                 rate_samples       = MYMIN(rate_samples+1,10);
758                 float old_fraction = ((float) (rate_samples-1) )/( (float) rate_samples);
759                 avg_kbps           = avg_kbps * old_fraction +
760                                 cur_kbps * (1.0 - old_fraction);
761                 avg_kbps_lost      = avg_kbps_lost * old_fraction +
762                                 cur_kbps_lost * (1.0 - old_fraction);
763                 avg_incoming_kbps  = avg_incoming_kbps * old_fraction +
764                                 cur_incoming_kbps * (1.0 - old_fraction);
765         }
766 }
767
768
769 /*
770         Peer
771 */
772
773 PeerHelper::PeerHelper(Peer* peer) :
774         m_peer(peer)
775 {
776         if (peer && !peer->IncUseCount())
777                 m_peer = nullptr;
778 }
779
780 PeerHelper::~PeerHelper()
781 {
782         if (m_peer)
783                 m_peer->DecUseCount();
784
785         m_peer = nullptr;
786 }
787
788 PeerHelper& PeerHelper::operator=(Peer* peer)
789 {
790         m_peer = peer;
791         if (peer && !peer->IncUseCount())
792                 m_peer = nullptr;
793         return *this;
794 }
795
796 Peer* PeerHelper::operator->() const
797 {
798         return m_peer;
799 }
800
801 Peer* PeerHelper::operator&() const
802 {
803         return m_peer;
804 }
805
806 bool PeerHelper::operator!()
807 {
808         return ! m_peer;
809 }
810
811 bool PeerHelper::operator!=(void* ptr)
812 {
813         return ((void*) m_peer != ptr);
814 }
815
816 bool Peer::IncUseCount()
817 {
818         MutexAutoLock lock(m_exclusive_access_mutex);
819
820         if (!m_pending_deletion) {
821                 this->m_usage++;
822                 return true;
823         }
824
825         return false;
826 }
827
828 void Peer::DecUseCount()
829 {
830         {
831                 MutexAutoLock lock(m_exclusive_access_mutex);
832                 sanity_check(m_usage > 0);
833                 m_usage--;
834
835                 if (!((m_pending_deletion) && (m_usage == 0)))
836                         return;
837         }
838         delete this;
839 }
840
841 void Peer::RTTStatistics(float rtt, const std::string &profiler_id,
842                 unsigned int num_samples) {
843
844         if (m_last_rtt > 0) {
845                 /* set min max values */
846                 if (rtt < m_rtt.min_rtt)
847                         m_rtt.min_rtt = rtt;
848                 if (rtt >= m_rtt.max_rtt)
849                         m_rtt.max_rtt = rtt;
850
851                 /* do average calculation */
852                 if (m_rtt.avg_rtt < 0.0)
853                         m_rtt.avg_rtt  = rtt;
854                 else
855                         m_rtt.avg_rtt  = m_rtt.avg_rtt * (num_samples/(num_samples-1)) +
856                                                                 rtt * (1/num_samples);
857
858                 /* do jitter calculation */
859
860                 //just use some neutral value at beginning
861                 float jitter = m_rtt.jitter_min;
862
863                 if (rtt > m_last_rtt)
864                         jitter = rtt-m_last_rtt;
865
866                 if (rtt <= m_last_rtt)
867                         jitter = m_last_rtt - rtt;
868
869                 if (jitter < m_rtt.jitter_min)
870                         m_rtt.jitter_min = jitter;
871                 if (jitter >= m_rtt.jitter_max)
872                         m_rtt.jitter_max = jitter;
873
874                 if (m_rtt.jitter_avg < 0.0)
875                         m_rtt.jitter_avg  = jitter;
876                 else
877                         m_rtt.jitter_avg  = m_rtt.jitter_avg * (num_samples/(num_samples-1)) +
878                                                                 jitter * (1/num_samples);
879
880                 if (!profiler_id.empty()) {
881                         g_profiler->graphAdd(profiler_id + " RTT [ms]", rtt * 1000.f);
882                         g_profiler->graphAdd(profiler_id + " jitter [ms]", jitter * 1000.f);
883                 }
884         }
885         /* save values required for next loop */
886         m_last_rtt = rtt;
887 }
888
889 bool Peer::isTimedOut(float timeout)
890 {
891         MutexAutoLock lock(m_exclusive_access_mutex);
892         u64 current_time = porting::getTimeMs();
893
894         float dtime = CALC_DTIME(m_last_timeout_check,current_time);
895         m_last_timeout_check = current_time;
896
897         m_timeout_counter += dtime;
898
899         return m_timeout_counter > timeout;
900 }
901
902 void Peer::Drop()
903 {
904         {
905                 MutexAutoLock usage_lock(m_exclusive_access_mutex);
906                 m_pending_deletion = true;
907                 if (m_usage != 0)
908                         return;
909         }
910
911         PROFILE(std::stringstream peerIdentifier1);
912         PROFILE(peerIdentifier1 << "runTimeouts[" << m_connection->getDesc()
913                         << ";" << id << ";RELIABLE]");
914         PROFILE(g_profiler->remove(peerIdentifier1.str()));
915         PROFILE(std::stringstream peerIdentifier2);
916         PROFILE(peerIdentifier2 << "sendPackets[" << m_connection->getDesc()
917                         << ";" << id << ";RELIABLE]");
918         PROFILE(ScopeProfiler peerprofiler(g_profiler, peerIdentifier2.str(), SPT_AVG));
919
920         delete this;
921 }
922
923 UDPPeer::UDPPeer(u16 a_id, Address a_address, Connection* connection) :
924         Peer(a_address,a_id,connection)
925 {
926         for (Channel &channel : channels)
927                 channel.setWindowSize(g_settings->getU16("max_packets_per_iteration"));
928 }
929
930 bool UDPPeer::getAddress(MTProtocols type,Address& toset)
931 {
932         if ((type == MTP_UDP) || (type == MTP_MINETEST_RELIABLE_UDP) || (type == MTP_PRIMARY))
933         {
934                 toset = address;
935                 return true;
936         }
937
938         return false;
939 }
940
941 void UDPPeer::reportRTT(float rtt)
942 {
943         if (rtt < 0.0) {
944                 return;
945         }
946         RTTStatistics(rtt,"rudp",MAX_RELIABLE_WINDOW_SIZE*10);
947
948         float timeout = getStat(AVG_RTT) * RESEND_TIMEOUT_FACTOR;
949         if (timeout < RESEND_TIMEOUT_MIN)
950                 timeout = RESEND_TIMEOUT_MIN;
951         if (timeout > RESEND_TIMEOUT_MAX)
952                 timeout = RESEND_TIMEOUT_MAX;
953
954         MutexAutoLock usage_lock(m_exclusive_access_mutex);
955         resend_timeout = timeout;
956 }
957
958 bool UDPPeer::Ping(float dtime,SharedBuffer<u8>& data)
959 {
960         m_ping_timer += dtime;
961         if (m_ping_timer >= PING_TIMEOUT)
962         {
963                 // Create and send PING packet
964                 writeU8(&data[0], PACKET_TYPE_CONTROL);
965                 writeU8(&data[1], CONTROLTYPE_PING);
966                 m_ping_timer = 0.0;
967                 return true;
968         }
969         return false;
970 }
971
972 void UDPPeer::PutReliableSendCommand(ConnectionCommand &c,
973                 unsigned int max_packet_size)
974 {
975         if (m_pending_disconnect)
976                 return;
977
978         if ( channels[c.channelnum].queued_commands.empty() &&
979                         /* don't queue more packets then window size */
980                         (channels[c.channelnum].queued_reliables.size()
981                         < (channels[c.channelnum].getWindowSize()/2))) {
982                 LOG(dout_con<<m_connection->getDesc()
983                                 <<" processing reliable command for peer id: " << c.peer_id
984                                 <<" data size: " << c.data.getSize() << std::endl);
985                 if (!processReliableSendCommand(c,max_packet_size)) {
986                         channels[c.channelnum].queued_commands.push_back(c);
987                 }
988         }
989         else {
990                 LOG(dout_con<<m_connection->getDesc()
991                                 <<" Queueing reliable command for peer id: " << c.peer_id
992                                 <<" data size: " << c.data.getSize() <<std::endl);
993                 channels[c.channelnum].queued_commands.push_back(c);
994         }
995 }
996
997 bool UDPPeer::processReliableSendCommand(
998                                 ConnectionCommand &c,
999                                 unsigned int max_packet_size)
1000 {
1001         if (m_pending_disconnect)
1002                 return true;
1003
1004         u32 chunksize_max = max_packet_size
1005                                                         - BASE_HEADER_SIZE
1006                                                         - RELIABLE_HEADER_SIZE;
1007
1008         sanity_check(c.data.getSize() < MAX_RELIABLE_WINDOW_SIZE*512);
1009
1010         std::list<SharedBuffer<u8>> originals;
1011         u16 split_sequence_number = channels[c.channelnum].readNextSplitSeqNum();
1012
1013         if (c.raw) {
1014                 originals.emplace_back(c.data);
1015         } else {
1016                 makeAutoSplitPacket(c.data, chunksize_max,split_sequence_number, &originals);
1017                 channels[c.channelnum].setNextSplitSeqNum(split_sequence_number);
1018         }
1019
1020         bool have_sequence_number = true;
1021         bool have_initial_sequence_number = false;
1022         std::queue<BufferedPacket> toadd;
1023         volatile u16 initial_sequence_number = 0;
1024
1025         for (SharedBuffer<u8> &original : originals) {
1026                 u16 seqnum = channels[c.channelnum].getOutgoingSequenceNumber(have_sequence_number);
1027
1028                 /* oops, we don't have enough sequence numbers to send this packet */
1029                 if (!have_sequence_number)
1030                         break;
1031
1032                 if (!have_initial_sequence_number)
1033                 {
1034                         initial_sequence_number = seqnum;
1035                         have_initial_sequence_number = true;
1036                 }
1037
1038                 SharedBuffer<u8> reliable = makeReliablePacket(original, seqnum);
1039
1040                 // Add base headers and make a packet
1041                 BufferedPacket p = con::makePacket(address, reliable,
1042                                 m_connection->GetProtocolID(), m_connection->GetPeerID(),
1043                                 c.channelnum);
1044
1045                 toadd.push(p);
1046         }
1047
1048         if (have_sequence_number) {
1049                 volatile u16 pcount = 0;
1050                 while (!toadd.empty()) {
1051                         BufferedPacket p = toadd.front();
1052                         toadd.pop();
1053 //                      LOG(dout_con<<connection->getDesc()
1054 //                                      << " queuing reliable packet for peer_id: " << c.peer_id
1055 //                                      << " channel: " << (c.channelnum&0xFF)
1056 //                                      << " seqnum: " << readU16(&p.data[BASE_HEADER_SIZE+1])
1057 //                                      << std::endl)
1058                         channels[c.channelnum].queued_reliables.push(p);
1059                         pcount++;
1060                 }
1061                 sanity_check(channels[c.channelnum].queued_reliables.size() < 0xFFFF);
1062                 return true;
1063         }
1064
1065         volatile u16 packets_available = toadd.size();
1066         /* we didn't get a single sequence number no need to fill queue */
1067         if (!have_initial_sequence_number) {
1068                 return false;
1069         }
1070
1071         while (!toadd.empty()) {
1072                 /* remove packet */
1073                 toadd.pop();
1074
1075                 bool successfully_put_back_sequence_number
1076                         = channels[c.channelnum].putBackSequenceNumber(
1077                                 (initial_sequence_number+toadd.size() % (SEQNUM_MAX+1)));
1078
1079                 FATAL_ERROR_IF(!successfully_put_back_sequence_number, "error");
1080         }
1081
1082         // DO NOT REMOVE n_queued! It avoids a deadlock of async locked
1083         // 'log_message_mutex' and 'm_list_mutex'.
1084         u32 n_queued = channels[c.channelnum].outgoing_reliables_sent.size();
1085
1086         LOG(dout_con<<m_connection->getDesc()
1087                         << " Windowsize exceeded on reliable sending "
1088                         << c.data.getSize() << " bytes"
1089                         << std::endl << "\t\tinitial_sequence_number: "
1090                         << initial_sequence_number
1091                         << std::endl << "\t\tgot at most            : "
1092                         << packets_available << " packets"
1093                         << std::endl << "\t\tpackets queued         : "
1094                         << n_queued
1095                         << std::endl);
1096
1097         return false;
1098 }
1099
1100 void UDPPeer::RunCommandQueues(
1101                                                         unsigned int max_packet_size,
1102                                                         unsigned int maxcommands,
1103                                                         unsigned int maxtransfer)
1104 {
1105
1106         for (Channel &channel : channels) {
1107                 unsigned int commands_processed = 0;
1108
1109                 if ((!channel.queued_commands.empty()) &&
1110                                 (channel.queued_reliables.size() < maxtransfer) &&
1111                                 (commands_processed < maxcommands)) {
1112                         try {
1113                                 ConnectionCommand c = channel.queued_commands.front();
1114
1115                                 LOG(dout_con << m_connection->getDesc()
1116                                                 << " processing queued reliable command " << std::endl);
1117
1118                                 // Packet is processed, remove it from queue
1119                                 if (processReliableSendCommand(c,max_packet_size)) {
1120                                         channel.queued_commands.pop_front();
1121                                 } else {
1122                                         LOG(dout_con << m_connection->getDesc()
1123                                                         << " Failed to queue packets for peer_id: " << c.peer_id
1124                                                         << ", delaying sending of " << c.data.getSize()
1125                                                         << " bytes" << std::endl);
1126                                 }
1127                         }
1128                         catch (ItemNotFoundException &e) {
1129                                 // intentionally empty
1130                         }
1131                 }
1132         }
1133 }
1134
1135 u16 UDPPeer::getNextSplitSequenceNumber(u8 channel)
1136 {
1137         assert(channel < CHANNEL_COUNT); // Pre-condition
1138         return channels[channel].readNextSplitSeqNum();
1139 }
1140
1141 void UDPPeer::setNextSplitSequenceNumber(u8 channel, u16 seqnum)
1142 {
1143         assert(channel < CHANNEL_COUNT); // Pre-condition
1144         channels[channel].setNextSplitSeqNum(seqnum);
1145 }
1146
1147 SharedBuffer<u8> UDPPeer::addSplitPacket(u8 channel, const BufferedPacket &toadd,
1148         bool reliable)
1149 {
1150         assert(channel < CHANNEL_COUNT); // Pre-condition
1151         return channels[channel].incoming_splits.insert(toadd, reliable);
1152 }
1153
1154 /*
1155         Connection
1156 */
1157
1158 Connection::Connection(u32 protocol_id, u32 max_packet_size, float timeout,
1159                 bool ipv6, PeerHandler *peerhandler) :
1160         m_udpSocket(ipv6),
1161         m_protocol_id(protocol_id),
1162         m_sendThread(new ConnectionSendThread(max_packet_size, timeout)),
1163         m_receiveThread(new ConnectionReceiveThread(max_packet_size)),
1164         m_bc_peerhandler(peerhandler)
1165
1166 {
1167         m_udpSocket.setTimeoutMs(5);
1168
1169         m_sendThread->setParent(this);
1170         m_receiveThread->setParent(this);
1171
1172         m_sendThread->start();
1173         m_receiveThread->start();
1174 }
1175
1176
1177 Connection::~Connection()
1178 {
1179         m_shutting_down = true;
1180         // request threads to stop
1181         m_sendThread->stop();
1182         m_receiveThread->stop();
1183
1184         //TODO for some unkonwn reason send/receive threads do not exit as they're
1185         // supposed to be but wait on peer timeout. To speed up shutdown we reduce
1186         // timeout to half a second.
1187         m_sendThread->setPeerTimeout(0.5);
1188
1189         // wait for threads to finish
1190         m_sendThread->wait();
1191         m_receiveThread->wait();
1192
1193         // Delete peers
1194         for (auto &peer : m_peers) {
1195                 delete peer.second;
1196         }
1197 }
1198
1199 /* Internal stuff */
1200 void Connection::putEvent(ConnectionEvent &e)
1201 {
1202         assert(e.type != CONNEVENT_NONE); // Pre-condition
1203         m_event_queue.push_back(e);
1204 }
1205
1206 void Connection::TriggerSend()
1207 {
1208         m_sendThread->Trigger();
1209 }
1210
1211 PeerHelper Connection::getPeerNoEx(session_t peer_id)
1212 {
1213         MutexAutoLock peerlock(m_peers_mutex);
1214         std::map<session_t, Peer *>::iterator node = m_peers.find(peer_id);
1215
1216         if (node == m_peers.end()) {
1217                 return PeerHelper(NULL);
1218         }
1219
1220         // Error checking
1221         FATAL_ERROR_IF(node->second->id != peer_id, "Invalid peer id");
1222
1223         return PeerHelper(node->second);
1224 }
1225
1226 /* find peer_id for address */
1227 u16 Connection::lookupPeer(Address& sender)
1228 {
1229         MutexAutoLock peerlock(m_peers_mutex);
1230         std::map<u16, Peer*>::iterator j;
1231         j = m_peers.begin();
1232         for(; j != m_peers.end(); ++j)
1233         {
1234                 Peer *peer = j->second;
1235                 if (peer->isPendingDeletion())
1236                         continue;
1237
1238                 Address tocheck;
1239
1240                 if ((peer->getAddress(MTP_MINETEST_RELIABLE_UDP, tocheck)) && (tocheck == sender))
1241                         return peer->id;
1242
1243                 if ((peer->getAddress(MTP_UDP, tocheck)) && (tocheck == sender))
1244                         return peer->id;
1245         }
1246
1247         return PEER_ID_INEXISTENT;
1248 }
1249
1250 bool Connection::deletePeer(session_t peer_id, bool timeout)
1251 {
1252         Peer *peer = 0;
1253
1254         /* lock list as short as possible */
1255         {
1256                 MutexAutoLock peerlock(m_peers_mutex);
1257                 if (m_peers.find(peer_id) == m_peers.end())
1258                         return false;
1259                 peer = m_peers[peer_id];
1260                 m_peers.erase(peer_id);
1261                 m_peer_ids.remove(peer_id);
1262         }
1263
1264         Address peer_address;
1265         //any peer has a primary address this never fails!
1266         peer->getAddress(MTP_PRIMARY, peer_address);
1267         // Create event
1268         ConnectionEvent e;
1269         e.peerRemoved(peer_id, timeout, peer_address);
1270         putEvent(e);
1271
1272
1273         peer->Drop();
1274         return true;
1275 }
1276
1277 /* Interface */
1278
1279 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
1280 {
1281         try {
1282                 return m_event_queue.pop_front(timeout_ms);
1283         } catch(ItemNotFoundException &ex) {
1284                 ConnectionEvent e;
1285                 e.type = CONNEVENT_NONE;
1286                 return e;
1287         }
1288 }
1289
1290 void Connection::putCommand(ConnectionCommand &c)
1291 {
1292         if (!m_shutting_down) {
1293                 m_command_queue.push_back(c);
1294                 m_sendThread->Trigger();
1295         }
1296 }
1297
1298 void Connection::Serve(Address bind_addr)
1299 {
1300         ConnectionCommand c;
1301         c.serve(bind_addr);
1302         putCommand(c);
1303 }
1304
1305 void Connection::Connect(Address address)
1306 {
1307         ConnectionCommand c;
1308         c.connect(address);
1309         putCommand(c);
1310 }
1311
1312 bool Connection::Connected()
1313 {
1314         MutexAutoLock peerlock(m_peers_mutex);
1315
1316         if (m_peers.size() != 1)
1317                 return false;
1318
1319         std::map<session_t, Peer *>::iterator node = m_peers.find(PEER_ID_SERVER);
1320         if (node == m_peers.end())
1321                 return false;
1322
1323         if (m_peer_id == PEER_ID_INEXISTENT)
1324                 return false;
1325
1326         return true;
1327 }
1328
1329 void Connection::Disconnect()
1330 {
1331         ConnectionCommand c;
1332         c.disconnect();
1333         putCommand(c);
1334 }
1335
1336 bool Connection::Receive(NetworkPacket *pkt, u32 timeout)
1337 {
1338         /*
1339                 Note that this function can potentially wait infinitely if non-data
1340                 events keep happening before the timeout expires.
1341                 This is not considered to be a problem (is it?)
1342         */
1343         for(;;) {
1344                 ConnectionEvent e = waitEvent(timeout);
1345                 if (e.type != CONNEVENT_NONE)
1346                         LOG(dout_con << getDesc() << ": Receive: got event: "
1347                                         << e.describe() << std::endl);
1348                 switch(e.type) {
1349                 case CONNEVENT_NONE:
1350                         return false;
1351                 case CONNEVENT_DATA_RECEIVED:
1352                         // Data size is lesser than command size, ignoring packet
1353                         if (e.data.getSize() < 2) {
1354                                 continue;
1355                         }
1356
1357                         pkt->putRawPacket(*e.data, e.data.getSize(), e.peer_id);
1358                         return true;
1359                 case CONNEVENT_PEER_ADDED: {
1360                         UDPPeer tmp(e.peer_id, e.address, this);
1361                         if (m_bc_peerhandler)
1362                                 m_bc_peerhandler->peerAdded(&tmp);
1363                         continue;
1364                 }
1365                 case CONNEVENT_PEER_REMOVED: {
1366                         UDPPeer tmp(e.peer_id, e.address, this);
1367                         if (m_bc_peerhandler)
1368                                 m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
1369                         continue;
1370                 }
1371                 case CONNEVENT_BIND_FAILED:
1372                         throw ConnectionBindFailed("Failed to bind socket "
1373                                         "(port already in use?)");
1374                 }
1375         }
1376         return false;
1377 }
1378
1379 void Connection::Receive(NetworkPacket *pkt)
1380 {
1381         bool any = Receive(pkt, m_bc_receive_timeout);
1382         if (!any)
1383                 throw NoIncomingDataException("No incoming data");
1384 }
1385
1386 bool Connection::TryReceive(NetworkPacket *pkt)
1387 {
1388         return Receive(pkt, 0);
1389 }
1390
1391 void Connection::Send(session_t peer_id, u8 channelnum,
1392                 NetworkPacket *pkt, bool reliable)
1393 {
1394         assert(channelnum < CHANNEL_COUNT); // Pre-condition
1395
1396         ConnectionCommand c;
1397
1398         c.send(peer_id, channelnum, pkt, reliable);
1399         putCommand(c);
1400 }
1401
1402 Address Connection::GetPeerAddress(session_t peer_id)
1403 {
1404         PeerHelper peer = getPeerNoEx(peer_id);
1405
1406         if (!peer)
1407                 throw PeerNotFoundException("No address for peer found!");
1408         Address peer_address;
1409         peer->getAddress(MTP_PRIMARY, peer_address);
1410         return peer_address;
1411 }
1412
1413 float Connection::getPeerStat(session_t peer_id, rtt_stat_type type)
1414 {
1415         PeerHelper peer = getPeerNoEx(peer_id);
1416         if (!peer) return -1;
1417         return peer->getStat(type);
1418 }
1419
1420 float Connection::getLocalStat(rate_stat_type type)
1421 {
1422         PeerHelper peer = getPeerNoEx(PEER_ID_SERVER);
1423
1424         FATAL_ERROR_IF(!peer, "Connection::getLocalStat we couldn't get our own peer? are you serious???");
1425
1426         float retval = 0.0;
1427
1428         for (Channel &channel : dynamic_cast<UDPPeer *>(&peer)->channels) {
1429                 switch(type) {
1430                         case CUR_DL_RATE:
1431                                 retval += channel.getCurrentDownloadRateKB();
1432                                 break;
1433                         case AVG_DL_RATE:
1434                                 retval += channel.getAvgDownloadRateKB();
1435                                 break;
1436                         case CUR_INC_RATE:
1437                                 retval += channel.getCurrentIncomingRateKB();
1438                                 break;
1439                         case AVG_INC_RATE:
1440                                 retval += channel.getAvgIncomingRateKB();
1441                                 break;
1442                         case AVG_LOSS_RATE:
1443                                 retval += channel.getAvgLossRateKB();
1444                                 break;
1445                         case CUR_LOSS_RATE:
1446                                 retval += channel.getCurrentLossRateKB();
1447                                 break;
1448                 default:
1449                         FATAL_ERROR("Connection::getLocalStat Invalid stat type");
1450                 }
1451         }
1452         return retval;
1453 }
1454
1455 u16 Connection::createPeer(Address& sender, MTProtocols protocol, int fd)
1456 {
1457         // Somebody wants to make a new connection
1458
1459         // Get a unique peer id (2 or higher)
1460         session_t peer_id_new = m_next_remote_peer_id;
1461         u16 overflow =  MAX_UDP_PEERS;
1462
1463         /*
1464                 Find an unused peer id
1465         */
1466         MutexAutoLock lock(m_peers_mutex);
1467         bool out_of_ids = false;
1468         for(;;) {
1469                 // Check if exists
1470                 if (m_peers.find(peer_id_new) == m_peers.end())
1471
1472                         break;
1473                 // Check for overflow
1474                 if (peer_id_new == overflow) {
1475                         out_of_ids = true;
1476                         break;
1477                 }
1478                 peer_id_new++;
1479         }
1480
1481         if (out_of_ids) {
1482                 errorstream << getDesc() << " ran out of peer ids" << std::endl;
1483                 return PEER_ID_INEXISTENT;
1484         }
1485
1486         // Create a peer
1487         Peer *peer = 0;
1488         peer = new UDPPeer(peer_id_new, sender, this);
1489
1490         m_peers[peer->id] = peer;
1491         m_peer_ids.push_back(peer->id);
1492
1493         m_next_remote_peer_id = (peer_id_new +1 ) % MAX_UDP_PEERS;
1494
1495         LOG(dout_con << getDesc()
1496                         << "createPeer(): giving peer_id=" << peer_id_new << std::endl);
1497
1498         ConnectionCommand cmd;
1499         SharedBuffer<u8> reply(4);
1500         writeU8(&reply[0], PACKET_TYPE_CONTROL);
1501         writeU8(&reply[1], CONTROLTYPE_SET_PEER_ID);
1502         writeU16(&reply[2], peer_id_new);
1503         cmd.createPeer(peer_id_new,reply);
1504         putCommand(cmd);
1505
1506         // Create peer addition event
1507         ConnectionEvent e;
1508         e.peerAdded(peer_id_new, sender);
1509         putEvent(e);
1510
1511         // We're now talking to a valid peer_id
1512         return peer_id_new;
1513 }
1514
1515 void Connection::PrintInfo(std::ostream &out)
1516 {
1517         m_info_mutex.lock();
1518         out<<getDesc()<<": ";
1519         m_info_mutex.unlock();
1520 }
1521
1522 const std::string Connection::getDesc()
1523 {
1524         return std::string("con(")+
1525                         itos(m_udpSocket.GetHandle())+"/"+itos(m_peer_id)+")";
1526 }
1527
1528 void Connection::DisconnectPeer(session_t peer_id)
1529 {
1530         ConnectionCommand discon;
1531         discon.disconnect_peer(peer_id);
1532         putCommand(discon);
1533 }
1534
1535 void Connection::sendAck(session_t peer_id, u8 channelnum, u16 seqnum)
1536 {
1537         assert(channelnum < CHANNEL_COUNT); // Pre-condition
1538
1539         LOG(dout_con<<getDesc()
1540                         <<" Queuing ACK command to peer_id: " << peer_id <<
1541                         " channel: " << (channelnum & 0xFF) <<
1542                         " seqnum: " << seqnum << std::endl);
1543
1544         ConnectionCommand c;
1545         SharedBuffer<u8> ack(4);
1546         writeU8(&ack[0], PACKET_TYPE_CONTROL);
1547         writeU8(&ack[1], CONTROLTYPE_ACK);
1548         writeU16(&ack[2], seqnum);
1549
1550         c.ack(peer_id, channelnum, ack);
1551         putCommand(c);
1552         m_sendThread->Trigger();
1553 }
1554
1555 UDPPeer* Connection::createServerPeer(Address& address)
1556 {
1557         if (getPeerNoEx(PEER_ID_SERVER) != 0)
1558         {
1559                 throw ConnectionException("Already connected to a server");
1560         }
1561
1562         UDPPeer *peer = new UDPPeer(PEER_ID_SERVER, address, this);
1563
1564         {
1565                 MutexAutoLock lock(m_peers_mutex);
1566                 m_peers[peer->id] = peer;
1567                 m_peer_ids.push_back(peer->id);
1568         }
1569
1570         return peer;
1571 }
1572
1573 } // namespace