]> git.lizzy.rs Git - minetest.git/blob - src/network/connection.h
Shave off buffer copies in networking code (#11607)
[minetest.git] / src / network / connection.h
1 /*
2 Minetest
3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #pragma once
21
22 #include "irrlichttypes.h"
23 #include "peerhandler.h"
24 #include "socket.h"
25 #include "constants.h"
26 #include "util/pointer.h"
27 #include "util/container.h"
28 #include "util/thread.h"
29 #include "util/numeric.h"
30 #include "networkprotocol.h"
31 #include <iostream>
32 #include <vector>
33 #include <map>
34
35 class NetworkPacket;
36
37 namespace con
38 {
39
40 class ConnectionReceiveThread;
41 class ConnectionSendThread;
42
43 typedef enum MTProtocols {
44         MTP_PRIMARY,
45         MTP_UDP,
46         MTP_MINETEST_RELIABLE_UDP
47 } MTProtocols;
48
49 #define MAX_UDP_PEERS 65535
50
51 #define SEQNUM_MAX 65535
52
53 inline bool seqnum_higher(u16 totest, u16 base)
54 {
55         if (totest > base)
56         {
57                 if ((totest - base) > (SEQNUM_MAX/2))
58                         return false;
59
60                 return true;
61         }
62
63         if ((base - totest) > (SEQNUM_MAX/2))
64                 return true;
65
66         return false;
67 }
68
69 inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
70 {
71         u16 window_start = next;
72         u16 window_end   = ( next + window_size ) % (SEQNUM_MAX+1);
73
74         if (window_start < window_end) {
75                 return ((seqnum >= window_start) && (seqnum < window_end));
76         }
77
78
79         return ((seqnum < window_end) || (seqnum >= window_start));
80 }
81
82 static inline float CALC_DTIME(u64 lasttime, u64 curtime)
83 {
84         float value = ( curtime - lasttime) / 1000.0;
85         return MYMAX(MYMIN(value,0.1),0.0);
86 }
87
88 struct BufferedPacket
89 {
90         BufferedPacket(u8 *a_data, u32 a_size):
91                 data(a_data, a_size)
92         {}
93         BufferedPacket(u32 a_size):
94                 data(a_size)
95         {}
96         Buffer<u8> data; // Data of the packet, including headers
97         float time = 0.0f; // Seconds from buffering the packet or re-sending
98         float totaltime = 0.0f; // Seconds from buffering the packet
99         u64 absolute_send_time = -1;
100         Address address; // Sender or destination
101         unsigned int resend_count = 0;
102 };
103
104 // This adds the base headers to the data and makes a packet out of it
105 BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
106                 u32 protocol_id, session_t sender_peer_id, u8 channel);
107
108 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
109 // Increments split_seqnum if a split packet is made
110 void makeAutoSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max,
111                 u16 &split_seqnum, std::list<SharedBuffer<u8>> *list);
112
113 // Add the TYPE_RELIABLE header to the data
114 SharedBuffer<u8> makeReliablePacket(const SharedBuffer<u8> &data, u16 seqnum);
115
116 struct IncomingSplitPacket
117 {
118         IncomingSplitPacket(u32 cc, bool r):
119                 chunk_count(cc), reliable(r) {}
120
121         IncomingSplitPacket() = delete;
122
123         float time = 0.0f; // Seconds from adding
124         u32 chunk_count;
125         bool reliable; // If true, isn't deleted on timeout
126
127         bool allReceived() const
128         {
129                 return (chunks.size() == chunk_count);
130         }
131         bool insert(u32 chunk_num, SharedBuffer<u8> &chunkdata);
132         SharedBuffer<u8> reassemble();
133
134 private:
135         // Key is chunk number, value is data without headers
136         std::map<u16, SharedBuffer<u8>> chunks;
137 };
138
139 /*
140 === NOTES ===
141
142 A packet is sent through a channel to a peer with a basic header:
143         Header (7 bytes):
144         [0] u32 protocol_id
145         [4] session_t sender_peer_id
146         [6] u8 channel
147 sender_peer_id:
148         Unique to each peer.
149         value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
150         value 1 (PEER_ID_SERVER) is reserved for server
151         these constants are defined in constants.h
152 channel:
153         Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist.
154 */
155 #define BASE_HEADER_SIZE 7
156 #define CHANNEL_COUNT 3
157 /*
158 Packet types:
159
160 CONTROL: This is a packet used by the protocol.
161 - When this is processed, nothing is handed to the user.
162         Header (2 byte):
163         [0] u8 type
164         [1] u8 controltype
165 controltype and data description:
166         CONTROLTYPE_ACK
167                 [2] u16 seqnum
168         CONTROLTYPE_SET_PEER_ID
169                 [2] session_t peer_id_new
170         CONTROLTYPE_PING
171         - There is no actual reply, but this can be sent in a reliable
172           packet to get a reply
173         CONTROLTYPE_DISCO
174 */
175 //#define TYPE_CONTROL 0
176 #define CONTROLTYPE_ACK 0
177 #define CONTROLTYPE_SET_PEER_ID 1
178 #define CONTROLTYPE_PING 2
179 #define CONTROLTYPE_DISCO 3
180
181 /*
182 ORIGINAL: This is a plain packet with no control and no error
183 checking at all.
184 - When this is processed, it is directly handed to the user.
185         Header (1 byte):
186         [0] u8 type
187 */
188 //#define TYPE_ORIGINAL 1
189 #define ORIGINAL_HEADER_SIZE 1
190 /*
191 SPLIT: These are sequences of packets forming one bigger piece of
192 data.
193 - When processed and all the packet_nums 0...packet_count-1 are
194   present (this should be buffered), the resulting data shall be
195   directly handed to the user.
196 - If the data fails to come up in a reasonable time, the buffer shall
197   be silently discarded.
198 - These can be sent as-is or atop of a RELIABLE packet stream.
199         Header (7 bytes):
200         [0] u8 type
201         [1] u16 seqnum
202         [3] u16 chunk_count
203         [5] u16 chunk_num
204 */
205 //#define TYPE_SPLIT 2
206 /*
207 RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
208 and they shall be delivered in the same order as sent. This is done
209 with a buffer in the receiving and transmitting end.
210 - When this is processed, the contents of each packet is recursively
211   processed as packets.
212         Header (3 bytes):
213         [0] u8 type
214         [1] u16 seqnum
215
216 */
217 //#define TYPE_RELIABLE 3
218 #define RELIABLE_HEADER_SIZE 3
219 #define SEQNUM_INITIAL 65500
220
221 enum PacketType: u8 {
222         PACKET_TYPE_CONTROL = 0,
223         PACKET_TYPE_ORIGINAL = 1,
224         PACKET_TYPE_SPLIT = 2,
225         PACKET_TYPE_RELIABLE = 3,
226         PACKET_TYPE_MAX
227 };
228 /*
229         A buffer which stores reliable packets and sorts them internally
230         for fast access to the smallest one.
231 */
232
233 typedef std::list<BufferedPacket>::iterator RPBSearchResult;
234
235 class ReliablePacketBuffer
236 {
237 public:
238         ReliablePacketBuffer() = default;
239
240         bool getFirstSeqnum(u16& result);
241
242         BufferedPacket popFirst();
243         BufferedPacket popSeqnum(u16 seqnum);
244         void insert(const BufferedPacket &p, u16 next_expected);
245
246         void incrementTimeouts(float dtime);
247         std::list<BufferedPacket> getTimedOuts(float timeout, u32 max_packets);
248
249         void print();
250         bool empty();
251         u32 size();
252
253
254 private:
255         RPBSearchResult findPacket(u16 seqnum); // does not perform locking
256         inline RPBSearchResult notFound() { return m_list.end(); }
257
258         std::list<BufferedPacket> m_list;
259
260         u16 m_oldest_non_answered_ack;
261
262         std::mutex m_list_mutex;
263 };
264
265 /*
266         A buffer for reconstructing split packets
267 */
268
269 class IncomingSplitBuffer
270 {
271 public:
272         ~IncomingSplitBuffer();
273         /*
274                 Returns a reference counted buffer of length != 0 when a full split
275                 packet is constructed. If not, returns one of length 0.
276         */
277         SharedBuffer<u8> insert(const BufferedPacket &p, bool reliable);
278
279         void removeUnreliableTimedOuts(float dtime, float timeout);
280
281 private:
282         // Key is seqnum
283         std::map<u16, IncomingSplitPacket*> m_buf;
284
285         std::mutex m_map_mutex;
286 };
287
288 struct OutgoingPacket
289 {
290         session_t peer_id;
291         u8 channelnum;
292         SharedBuffer<u8> data;
293         bool reliable;
294         bool ack;
295
296         OutgoingPacket(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
297                         bool reliable_,bool ack_=false):
298                 peer_id(peer_id_),
299                 channelnum(channelnum_),
300                 data(data_),
301                 reliable(reliable_),
302                 ack(ack_)
303         {
304         }
305 };
306
307 enum ConnectionCommandType{
308         CONNCMD_NONE,
309         CONNCMD_SERVE,
310         CONNCMD_CONNECT,
311         CONNCMD_DISCONNECT,
312         CONNCMD_DISCONNECT_PEER,
313         CONNCMD_SEND,
314         CONNCMD_SEND_TO_ALL,
315         CONCMD_ACK,
316         CONCMD_CREATE_PEER
317 };
318
319 struct ConnectionCommand
320 {
321         enum ConnectionCommandType type = CONNCMD_NONE;
322         Address address;
323         session_t peer_id = PEER_ID_INEXISTENT;
324         u8 channelnum = 0;
325         Buffer<u8> data;
326         bool reliable = false;
327         bool raw = false;
328
329         ConnectionCommand() = default;
330
331         void serve(Address address_)
332         {
333                 type = CONNCMD_SERVE;
334                 address = address_;
335         }
336         void connect(Address address_)
337         {
338                 type = CONNCMD_CONNECT;
339                 address = address_;
340         }
341         void disconnect()
342         {
343                 type = CONNCMD_DISCONNECT;
344         }
345         void disconnect_peer(session_t peer_id_)
346         {
347                 type = CONNCMD_DISCONNECT_PEER;
348                 peer_id = peer_id_;
349         }
350
351         void send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, bool reliable_);
352
353         void ack(session_t peer_id_, u8 channelnum_, const Buffer<u8> &data_)
354         {
355                 type = CONCMD_ACK;
356                 peer_id = peer_id_;
357                 channelnum = channelnum_;
358                 data = data_;
359                 reliable = false;
360         }
361
362         void createPeer(session_t peer_id_, const Buffer<u8> &data_)
363         {
364                 type = CONCMD_CREATE_PEER;
365                 peer_id = peer_id_;
366                 data = data_;
367                 channelnum = 0;
368                 reliable = true;
369                 raw = true;
370         }
371 };
372
373 /* maximum window size to use, 0xFFFF is theoretical maximum. don't think about
374  * touching it, the less you're away from it the more likely data corruption
375  * will occur
376  */
377 #define MAX_RELIABLE_WINDOW_SIZE 0x8000
378 /* starting value for window size */
379 #define START_RELIABLE_WINDOW_SIZE 0x400
380 /* minimum value for window size */
381 #define MIN_RELIABLE_WINDOW_SIZE 0x40
382
383 class Channel
384 {
385
386 public:
387         u16 readNextIncomingSeqNum();
388         u16 incNextIncomingSeqNum();
389
390         u16 getOutgoingSequenceNumber(bool& successfull);
391         u16 readOutgoingSequenceNumber();
392         bool putBackSequenceNumber(u16);
393
394         u16 readNextSplitSeqNum();
395         void setNextSplitSeqNum(u16 seqnum);
396
397         // This is for buffering the incoming packets that are coming in
398         // the wrong order
399         ReliablePacketBuffer incoming_reliables;
400         // This is for buffering the sent packets so that the sender can
401         // re-send them if no ACK is received
402         ReliablePacketBuffer outgoing_reliables_sent;
403
404         //queued reliable packets
405         std::queue<BufferedPacket> queued_reliables;
406
407         //queue commands prior splitting to packets
408         std::deque<ConnectionCommand> queued_commands;
409
410         IncomingSplitBuffer incoming_splits;
411
412         Channel() = default;
413         ~Channel() = default;
414
415         void UpdatePacketLossCounter(unsigned int count);
416         void UpdatePacketTooLateCounter();
417         void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
418         void UpdateBytesLost(unsigned int bytes);
419         void UpdateBytesReceived(unsigned int bytes);
420
421         void UpdateTimers(float dtime);
422
423         const float getCurrentDownloadRateKB()
424                 { MutexAutoLock lock(m_internal_mutex); return cur_kbps; };
425         const float getMaxDownloadRateKB()
426                 { MutexAutoLock lock(m_internal_mutex); return max_kbps; };
427
428         const float getCurrentLossRateKB()
429                 { MutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
430         const float getMaxLossRateKB()
431                 { MutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
432
433         const float getCurrentIncomingRateKB()
434                 { MutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
435         const float getMaxIncomingRateKB()
436                 { MutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
437
438         const float getAvgDownloadRateKB()
439                 { MutexAutoLock lock(m_internal_mutex); return avg_kbps; };
440         const float getAvgLossRateKB()
441                 { MutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
442         const float getAvgIncomingRateKB()
443                 { MutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
444
445         const unsigned int getWindowSize() const { return window_size; };
446
447         void setWindowSize(unsigned int size) { window_size = size; };
448 private:
449         std::mutex m_internal_mutex;
450         int window_size = MIN_RELIABLE_WINDOW_SIZE;
451
452         u16 next_incoming_seqnum = SEQNUM_INITIAL;
453
454         u16 next_outgoing_seqnum = SEQNUM_INITIAL;
455         u16 next_outgoing_split_seqnum = SEQNUM_INITIAL;
456
457         unsigned int current_packet_loss = 0;
458         unsigned int current_packet_too_late = 0;
459         unsigned int current_packet_successful = 0;
460         float packet_loss_counter = 0.0f;
461
462         unsigned int current_bytes_transfered = 0;
463         unsigned int current_bytes_received = 0;
464         unsigned int current_bytes_lost = 0;
465         float max_kbps = 0.0f;
466         float cur_kbps = 0.0f;
467         float avg_kbps = 0.0f;
468         float max_incoming_kbps = 0.0f;
469         float cur_incoming_kbps = 0.0f;
470         float avg_incoming_kbps = 0.0f;
471         float max_kbps_lost = 0.0f;
472         float cur_kbps_lost = 0.0f;
473         float avg_kbps_lost = 0.0f;
474         float bpm_counter = 0.0f;
475
476         unsigned int rate_samples = 0;
477 };
478
479 class Peer;
480
481 class PeerHelper
482 {
483 public:
484         PeerHelper() = default;
485         PeerHelper(Peer* peer);
486         ~PeerHelper();
487
488         PeerHelper&   operator=(Peer* peer);
489         Peer*         operator->() const;
490         bool          operator!();
491         Peer*         operator&() const;
492         bool          operator!=(void* ptr);
493
494 private:
495         Peer *m_peer = nullptr;
496 };
497
498 class Connection;
499
500 typedef enum {
501         CUR_DL_RATE,
502         AVG_DL_RATE,
503         CUR_INC_RATE,
504         AVG_INC_RATE,
505         CUR_LOSS_RATE,
506         AVG_LOSS_RATE,
507 } rate_stat_type;
508
509 class Peer {
510         public:
511                 friend class PeerHelper;
512
513                 Peer(Address address_,u16 id_,Connection* connection) :
514                         id(id_),
515                         m_connection(connection),
516                         address(address_),
517                         m_last_timeout_check(porting::getTimeMs())
518                 {
519                 };
520
521                 virtual ~Peer() {
522                         MutexAutoLock usage_lock(m_exclusive_access_mutex);
523                         FATAL_ERROR_IF(m_usage != 0, "Reference counting failure");
524                 };
525
526                 // Unique id of the peer
527                 u16 id;
528
529                 void Drop();
530
531                 virtual void PutReliableSendCommand(ConnectionCommand &c,
532                                                 unsigned int max_packet_size) {};
533
534                 virtual bool getAddress(MTProtocols type, Address& toset) = 0;
535
536                 bool isPendingDeletion()
537                 { MutexAutoLock lock(m_exclusive_access_mutex); return m_pending_deletion; };
538
539                 void ResetTimeout()
540                         {MutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter = 0.0; };
541
542                 bool isTimedOut(float timeout);
543
544                 unsigned int m_increment_packets_remaining = 0;
545
546                 virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
547                 virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
548                 virtual SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
549                                 bool reliable)
550                 {
551                         errorstream << "Peer::addSplitPacket called,"
552                                         << " this is supposed to be never called!" << std::endl;
553                         return SharedBuffer<u8>(0);
554                 };
555
556                 virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
557
558                 virtual float getStat(rtt_stat_type type) const {
559                         switch (type) {
560                                 case MIN_RTT:
561                                         return m_rtt.min_rtt;
562                                 case MAX_RTT:
563                                         return m_rtt.max_rtt;
564                                 case AVG_RTT:
565                                         return m_rtt.avg_rtt;
566                                 case MIN_JITTER:
567                                         return m_rtt.jitter_min;
568                                 case MAX_JITTER:
569                                         return m_rtt.jitter_max;
570                                 case AVG_JITTER:
571                                         return m_rtt.jitter_avg;
572                         }
573                         return -1;
574                 }
575         protected:
576                 virtual void reportRTT(float rtt) {};
577
578                 void RTTStatistics(float rtt,
579                                                         const std::string &profiler_id = "",
580                                                         unsigned int num_samples = 1000);
581
582                 bool IncUseCount();
583                 void DecUseCount();
584
585                 std::mutex m_exclusive_access_mutex;
586
587                 bool m_pending_deletion = false;
588
589                 Connection* m_connection;
590
591                 // Address of the peer
592                 Address address;
593
594                 // Ping timer
595                 float m_ping_timer = 0.0f;
596         private:
597
598                 struct rttstats {
599                         float jitter_min = FLT_MAX;
600                         float jitter_max = 0.0f;
601                         float jitter_avg = -1.0f;
602                         float min_rtt = FLT_MAX;
603                         float max_rtt = 0.0f;
604                         float avg_rtt = -1.0f;
605
606                         rttstats() = default;
607                 };
608
609                 rttstats m_rtt;
610                 float m_last_rtt = -1.0f;
611
612                 // current usage count
613                 unsigned int m_usage = 0;
614
615                 // Seconds from last receive
616                 float m_timeout_counter = 0.0f;
617
618                 u64 m_last_timeout_check;
619 };
620
621 class UDPPeer : public Peer
622 {
623 public:
624
625         friend class PeerHelper;
626         friend class ConnectionReceiveThread;
627         friend class ConnectionSendThread;
628         friend class Connection;
629
630         UDPPeer(u16 a_id, Address a_address, Connection* connection);
631         virtual ~UDPPeer() = default;
632
633         void PutReliableSendCommand(ConnectionCommand &c,
634                                                         unsigned int max_packet_size);
635
636         bool getAddress(MTProtocols type, Address& toset);
637
638         u16 getNextSplitSequenceNumber(u8 channel);
639         void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
640
641         SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
642                 bool reliable);
643
644 protected:
645         /*
646                 Calculates avg_rtt and resend_timeout.
647                 rtt=-1 only recalculates resend_timeout
648         */
649         void reportRTT(float rtt);
650
651         void RunCommandQueues(
652                                         unsigned int max_packet_size,
653                                         unsigned int maxcommands,
654                                         unsigned int maxtransfer);
655
656         float getResendTimeout()
657                 { MutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
658
659         void setResendTimeout(float timeout)
660                 { MutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
661         bool Ping(float dtime,SharedBuffer<u8>& data);
662
663         Channel channels[CHANNEL_COUNT];
664         bool m_pending_disconnect = false;
665 private:
666         // This is changed dynamically
667         float resend_timeout = 0.5;
668
669         bool processReliableSendCommand(
670                                         ConnectionCommand &c,
671                                         unsigned int max_packet_size);
672 };
673
674 /*
675         Connection
676 */
677
678 enum ConnectionEventType{
679         CONNEVENT_NONE,
680         CONNEVENT_DATA_RECEIVED,
681         CONNEVENT_PEER_ADDED,
682         CONNEVENT_PEER_REMOVED,
683         CONNEVENT_BIND_FAILED,
684 };
685
686 struct ConnectionEvent
687 {
688         enum ConnectionEventType type = CONNEVENT_NONE;
689         session_t peer_id = 0;
690         Buffer<u8> data;
691         bool timeout = false;
692         Address address;
693
694         ConnectionEvent() = default;
695
696         const char *describe() const
697         {
698                 switch(type) {
699                 case CONNEVENT_NONE:
700                         return "CONNEVENT_NONE";
701                 case CONNEVENT_DATA_RECEIVED:
702                         return "CONNEVENT_DATA_RECEIVED";
703                 case CONNEVENT_PEER_ADDED:
704                         return "CONNEVENT_PEER_ADDED";
705                 case CONNEVENT_PEER_REMOVED:
706                         return "CONNEVENT_PEER_REMOVED";
707                 case CONNEVENT_BIND_FAILED:
708                         return "CONNEVENT_BIND_FAILED";
709                 }
710                 return "Invalid ConnectionEvent";
711         }
712
713         void dataReceived(session_t peer_id_, const Buffer<u8> &data_)
714         {
715                 type = CONNEVENT_DATA_RECEIVED;
716                 peer_id = peer_id_;
717                 data = data_;
718         }
719         void peerAdded(session_t peer_id_, Address address_)
720         {
721                 type = CONNEVENT_PEER_ADDED;
722                 peer_id = peer_id_;
723                 address = address_;
724         }
725         void peerRemoved(session_t peer_id_, bool timeout_, Address address_)
726         {
727                 type = CONNEVENT_PEER_REMOVED;
728                 peer_id = peer_id_;
729                 timeout = timeout_;
730                 address = address_;
731         }
732         void bindFailed()
733         {
734                 type = CONNEVENT_BIND_FAILED;
735         }
736 };
737
738 class PeerHandler;
739
740 class Connection
741 {
742 public:
743         friend class ConnectionSendThread;
744         friend class ConnectionReceiveThread;
745
746         Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
747                         PeerHandler *peerhandler);
748         ~Connection();
749
750         /* Interface */
751         ConnectionEvent waitEvent(u32 timeout_ms);
752         // Warning: creates an unnecessary copy, prefer putCommand(T&&) if possible
753         void putCommand(const ConnectionCommand &c);
754         void putCommand(ConnectionCommand &&c);
755
756         void SetTimeoutMs(u32 timeout) { m_bc_receive_timeout = timeout; }
757         void Serve(Address bind_addr);
758         void Connect(Address address);
759         bool Connected();
760         void Disconnect();
761         void Receive(NetworkPacket* pkt);
762         bool TryReceive(NetworkPacket *pkt);
763         void Send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable);
764         session_t GetPeerID() const { return m_peer_id; }
765         Address GetPeerAddress(session_t peer_id);
766         float getPeerStat(session_t peer_id, rtt_stat_type type);
767         float getLocalStat(rate_stat_type type);
768         const u32 GetProtocolID() const { return m_protocol_id; };
769         const std::string getDesc();
770         void DisconnectPeer(session_t peer_id);
771
772 protected:
773         PeerHelper getPeerNoEx(session_t peer_id);
774         u16   lookupPeer(Address& sender);
775
776         u16 createPeer(Address& sender, MTProtocols protocol, int fd);
777         UDPPeer*  createServerPeer(Address& sender);
778         bool deletePeer(session_t peer_id, bool timeout);
779
780         void SetPeerID(session_t id) { m_peer_id = id; }
781
782         void sendAck(session_t peer_id, u8 channelnum, u16 seqnum);
783
784         void PrintInfo(std::ostream &out);
785
786         std::vector<session_t> getPeerIDs()
787         {
788                 MutexAutoLock peerlock(m_peers_mutex);
789                 return m_peer_ids;
790         }
791
792         UDPSocket m_udpSocket;
793         // Command queue: user -> SendThread
794         MutexedQueue<ConnectionCommand> m_command_queue;
795
796         bool Receive(NetworkPacket *pkt, u32 timeout);
797
798         // Warning: creates an unnecessary copy, prefer putEvent(T&&) if possible
799         void putEvent(const ConnectionEvent &e);
800         void putEvent(ConnectionEvent &&e);
801
802         void TriggerSend();
803         
804         bool ConnectedToServer() 
805         {
806                 return getPeerNoEx(PEER_ID_SERVER) != nullptr;
807         }
808 private:
809         // Event queue: ReceiveThread -> user
810         MutexedQueue<ConnectionEvent> m_event_queue;
811
812         session_t m_peer_id = 0;
813         u32 m_protocol_id;
814
815         std::map<session_t, Peer *> m_peers;
816         std::vector<session_t> m_peer_ids;
817         std::mutex m_peers_mutex;
818
819         std::unique_ptr<ConnectionSendThread> m_sendThread;
820         std::unique_ptr<ConnectionReceiveThread> m_receiveThread;
821
822         std::mutex m_info_mutex;
823
824         // Backwards compatibility
825         PeerHandler *m_bc_peerhandler;
826         u32 m_bc_receive_timeout = 0;
827
828         bool m_shutting_down = false;
829
830         session_t m_next_remote_peer_id = 2;
831 };
832
833 } // namespace