]> git.lizzy.rs Git - plan9front.git/blob - sys/src/cmd/upas/q/runq.c
upas/fs: fix more locking bugs, remove debugging clutter, remove planb mbox code
[plan9front.git] / sys / src / cmd / upas / q / runq.c
1 #include "common.h"
2 #include <ctype.h>
3
4 void    doalldirs(void);
5 void    dodir(char*);
6 void    dofile(Dir*);
7 void    rundir(char*);
8 char*   file(char*, char);
9 void    warning(char*, void*);
10 void    error(char*, void*);
11 int     returnmail(char**, char*, char*);
12 void    logit(char*, char*, char**);
13 void    doload(int);
14
15 #define HUNK 32
16 char    *cmd;
17 char    *root;
18 int     debug;
19 int     giveup = 2*24*60*60;
20 int     load;
21 int     limit;
22
23 /* the current directory */
24 Dir     *dirbuf;
25 long    ndirbuf = 0;
26 int     nfiles;
27 char    *curdir;
28
29 char *runqlog = "runq";
30
31 int     *pidlist;
32 char    **badsys;               /* array of recalcitrant systems */
33 int     nbad;
34 int     npid = 50;
35 int     sflag;                  /* single thread per directory */
36 int     aflag;                  /* all directories */
37 int     Eflag;                  /* ignore E.xxxxxx dates */
38 int     Rflag;                  /* no giving up, ever */
39
40 void
41 usage(void)
42 {
43         fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
44         exits("");
45 }
46
47 void
48 main(int argc, char **argv)
49 {
50         char *qdir, *x;
51
52         qdir = 0;
53
54         ARGBEGIN{
55         case 'l':
56                 x = ARGF();
57                 if(x == 0)
58                         usage();
59                 load = atoi(x);
60                 if(load < 0)
61                         load = 0;
62                 break;
63         case 'E':
64                 Eflag++;
65                 break;
66         case 'R':       /* no giving up -- just leave stuff in the queue */
67                 Rflag++;
68                 break;
69         case 'a':
70                 aflag++;
71                 break;
72         case 'd':
73                 debug++;
74                 break;
75         case 'r':
76                 limit = atoi(ARGF());
77                 break;
78         case 's':
79                 sflag++;
80                 break;
81         case 't':
82                 giveup = 60*60*atoi(ARGF());
83                 break;
84         case 'q':
85                 qdir = ARGF();
86                 if(qdir == 0)
87                         usage();
88                 break;
89         case 'n':
90                 npid = atoi(ARGF());
91                 if(npid == 0)
92                         usage();
93                 break;
94         }ARGEND;
95
96         if(argc != 2)
97                 usage();
98
99         pidlist = malloc(npid*sizeof(*pidlist));
100         if(pidlist == 0)
101                 error("can't malloc", 0);
102
103         if(aflag == 0 && qdir == 0) {
104                 qdir = getuser();
105                 if(qdir == 0)
106                         error("unknown user", 0);
107         }
108         root = argv[0];
109         cmd = argv[1];
110
111         if(chdir(root) < 0)
112                 error("can't cd to %s", root);
113
114         doload(1);
115         if(aflag)
116                 doalldirs();
117         else
118                 dodir(qdir);
119         doload(0);
120         exits(0);
121 }
122
123 int
124 emptydir(char *name)
125 {
126         int fd;
127         long n;
128         char buf[2048];
129
130         fd = open(name, OREAD);
131         if(fd < 0)
132                 return 1;
133         n = read(fd, buf, sizeof(buf));
134         close(fd);
135         if(n <= 0) {
136                 if(debug)
137                         fprint(2, "removing directory %s\n", name);
138                 syslog(0, runqlog, "rmdir %s", name);
139                 remove(name);
140                 return 1;
141         }
142         return 0;
143 }
144
145 int
146 forkltd(void)
147 {
148         int i;
149         int pid;
150
151         for(i = 0; i < npid; i++){
152                 if(pidlist[i] <= 0)
153                         break;
154         }
155
156         while(i >= npid){
157                 pid = waitpid();
158                 if(pid < 0){
159                         syslog(0, runqlog, "forkltd confused");
160                         exits(0);
161                 }
162
163                 for(i = 0; i < npid; i++)
164                         if(pidlist[i] == pid)
165                                 break;
166         }
167         pidlist[i] = fork();
168         return pidlist[i];
169 }
170
171 /*
172  *  run all user directories, must be bootes (or root on unix) to do this
173  */
174 void
175 doalldirs(void)
176 {
177         Dir *db;
178         int fd;
179         long i, n;
180
181
182         fd = open(".", OREAD);
183         if(fd == -1){
184                 warning("reading %s", root);
185                 return;
186         }
187         n = dirreadall(fd, &db);
188         if(n > 0){
189                 for(i=0; i<n; i++){
190                         if(db[i].qid.type & QTDIR){
191                                 if(emptydir(db[i].name))
192                                         continue;
193                                 switch(forkltd()){
194                                 case -1:
195                                         syslog(0, runqlog, "out of procs");
196                                         doload(0);
197                                         exits(0);
198                                 case 0:
199                                         if(sysdetach() < 0)
200                                                 error("%r", 0);
201                                         dodir(db[i].name);
202                                         exits(0);
203                                 default:
204                                         break;
205                                 }
206                         }
207                 }
208                 free(db);
209         }
210         close(fd);
211 }
212
213 /*
214  *  cd to a user directory and run it
215  */
216 void
217 dodir(char *name)
218 {
219         curdir = name;
220
221         if(chdir(name) < 0){
222                 warning("cd to %s", name);
223                 return;
224         }
225         if(debug)
226                 fprint(2, "running %s\n", name);
227         rundir(name);
228         chdir("..");
229 }
230
231 /*
232  *  run the current directory
233  */
234 void
235 rundir(char *name)
236 {
237         int fd;
238         long i;
239
240         if(aflag && sflag)
241                 fd = sysopenlocked(".", OREAD);
242         else
243                 fd = open(".", OREAD);
244         if(fd == -1){
245                 warning("reading %s", name);
246                 return;
247         }
248         nfiles = dirreadall(fd, &dirbuf);
249         if(nfiles > 0){
250                 for(i=0; i<nfiles; i++){
251                         if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
252                                 continue;
253                         dofile(&dirbuf[i]);
254                 }
255                 free(dirbuf);
256         }
257         if(aflag && sflag)
258                 sysunlockfile(fd);
259         else
260                 close(fd);
261 }
262
263 /*
264  *  free files matching name in the current directory
265  */
266 void
267 remmatch(char *name)
268 {
269         long i;
270
271         syslog(0, runqlog, "removing %s/%s", curdir, name);
272
273         for(i=0; i<nfiles; i++){
274                 if(strcmp(&dirbuf[i].name[1], &name[1]) == 0)
275                         remove(dirbuf[i].name);
276         }
277
278         /* error file (may have) appeared after we read the directory */
279         /* stomp on data file in case of phase error */
280         remove(file(name, 'D'));
281         remove(file(name, 'E'));
282 }
283
284 /*
285  *  like trylock, but we've already got the lock on fd,
286  *  and don't want an L. lock file.
287  */
288 static Mlock *
289 keeplockalive(char *path, int fd)
290 {
291         char buf[1];
292         Mlock *l;
293
294         l = malloc(sizeof(Mlock));
295         if(l == 0)
296                 return 0;
297         l->fd = fd;
298         snprint(l->name, sizeof l->name, "%s", path);
299
300         /* fork process to keep lock alive until sysunlock(l) */
301         switch(l->pid = rfork(RFPROC)){
302         default:
303                 break;
304         case 0:
305                 fd = l->fd;
306                 for(;;){
307                         sleep(1000*60);
308                         if(pread(fd, buf, 1, 0) < 0)
309                                 break;
310                 }
311                 _exits(0);
312         }
313         return l;
314 }
315
316 /*
317  *  try a message
318  */
319 void
320 dofile(Dir *dp)
321 {
322         Dir *d;
323         int dfd, ac, dtime, efd, pid, i, etime;
324         char *buf, *cp, **av;
325         Waitmsg *wm;
326         Biobuf *b;
327         Mlock *l = nil;
328
329         if(debug)
330                 fprint(2, "dofile %s\n", dp->name);
331         /*
332          *  if no data file or empty control or data file, just clean up
333          *  the empty control file must be 15 minutes old, to minimize the
334          *  chance of a race.
335          */
336         d = dirstat(file(dp->name, 'D'));
337         if(d == nil){
338                 syslog(0, runqlog, "no data file for %s", dp->name);
339                 remmatch(dp->name);
340                 return;
341         }
342         if(dp->length == 0){
343                 if(time(0)-dp->mtime > 15*60){
344                         syslog(0, runqlog, "empty ctl file for %s", dp->name);
345                         remmatch(dp->name);
346                 }
347                 return;
348         }
349         dtime = d->mtime;
350         free(d);
351
352         /*
353          *  retry times depend on the age of the errors file
354          */
355         if(!Eflag && (d = dirstat(file(dp->name, 'E'))) != nil){
356                 etime = d->mtime;
357                 free(d);
358                 if(etime - dtime < 60*60){
359                         /* up to the first hour, try every 15 minutes */
360                         if(time(0) - etime < 15*60)
361                                 return;
362                 } else {
363                         /* after the first hour, try once an hour */
364                         if(time(0) - etime < 60*60)
365                                 return;
366                 }
367
368         }
369
370         /*
371          *  open control and data
372          */
373         b = sysopen(file(dp->name, 'C'), "rl", 0660);
374         if(b == 0) {
375                 if(debug)
376                         fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
377                 return;
378         }
379         dfd = open(file(dp->name, 'D'), OREAD);
380         if(dfd < 0){
381                 if(debug)
382                         fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
383                 Bterm(b);
384                 sysunlockfile(Bfildes(b));
385                 return;
386         }
387
388         /*
389          *  make arg list
390          *      - read args into (malloc'd) buffer
391          *      - malloc a vector and copy pointers to args into it
392          */
393         buf = malloc(dp->length+1);
394         if(buf == 0){
395                 warning("buffer allocation", 0);
396                 Bterm(b);
397                 sysunlockfile(Bfildes(b));
398                 close(dfd);
399                 return;
400         }
401         if(Bread(b, buf, dp->length) != dp->length){
402                 warning("reading control file %s\n", dp->name);
403                 Bterm(b);
404                 sysunlockfile(Bfildes(b));
405                 close(dfd);
406                 free(buf);
407                 return;
408         }
409         buf[dp->length] = 0;
410         av = malloc(2*sizeof(char*));
411         if(av == 0){
412                 warning("argv allocation", 0);
413                 close(dfd);
414                 free(buf);
415                 Bterm(b);
416                 sysunlockfile(Bfildes(b));
417                 return;
418         }
419         for(ac = 1, cp = buf; *cp; ac++){
420                 while(isspace(*cp))
421                         *cp++ = 0;
422                 if(*cp == 0)
423                         break;
424
425                 av = realloc(av, (ac+2)*sizeof(char*));
426                 if(av == 0){
427                         warning("argv allocation", 0);
428                         close(dfd);
429                         free(buf);
430                         Bterm(b);
431                         sysunlockfile(Bfildes(b));
432                         return;
433                 }
434                 av[ac] = cp;
435                 while(*cp && !isspace(*cp)){
436                         if(*cp++ == '"'){
437                                 while(*cp && *cp != '"')
438                                         cp++;
439                                 if(*cp)
440                                         cp++;
441                         }
442                 }
443         }
444         av[0] = cmd;
445         av[ac] = 0;
446
447         if(!Eflag &&time(0) - dtime > giveup){
448                 if(returnmail(av, dp->name, "Giveup") != 0)
449                         logit("returnmail failed", dp->name, av);
450                 remmatch(dp->name);
451                 goto done;
452         }
453
454         for(i = 0; i < nbad; i++){
455                 if(strcmp(av[3], badsys[i]) == 0)
456                         goto done;
457         }
458
459         /*
460          * Ken's fs, for example, gives us 5 minutes of inactivity before
461          * the lock goes stale, so we have to keep reading it.
462          */
463         l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
464
465         /*
466          *  transfer
467          */
468         pid = fork();
469         switch(pid){
470         case -1:
471                 sysunlock(l);
472                 sysunlockfile(Bfildes(b));
473                 syslog(0, runqlog, "out of procs");
474                 exits(0);
475         case 0:
476                 if(debug) {
477                         fprint(2, "Starting %s", cmd);
478                         for(ac = 0; av[ac]; ac++)
479                                 fprint(2, " %s", av[ac]);
480                         fprint(2, "\n");
481                 }
482                 logit("execing", dp->name, av);
483                 close(0);
484                 dup(dfd, 0);
485                 close(dfd);
486                 close(2);
487                 efd = open(file(dp->name, 'E'), OWRITE);
488                 if(efd < 0){
489                         if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
490                         efd = create(file(dp->name, 'E'), OWRITE, 0666);
491                         if(efd < 0){
492                                 if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
493                                 exits("could not open error file - Retry");
494                         }
495                 }
496                 seek(efd, 0, 2);
497                 exec(cmd, av);
498                 error("can't exec %s", cmd);
499                 break;
500         default:
501                 for(;;){
502                         wm = wait();
503                         if(wm == nil)
504                                 error("wait failed: %r", "");
505                         if(wm->pid == pid)
506                                 break;
507                         free(wm);
508                 }
509                 if(debug)
510                         fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
511
512                 if(wm->msg[0]){
513                         if(debug)
514                                 fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
515                         if(!Rflag && strstr(wm->msg, "Retry")==0){
516                                 /* return the message and remove it */
517                                 if(returnmail(av, dp->name, wm->msg) != 0)
518                                         logit("returnmail failed", dp->name, av);
519                                 remmatch(dp->name);
520                         } else {
521                                 /* add sys to bad list and try again later */
522                                 nbad++;
523                                 badsys = realloc(badsys, nbad*sizeof(char*));
524                                 badsys[nbad-1] = strdup(av[3]);
525                         }
526                 } else {
527                         /* it worked remove the message */
528                         remmatch(dp->name);
529                 }
530                 free(wm);
531
532         }
533 done:
534         if (l)
535                 sysunlock(l);
536         Bterm(b);
537         sysunlockfile(Bfildes(b));
538         free(buf);
539         free(av);
540         close(dfd);
541 }
542
543
544 /*
545  *  return a name starting with the given character
546  */
547 char*
548 file(char *name, char type)
549 {
550         static char nname[Elemlen+1];
551
552         strncpy(nname, name, Elemlen);
553         nname[Elemlen] = 0;
554         nname[0] = type;
555         return nname;
556 }
557
558 /*
559  *  send back the mail with an error message
560  *
561  *  return 0 if successful
562  */
563 int
564 returnmail(char **av, char *name, char *msg)
565 {
566         char buf[256], attachment[Pathlen], *sender;
567         int i, fd, pfd[2];
568         long n;
569         Waitmsg *wm;
570         String *s;
571
572         if(av[1] == 0 || av[2] == 0){
573                 logit("runq - dumping bad file", name, av);
574                 return 0;
575         }
576
577         s = unescapespecial(s_copy(av[2]));
578         sender = s_to_c(s);
579
580         if(!returnable(sender) || strcmp(sender, "postmaster") == 0) {
581                 logit("runq - dumping p to p mail", name, av);
582                 return 0;
583         }
584
585         if(pipe(pfd) < 0){
586                 logit("runq - pipe failed", name, av);
587                 return -1;
588         }
589
590         switch(rfork(RFFDG|RFPROC|RFENVG)){
591         case -1:
592                 logit("runq - fork failed", name, av);
593                 return -1;
594         case 0:
595                 logit("returning", name, av);
596                 close(pfd[1]);
597                 close(0);
598                 dup(pfd[0], 0);
599                 close(pfd[0]);
600                 putenv("upasname", "/dev/null");
601                 snprint(buf, sizeof(buf), "%s/marshal", UPASBIN);
602                 snprint(attachment, sizeof(attachment), "%s", file(name, 'D'));
603                 execl(buf, "send", "-A", attachment, "-s", "permanent failure", sender, nil);
604                 error("can't exec", 0);
605                 break;
606         default:
607                 break;
608         }
609
610         close(pfd[0]);
611         fprint(pfd[1], "\n");   /* get out of headers */
612         if(av[1]){
613                 fprint(pfd[1], "Your request ``%.20s ", av[1]);
614                 for(n = 3; av[n]; n++)
615                         fprint(pfd[1], "%s ", av[n]);
616         }
617         fprint(pfd[1], "'' failed (code %s).\nThe symptom was:\n\n", msg);
618         fd = open(file(name, 'E'), OREAD);
619         if(fd >= 0){
620                 for(;;){
621                         n = read(fd, buf, sizeof(buf));
622                         if(n <= 0)
623                                 break;
624                         if(write(pfd[1], buf, n) != n){
625                                 close(fd);
626                                 goto out;
627                         }
628                 }
629                 close(fd);
630         }
631         close(pfd[1]);
632 out:
633         wm = wait();
634         if(wm == nil){
635                 syslog(0, "runq", "wait: %r");
636                 logit("wait failed", name, av);
637                 return -1;
638         }
639         i = 0;
640         if(wm->msg[0]){
641                 i = -1;
642                 syslog(0, "runq", "returnmail child: %s", wm->msg);
643                 logit("returnmail child failed", name, av);
644         }
645         free(wm);
646         return i;
647 }
648
649 /*
650  *  print a warning and continue
651  */
652 void
653 warning(char *f, void *a)
654 {
655         char err[ERRMAX];
656         char buf[256];
657
658         rerrstr(err, sizeof(err));
659         snprint(buf, sizeof(buf), f, a);
660         fprint(2, "runq: %s: %s\n", buf, err);
661 }
662
663 /*
664  *  print an error and die
665  */
666 void
667 error(char *f, void *a)
668 {
669         char err[ERRMAX];
670         char buf[256];
671
672         rerrstr(err, sizeof(err));
673         snprint(buf, sizeof(buf), f, a);
674         fprint(2, "runq: %s: %s\n", buf, err);
675         exits(buf);
676 }
677
678 void
679 logit(char *msg, char *file, char **av)
680 {
681         int n, m;
682         char buf[256];
683
684         n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg);
685         for(; *av; av++){
686                 m = strlen(*av);
687                 if(n + m + 4 > sizeof(buf))
688                         break;
689                 sprint(buf + n, " '%s'", *av);
690                 n += m + 3;
691         }
692         syslog(0, runqlog, "%s", buf);
693 }
694
695 char *loadfile = ".runqload";
696
697 /*
698  *  load balancing
699  */
700 void
701 doload(int start)
702 {
703         int fd;
704         char buf[32];
705         int i, n;
706         Mlock *l;
707         Dir *d;
708
709         if(load <= 0)
710                 return;
711
712         if(chdir(root) < 0){
713                 load = 0;
714                 return;
715         }
716
717         l = syslock(loadfile);
718         fd = open(loadfile, ORDWR);
719         if(fd < 0){
720                 fd = create(loadfile, 0666, ORDWR);
721                 if(fd < 0){
722                         load = 0;
723                         sysunlock(l);
724                         return;
725                 }
726         }
727
728         /* get current load */
729         i = 0;
730         n = read(fd, buf, sizeof(buf)-1);
731         if(n >= 0){
732                 buf[n] = 0;
733                 i = atoi(buf);
734         }
735         if(i < 0)
736                 i = 0;
737
738         /* ignore load if file hasn't been changed in 30 minutes */
739         d = dirfstat(fd);
740         if(d != nil){
741                 if(d->mtime + 30*60 < time(0))
742                         i = 0;
743                 free(d);
744         }
745
746         /* if load already too high, give up */
747         if(start && i >= load){
748                 sysunlock(l);
749                 exits(0);
750         }
751
752         /* increment/decrement load */
753         if(start)
754                 i++;
755         else
756                 i--;
757         seek(fd, 0, 0);
758         fprint(fd, "%d\n", i);
759         sysunlock(l);
760         close(fd);
761 }