Protection against multi-threaded usage.

- Added Mutex for open / close of sockets.
 - Removed cross socket ticking.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@7768 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2014-12-17 09:04:27 +00:00
parent 93dd5d8322
commit e2dbbb7740

View File

@ -55,6 +55,10 @@ struct TcpOnUdp_t
typedef struct TcpOnUdp_t TcpOnUdp; typedef struct TcpOnUdp_t TcpOnUdp;
static RsMutex touMutex("touMutex");
// Mutex is used to control addition / removals from tou_streams.
// Lookup should be okay - as long as you stick to your allocated ID!
static std::vector<TcpOnUdp *> tou_streams; static std::vector<TcpOnUdp *> tou_streams;
static int tou_inited = 0; static int tou_inited = 0;
@ -67,8 +71,6 @@ static UdpSubReceiver *udpSR[MAX_TOU_RECEIVERS] = {NULL};
static uint32_t udpType[MAX_TOU_RECEIVERS] = { 0 }; static uint32_t udpType[MAX_TOU_RECEIVERS] = { 0 };
static uint32_t noUdpSR = 0; static uint32_t noUdpSR = 0;
static int tou_tick_all();
/* tou_init /* tou_init
* *
* Modified to accept a number of UdpSubRecievers! * Modified to accept a number of UdpSubRecievers!
@ -82,6 +84,8 @@ static int tou_tick_all();
/* tou_init - opens the udp port (universal bind) */ /* tou_init - opens the udp port (universal bind) */
int tou_init(void **in_udpsubrecvs, int *type, int number) int tou_init(void **in_udpsubrecvs, int *type, int number)
{ {
RsStackMutex stack(touMutex); /***** LOCKED ******/
UdpSubReceiver **usrArray = (UdpSubReceiver **) in_udpsubrecvs; UdpSubReceiver **usrArray = (UdpSubReceiver **) in_udpsubrecvs;
if (number > MAX_TOU_RECEIVERS) if (number > MAX_TOU_RECEIVERS)
{ {
@ -113,6 +117,8 @@ int tou_init(void **in_udpsubrecvs, int *type, int number)
/* open - allocates a sockfd, and checks that the type is okay */ /* open - allocates a sockfd, and checks that the type is okay */
int tou_socket(uint32_t recvIdx, uint32_t type, int /*protocol*/) int tou_socket(uint32_t recvIdx, uint32_t type, int /*protocol*/)
{ {
RsStackMutex stack(touMutex); /***** LOCKED ******/
if (!tou_inited) if (!tou_inited)
{ {
return -1; return -1;
@ -251,7 +257,6 @@ int tou_connect(int sockfd, const struct sockaddr *serv_addr,
tous->tcp->connect(*(const struct sockaddr_in *) serv_addr, conn_period); tous->tcp->connect(*(const struct sockaddr_in *) serv_addr, conn_period);
tous->tcp->tick(); tous->tcp->tick();
tou_tick_all();
if (tous->tcp->isConnected()) if (tous->tcp->isConnected())
{ {
return 0; return 0;
@ -323,14 +328,12 @@ int tou_listenfor(int sockfd, const struct sockaddr *serv_addr,
tous->tcp->listenfor(*((struct sockaddr_in *) serv_addr)); tous->tcp->listenfor(*((struct sockaddr_in *) serv_addr));
tous->tcp->tick(); tous->tcp->tick();
tou_tick_all();
return 0; return 0;
} }
int tou_listen(int /* sockfd */ , int /* backlog */ ) int tou_listen(int /* sockfd */ , int /* backlog */ )
{ {
tou_tick_all();
return 1; return 1;
} }
@ -402,7 +405,6 @@ int tou_connect_via_relay(int sockfd,
*/ */
tous->tcp->connect(*dest_addr, DEFAULT_RELAY_CONN_PERIOD); tous->tcp->connect(*dest_addr, DEFAULT_RELAY_CONN_PERIOD);
tous->tcp->tick(); tous->tcp->tick();
tou_tick_all();
if (tous->tcp->isConnected()) if (tous->tcp->isConnected())
{ {
return 0; return 0;
@ -442,7 +444,6 @@ int tou_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
//tous->tcp->connect(); //tous->tcp->connect();
tous->tcp->tick(); tous->tcp->tick();
tou_tick_all();
if (tous->tcp->isConnected()) if (tous->tcp->isConnected())
{ {
// should get remote address // should get remote address
@ -464,7 +465,6 @@ int tou_connected(int sockfd)
TcpOnUdp *tous = tou_streams[sockfd]; TcpOnUdp *tous = tou_streams[sockfd];
tous->tcp->tick(); tous->tcp->tick();
tou_tick_all();
return (tous->tcp->TcpState() == 4); return (tous->tcp->TcpState() == 4);
} }
@ -482,7 +482,6 @@ ssize_t tou_read(int sockfd, void *buf, size_t count)
TcpOnUdp *tous = tou_streams[sockfd]; TcpOnUdp *tous = tou_streams[sockfd];
tous->tcp->tick(); tous->tcp->tick();
tou_tick_all();
int err = tous->tcp->read((char *) buf, count); int err = tous->tcp->read((char *) buf, count);
if (err < 0) if (err < 0)
@ -507,11 +506,9 @@ ssize_t tou_write(int sockfd, const void *buf, size_t count)
{ {
tous->lasterrno = tous->tcp->TcpErrorState(); tous->lasterrno = tous->tcp->TcpErrorState();
tous->tcp->tick(); tous->tcp->tick();
tou_tick_all();
return -1; return -1;
} }
tous->tcp->tick(); tous->tcp->tick();
tou_tick_all();
return err; return err;
} }
@ -524,7 +521,6 @@ int tou_maxread(int sockfd)
} }
TcpOnUdp *tous = tou_streams[sockfd]; TcpOnUdp *tous = tou_streams[sockfd];
tous->tcp->tick(); tous->tcp->tick();
tou_tick_all();
int ret = tous->tcp->read_pending(); int ret = tous->tcp->read_pending();
if (ret < 0) if (ret < 0)
@ -543,7 +539,6 @@ int tou_maxwrite(int sockfd)
} }
TcpOnUdp *tous = tou_streams[sockfd]; TcpOnUdp *tous = tou_streams[sockfd];
tous->tcp->tick(); tous->tcp->tick();
tou_tick_all();
int ret = tous->tcp->write_allowed(); int ret = tous->tcp->write_allowed();
if (ret < 0) if (ret < 0)
@ -558,13 +553,16 @@ int tou_maxwrite(int sockfd)
/* close down the tcp over udp connection */ /* close down the tcp over udp connection */
int tou_close(int sockfd) int tou_close(int sockfd)
{ {
TcpOnUdp *tous = NULL;
{
RsStackMutex stack(touMutex); /***** LOCKED ******/
if (tou_streams[sockfd] == NULL) if (tou_streams[sockfd] == NULL)
{ {
return -1; return -1;
} }
TcpOnUdp *tous = tou_streams[sockfd]; tous = tou_streams[sockfd];
tou_streams[sockfd] = NULL;
tou_tick_all(); }
if (tous->tcp) if (tous->tcp)
{ {
@ -620,7 +618,6 @@ int tou_close(int sockfd)
} }
delete tous; delete tous;
tou_streams[sockfd] = NULL;
return 1; return 1;
} }
@ -646,65 +643,3 @@ int tou_clear_error(int sockfd)
return 0; return 0;
} }
/* unfortuately the library needs to be ticked. (not running a thread)
* you can put it in a thread!
*/
/*
* Some helper functions for stuff.
*
*/
static int tou_passall();
static int tou_active_rw();
static int nextActiveCycle;
static int nextIdleCheck;
static const int kActiveCycleStep = 1;
static const int kIdleCheckStep = 5;
static int tou_tick_all()
{
tou_passall();
return 1;
/* check timer */
int ts = time(NULL);
if (ts > nextActiveCycle)
{
tou_active_rw();
nextActiveCycle += kActiveCycleStep;
}
if (ts > nextIdleCheck)
{
tou_passall();
nextIdleCheck += kIdleCheckStep;
}
return 0;
}
static int tou_passall()
{
/* iterate through all and clean up old sockets.
* check if idle are still idle.
*/
std::vector<TcpOnUdp *>::iterator it;
for(it = tou_streams.begin(); it != tou_streams.end(); ++it)
{
if ((*it) && ((*it)->tcp))
{
(*it)->tcp->tick();
}
}
return 1;
}
static int tou_active_rw()
{
/* iterate through actives and tick
*/
return 1;
}