Maxfdata= 8192,
Maxhost= 64, /* maximum host name size */
Maxservice= 64, /* maximum service name size */
+ Maxactive= 200, /* maximum number of active slave procs */
Qdir= 0,
Qcs= 1,
struct Mfile
{
- int busy;
+ int busy; /* fid in use */
+ int ref; /* cleanup when drops to zero */
char *user;
Qid qid;
Fcall request;
Fcall reply;
};
-Lock joblock;
+QLock joblock;
Job *joblist;
Mlist *mlist;
int ipv6lookups = 1;
jmp_buf masterjmp; /* return through here after a slave process has been created */
int *isslave; /* *isslave non-zero means this is a slave process */
+long active; /* number of active slaves */
char *dbfile;
Ndb *db, *netdb;
+char *csuser;
void rversion(Job*);
void rflush(Job*);
void setext(char*, int, char*);
void cleanmf(Mfile*);
-extern void paralloc(void);
-
-Lock dblock; /* mutex on database operations */
-Lock netlock; /* mutex for netinit() */
+QLock dblock; /* mutex on database operations */
+QLock netlock; /* mutex for netinit() */
char *logfile = "cs";
char *paranoiafile = "cs.paranoia";
Network *next;
};
-enum
-{
- Ntcp = 0,
+enum {
+ Ntcp = 1,
};
/*
* net doesn't apply to (r)udp, icmp(v6), or telco (for speed).
*/
Network network[] = {
-[Ntcp] { "tcp", iplookup, iptrans, 0 },
- { "udp", iplookup, iptrans, 1 },
- { "icmp", iplookup, iptrans, 1 },
- { "icmpv6", iplookup, iptrans, 1 },
- { "rudp", iplookup, iptrans, 1 },
- { "telco", telcolookup, telcotrans, 1 },
+ { "il", iplookup, iptrans, 0, 1, },
+ { "tcp", iplookup, iptrans, 0, 0, },
+ { "il", iplookup, iptrans, 0, 0, },
+ { "udp", iplookup, iptrans, 1, 0, },
+ { "icmp", iplookup, iptrans, 1, 0, },
+ { "icmpv6", iplookup, iptrans, 1, 0, },
+ { "rudp", iplookup, iptrans, 1, 0, },
+ { "ssh", iplookup, iptrans, 1, 0, },
+ { "telco", telcolookup, telcotrans, 1, 0, },
{ 0 },
};
-Lock ipifclock;
+QLock ipifclock;
Ipifc *ipifcs;
char eaddr[16]; /* ascii ethernet address */
netinit(0);
if(!justsetname){
+ csuser = estrdup(getuser());
mountinit(servefile, mntpt);
io();
}
for(f = mlist; f; f = f->next)
if(f->mf.busy && f->mf.fid == fid)
return &f->mf;
- else if(!ff && !f->mf.busy)
+ else if(!ff && !f->mf.busy && !f->mf.ref)
ff = f;
if(ff == 0){
ff = emalloc(sizeof *f);
{
Job *job;
- job = mallocz(sizeof(Job), 1);
- lock(&joblock);
+ job = emalloc(sizeof *job);
+ qlock(&joblock);
job->next = joblist;
joblist = job;
job->request.tag = -1;
- unlock(&joblock);
+ qunlock(&joblock);
return job;
}
{
Job **l;
- lock(&joblock);
+ qlock(&joblock);
for(l = &joblist; *l; l = &(*l)->next){
if((*l) == job){
*l = job->next;
- free(job);
break;
}
}
- unlock(&joblock);
+ qunlock(&joblock);
+ free(job);
}
void
{
Job *job;
- lock(&joblock);
+ qlock(&joblock);
for(job = joblist; job; job = job->next){
if(job->request.tag == tag && job->request.type != Tflush){
job->flushed = 1;
break;
}
}
- unlock(&joblock);
+ qunlock(&joblock);
}
void
for(;;){
n = read9pmsg(mfd[0], mdata, sizeof mdata);
- if(n<=0)
+ if(n < 0)
error("mount read");
+ if(n == 0)
+ continue;
job = newjob();
if(convM2S(mdata, n, &job->request) != n){
syslog(1, logfile, "format error %ux %ux %ux %ux %ux",
freejob(job);
continue;
}
- lock(&dblock);
+ qlock(&dblock);
mf = newfid(job->request.fid);
if(debug)
syslog(0, logfile, "%F", &job->request);
-
switch(job->request.type){
default:
syslog(1, logfile, "unknown request type %d", job->request.type);
rwstat(job, mf);
break;
}
- unlock(&dblock);
+ qunlock(&dblock);
freejob(job);
if(*isslave){
if(debug)
syslog(0, logfile, "slave death %d", getpid());
+ adec(&active);
_exits(0);
}
}
err = 0;
off = job->request.offset;
cnt = job->request.count;
+ mf->ref++;
+
if(mf->qid.type & QTDIR){
clock = time(0);
if(off == 0){
n = convD2M(&dir, buf, sizeof buf);
}
job->reply.data = (char*)buf;
- } else {
- for(;;){
- /* look for an answer at the right offset */
- toff = 0;
- for(i = 0; mf->reply[i] && i < mf->nreply; i++){
- n = mf->replylen[i];
- if(off < toff + n)
- break;
- toff += n;
- }
- if(i < mf->nreply)
- break; /* got something to return */
-
- /* try looking up more answers */
- if(lookup(mf) == 0){
- /* no more */
- n = 0;
- goto send;
- }
+ goto send;
+ }
+
+ for(;;){
+ /* look for an answer at the right offset */
+ toff = 0;
+ for(i = 0; mf->reply[i] && i < mf->nreply; i++){
+ n = mf->replylen[i];
+ if(off < toff + n)
+ break;
+ toff += n;
}
+ if(i < mf->nreply)
+ break; /* got something to return */
- /* give back a single reply (or part of one) */
- job->reply.data = mf->reply[i] + (off - toff);
- if(cnt > toff - off + n)
- n = toff - off + n;
- else
- n = cnt;
+ /* try looking up more answers */
+ if(lookup(mf) == 0 || job->flushed){
+ /* no more */
+ n = 0;
+ goto send;
+ }
}
+
+ /* give back a single reply (or part of one) */
+ job->reply.data = mf->reply[i] + (off - toff);
+ if(cnt > toff - off + n)
+ n = toff - off + n;
+ else
+ n = cnt;
+
send:
job->reply.count = n;
sendmsg(job, err);
+
+ if(--mf->ref == 0 && mf->busy == 0)
+ cleanmf(mf);
}
+
void
cleanmf(Mfile *mf)
{
}
job->request.data[cnt] = 0;
+ if(strcmp(mf->user, "none") == 0 || strcmp(mf->user, csuser) != 0)
+ goto query; /* skip special commands if not owner */
+
/*
* toggle debugging
*/
goto send;
}
+query:
+ if(mf->ref){
+ err = "query already in progress";
+ goto send;
+ }
+ mf->ref++;
+
/* start transaction with a clean slate */
cleanmf(mf);
*/
if(*job->request.data == '!'){
err = genquery(mf, job->request.data+1);
- goto send;
+ goto done;
}
if(debug)
syslog(0, logfile, "write %s", job->request.data);
if(paranoia)
syslog(0, paranoiafile, "write %s by %s", job->request.data, mf->user);
-
/*
* break up name
*/
n = getfields(job->request.data, field, 4, 1, "!");
switch(n){
case 1:
- mf->net = strdup("net");
- mf->host = strdup(field[0]);
+ mf->net = estrdup("net");
+ mf->host = estrdup(field[0]);
break;
case 4:
- mf->rem = strdup(field[3]);
+ mf->rem = estrdup(field[3]);
/* fall through */
case 3:
- mf->serv = strdup(field[2]);
+ mf->serv = estrdup(field[2]);
/* fall through */
case 2:
- mf->host = strdup(field[1]);
- mf->net = strdup(field[0]);
+ mf->host = estrdup(field[1]);
+ mf->net = estrdup(field[0]);
break;
}
rerrstr(curerr, sizeof curerr);
err = curerr;
}
+
+done:
+ if(--mf->ref == 0 && mf->busy == 0)
+ cleanmf(mf);
+
send:
job->reply.count = cnt;
sendmsg(job, err);
void
rclunk(Job *job, Mfile *mf)
{
- cleanmf(mf);
+ if(mf->ref == 0)
+ cleanmf(mf);
free(mf->user);
mf->user = 0;
- mf->busy = 0;
mf->fid = 0;
+ mf->busy = 0;
sendmsg(job, 0);
}
syslog(1, logfile, "sendmsg convS2M of %F returns 0", &job->reply);
abort();
}
- lock(&joblock);
+ qlock(&joblock);
if(job->flushed == 0)
if(write(mfd[1], mdata, n)!=n)
error("mount write");
- unlock(&joblock);
+ qunlock(&joblock);
if(debug)
syslog(0, logfile, "%F %d", &job->reply, n);
}
if(p && *p){
attr = ipattr(p);
if(strcmp(attr, "ip") != 0)
- mysysname = strdup(p);
+ mysysname = estrdup(p);
}
/*
ndbreopen(netdb);
for(tt = t = ndbparse(netdb); t != nil; t = t->entry){
if(strcmp(t->attr, "sys") == 0){
- mysysname = strdup(t->val);
+ mysysname = estrdup(t->val);
break;
}
}
}
for(tt = t; tt != nil; tt = tt->entry){
if(strcmp(tt->attr, "sys") == 0){
- mysysname = strdup(tt->val);
+ mysysname = estrdup(tt->val);
break;
}
}
/* nothing else worked, use the ip address */
if(mysysname == 0 && isvalidip(ipa))
- mysysname = strdup(ipaddr);
+ mysysname = estrdup(ipaddr);
/* set /dev/sysname if we now know it */
default:
return;
}
- lock(&netlock);
+ qlock(&netlock);
}
/* add the mounted networks to the default list */
mysysname?mysysname:"???", eaddr, ipaddr, ipa);
if(background){
- unlock(&netlock);
+ qunlock(&netlock);
_exits(0);
}
}
else
snprint(reply, sizeof(reply), "%s/%s/clone %s",
mntpt, mf->net, mf->host);
- mf->reply[0] = strdup(reply);
+ mf->reply[0] = estrdup(reply);
mf->replylen[0] = strlen(reply);
mf->nreply = 1;
return 1;
/* '*' means any service */
if(strcmp(name, "*")==0){
- strcpy(buf, name);
+ nstrcpy(buf, name, blen);
return buf;
}
if(atoi(name) < 1024 && strcmp(np->net, "tcp") == 0)
p = ndbgetvalue(db, &s, "port", name, "port", &t);
if(p == nil)
- p = strdup(name);
+ p = estrdup(name);
}
if(t){
if(strcmp("::", host) == 0)
return ndbnew("ip", "*");
+
/*
* '$' means the rest of the name is an attribute that we
* need to search for
/*
* turn '[ip address]' into just 'ip address'
*/
- if(*host == '[' && host[strlen(host)-1] == ']'){
- host++;
- host[strlen(host)-1] = 0;
+ if(*host == '['){
+ char *x;
+
+ if(host != dollar){
+ nstrcpy(dollar, host, sizeof dollar);
+ host = dollar;
+ }
+ if(x = strchr(++host, ']'))
+ *x = 0;
}
/*
/*
* reorder according to our interfaces
*/
- lock(&ipifclock);
+ qlock(&ipifclock);
for(ifc = ipifcs; ifc != nil; ifc = ifc->next){
for(lifc = ifc->lifc; lifc != nil; lifc = lifc->next){
maskip(lifc->ip, lifc->mask, net);
maskip(ip, lifc->mask, tnet);
if(memcmp(net, tnet, IPaddrlen) == 0){
t = reorder(t, nt);
- unlock(&ipifclock);
+ qunlock(&ipifclock);
return t;
}
}
}
}
- unlock(&ipifclock);
+ qunlock(&ipifclock);
return t;
}
snprint(reply, sizeof(reply), "%s/%s/clone %s!%s%s%s",
mntpt, np->net, t->val, ts, x, hack? "!fasttimeout": "");
- return strdup(reply);
+ return estrdup(reply);
}
/*
else
snprint(reply, sizeof(reply), "%s/%s/clone %s%s", mntpt, np->net,
t->val, x);
- return strdup(reply);
+ return estrdup(reply);
}
/*
{
if(*isslave)
return; /* we're already a slave process */
-
+ if(ainc(&active) >= Maxactive){
+ adec(&active);
+ return;
+ }
switch(rfork(RFPROC|RFNOTEG|RFMEM|RFNOWAIT)){
case -1:
+ adec(&active);
break;
case 0:
+ *isslave = 1;
if(debug)
syslog(0, logfile, "slave %d", getpid());
procsetname("%s", host);
- *isslave = 1;
break;
default:
longjmp(masterjmp, 1);
/* convert ipv6 attr to ip */
for (tt = t6; tt != nil; tt = tt->entry)
if (strcmp(tt->attr, "ipv6") == 0)
- strncpy(tt->attr, "ip", sizeof tt->attr - 1);
+ strcpy(tt->attr, "ip");
if (t == nil)
return t6;
char buf[Maxreply];
Ndbtuple *t;
- unlock(&dblock);
-
- /* save the name before starting a slave */
- snprint(buf, sizeof(buf), "%s", host);
-
+ qunlock(&dblock);
slave(host);
+ if(*isslave == 0){
+ qlock(&dblock);
+ werrstr("too mutch activity");
+ return nil;
+ }
- if(strcmp(ipattr(buf), "ip") == 0)
- t = dnsquery(mntpt, buf, "ptr");
+ if(strcmp(ipattr(host), "ip") == 0)
+ t = dnsquery(mntpt, host, "ptr");
else {
- t = dnsquery(mntpt, buf, "ip");
+ t = dnsquery(mntpt, host, "ip");
/* special case: query ipv6 (AAAA dns RR) too */
if (ipv6lookups)
- t = dnsip6lookup(mntpt, buf, t);
+ t = dnsip6lookup(mntpt, host, t);
}
s->t = t;
werrstr("temporary problem: %s", buf);
}
- lock(&dblock);
+ qlock(&dblock);
return t;
}
if(nt->line != nt->entry){
mf->replylen[mf->nreply] = s_len(s);
- mf->reply[mf->nreply++] = strdup(s_to_c(s));
+ mf->reply[mf->nreply++] = estrdup(s_to_c(s));
s_restart(s);
} else
s_append(s, " ");
ndbfree(nt);
continue;
}
- strcpy(nt->attr, attr);
+ nstrcpy(nt->attr, attr, sizeof(nt->attr));
l = &nt->entry;
}
return t;
x = malloc(size);
if(x == nil)
- abort();
+ error("out of memory");
memset(x, 0, size);
return x;
}
int size;
char *p;
- size = strlen(s)+1;
- p = malloc(size);
+ size = strlen(s);
+ p = malloc(size+1);
if(p == nil)
- abort();
+ error("out of memory");
memmove(p, s, size);
+ p[size] = 0;
return p;
}