]> git.lizzy.rs Git - minetest.git/blob - src/network/connection.h
Improve protocol-level receiving code (#9617)
[minetest.git] / src / network / connection.h
1 /*
2 Minetest
3 Copyright (C) 2013 celeron55, Perttu Ahola <celeron55@gmail.com>
4
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU Lesser General Public License as published by
7 the Free Software Foundation; either version 2.1 of the License, or
8 (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 GNU Lesser General Public License for more details.
14
15 You should have received a copy of the GNU Lesser General Public License along
16 with this program; if not, write to the Free Software Foundation, Inc.,
17 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #pragma once
21
22 #include "irrlichttypes_bloated.h"
23 #include "peerhandler.h"
24 #include "socket.h"
25 #include "constants.h"
26 #include "util/pointer.h"
27 #include "util/container.h"
28 #include "util/thread.h"
29 #include "util/numeric.h"
30 #include "networkprotocol.h"
31 #include <iostream>
32 #include <fstream>
33 #include <list>
34 #include <map>
35
36 class NetworkPacket;
37
38 namespace con
39 {
40
41 class ConnectionReceiveThread;
42 class ConnectionSendThread;
43
44 typedef enum MTProtocols {
45         MTP_PRIMARY,
46         MTP_UDP,
47         MTP_MINETEST_RELIABLE_UDP
48 } MTProtocols;
49
50 #define MAX_UDP_PEERS 65535
51
52 #define SEQNUM_MAX 65535
53
54 inline bool seqnum_higher(u16 totest, u16 base)
55 {
56         if (totest > base)
57         {
58                 if ((totest - base) > (SEQNUM_MAX/2))
59                         return false;
60
61                 return true;
62         }
63
64         if ((base - totest) > (SEQNUM_MAX/2))
65                 return true;
66
67         return false;
68 }
69
70 inline bool seqnum_in_window(u16 seqnum, u16 next,u16 window_size)
71 {
72         u16 window_start = next;
73         u16 window_end   = ( next + window_size ) % (SEQNUM_MAX+1);
74
75         if (window_start < window_end) {
76                 return ((seqnum >= window_start) && (seqnum < window_end));
77         }
78
79
80         return ((seqnum < window_end) || (seqnum >= window_start));
81 }
82
83 static inline float CALC_DTIME(u64 lasttime, u64 curtime)
84 {
85         float value = ( curtime - lasttime) / 1000.0;
86         return MYMAX(MYMIN(value,0.1),0.0);
87 }
88
89 struct BufferedPacket
90 {
91         BufferedPacket(u8 *a_data, u32 a_size):
92                 data(a_data, a_size)
93         {}
94         BufferedPacket(u32 a_size):
95                 data(a_size)
96         {}
97         Buffer<u8> data; // Data of the packet, including headers
98         float time = 0.0f; // Seconds from buffering the packet or re-sending
99         float totaltime = 0.0f; // Seconds from buffering the packet
100         u64 absolute_send_time = -1;
101         Address address; // Sender or destination
102         unsigned int resend_count = 0;
103 };
104
105 // This adds the base headers to the data and makes a packet out of it
106 BufferedPacket makePacket(Address &address, const SharedBuffer<u8> &data,
107                 u32 protocol_id, session_t sender_peer_id, u8 channel);
108
109 // Depending on size, make a TYPE_ORIGINAL or TYPE_SPLIT packet
110 // Increments split_seqnum if a split packet is made
111 void makeAutoSplitPacket(const SharedBuffer<u8> &data, u32 chunksize_max,
112                 u16 &split_seqnum, std::list<SharedBuffer<u8>> *list);
113
114 // Add the TYPE_RELIABLE header to the data
115 SharedBuffer<u8> makeReliablePacket(const SharedBuffer<u8> &data, u16 seqnum);
116
117 struct IncomingSplitPacket
118 {
119         IncomingSplitPacket(u32 cc, bool r):
120                 chunk_count(cc), reliable(r) {}
121
122         IncomingSplitPacket() = delete;
123
124         float time = 0.0f; // Seconds from adding
125         u32 chunk_count;
126         bool reliable; // If true, isn't deleted on timeout
127
128         bool allReceived() const
129         {
130                 return (chunks.size() == chunk_count);
131         }
132         bool insert(u32 chunk_num, SharedBuffer<u8> &chunkdata);
133         SharedBuffer<u8> reassemble();
134
135 private:
136         // Key is chunk number, value is data without headers
137         std::map<u16, SharedBuffer<u8>> chunks;
138 };
139
140 /*
141 === NOTES ===
142
143 A packet is sent through a channel to a peer with a basic header:
144         Header (7 bytes):
145         [0] u32 protocol_id
146         [4] session_t sender_peer_id
147         [6] u8 channel
148 sender_peer_id:
149         Unique to each peer.
150         value 0 (PEER_ID_INEXISTENT) is reserved for making new connections
151         value 1 (PEER_ID_SERVER) is reserved for server
152         these constants are defined in constants.h
153 channel:
154         Channel numbers have no intrinsic meaning. Currently only 0, 1, 2 exist.
155 */
156 #define BASE_HEADER_SIZE 7
157 #define CHANNEL_COUNT 3
158 /*
159 Packet types:
160
161 CONTROL: This is a packet used by the protocol.
162 - When this is processed, nothing is handed to the user.
163         Header (2 byte):
164         [0] u8 type
165         [1] u8 controltype
166 controltype and data description:
167         CONTROLTYPE_ACK
168                 [2] u16 seqnum
169         CONTROLTYPE_SET_PEER_ID
170                 [2] session_t peer_id_new
171         CONTROLTYPE_PING
172         - There is no actual reply, but this can be sent in a reliable
173           packet to get a reply
174         CONTROLTYPE_DISCO
175 */
176 //#define TYPE_CONTROL 0
177 #define CONTROLTYPE_ACK 0
178 #define CONTROLTYPE_SET_PEER_ID 1
179 #define CONTROLTYPE_PING 2
180 #define CONTROLTYPE_DISCO 3
181
182 /*
183 ORIGINAL: This is a plain packet with no control and no error
184 checking at all.
185 - When this is processed, it is directly handed to the user.
186         Header (1 byte):
187         [0] u8 type
188 */
189 //#define TYPE_ORIGINAL 1
190 #define ORIGINAL_HEADER_SIZE 1
191 /*
192 SPLIT: These are sequences of packets forming one bigger piece of
193 data.
194 - When processed and all the packet_nums 0...packet_count-1 are
195   present (this should be buffered), the resulting data shall be
196   directly handed to the user.
197 - If the data fails to come up in a reasonable time, the buffer shall
198   be silently discarded.
199 - These can be sent as-is or atop of a RELIABLE packet stream.
200         Header (7 bytes):
201         [0] u8 type
202         [1] u16 seqnum
203         [3] u16 chunk_count
204         [5] u16 chunk_num
205 */
206 //#define TYPE_SPLIT 2
207 /*
208 RELIABLE: Delivery of all RELIABLE packets shall be forced by ACKs,
209 and they shall be delivered in the same order as sent. This is done
210 with a buffer in the receiving and transmitting end.
211 - When this is processed, the contents of each packet is recursively
212   processed as packets.
213         Header (3 bytes):
214         [0] u8 type
215         [1] u16 seqnum
216
217 */
218 //#define TYPE_RELIABLE 3
219 #define RELIABLE_HEADER_SIZE 3
220 #define SEQNUM_INITIAL 65500
221
222 enum PacketType: u8 {
223         PACKET_TYPE_CONTROL = 0,
224         PACKET_TYPE_ORIGINAL = 1,
225         PACKET_TYPE_SPLIT = 2,
226         PACKET_TYPE_RELIABLE = 3,
227         PACKET_TYPE_MAX
228 };
229 /*
230         A buffer which stores reliable packets and sorts them internally
231         for fast access to the smallest one.
232 */
233
234 typedef std::list<BufferedPacket>::iterator RPBSearchResult;
235
236 class ReliablePacketBuffer
237 {
238 public:
239         ReliablePacketBuffer() = default;
240
241         bool getFirstSeqnum(u16& result);
242
243         BufferedPacket popFirst();
244         BufferedPacket popSeqnum(u16 seqnum);
245         void insert(BufferedPacket &p, u16 next_expected);
246
247         void incrementTimeouts(float dtime);
248         std::list<BufferedPacket> getTimedOuts(float timeout,
249                         unsigned int max_packets);
250
251         void print();
252         bool empty();
253         RPBSearchResult notFound();
254         u32 size();
255
256
257 private:
258         RPBSearchResult findPacket(u16 seqnum); // does not perform locking
259
260         std::list<BufferedPacket> m_list;
261
262         u16 m_oldest_non_answered_ack;
263
264         std::mutex m_list_mutex;
265 };
266
267 /*
268         A buffer for reconstructing split packets
269 */
270
271 class IncomingSplitBuffer
272 {
273 public:
274         ~IncomingSplitBuffer();
275         /*
276                 Returns a reference counted buffer of length != 0 when a full split
277                 packet is constructed. If not, returns one of length 0.
278         */
279         SharedBuffer<u8> insert(const BufferedPacket &p, bool reliable);
280
281         void removeUnreliableTimedOuts(float dtime, float timeout);
282
283 private:
284         // Key is seqnum
285         std::map<u16, IncomingSplitPacket*> m_buf;
286
287         std::mutex m_map_mutex;
288 };
289
290 struct OutgoingPacket
291 {
292         session_t peer_id;
293         u8 channelnum;
294         SharedBuffer<u8> data;
295         bool reliable;
296         bool ack;
297
298         OutgoingPacket(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_,
299                         bool reliable_,bool ack_=false):
300                 peer_id(peer_id_),
301                 channelnum(channelnum_),
302                 data(data_),
303                 reliable(reliable_),
304                 ack(ack_)
305         {
306         }
307 };
308
309 enum ConnectionCommandType{
310         CONNCMD_NONE,
311         CONNCMD_SERVE,
312         CONNCMD_CONNECT,
313         CONNCMD_DISCONNECT,
314         CONNCMD_DISCONNECT_PEER,
315         CONNCMD_SEND,
316         CONNCMD_SEND_TO_ALL,
317         CONCMD_ACK,
318         CONCMD_CREATE_PEER
319 };
320
321 struct ConnectionCommand
322 {
323         enum ConnectionCommandType type = CONNCMD_NONE;
324         Address address;
325         session_t peer_id = PEER_ID_INEXISTENT;
326         u8 channelnum = 0;
327         Buffer<u8> data;
328         bool reliable = false;
329         bool raw = false;
330
331         ConnectionCommand() = default;
332         ConnectionCommand &operator=(const ConnectionCommand &other)
333         {
334                 type = other.type;
335                 address = other.address;
336                 peer_id = other.peer_id;
337                 channelnum = other.channelnum;
338                 // We must copy the buffer here to prevent race condition
339                 data = SharedBuffer<u8>(*other.data, other.data.getSize());
340                 reliable = other.reliable;
341                 raw = other.raw;
342                 return *this;
343         }
344
345         void serve(Address address_)
346         {
347                 type = CONNCMD_SERVE;
348                 address = address_;
349         }
350         void connect(Address address_)
351         {
352                 type = CONNCMD_CONNECT;
353                 address = address_;
354         }
355         void disconnect()
356         {
357                 type = CONNCMD_DISCONNECT;
358         }
359         void disconnect_peer(session_t peer_id_)
360         {
361                 type = CONNCMD_DISCONNECT_PEER;
362                 peer_id = peer_id_;
363         }
364
365         void send(session_t peer_id_, u8 channelnum_, NetworkPacket *pkt, bool reliable_);
366
367         void ack(session_t peer_id_, u8 channelnum_, const SharedBuffer<u8> &data_)
368         {
369                 type = CONCMD_ACK;
370                 peer_id = peer_id_;
371                 channelnum = channelnum_;
372                 data = data_;
373                 reliable = false;
374         }
375
376         void createPeer(session_t peer_id_, const SharedBuffer<u8> &data_)
377         {
378                 type = CONCMD_CREATE_PEER;
379                 peer_id = peer_id_;
380                 data = data_;
381                 channelnum = 0;
382                 reliable = true;
383                 raw = true;
384         }
385 };
386
387 /* maximum window size to use, 0xFFFF is theoretical maximum. don't think about
388  * touching it, the less you're away from it the more likely data corruption
389  * will occur
390  */
391 #define MAX_RELIABLE_WINDOW_SIZE 0x8000
392 /* starting value for window size */
393 #define START_RELIABLE_WINDOW_SIZE 0x400
394 /* minimum value for window size */
395 #define MIN_RELIABLE_WINDOW_SIZE 0x40
396
397 class Channel
398 {
399
400 public:
401         u16 readNextIncomingSeqNum();
402         u16 incNextIncomingSeqNum();
403
404         u16 getOutgoingSequenceNumber(bool& successfull);
405         u16 readOutgoingSequenceNumber();
406         bool putBackSequenceNumber(u16);
407
408         u16 readNextSplitSeqNum();
409         void setNextSplitSeqNum(u16 seqnum);
410
411         // This is for buffering the incoming packets that are coming in
412         // the wrong order
413         ReliablePacketBuffer incoming_reliables;
414         // This is for buffering the sent packets so that the sender can
415         // re-send them if no ACK is received
416         ReliablePacketBuffer outgoing_reliables_sent;
417
418         //queued reliable packets
419         std::queue<BufferedPacket> queued_reliables;
420
421         //queue commands prior splitting to packets
422         std::deque<ConnectionCommand> queued_commands;
423
424         IncomingSplitBuffer incoming_splits;
425
426         Channel() = default;
427         ~Channel() = default;
428
429         void UpdatePacketLossCounter(unsigned int count);
430         void UpdatePacketTooLateCounter();
431         void UpdateBytesSent(unsigned int bytes,unsigned int packages=1);
432         void UpdateBytesLost(unsigned int bytes);
433         void UpdateBytesReceived(unsigned int bytes);
434
435         void UpdateTimers(float dtime);
436
437         const float getCurrentDownloadRateKB()
438                 { MutexAutoLock lock(m_internal_mutex); return cur_kbps; };
439         const float getMaxDownloadRateKB()
440                 { MutexAutoLock lock(m_internal_mutex); return max_kbps; };
441
442         const float getCurrentLossRateKB()
443                 { MutexAutoLock lock(m_internal_mutex); return cur_kbps_lost; };
444         const float getMaxLossRateKB()
445                 { MutexAutoLock lock(m_internal_mutex); return max_kbps_lost; };
446
447         const float getCurrentIncomingRateKB()
448                 { MutexAutoLock lock(m_internal_mutex); return cur_incoming_kbps; };
449         const float getMaxIncomingRateKB()
450                 { MutexAutoLock lock(m_internal_mutex); return max_incoming_kbps; };
451
452         const float getAvgDownloadRateKB()
453                 { MutexAutoLock lock(m_internal_mutex); return avg_kbps; };
454         const float getAvgLossRateKB()
455                 { MutexAutoLock lock(m_internal_mutex); return avg_kbps_lost; };
456         const float getAvgIncomingRateKB()
457                 { MutexAutoLock lock(m_internal_mutex); return avg_incoming_kbps; };
458
459         const unsigned int getWindowSize() const { return window_size; };
460
461         void setWindowSize(unsigned int size) { window_size = size; };
462 private:
463         std::mutex m_internal_mutex;
464         int window_size = MIN_RELIABLE_WINDOW_SIZE;
465
466         u16 next_incoming_seqnum = SEQNUM_INITIAL;
467
468         u16 next_outgoing_seqnum = SEQNUM_INITIAL;
469         u16 next_outgoing_split_seqnum = SEQNUM_INITIAL;
470
471         unsigned int current_packet_loss = 0;
472         unsigned int current_packet_too_late = 0;
473         unsigned int current_packet_successful = 0;
474         float packet_loss_counter = 0.0f;
475
476         unsigned int current_bytes_transfered = 0;
477         unsigned int current_bytes_received = 0;
478         unsigned int current_bytes_lost = 0;
479         float max_kbps = 0.0f;
480         float cur_kbps = 0.0f;
481         float avg_kbps = 0.0f;
482         float max_incoming_kbps = 0.0f;
483         float cur_incoming_kbps = 0.0f;
484         float avg_incoming_kbps = 0.0f;
485         float max_kbps_lost = 0.0f;
486         float cur_kbps_lost = 0.0f;
487         float avg_kbps_lost = 0.0f;
488         float bpm_counter = 0.0f;
489
490         unsigned int rate_samples = 0;
491 };
492
493 class Peer;
494
495 class PeerHelper
496 {
497 public:
498         PeerHelper() = default;
499         PeerHelper(Peer* peer);
500         ~PeerHelper();
501
502         PeerHelper&   operator=(Peer* peer);
503         Peer*         operator->() const;
504         bool          operator!();
505         Peer*         operator&() const;
506         bool          operator!=(void* ptr);
507
508 private:
509         Peer *m_peer = nullptr;
510 };
511
512 class Connection;
513
514 typedef enum {
515         CUR_DL_RATE,
516         AVG_DL_RATE,
517         CUR_INC_RATE,
518         AVG_INC_RATE,
519         CUR_LOSS_RATE,
520         AVG_LOSS_RATE,
521 } rate_stat_type;
522
523 class Peer {
524         public:
525                 friend class PeerHelper;
526
527                 Peer(Address address_,u16 id_,Connection* connection) :
528                         id(id_),
529                         m_connection(connection),
530                         address(address_),
531                         m_last_timeout_check(porting::getTimeMs())
532                 {
533                 };
534
535                 virtual ~Peer() {
536                         MutexAutoLock usage_lock(m_exclusive_access_mutex);
537                         FATAL_ERROR_IF(m_usage != 0, "Reference counting failure");
538                 };
539
540                 // Unique id of the peer
541                 u16 id;
542
543                 void Drop();
544
545                 virtual void PutReliableSendCommand(ConnectionCommand &c,
546                                                 unsigned int max_packet_size) {};
547
548                 virtual bool getAddress(MTProtocols type, Address& toset) = 0;
549
550                 bool isPendingDeletion()
551                 { MutexAutoLock lock(m_exclusive_access_mutex); return m_pending_deletion; };
552
553                 void ResetTimeout()
554                         {MutexAutoLock lock(m_exclusive_access_mutex); m_timeout_counter = 0.0; };
555
556                 bool isTimedOut(float timeout);
557
558                 unsigned int m_increment_packets_remaining = 0;
559
560                 virtual u16 getNextSplitSequenceNumber(u8 channel) { return 0; };
561                 virtual void setNextSplitSequenceNumber(u8 channel, u16 seqnum) {};
562                 virtual SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
563                                 bool reliable)
564                 {
565                         errorstream << "Peer::addSplitPacket called,"
566                                         << " this is supposed to be never called!" << std::endl;
567                         return SharedBuffer<u8>(0);
568                 };
569
570                 virtual bool Ping(float dtime, SharedBuffer<u8>& data) { return false; };
571
572                 virtual float getStat(rtt_stat_type type) const {
573                         switch (type) {
574                                 case MIN_RTT:
575                                         return m_rtt.min_rtt;
576                                 case MAX_RTT:
577                                         return m_rtt.max_rtt;
578                                 case AVG_RTT:
579                                         return m_rtt.avg_rtt;
580                                 case MIN_JITTER:
581                                         return m_rtt.jitter_min;
582                                 case MAX_JITTER:
583                                         return m_rtt.jitter_max;
584                                 case AVG_JITTER:
585                                         return m_rtt.jitter_avg;
586                         }
587                         return -1;
588                 }
589         protected:
590                 virtual void reportRTT(float rtt) {};
591
592                 void RTTStatistics(float rtt,
593                                                         const std::string &profiler_id = "",
594                                                         unsigned int num_samples = 1000);
595
596                 bool IncUseCount();
597                 void DecUseCount();
598
599                 std::mutex m_exclusive_access_mutex;
600
601                 bool m_pending_deletion = false;
602
603                 Connection* m_connection;
604
605                 // Address of the peer
606                 Address address;
607
608                 // Ping timer
609                 float m_ping_timer = 0.0f;
610         private:
611
612                 struct rttstats {
613                         float jitter_min = FLT_MAX;
614                         float jitter_max = 0.0f;
615                         float jitter_avg = -2.0f;
616                         float min_rtt = FLT_MAX;
617                         float max_rtt = 0.0f;
618                         float avg_rtt = -2.0f;
619
620                         rttstats() = default;
621                 };
622
623                 rttstats m_rtt;
624                 float m_last_rtt = -2.0f;
625
626                 // current usage count
627                 unsigned int m_usage = 0;
628
629                 // Seconds from last receive
630                 float m_timeout_counter = 0.0f;
631
632                 u64 m_last_timeout_check;
633 };
634
635 class UDPPeer : public Peer
636 {
637 public:
638
639         friend class PeerHelper;
640         friend class ConnectionReceiveThread;
641         friend class ConnectionSendThread;
642         friend class Connection;
643
644         UDPPeer(u16 a_id, Address a_address, Connection* connection);
645         virtual ~UDPPeer() = default;
646
647         void PutReliableSendCommand(ConnectionCommand &c,
648                                                         unsigned int max_packet_size);
649
650         bool getAddress(MTProtocols type, Address& toset);
651
652         u16 getNextSplitSequenceNumber(u8 channel);
653         void setNextSplitSequenceNumber(u8 channel, u16 seqnum);
654
655         SharedBuffer<u8> addSplitPacket(u8 channel, const BufferedPacket &toadd,
656                 bool reliable);
657
658 protected:
659         /*
660                 Calculates avg_rtt and resend_timeout.
661                 rtt=-1 only recalculates resend_timeout
662         */
663         void reportRTT(float rtt);
664
665         void RunCommandQueues(
666                                         unsigned int max_packet_size,
667                                         unsigned int maxcommands,
668                                         unsigned int maxtransfer);
669
670         float getResendTimeout()
671                 { MutexAutoLock lock(m_exclusive_access_mutex); return resend_timeout; }
672
673         void setResendTimeout(float timeout)
674                 { MutexAutoLock lock(m_exclusive_access_mutex); resend_timeout = timeout; }
675         bool Ping(float dtime,SharedBuffer<u8>& data);
676
677         Channel channels[CHANNEL_COUNT];
678         bool m_pending_disconnect = false;
679 private:
680         // This is changed dynamically
681         float resend_timeout = 0.5;
682
683         bool processReliableSendCommand(
684                                         ConnectionCommand &c,
685                                         unsigned int max_packet_size);
686 };
687
688 /*
689         Connection
690 */
691
692 enum ConnectionEventType{
693         CONNEVENT_NONE,
694         CONNEVENT_DATA_RECEIVED,
695         CONNEVENT_PEER_ADDED,
696         CONNEVENT_PEER_REMOVED,
697         CONNEVENT_BIND_FAILED,
698 };
699
700 struct ConnectionEvent
701 {
702         enum ConnectionEventType type = CONNEVENT_NONE;
703         session_t peer_id = 0;
704         Buffer<u8> data;
705         bool timeout = false;
706         Address address;
707
708         ConnectionEvent() = default;
709
710         std::string describe()
711         {
712                 switch(type) {
713                 case CONNEVENT_NONE:
714                         return "CONNEVENT_NONE";
715                 case CONNEVENT_DATA_RECEIVED:
716                         return "CONNEVENT_DATA_RECEIVED";
717                 case CONNEVENT_PEER_ADDED:
718                         return "CONNEVENT_PEER_ADDED";
719                 case CONNEVENT_PEER_REMOVED:
720                         return "CONNEVENT_PEER_REMOVED";
721                 case CONNEVENT_BIND_FAILED:
722                         return "CONNEVENT_BIND_FAILED";
723                 }
724                 return "Invalid ConnectionEvent";
725         }
726
727         void dataReceived(session_t peer_id_, const SharedBuffer<u8> &data_)
728         {
729                 type = CONNEVENT_DATA_RECEIVED;
730                 peer_id = peer_id_;
731                 data = data_;
732         }
733         void peerAdded(session_t peer_id_, Address address_)
734         {
735                 type = CONNEVENT_PEER_ADDED;
736                 peer_id = peer_id_;
737                 address = address_;
738         }
739         void peerRemoved(session_t peer_id_, bool timeout_, Address address_)
740         {
741                 type = CONNEVENT_PEER_REMOVED;
742                 peer_id = peer_id_;
743                 timeout = timeout_;
744                 address = address_;
745         }
746         void bindFailed()
747         {
748                 type = CONNEVENT_BIND_FAILED;
749         }
750 };
751
752 class PeerHandler;
753
754 class Connection
755 {
756 public:
757         friend class ConnectionSendThread;
758         friend class ConnectionReceiveThread;
759
760         Connection(u32 protocol_id, u32 max_packet_size, float timeout, bool ipv6,
761                         PeerHandler *peerhandler);
762         ~Connection();
763
764         /* Interface */
765         ConnectionEvent waitEvent(u32 timeout_ms);
766         void putCommand(ConnectionCommand &c);
767
768         void SetTimeoutMs(u32 timeout) { m_bc_receive_timeout = timeout; }
769         void Serve(Address bind_addr);
770         void Connect(Address address);
771         bool Connected();
772         void Disconnect();
773         void Receive(NetworkPacket* pkt);
774         bool TryReceive(NetworkPacket *pkt);
775         void Send(session_t peer_id, u8 channelnum, NetworkPacket *pkt, bool reliable);
776         session_t GetPeerID() const { return m_peer_id; }
777         Address GetPeerAddress(session_t peer_id);
778         float getPeerStat(session_t peer_id, rtt_stat_type type);
779         float getLocalStat(rate_stat_type type);
780         const u32 GetProtocolID() const { return m_protocol_id; };
781         const std::string getDesc();
782         void DisconnectPeer(session_t peer_id);
783
784 protected:
785         PeerHelper getPeerNoEx(session_t peer_id);
786         u16   lookupPeer(Address& sender);
787
788         u16 createPeer(Address& sender, MTProtocols protocol, int fd);
789         UDPPeer*  createServerPeer(Address& sender);
790         bool deletePeer(session_t peer_id, bool timeout);
791
792         void SetPeerID(session_t id) { m_peer_id = id; }
793
794         void sendAck(session_t peer_id, u8 channelnum, u16 seqnum);
795
796         void PrintInfo(std::ostream &out);
797
798         std::list<session_t> getPeerIDs()
799         {
800                 MutexAutoLock peerlock(m_peers_mutex);
801                 return m_peer_ids;
802         }
803
804         UDPSocket m_udpSocket;
805         MutexedQueue<ConnectionCommand> m_command_queue;
806
807         bool Receive(NetworkPacket *pkt, u32 timeout);
808
809         void putEvent(ConnectionEvent &e);
810
811         void TriggerSend();
812 private:
813         MutexedQueue<ConnectionEvent> m_event_queue;
814
815         session_t m_peer_id = 0;
816         u32 m_protocol_id;
817
818         std::map<session_t, Peer *> m_peers;
819         std::list<session_t> m_peer_ids;
820         std::mutex m_peers_mutex;
821
822         std::unique_ptr<ConnectionSendThread> m_sendThread;
823         std::unique_ptr<ConnectionReceiveThread> m_receiveThread;
824
825         std::mutex m_info_mutex;
826
827         // Backwards compatibility
828         PeerHandler *m_bc_peerhandler;
829         u32 m_bc_receive_timeout = 0;
830
831         bool m_shutting_down = false;
832
833         session_t m_next_remote_peer_id = 2;
834 };
835
836 } // namespace