Copyright (C) 2010 celeron55, Perttu Ahola <celeron55@gmail.com>
This program is free software; you can redistribute it and/or modify
-it under the terms of the GNU General Public License as published by
-the Free Software Foundation; either version 2 of the License, or
+it under the terms of the GNU Lesser General Public License as published by
+the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
+GNU Lesser General Public License for more details.
-You should have received a copy of the GNU General Public License along
+You should have received a copy of the GNU Lesser General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "serialization.h"
#include "log.h"
#include "porting.h"
+#include "util/serialize.h"
+#include "util/numeric.h"
+#include "util/string.h"
+#include "settings.h"
namespace con
{
+static u16 readPeerId(u8 *packetdata)
+{
+ return readU16(&packetdata[4]);
+}
+static u8 readChannel(u8 *packetdata)
+{
+ return readU8(&packetdata[6]);
+}
+
BufferedPacket makePacket(Address &address, u8 *data, u32 datasize,
u32 protocol_id, u16 sender_peer_id, u8 channel)
{
<<" != sp->reliable="<<sp->reliable
<<std::endl;
- // If chunk already exists, cancel
+ // If chunk already exists, ignore it.
+ // Sometimes two identical packets may arrive when there is network
+ // lag and the server re-sends stuff.
if(sp->chunks.find(chunk_num) != NULL)
- throw AlreadyExistsException("Chunk already in buffer");
+ return SharedBuffer<u8>();
// Cut chunk data out of packet
u32 chunkdatasize = p.data.getSize() - headersize;
m_sendtime_accu(0),
m_max_packets_per_second(10),
m_num_sent(0),
- m_max_num_sent(0)
+ m_max_num_sent(0),
+ congestion_control_aim_rtt(0.2),
+ congestion_control_max_rate(400),
+ congestion_control_min_rate(10)
{
}
Peer::~Peer()
{
if(rtt >= 0.0){
if(rtt < 0.01){
- if(m_max_packets_per_second < 100)
+ if(m_max_packets_per_second < congestion_control_max_rate)
m_max_packets_per_second += 10;
- } else if(rtt < 0.2){
- if(m_max_packets_per_second < 100)
+ } else if(rtt < congestion_control_aim_rtt){
+ if(m_max_packets_per_second < congestion_control_max_rate)
m_max_packets_per_second += 2;
} else {
- if(m_max_packets_per_second > 5)
- m_max_packets_per_second *= 0.5;
+ m_max_packets_per_second *= 0.8;
+ if(m_max_packets_per_second < congestion_control_min_rate)
+ m_max_packets_per_second = congestion_control_min_rate;
}
}
Connection::~Connection()
{
stop();
+ // Delete peers
+ for(core::map<u16, Peer*>::Iterator
+ j = m_peers.getIterator();
+ j.atEnd() == false; j++)
+ {
+ Peer *peer = j.getNode()->getValue();
+ delete peer;
+ }
}
/* Internal stuff */
// Receive packets from the network and buffers and create ConnectionEvents
void Connection::receive()
{
- u32 datasize = 100000;
+ u32 datasize = m_max_packet_size * 2; // Double it just to be safe
// TODO: We can not know how many layers of header there are.
// For now, just assume there are no other than the base headers.
u32 packet_maxsize = datasize + BASE_HEADER_SIZE;
- Buffer<u8> packetdata(packet_maxsize);
+ SharedBuffer<u8> packetdata(packet_maxsize);
bool single_wait_done = false;
dout_con<<"ProcessPacket returned data of size "
<<resultdata.getSize()<<std::endl;
- if(datasize < resultdata.getSize())
- throw InvalidIncomingDataException
- ("Buffer too small for received data");
-
ConnectionEvent e;
e.dataReceived(peer_id, resultdata);
putEvent(e);
void Connection::runTimeouts(float dtime)
{
+ float congestion_control_aim_rtt
+ = g_settings->getFloat("congestion_control_aim_rtt");
+ float congestion_control_max_rate
+ = g_settings->getFloat("congestion_control_max_rate");
+ float congestion_control_min_rate
+ = g_settings->getFloat("congestion_control_min_rate");
+
core::list<u16> timeouted_peers;
core::map<u16, Peer*>::Iterator j;
j = m_peers.getIterator();
for(; j.atEnd() == false; j++)
{
Peer *peer = j.getNode()->getValue();
+
+ // Update congestion control values
+ peer->congestion_control_aim_rtt = congestion_control_aim_rtt;
+ peer->congestion_control_max_rate = congestion_control_max_rate;
+ peer->congestion_control_min_rate = congestion_control_min_rate;
/*
Check peer timeout
void Connection::serve(u16 port)
{
dout_con<<getDesc()<<" serving at port "<<port<<std::endl;
- m_socket.Bind(port);
- m_peer_id = PEER_ID_SERVER;
+ try{
+ m_socket.Bind(port);
+ m_peer_id = PEER_ID_SERVER;
+ }
+ catch(SocketException &e){
+ // Create event
+ ConnectionEvent ce;
+ ce.bindFailed();
+ putEvent(ce);
+ }
}
void Connection::connect(Address address)
{
try{
return m_event_queue.pop_front(timeout_ms);
- } catch(ItemNotFoundException &e){
+ } catch(ItemNotFoundException &ex){
ConnectionEvent e;
e.type = CONNEVENT_NONE;
return e;
putCommand(c);
}
-u32 Connection::Receive(u16 &peer_id, u8 *data, u32 datasize)
+u32 Connection::Receive(u16 &peer_id, SharedBuffer<u8> &data)
{
for(;;){
ConnectionEvent e = waitEvent(m_bc_receive_timeout);
throw NoIncomingDataException("No incoming data");
case CONNEVENT_DATA_RECEIVED:
peer_id = e.peer_id;
- memcpy(data, *e.data, e.data.getSize());
+ data = SharedBuffer<u8>(e.data);
return e.data.getSize();
case CONNEVENT_PEER_ADDED: {
Peer tmp(e.peer_id, e.address);
if(m_bc_peerhandler)
m_bc_peerhandler->deletingPeer(&tmp, e.timeout);
continue; }
+ case CONNEVENT_BIND_FAILED:
+ throw ConnectionBindFailed("Failed to bind socket "
+ "(port already in use?)");
}
}
throw NoIncomingDataException("No incoming data");