00001
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419 #include <jsoc_main.h>
00420 #include <sum_rpc.h>
00421 #include <cmdparams.h>
00422 #include <drms.h>
00423 #include <drms_names.h>
00424 #include <pvm3.h>
00425 #include <soi_key.h>
00426 #include <stdio.h>
00427 #include <stdlib.h>
00428 #include <ctype.h>
00429 #include <strings.h>
00430 #include <sys/types.h>
00431 #include <sys/time.h>
00432 #include <sys/stat.h>
00433 #include <dirent.h>
00434 #include <unistd.h>
00435 #include <printk.h>
00436 #include <setjmp.h>
00437
00438 #include "pe.h"
00439
00440
00441 #define MONE (long)-1
00442 #define PEMAILFILE "/tmp/pe.%s.%d.mail"
00443 #define PKTSZ 1788 //size of VCDU pkt
00444 #define MAXFILES 8192 //max # of file can handle in tlmdir
00445 #define NUMTIMERS 8 //number of seperate timers avail
00446
00447 #define IMAGE_NUM_COMMIT 2
00448 #define TESTAPPID 0x199 //appid of test pattern packet
00449 #define TESTVALUE 0xc0b //first value in test pattern packet
00450 #define MAXERRMSGCNT 10 //max # of err msg before skip the tlm file
00451 #define NOTSPECIFIED "***NOTSPECIFIED***"
00452 #define mailable_users_num 6
00453
00454 #define out_namespace "dsds"
00455
00456
00457 static char *mailable_users[] = {"jprod","daemon","tprod","production",
00458 "jeneen", "thailand"};
00459 #define mailees_num 1
00460 static char *mailees[] = {"prod3@sun"};
00461
00462
00463
00464
00465 ModuleArgs_t module_args[] = {
00466 {ARG_STRING, "map", NOTSPECIFIED, "map file with the pe directives"},
00467 {ARG_INT, "touch","-1","#of days to retain input datasets before deletable"},
00468 {ARG_FLAG, "A", "0", "retrieve from tape flag"},
00469 {ARG_FLAG, "v", "0", "verbose flag"},
00470 {ARG_FLAG, "h", "0", "help flag"},
00471 {ARG_END}
00472 };
00473
00474 CmdParams_t cmdparams;
00475 char **argv = NULL;
00476 int argc = 0;
00477 char *module_name = "jpe";
00478 jmp_buf env;
00479
00480 int resp_dsds(int dsdstid);
00481 int pemail(char *fmt, ...);
00482 double du_dir();
00483
00484 FILE *pemailfp;
00485 FILE *pelogfp;
00486 PSERVER stab[MAX_SERV+1];
00487 KEY *wd_dup_list;
00488 KEY *wd_mk_list;
00489 KEY *JPElist;
00490
00491 static char datestr[32];
00492 static struct timeval first[NUMTIMERS], second[NUMTIMERS];
00493 static DRMS_Record_t *rs;
00494 extern DRMS_Env_t *drms_env;
00495 extern KEY *call_drms_in(KEY *list, int dbflg);
00496
00497 unsigned int uid;
00498 unsigned int vcdu_24_cnt, vcdu_24_cnt_next;
00499 int wdkey_int;
00500 int verbose;
00501 int whk_status;
00502 int ampexflg;
00503 int nocontrolc;
00504 int archactive;
00505 int ccactive;
00506 int mailable;
00507 int mailtrue;
00508 int touchflg;
00509 double reqbytes;
00510 int rcpflg;
00511 int dbxflg = 0;
00512 int debugflg;
00513 int reqmegs;
00514 int nowarn;
00515 int dsdsin;
00516 int msg_id;
00517 int num_hosts;
00518 int effective_hosts;
00519 int total_tlm_vcdu;
00520 int total_missing_vcdu;
00521 int dsds_tid;
00522
00523 int pe_tid;
00524 int ampex_tid;
00525 int lago_tid;
00526 int tae_tid;
00527 int errmsgcnt, fileimgcnt;
00528 int imagecnt = 0;
00529 int restartflg = 0;
00530 int sigalrmflg = 0;
00531 int ignoresigalrmflg = 0;
00532 int firstfound = 0;
00533 int firstalloc = 1;
00534 int JPE_out_nsets;
00535 int ALRMSEC = 60;
00536 char argvc[32], argindir[96], arglogfile[96], argoutdir[96];
00537 char timetag[32];
00538 char pchan[8];
00539 char rchan[8];
00540 char stopfile[80];
00541 char tlmseriesname[128];
00542 char lev0seriesname[128];
00543 char tlmnamekey[128];
00544 char tlmnamekeyfirst[128];
00545 char oldtlmdsnam[128];
00546 char mailname[256];
00547 char pdshost[MAX_STR];
00548 char pe_map[256];
00549 char dsdswd[MAX_STR];
00550 char *username;
00551 char *rcpdir;
00552 char *t_first_arg;
00553 char *t_last_arg;
00554
00555 void BeginTimer(int n)
00556 {
00557 gettimeofday (&first[n], NULL);
00558 }
00559
00560 float EndTimer(int n)
00561 {
00562 gettimeofday (&second[n], NULL);
00563 if (first[n].tv_usec > second[n].tv_usec) {
00564 second[n].tv_usec += 1000000;
00565 second[n].tv_sec--;
00566 }
00567 return (float) (second[n].tv_sec-first[n].tv_sec) +
00568 (float) (second[n].tv_usec-first[n].tv_usec)/1000000.0;
00569 }
00570
00571
00572
00573 int pelog(char *fmt, ...)
00574 {
00575 va_list args;
00576 char string[32768];
00577
00578 va_start(args, fmt);
00579 vsprintf(string, fmt, args);
00580 if(pelogfp) {
00581 fprintf(pelogfp, string);
00582 fflush(pelogfp);
00583 }
00584 va_end(args);
00585 return(0);
00586 }
00587
00588
00589
00590 void open_pelog(char *filename)
00591 {
00592 if((pelogfp=fopen(filename, "w")) == NULL)
00593 fprintf(stderr, "Can't open the log file %s\n", filename);
00594 }
00595
00596 void kill_pvm()
00597 {
00598 int i;
00599 HDATA *hnext;
00600
00601 for(i=0; i<MAX_SERV; i++) {
00602 if(stab[i].name==NULL)
00603 break;
00604 for(hnext=(HDATA *)gethnext(stab[i].hosts); hnext != NULL;
00605 hnext=(HDATA *)gethnext((HDATA *)MONE)) {
00606 if(hnext->tid > 0) {
00607 pvm_kill(hnext->tid);
00608
00609 pelog("%x\tn/a\tkilled\t0\t%s\t<NONE>\t<NONE>\n", hnext->tid,stab[i].name);
00610
00611
00612
00613
00614
00615
00616
00617 }
00618 }
00619 }
00620 }
00621
00622
00623 void abortit(int stat)
00624 {
00625 printk("***Abort in progress ...\n");
00626 printk("**Exit jpe w/ status = %d\n", stat);
00627
00628
00629 longjmp(env, 2);
00630 }
00631
00632 void sighandler(sig)
00633 int sig;
00634 {
00635 KEY *list = newkeylist();
00636 int respcnt;
00637
00638 if (signal(SIGINT, SIG_IGN) != SIG_IGN)
00639 signal(SIGINT, sighandler);
00640 if (signal(SIGTERM, SIG_IGN) != SIG_IGN)
00641 signal(SIGTERM, sighandler);
00642 if(archactive) {
00643 printf("Ctrl-C during an archive update. Will abort when done...\n");
00644 archactive = -1;
00645 return;
00646 }
00647 if(nocontrolc) {
00648
00649 if(ccactive) {
00650 printf("A ^C is already active. I'm awaiting a dsds_svc reply...\n");
00651 }
00652 else {
00653 ccactive = 1;
00654 setkey_uint(&list, "dsds_uid", uid);
00655 setkey_int(&list, "ampex_tid", ampex_tid);
00656 setkey_str(&list, "USER", username);
00657 printf("^C signal received by pe\n");
00658 printf("Sending TAPECANCEL request to dsds_svc...\n");
00659 if((list = call_dsds(&list,REQTAPECANCEL,dsds_tid,pe_tid,printf,debugflg)) == NULL) {
00660 printf("Can't REQTAPECANCEL with dsds_svc\n");
00661 }
00662 else {
00663 if(getkeytype(list,"MSG_PEND")==KEYTYP_INT) {
00664 respcnt = 0;
00665 while(1) {
00666 if(resp_dsds(dsds_tid)) {
00667 printf("..."); respcnt++;
00668 if(respcnt == 20) {
00669 respcnt = 0;
00670 printf("\n");
00671 }
00672 continue;
00673 }
00674 if(respcnt != 0) printf("\n");
00675 break;
00676 }
00677 printf("Ampex tape read request has been canceled.\n");
00678 abortit(1);
00679 }
00680 else {
00681 printf("Failed to get TAPECANCEL msg to ampex_svc\n");
00682
00683 }
00684 }
00685 }
00686 }
00687 else {
00688 printf("\n***PE received a termination signal\n");
00689 abortit(1);
00690 }
00691 }
00692
00693
00694
00695
00696
00697 void who_died()
00698 {
00699 PSERVER *sptr;
00700 HDATA *hnext;
00701 struct taskinfo *taskp;
00702 int i, j, ntasks;
00703
00704 for(i=0; i<MAX_SERV; i++) {
00705 sptr = &stab[i];
00706 if(sptr->name == NULL)
00707 break;
00708 if(sptr->busyall) {
00709 for(hnext=(HDATA *)gethnext(sptr->hosts); hnext != NULL;
00710 hnext=(HDATA *)gethnext((HDATA *)MONE))
00711 {
00712 if(hnext->busy) {
00713 if(pvm_tasks(pvm_tidtohost(hnext->tid), &ntasks, &taskp)) {
00714 pemail("***Error on pvm_tasks() call\n");
00715 abortit(1);
00716 }
00717 for(j=0; j<ntasks; j++) {
00718 if(taskp[j].ti_tid == hnext->tid) break;
00719 }
00720 if(j == ntasks) {
00721 pelog("%x\tn/a\tcrash\t0\t%s\t<NONE>\t<NONE>\n", hnext->tid,sptr->name);
00722 pemail("***Unexpected exit of %s tid=%x on %s **\n",
00723 sptr->name, hnext->tid, hnext->host_name);
00724
00725
00726
00727 if (sptr->noabort) {
00728
00729
00730
00731 hnext->tid = 0;
00732 hnext->busy = 0;
00733 sptr->busyall--;
00734 return;
00735 }
00736 hnext->tid = 0;
00737 abortit(1);
00738 }
00739 }
00740 }
00741 }
00742 }
00743 }
00744
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754
00755
00756
00757
00758
00759
00760
00761
00762
00763
00764
00765
00766
00767
00768
00769
00770
00771
00772
00773
00774
00775
00776
00777
00778
00779
00780
00781 int arg_available (KEY *param, char *name, int type)
00782 {
00783 int paramtype = getkeytype (param, name);
00784
00785 if (paramtype == KEYTYP_BYTE)
00786 {
00787 if (type == ARG_FLAG)
00788 return (1);
00789 else
00790 return (0);
00791 }
00792 else if (paramtype >= KEYTYP_STRING)
00793 return (1);
00794 else
00795 return (0);
00796 }
00797
00798
00799
00800 void usage()
00801 {
00802 printk("Usage:\njpe [-v] [-A] [touch=#] map=map_file_name\n");
00803 printk("where: -v = verbose\n");
00804 printk(" -A = access the tape_svc if need to retrieve data\n");
00805 printk(" touch = #of days to retain input datasets before deletable\n");
00806 abortit(1);
00807 }
00808
00809
00810
00811 void getpasswd()
00812 {
00813 int getpasswd;
00814 char *envpasswd;
00815 char pline[MAX_STR];
00816
00817 getpasswd = 1;
00818 if(envpasswd = (char *)getenv("LMODEPASSWD")) {
00819 if(!strcmp(envpasswd, "mdi4soi")) getpasswd = 0;
00820 }
00821 if(getpasswd) {
00822 printf("Need password to run pe in local mode: passwd = ");
00823 if(system("stty -echo")) {
00824 printf("\n You're not the owner of this tty (you did an su?)\n");
00825 printf(" WARNING: Your passwd will be echoed\n");
00826 }
00827 if(gets(pline) == NULL) {
00828 system("stty echo");
00829 printf("\n***Invalid passwd\n");
00830 abortit(1);
00831 }
00832 system("stty echo");
00833 if(strcmp(pline, "mdi4soi")) {
00834 printf("\n***Invalid passwd\n");
00835 abortit(1);
00836 }
00837 printf("\n");
00838 }
00839 }
00840
00841
00842
00843
00844 void open_pemail(char *filename, char *idstr)
00845 {
00846 int i;
00847
00848 mailtrue = 0;
00849 mailable = 0;
00850 for(i = 0; i < mailable_users_num; i++) {
00851 if(!strcmp(username, mailable_users[i])) mailable = 1;
00852 }
00853 if(mailable) {
00854 if((pemailfp=fopen(filename, "w")) == NULL) {
00855 fprintf(stderr, "Can't open the mail file %s\n", filename);
00856 mailable = 0;
00857 return;
00858 }
00859 fprintf(pemailfp, idstr);
00860 }
00861 }
00862
00863
00864
00865
00866
00867 int pemail(char *fmt, ...)
00868 {
00869 va_list args;
00870 char string[32768];
00871
00872 va_start(args, fmt);
00873 vsprintf(string, fmt, args);
00874 printk(string);
00875 if(mailable) {
00876 fprintf(pemailfp, string);
00877 fflush(pemailfp);
00878 }
00879 va_end(args);
00880 mailtrue = 1;
00881 return(0);
00882 }
00883
00884
00885
00886 void mailit()
00887 {
00888 int i;
00889 char cmd[128];
00890
00891 if(mailable && mailtrue) {
00892
00893 if(!(strcmp("daemon", username)) || !(strcmp("production", username))) {
00894 for(i = 0; i < mailees_num; i++) {
00895 sprintf(cmd, "Mail -s \"pe 3* mail\" %s < %s", mailees[i], mailname);
00896 system(cmd);
00897 }
00898 }
00899 else {
00900 sprintf(cmd, "Mail -s \"pe 3* mail\" %s < %s", username, mailname);
00901 system(cmd);
00902 }
00903 }
00904 }
00905
00906
00907
00908
00909
00910 void do_cmd(char *line)
00911 {
00912 char cmd[256];
00913 char *token, *name, *value;
00914 char *putenvcmd;
00915
00916 strcpy(cmd, line);
00917 if(token=(char *)strtok(line, " \t\n")) {
00918 if(!strcmp(token, "setenv")) {
00919 if(name=(char *)strtok(NULL, " \t\n")) {
00920 if(value=(char *)strtok(NULL, " \t\n")) {
00921
00922 putenvcmd = (char *)malloc(128);
00923 sprintf(putenvcmd, "%s=%s", name, value);
00924 putenv(putenvcmd);
00925 }
00926 }
00927 }
00928 else {
00929 system(cmd);
00930 }
00931 }
00932 }
00933
00934
00935
00936
00937
00938
00939 void setstab(char *sname)
00940 {
00941 int i;
00942 char *s;
00943
00944 for(i=0; i<MAX_SERV; i++) {
00945 if(stab[i].name == NULL) {
00946 s = (char *)malloc((strlen(sname))+5);
00947 strcpy(s, sname);
00948 strcat(s, "_svc");
00949 stab[i].name=s;
00950 msg_id++;
00951 stab[i].msgid=msg_id;
00952
00953 strcpy(stab[i].version, soi_version);
00954 break;
00955 }
00956 }
00957 if(i == MAX_SERV) {
00958 pemail("***Map file exceeds pe limit of %d servers\n",MAX_SERV);
00959 abortit(1);
00960 }
00961 }
00962
00963
00964
00965
00966 int addpvmd(char *host)
00967 {
00968 struct hostinfo *hostp;
00969 char *hostptr[2];
00970 int nhost, narch, i, hinfos;
00971
00972
00973 if(pvm_config(&nhost, &narch, &hostp)) {
00974 pemail("***Can't get pvm configuration. Pvm daemon(s) running?\n");
00975 return(1);
00976 }
00977 for(i=0; i<nhost; i++) {
00978 if(!strcmp(hostp[i].hi_name, host)) {
00979 break;
00980 }
00981 }
00982 if(i == nhost) {
00983 printk("A pvm daemon is been added for host %s...\n", host);
00984
00985
00986 sleep(2);
00987 hostptr[0] = host;
00988 if((pvm_addhosts(hostptr, 1, &hinfos)) < 0) {
00989 pemail("***Can't add a pvm daemon for host %s\n", host);
00990 return(1);
00991 }
00992 if(hinfos < 0) {
00993 pemail("***Can't add %s pvm daemon. Error %d.\n", host, hinfos);
00994 pemail("An independently started one may be running? Halt it.\n");
00995 return(1);
00996 }
00997 sleep(2);
00998 }
00999 return(0);
01000 }
01001
01002
01003
01004
01005
01006
01007
01008
01009
01010
01011 int sethosts(char *hname)
01012 {
01013 HDATA *hnext;
01014 char *host;
01015 int j;
01016
01017 num_hosts=0;
01018 host=(char *)strtok(hname,",");
01019 while(host) {
01020 for(j=0; j<MAX_SERV; j++) {
01021 if(stab[j].name == NULL)
01022 break;
01023 sethname(&stab[j].hosts, host);
01024 }
01025 if(addpvmd(host)) return(1);
01026 num_hosts++;
01027 host=(char *)strtok(NULL,",");
01028 }
01029 effective_hosts = num_hosts;
01030 if(effective_hosts != 1) {
01031 pemail("**Multiple hosts no longer allowed. Will use first host only\n");
01032 effective_hosts = 1;
01033 }
01034
01035 for(j=0; j<MAX_SERV; j++) {
01036 if(stab[j].name==NULL)
01037 break;
01038 for(hnext=(HDATA *)gethnext(stab[j].hosts); hnext != NULL;
01039 hnext=(HDATA *)gethnext((HDATA *)MONE)) {
01040 hnext->param_list = newkeylist();
01041 add_keys(stab[j].map_list, &hnext->param_list);
01042 }
01043 }
01044 return(0);
01045 }
01046
01047
01048
01049
01050
01051
01052
01053
01054
01055
01056
01057
01058
01059
01060
01061
01062
01063 int CollectParams (KEY **newlist, int argc, char **argv)
01064 {
01065 int pct, i;
01066 char *arg, *valu, *keyname;
01067 char isval, flagval;
01068 static char flagname[] = "x";
01069
01070 pct = argc;
01071 *newlist = newkeylist ();
01072
01073 isval = 0;
01074 for (i=1; i<argc; i++)
01075 {
01076 arg = argv[i];
01077 if (isval)
01078
01079 {
01080 setkey_str (newlist, keyname, arg);
01081 free (keyname);
01082 isval = 0;
01083 pct--;
01084 continue;
01085 }
01086 if (arg[0] == '-' || arg[0] == '+')
01087
01088 {
01089 valu = arg;
01090 valu++;
01091 isval = 0;
01092 if (arg[0] == '-' && arg[1] == 'P')
01093 {
01094
01095 pct--;
01096 continue;
01097 }
01098 flagval = (arg[0] == '-') ? -1 : 1;
01099 while (*valu)
01100 {
01101 flagname[0] = valu[0];
01102 setkey_byte (newlist, flagname, flagval);
01103 valu++;
01104 }
01105 continue;
01106 }
01107 if (valu = index (arg, '='))
01108 {
01109 *valu = 0;
01110 keyname = key_strdup (arg);
01111 *valu = '=';
01112 valu++;
01113 if (strlen (valu))
01114 {
01115 setkey_str (newlist, keyname, valu);
01116 free (keyname);
01117 isval = 0;
01118 }
01119 else
01120 {
01121 isval = 1;
01122 }
01123 continue;
01124 }
01125
01126 pct--;
01127 isval = 0;
01128 }
01129 return (pct - 1);
01130 }
01131
01132 void get_cmd(int argc, char *argv[])
01133 {
01134 int xargc, first, pfound, groupflg, groupid, gid, archgrp;
01135 int qon, i;
01136 int nullarchflg = 0;
01137 char *toks[128];
01138 char *tokens1 = "*\t\n";
01139 char *tokens2 = " \t\n";
01140 char *tokens3 = "*\n";
01141 char *tokens4 = " \n";
01142 char hostname[MAX_STR], subname[MAX_STR];
01143 char *inname, *sp, *hname, *cptr;
01144 char *line, *line2, *token, *tokenchars, *tokencharsend;
01145 char carch;
01146 FILE *mapfp;
01147 PSERVER *sptr;
01148 KEY *work_list;
01149
01150 CollectParams(&work_list, argc, argv);
01151 if(!(inname=getkey_str(work_list, "map"))) {
01152 usage();
01153 }
01154 gethostname(hostname, MAX_STR);
01155 if(!findkey(work_list, "touch"))
01156 touchflg = -1;
01157 else {
01158 cptr = getkey_str(work_list, "touch");
01159 if(!strcmp(cptr, "keep")) {
01160 touchflg = 9999999;
01161 }
01162 else {
01163 if(!isdigit((int)*cptr)) {
01164 pemail("***touch= must say \"keep\" or give integer #of days\n");
01165 abortit(1);
01166 }
01167 touchflg = atoi(cptr);
01168 }
01169 }
01170 if(findkey(work_list, "pds")) {
01171 strcpy(pdshost, getkey_str(work_list, "pds"));
01172 } else if((char *)getenv("PDS_SET_HOST")) {
01173 strcpy(pdshost, (char *)getenv("PDS_SET_HOST"));
01174 } else {
01175 strcpy(pdshost, hostname);
01176 }
01177 if(cptr = index(pdshost, '.')) {
01178 *cptr = NULL;
01179 }
01180 if(findkey(work_list, "rcp")) {
01181 rcpdir=getkey_str(work_list, "rcp");
01182 if(strncmp(rcpdir, "/", 1)) {
01183 pemail("***The rcp= dir must start with \"/\"\n");
01184 abortit(1);
01185 }
01186 rcpflg = 1;
01187 }
01188
01189 verbose=(getkeytype(work_list, "v") == KEYTYP_BYTE);
01190
01191 ampexflg=(getkeytype(work_list, "A") == KEYTYP_BYTE);
01192 if(getkeytype(work_list, "O") == KEYTYP_BYTE) {
01193 printf("\npe does not have a \"-O\" switch.\n");
01194 abortit(1);
01195 }
01196 if(!(mapfp=fopen(inname, "r"))) {
01197 pemail("***Can't open the map file %s\n", inname);
01198 abortit(1);
01199 }
01200 strcpy(pe_map, inname);
01201 hname=hostname;
01202 sptr = &stab[0];
01203 first=1; pfound=0; groupflg=0; gid=0; groupid=0, archgrp=0;
01204 line = (char *)malloc(16384);
01205 line2 = (char *)malloc(16384);
01206 while(fgets(line, 16384, mapfp)) {
01207 if(line[0] == '#' || line[0] == '\n') continue;
01208 if(line[0] == '!') {
01209 do_cmd(line+1);
01210 continue;
01211 }
01212 if(strchr(line, '"')) {
01213 tokenchars = tokens1;
01214 tokencharsend = tokens3;
01215
01216
01217 qon = 0; line2[0] = NULL;
01218 for(i=0; ; i++) {
01219 if(line[i] == '"') {
01220 if(qon) qon = 0;
01221 else qon = 1;
01222 i++;
01223 }
01224 if((line[i] == '*') && qon) {
01225 pemail("Illegal \"*\" char in quoted string\n");
01226 abortit(1);
01227 }
01228 if((line[i] == ' ') && !qon) {
01229 line[i] = '*';
01230 }
01231 strncat(line2, &line[i], 1);
01232 if(line[i] == '\n') break;
01233 }
01234 if(qon) {
01235 pemail("Mismatched quotes in: %s\n", line);
01236 abortit(1);
01237 }
01238 } else {
01239 tokenchars = tokens2;
01240 tokencharsend = tokens4;
01241 strcpy(line2, line);
01242 }
01243 toks[0]=argv[0];
01244 xargc=1;
01245 token=(char *)strtok(line2, tokenchars);
01246 while(token) {
01247 if(token[0] == '\\') {
01248 line = (char *)malloc(16384);
01249 line2 = (char *)malloc(16384);
01250 while(fgets(line, 16384, mapfp)) {
01251 if(line[0] == '#' || line[0] == '\n') continue;
01252 break;
01253 }
01254 if(strchr(line, '"')) {
01255 tokenchars = tokens1;
01256 tokencharsend = tokens3;
01257
01258
01259 qon = 0; line2[0] = NULL;
01260 for(i=0; ; i++) {
01261 if(line[i] == '"') {
01262 if(qon) qon = 0;
01263 else qon = 1;
01264 i++;
01265 }
01266 if((line[i] == '*') && qon) {
01267 pemail("Illegal \"*\" char in quoted string\n");
01268 abortit(1);
01269 }
01270 if((line[i] == ' ') && !qon) {
01271 line[i] = '*';
01272 }
01273 strncat(line2, &line[i], 1);
01274 if(line[i] == '\n') break;
01275 }
01276 if(qon) {
01277 pemail("Mismatched quotes in: %s\n", line);
01278 abortit(1);
01279 }
01280 } else {
01281 tokenchars = tokens2;
01282 tokencharsend = tokens4;
01283 strcpy(line2, line);
01284 }
01285 if(!(token=(char *)strtok(line2, tokenchars)))
01286 break;
01287 }
01288 toks[xargc]=token;
01289 xargc++;
01290 token=(char *)strtok(NULL, tokencharsend);
01291 }
01292 CollectParams(&work_list, xargc, toks);
01293 if(sp=getkey_str(work_list, "DSDSOUT"))
01294 reqmegs=atoi(sp);
01295 else if(sp=getkey_str(work_list, "NOWARN"))
01296 nowarn=atoi(sp);
01297 else if(sp=getkey_str(work_list, "HOST"))
01298 hname=sp;
01299 else if(sp=getkey_str(work_list, "START_ARCHIVE")) {
01300 if(atoi(sp)) {
01301 if(archgrp) {
01302 pemail("***Non-matching START/END_ARCHIVE pair\n");
01303 abortit(1);
01304 }
01305 archgrp = 1;
01306 }
01307 }
01308 else if(sp=getkey_str(work_list, "END_ARCHIVE")) {
01309 if(atoi(sp)) {
01310 if(!archgrp) {
01311 pemail("***Non-matching START/END_ARCHIVE pair\n");
01312 abortit(1);
01313 }
01314 archgrp = 0;
01315
01316 if(pfound) {
01317 --sptr;
01318 sptr->archive_group = -1;
01319 ++sptr;
01320 }
01321 }
01322 }
01323 else if(sp=getkey_str(work_list, "START_GROUP")) {
01324 if(atoi(sp)) {
01325 if(groupflg) {
01326 pemail("***Non-matching START/END_GROUP pair\n");
01327 abortit(1);
01328 }
01329 groupflg = 1;
01330 groupid = ++gid;
01331 }
01332 }
01333 else if(sp=getkey_str(work_list, "END_GROUP")) {
01334 if(atoi(sp)) {
01335 if(!groupflg) {
01336 pemail("***Non-matching START/END_GROUP pair\n");
01337 abortit(1);
01338 }
01339 groupflg = 0;
01340 groupid = 0;
01341 }
01342 }
01343 else if(sp=getkey_str(work_list, "p")) {
01344 pfound=1;
01345 deletekey(&work_list, "p");
01346
01347
01348 if(!strcmp(sp, "ingest_sci160k_log") || !strcmp(sp, "ingest_sci5k_log") || !strcmp(sp, "merge_sci160k_log")) {
01349 sprintf(subname, "%s_jpe", sp);
01350 sp = subname;
01351 nullarchflg = 1;
01352 }
01353 setstab(sp);
01354 sptr->cphist = 1;
01355 sptr->groupid = groupid;
01356 sptr->archive_group = archgrp;
01357 if(first) {
01358 first=0;
01359 sptr->firstserver=1;
01360 }
01361 if(sp=getkey_str(work_list, "d")) {
01362 if(sptr->dsin=atoi(sp))
01363 dsdsin=1;
01364 deletekey(&work_list, "d");
01365 }
01366
01367 sptr->dsin = 1;
01368 dsdsin=1;
01369
01370
01371
01372
01373
01374
01375
01376
01377
01378 if (sp = getkey_str (work_list, "COPY_HISTORY")) {
01379 sptr->cphist = strcasecmp (sp, "no");
01380 deletekey (&work_list, "COPY_HISTORY");
01381 }
01382
01383
01384
01385
01386
01387
01388
01389
01390
01391 if (sp = getkey_str (work_list, "ABORT_ACTION")) {
01392 sptr->noabort = strcasecmp (sp, "continue") ? 0 : 1;
01393 deletekey (&work_list, "ABORT_ACTION");
01394 }
01395 if(sp=getkey_str(work_list, "a")) {
01396 carch = *sp++;
01397 if(carch != 'a' && carch != 't' && carch != 'p' && carch != 'n' && carch != '0') {
01398 pemail("***Illegal a= spec for p=%s. Must be a, t, p or n\n",sptr->name);
01399 abortit(1);
01400 }
01401 if(carch == '0') {
01402
01403 sptr->archive = 't';
01404 }
01405 else {
01406 sptr->archive = carch;
01407 sptr->archive_day=atoi(sp);
01408 }
01409 deletekey(&work_list, "a");
01410 }
01411 else {
01412
01413 if(nullarchflg) sptr->archive = NULL;
01414 else sptr->archive = 't';
01415 }
01416 if(sp=getkey_str(work_list, "s")) {
01417 sptr->split=atoi(sp);
01418 sptr->split=0;
01419 deletekey(&work_list, "s");
01420 }
01421 add_keys(work_list, &sptr->map_list);
01422 sptr++;
01423 }
01424 else {
01425 pemail("***A map file line must start w/a control stmt, or p=\n");
01426 pemail(" %s\n", line);
01427 abortit(1);
01428 }
01429 }
01430 fclose(mapfp);
01431 if(!hname) {
01432 pemail("***The map file has no HOST specification\n");
01433 abortit(1);
01434 }
01435 if(!pfound) {
01436 pemail("***The map file has no p= line to define the server\n");
01437 abortit(1);
01438 }
01439 if(groupflg) {
01440 pemail("***Non-matching START/END_GROUP pair\n");
01441 abortit(1);
01442 }
01443 if(archgrp) {
01444 pemail("***Non-matching START/END_ARCHIVE pair\n");
01445 abortit(1);
01446 }
01447 if(sethosts(hname)) abortit(1);
01448
01449
01450
01451
01452 }
01453
01454
01455
01456
01457
01458 void spawn_pvm()
01459 {
01460 struct hostinfo *hostp;
01461
01462 HDATA *hnext;
01463 int i, nhost, narch;
01464
01465
01466 if(pvm_config(&nhost, &narch, &hostp)) {
01467 pemail("***Can't get pvm configuration. Pvm daemon(s) running?\n");
01468 abortit(1);
01469 }
01470
01471 for(hnext=(HDATA *)gethnext(stab[0].hosts); hnext != NULL;
01472 hnext=(HDATA *)gethnext((HDATA *)MONE))
01473 {
01474 for(i=0; i<nhost; i++) {
01475 if(!strcmp(hostp[i].hi_name, hnext->host_name))
01476
01477 break;
01478 }
01479 if(i == nhost) {
01480 pemail("***No pvm daemon running on %s\n", hnext->host_name);
01481 abortit(1);
01482 }
01483 }
01484
01485
01486
01487
01488
01489
01490
01491
01492
01493
01494
01495
01496
01497
01498
01499
01500
01501
01502
01503
01504
01505
01506
01507
01508
01509 }
01510
01511
01512
01513
01514
01515
01516 void msgid_send(PSERVER *sptr, HDATA *hnext)
01517 {
01518 KEY *alist, *keybad;
01519 char *log;
01520
01521 pvm_initsend(PvmDataDefault);
01522 pvm_pkint(&sptr->msgid, 1, 1);
01523
01524
01525 alist=newkeylist();
01526 setkey_int(&alist, "verbose", verbose);
01527 if(log=getkey_str(sptr->map_list, "history"))
01528 setkey_str(&alist, "history", log);
01529 if(log=getkey_str(sptr->map_list, "errlog"))
01530 setkey_str(&alist, "errlog", log);
01531 if(keybad=(KEY *)pack_keylist(alist)) {
01532 pemail("***Err packing pvm msg, type=%d name=%s\n",keybad->type,keybad->name
01533 );
01534 abortit(1);
01535 }
01536 if(pvm_send(hnext->tid, MSGMSG)) {
01537 pemail("***Error sending %s on %s its message id\n",
01538 sptr->name,hnext->host_name);
01539 abortit(1);
01540 }
01541 }
01542
01543
01544
01545 void arg_recv(PSERVER *sptr, HDATA *hnext)
01546 {
01547 argument *args;
01548 struct timeval tvalr;
01549 uint64_t tsr;
01550 int bufid, j;
01551
01552 pvm_initsend(PvmDataDefault);
01553 if(pvm_send(hnext->tid, MSGARGS)) {
01554 pemail("***Error calling %s on %s for arg list\n",sptr->name,hnext->host_name);
01555 abortit(1);
01556 }
01557 gettimeofday(&tvalr, NULL); tsr = tvalr.tv_sec;
01558 while(1) {
01559 if(!(bufid=pvm_nrecv(hnext->tid, MSGARGS))) {
01560 gettimeofday(&tvalr, NULL);
01561 if(tvalr.tv_sec - tsr > RESPWAIT) {
01562 pemail("***Timeout awaiting an argument list from %s\n",sptr->name);
01563 abortit(1);
01564 }
01565 }
01566 else
01567 break;
01568 }
01569 if(bufid < 0) {
01570 pemail("***Error receiving arg list from %s\n",sptr->name);
01571 abortit(1);
01572 }
01573 for(j=0; j<MAX_ARGS; j++) {
01574 args = &sptr->arguments[j];
01575 pvm_upkint(&args->kind,1,1);
01576 args->key=(char *)malloc(MAX_ARG_STR);
01577 pvm_upkstr(args->key);
01578
01579 if (!strcmp (args->key, "p") || !strcmp (args->key, "a")) {
01580 printf("**WARNING: Module has arg %s which is reserved by pe\n",
01581 args->key);
01582 }
01583 args->default_value=(char *)malloc(MAX_ARG_STR);
01584 pvm_upkstr(args->default_value);
01585 args->range=(char *)malloc(MAX_ARG_STR);
01586 pvm_upkstr(args->range);
01587 args->description=(char *)malloc(MAX_ARG_STR*4);
01588 pvm_upkstr(args->description);
01589 if(args->kind == ARG_END)
01590 break;
01591 }
01592 if(j == MAX_ARGS) {
01593 pemail("***Never got an ARG_END from %s after %d arguments\n",
01594 sptr->name, MAX_ARGS);
01595 abortit(1);
01596 }
01597 }
01598
01599
01600
01601
01602
01603
01604 int wd_dup_ck(KEY *list, char *wd)
01605 {
01606 KEY *walker = list;
01607
01608 while(walker) {
01609 if(!strcmp(wd, (char *)walker->val)) {
01610
01611
01612
01613
01614
01615
01616 return(1);
01617 }
01618 walker = walker->next;
01619 }
01620 return(0);
01621 }
01622
01623
01624
01625
01626
01627
01628
01629 void ck_arglist(PSERVER *sptr, HDATA *hx)
01630 {
01631 DIR *dfd;
01632 double dblval;
01633 int found, outnsets, i, intval, parse_status;
01634 char dirstr[MAX_STR], inname[MAX_STR], ext[MAX_STR], wdkey[MAX_STR];
01635 char *rootdbase, *wd, *valstr, *failstr, *strck, *cptr;
01636 argument *arg;
01637
01638 arg=sptr->arguments;
01639 while(arg->kind != ARG_END) {
01640 found=arg_available(hx->param_list, arg->key, arg->kind);
01641 if (!found) {
01642 if (found = strlen (arg->default_value));
01643 setkey_str (&hx->param_list, arg->key, arg->default_value);
01644 }
01645 if(!found) {
01646 pemail("***Can't satisfy arg \"%s\" for server %s\n", arg->key,sptr->name);
01647 abortit(1);
01648 }
01649 if(arg->kind == 1) {
01650 pemail("***An obsolete ARG_DATASET found in %s\n", sptr->name);
01651 abortit(1);
01652 }
01653 if (arg->kind == ARG_DATA_OUT) {
01655 if(sptr->archive == 'a') {
01656 strcpy(inname, arg->key); strcat(inname, "_nsets");
01657 outnsets = getkey_int(hx->param_list, inname);
01658 for(i=0; i < outnsets; i++) {
01659 sprintf(ext, "%s_%d_wd", arg->key, i);
01660 wd = getkey_str(hx->param_list, ext);
01661 if((dfd=opendir(wd)) == NULL) {
01662 strcpy(dirstr, "mkdir -p "); strcat(dirstr, wd);
01663 if(system(dirstr)) {
01664 pemail("***Cannot %s for %s\n",dirstr,sptr->name);
01665 abortit(1);
01666 }
01667 }
01668 }
01669 }
01670 else {
01671 if(reqmegs) {
01672 strcpy(inname, arg->key); strcat(inname, "_nsets");
01673 outnsets = getkey_int(hx->param_list, inname);
01674
01675 sprintf(ext, "%s_0_dbase", arg->key);
01676 if(findkey(hx->param_list, ext)) {
01677 rootdbase = getkey_str(hx->param_list, ext);
01678 sprintf(wdkey, "wd_%d", wdkey_int++);
01679 setkey_str(&wd_mk_list, wdkey, rootdbase);
01680 }
01681 for(i=0; i < outnsets; i++) {
01682 sprintf(ext, "%s_%d_wd", arg->key, i);
01683 cptr = (char *)sptr->archive;
01684 if(cptr == (char *)'t' || cptr == (char *)'p' || cptr == (char *)'n') {
01685 wd = getkey_str(hx->param_list, ext);
01686 cptr = strstr(wd, "/SUM");
01687 if(cptr != wd) {
01688 pemail("Cannot archive non-/SUM storage: %s = %s\n", ext,wd);
01689 abortit(1);
01690 }
01691 }
01692
01693 if(!wd_dup_ck(wd_mk_list, getkey_str(hx->param_list,ext))) {
01695 strcpy(dirstr, "mkdir -p ");
01696 strcat(dirstr, getkey_str(hx->param_list, ext));
01697 if(system(dirstr)) {
01698 pemail("***Cannot %s for %s\n",dirstr,sptr->name);
01699 abortit(1);
01700 }
01701 sprintf(wdkey, "wd_%d", wdkey_int++);
01702 setkey_str(&wd_mk_list, wdkey, getkey_str(hx->param_list, ext));
01703 }
01704 }
01705 }
01706 }
01707 }
01708 else if (arg->kind == ARG_FILEPTR) {
01709 pemail("***ARG_FILEPTR used by %s, is not supported by pe\n",sptr->name);
01710 abortit(1);
01711 }
01712 else if (arg->kind == ARG_FLOAT) {
01713 valstr = getkey_str (hx->param_list, arg->key);
01714 dblval = strtod(valstr, &failstr);
01715 if (valstr == failstr) dblval = D_MISSING;
01716 setkey_double (&hx->param_list, arg->key, dblval);
01717 }
01718 else if (arg->kind == ARG_INT) {
01719 valstr = getkey_str (hx->param_list, arg->key);
01720 intval = (int)strtol(valstr, &failstr,0);
01721 if (valstr == failstr) intval = I_MISSING;
01722 setkey_int (&hx->param_list, arg->key, intval);
01723 }
01724 else if (arg->kind == ARG_TIME)
01725 setkey_time (&hx->param_list, arg->key,
01726 sscan_time (getkey_str (hx->param_list, arg->key)));
01727 else if (arg->kind == ARG_FLAG) {
01728 if(!(strck = getkey_str (hx->param_list, arg->key))) {
01729
01730
01731 setkey_byte(&hx->param_list, arg->key, 1);
01732 }
01733 else {
01734 setkey_byte(&hx->param_list, arg->key, atoi(strck));
01735 }
01736 }
01737 else if (arg->kind == ARG_FLOATS) {
01738
01739 if(parse_status=parse_array (&hx->param_list, arg->key, KEYTYP_DOUBLE)) {
01740 pemail("***ARG_FLOATS error %d for arg %s\n", parse_status, arg->key);
01741 abortit(1);
01742 }
01743 }
01744 else if (arg->kind == ARG_INTS) {
01745 if(parse_status=parse_array (&hx->param_list, arg->key, KEYTYP_INT)) {
01746 pemail("***ARG_INTS error %d for arg %s\n", parse_status, arg->key);
01747 abortit(1);
01748 }
01749 } else if (arg->kind == ARG_NUME) {
01750 char **names;
01751 int nval, nvals = parse_numerated (arg->range, &names);
01752 valstr = getkey_str (hx->param_list, arg->key);
01753 for (nval = 0; nval < nvals; nval++)
01754 if (!strcmp (names[nval], valstr)) break;
01755 if (nval >= nvals) nval = 0;
01756 setkey_int (&hx->param_list, arg->key, nval);
01757 }
01758 arg++;
01759 }
01760 }
01761
01762
01763
01764
01765
01766
01767
01768
01769
01770
01771
01772
01773
01774 void form_split(PSERVER *sptr, KEY *xlist, char *key)
01775 {
01776 HDATA *hnext;
01777 char keystr[MAX_STR];
01778 int i, fsn, lsn, delta, drem, xfsn, xlsn, nosplit;
01779
01780
01781
01782
01783
01784
01785
01786 nosplit = 0;
01787 if(!sptr->split) {
01788 nosplit = 1;
01789 }
01790 else {
01791 sprintf(keystr, "%s_nsets", key);
01792 if(findkey(xlist, keystr)) {
01793 if((getkey_int(xlist, keystr)) != 1) {
01794 printf("**Ignoring the s=1 split flag given for %s:\n", sptr->name);
01795 printf(" The %s= specification is not a single dataset\n", key);
01796 nosplit = 1;
01797 }
01798 else {
01799 sprintf(keystr, "%s_0_fsn", key);
01800 if(!findkey(xlist, keystr)) {
01801 printf("**Ignoring the s=1 split flag given for %s:\n", sptr->name);
01802 printf(" No sel:[fsn-lsn] for first input ds given in map file or found from dsds\n");
01803 nosplit = 1;
01804 }
01805 }
01806 }
01807 else
01808 nosplit = 1;
01809 }
01810 if(nosplit) {
01811 effective_hosts = 1;
01812 hnext = (HDATA *)gethnext(sptr->hosts);
01813 add_keys(xlist, &hnext->param_list);
01814 return;
01815 }
01816
01817 sprintf(keystr, "%s_0_fsn", key);
01818 fsn=getkey_int(xlist, keystr);
01819 sprintf(keystr, "%s_0_lsn", key);
01820 lsn=getkey_int(xlist, keystr);
01821
01822 effective_hosts = num_hosts;
01823 delta=((lsn-fsn)+1)/effective_hosts;
01824 if(delta == 0)
01825 delta = 1;
01826 drem=((lsn-fsn)+1) % effective_hosts;
01827 hnext = (HDATA *)gethnext(sptr->hosts);
01828
01829 for(i=0; i<effective_hosts; i++) {
01830 if(((lsn-fsn)+1) == i) {
01831 effective_hosts = i;
01832 break;
01833 }
01834 add_keys(xlist, &hnext->param_list);
01835 xfsn=fsn+(delta*i);
01836 xlsn=xfsn+delta-1;
01837 if(i+1 == effective_hosts)
01838 xlsn=xlsn+drem;
01839 sprintf(keystr, "%s_0_fsn", key);
01840 setkey_int(&hnext->param_list, keystr, xfsn);
01841 sprintf(keystr, "%s_0_lsn", key);
01842 setkey_int(&hnext->param_list, keystr, xlsn);
01843
01844
01845
01846
01847
01848 hnext = (HDATA *)gethnext((HDATA *)MONE);
01849 }
01850 }
01851
01852
01853
01854
01855
01856
01857
01858
01859
01860 KEY *form_arg_data_in(KEY *xlist, PSERVER *sptr, argument *arg, int seq)
01861 {
01862
01863 char dbasekey[MAX_STR], inname[MAX_STR], ext[MAX_STR];
01864 char *wd;
01865 int innsets, parse, i;
01866
01867 if(!seq) add_keys(sptr->map_list, &xlist);
01868 if(!arg_available(xlist, arg->key, arg->kind)) {
01869 if(!(strlen(arg->default_value))) {
01870 pemail("***Can't satisfy arg \"%s\" for server %s\n",arg->key,sptr->name);
01871 abortit(1);
01872 }
01873 else
01874 setkey_str(&xlist, arg->key, arg->default_value);
01875 }
01876 if(touchflg != -1)
01877 setkey_int(&xlist, "touch", touchflg);
01878 sprintf(ext, "arg_data_in_%d", seq);
01879 setkey_str(&xlist, ext, arg->key);
01880
01881 strcpy(dbasekey, arg->key); strcat(dbasekey, "_dbase");
01882
01883 if(reqmegs && !sptr->firstserver && !sptr->dsin) {
01884 setkey_str(&xlist, dbasekey, dsdswd);
01885 sprintf(ext, "%s_rule", arg->key);
01886 setkey_str(&xlist, ext, "wd:{dbase}");
01887 sprintf(ext, "%s_%d_rule", arg->key, seq);
01888 setkey_str(&xlist, ext, "wd:{dbase}");
01889 }
01890 else if(getenv("dbase"))
01891 setkey_str(&xlist, dbasekey, getenv("dbase"));
01892 else
01893 setkey_str(&xlist, dbasekey, "/tmp");
01894
01895 if(parse=parse_list(&xlist, arg->key)) {
01896 if(parse != CANNOT_FILL_TEMPLATE) {
01897 pemail("***Error %d in parse_list for %s\n", parse, sptr->name);
01898 if(dbxflg) {
01899 printf("\n***** The xlist after the parse is:\n");
01900 keyiterate(printkey, xlist);
01901 }
01902 abortit(1);
01903 }
01904 }
01905 if(dbxflg) {
01906 printf("\n***** The xlist after the parse for %s is:\n", sptr->name);
01907 keyiterate(printkey, xlist);
01908 }
01909
01910
01911
01912 if(!sptr->dsin) {
01913
01914 if(strcmp(getkey_str(xlist, arg->key), "NOT SPECIFIED")) {
01915 strcpy(inname, arg->key); strcat(inname, "_nsets");
01916 innsets = getkey_int(xlist, inname);
01917 for(i=0; i < innsets; i++) {
01918 sprintf(ext, "%s_%d_wd", arg->key, i);
01919 if(!(wd=getkey_str(xlist, ext)) || !strcmp(wd, "")) {
01920 pemail("***No %s for %s. Missing rule or template in map file.\n",
01921 ext, sptr->name);
01922 abortit(1);
01923 }
01924 if(*wd == '.') {
01925 pemail("***%s of \".\" not valid for %s.\n", ext, sptr->name);
01926 abortit(1);
01927
01928 }
01929 }
01930 }
01931 }
01932 return(xlist);
01933 }
01934
01935
01936
01937
01938
01939
01940
01941
01942 KEY *dsds_arg_data_in(KEY *xlist)
01943 {
01944 static KEY *blist;
01945 char inname[MAX_STR];
01946
01947 char *svcname, *svcversion;
01948 int respcnt, status, i;
01949
01950 blist = newkeylist();
01951
01952
01953 setkey_uint(&xlist, "dsds_uid", uid);
01954 setkey_str(&xlist, "pds_host", pdshost);
01955
01956 if(ampexflg)
01957 setkey_int(&xlist, "ampex_tid", ampex_tid);
01958 else
01959 setkey_int(&xlist, "lago_tid", lago_tid);
01960
01961 if(svcversion=getkey_str(xlist,"svc_version")) {
01962
01963 printf("**Obsolete use of svc_version in map file. Ignoring...\n");
01964 }
01965 if(svcname=getkey_str(xlist,"svc_name")) {
01966
01967 printf("**Obsolete use of svc_name in map file. Ignoring...\n");
01968 }
01969 printf("Querying for the input datasets...\n");
01970 nocontrolc = 1;
01971
01972 blist = (KEY *)call_drms_in(xlist, dbxflg);
01973 if(blist == NULL) {
01974 pemail("Can't resolve/retrieve the input datasets with drms.\n");
01975 pemail("(Usually no -A switch specified.)\n");
01976 abortit(1);
01977 }
01978
01979
01980
01981
01982
01983
01984
01985
01986
01987
01988
01989
01990
01991
01992
01993
01994
01995
01996
01997
01998
01999
02000
02001
02002
02003
02004 nocontrolc = 0;
02005
02006
02007
02008
02009
02010
02011
02012
02013
02014
02015
02016
02017
02018
02019
02020
02021
02022
02023
02024 return((KEY *)blist);
02025 }
02026
02027
02028
02029
02030
02031 KEY *form_arg_data_out(PSERVER *sptr, argument *arg)
02032 {
02033 KEY *xlist;
02034 FILE *fin;
02035 char path[DRMS_MAXPATHLEN] = {0};
02036 char *wd, *stmp, *cptr, *cptr2;
02037 char dbasekey[MAX_STR], ext[MAX_STR], wdkey[MAX_STR], inname[MAX_STR];
02038 char drmsname[MAX_STR], cmd[MAX_STR], newmdirec[MAX_STR], buf[128];
02039 int innsets, parse, levnum, i, dstatus, ccnt;
02040
02041 char *prog, *level, *series, *jdata;
02042 char jpedata[192];
02043 int seriesnum, jcnt, k;
02044 int jix = 0;
02045
02046
02047
02048
02049 xlist=newkeylist();
02050 add_keys(sptr->map_list, &xlist);
02051 if(!arg_available(xlist, arg->key, arg->kind)) {
02052 pemail("***The ARG_DATA_OUT \"%s=\" for %s ", arg->key, sptr->name);
02053 pemail("is missing or\n a non-allowed default value given\n");
02054 abortit(1);
02055 }
02056 strcpy(dbasekey, arg->key); strcat(dbasekey, "_dbase");
02057 if(strcmp(dsdswd, ""))
02058 setkey_str(&xlist, dbasekey, dsdswd);
02059 else if(getenv("dbase"))
02060 setkey_str(&xlist, dbasekey, getenv("dbase"));
02061 else
02062 setkey_str(&xlist, dbasekey, "/tmp");
02063
02064 if(parse=parse_list(&xlist, arg->key)) {
02065 pemail("***Error %d in parse_list for %s\n", parse, sptr->name);
02066 if(dbxflg) {
02067 printf("\n***** The xlist after the parse is:\n");
02068 keyiterate(printkey, xlist);
02069 }
02070 abortit(1);
02071 }
02072 if(dbxflg) {
02073 printf("\n***** The xlist after the parse is:\n");
02074 keyiterate(printkey, xlist);
02075 }
02076 strcpy(inname, arg->key); strcat(inname, "_nsets");
02077 innsets = getkey_int(xlist, inname);
02078
02079
02080 if(sptr->archive == 'a') {
02081
02082
02083
02084
02085
02086
02087 for(i=0; i < innsets; i++) {
02088
02089
02090
02091
02092
02093 if(cptr = getenv("mdi_rec")) {
02094 if(cptr2 = strstr(cptr, "/soidata/info")) {
02095 ccnt = (cptr2-cptr)+1;
02096 snprintf(newmdirec, ccnt, "%s", cptr);
02097 strcat(newmdirec, "{dbase}/info");
02098 strcat(newmdirec, cptr2+13);
02099 sprintf(ext, "%s_%d_rule", arg->key, i);
02100 setkey_str(&xlist, ext, newmdirec);
02101 }
02102 }
02103 setkey_str(&xlist, dbasekey, "/SUM0/PAS");
02105
02106
02107 sprintf(ext, "%s_%d_wd", arg->key, i);
02108 deletekey(&xlist, ext);
02109 if(parse=parse_list(&xlist, arg->key)) {
02110 pemail("***Error %d in parse_list for %s\n", parse, sptr->name);
02111 if(debugflg) {
02112 printf("\n***** The xlist after the parse is:\n");
02113 keyiterate(printkey, xlist);
02114 }
02115 abortit(1);
02116 }
02117 }
02119
02120
02121 return(xlist);
02122 }
02123
02124
02125
02126 for(i=0; i < innsets; i++) {
02127 sprintf(ext, "%s_%d_level_sn", arg->key, i);
02128 if(findkey(xlist, ext)) {
02129 levnum = getkey_int(xlist, ext);
02130 if(levnum != 0) {
02131 pemail("***Map Error: Explicit output level # other than 0 is not allowed\n");
02132 abortit(1);
02133 }
02134 else {
02135 if(sptr->archive) {
02136 pemail("***Map Error: Archive of level # 0 output is not allowed\n");
02137 abortit(1);
02138 }
02139 }
02140 }
02141 sprintf(ext, "%s_%d_prog", arg->key, i);
02142 stmp = GETKEY_str(xlist, ext);
02143 if(!strcmp(stmp, "gong+")) stmp = "gongplus";
02144 sprintf(drmsname, "%s__", stmp);
02145 sprintf(ext, "%s_%d_level", arg->key, i);
02146 stmp = GETKEY_str(xlist, ext);
02147 sprintf(drmsname, "%s%s__", drmsname, stmp);
02148 sprintf(ext, "%s_%d_series", arg->key, i);
02149 stmp = GETKEY_str(xlist, ext);
02150 sprintf(drmsname, "%s%s", drmsname, stmp);
02151 sprintf(cmd, "echo \"%s\" | sed 's/\\\./_/g' | sed 's/-/__/g'", drmsname);
02152
02153 fin = popen(cmd, "r");
02154 fgets(buf, sizeof buf, fin);
02155 cptr = rindex(buf, '\n');
02156 if(cptr) *cptr = NULL;
02157
02158
02159
02160 sprintf(drmsname, "%s.%s", out_namespace, buf);
02161 if(findkey(JPElist, "JPE_out_nsets")) {
02162 jix = getkey_int(JPElist, "JPE_out_nsets");
02163 }
02164
02165
02166
02167 if(reqmegs) {
02168 if(firstalloc) {
02169
02170 rs = drms_create_record(drms_env, drmsname, DRMS_PERMANENT, &dstatus);
02171 if(dstatus) {
02172 printk("**ERROR %d: Can't create record for %s\n", dstatus, drmsname);
02173 abortit(1);
02174 }
02175
02176 drms_record_directory(rs, path, 0);
02177
02178 strcpy(dsdswd, path);
02179 sprintf(ext, "%s_%d_rs", arg->key, i);
02180 setkey_fileptr(&sptr->map_list, ext, (FILE *)rs);
02181
02182 sprintf(ext, "%s_%d_wd", arg->key, i);
02183 setkey_str(&xlist, ext, path);
02184 wd = path;
02185 sprintf(ext, "%s_%d_prog", arg->key, i);
02186 prog = getkey_str(xlist, ext);
02187 sprintf(ext, "%s_%d_level", arg->key, i);
02188 level = getkey_str(xlist, ext);
02189 sprintf(ext, "%s_%d_series", arg->key, i);
02190 series = getkey_str(xlist, ext);
02191 sprintf(ext, "%s_%d_series_sn", arg->key, i);
02192 seriesnum = getkey_int(xlist, ext);
02193 sprintf(jpedata, "prog:%s,level:%s,series:%s[%d]",
02194 prog, level, series, seriesnum);
02195
02196 sprintf(ext, "JPE_%d_data", jix);
02197 setkey_str(&JPElist, ext, jpedata);
02198 sprintf(ext, "JPE_%d_wd", jix);
02199 setkey_str(&JPElist, ext, path);
02200 sprintf(ext, "JPE_%d_rs", jix);
02201 setkey_fileptr(&JPElist, ext, (FILE *)rs);
02202 jix++;
02203 }
02204 else {
02205 sprintf(ext, "%s_%d_prog", arg->key, i);
02206 prog = getkey_str(xlist, ext);
02207 sprintf(ext, "%s_%d_level", arg->key, i);
02208 level = getkey_str(xlist, ext);
02209 sprintf(ext, "%s_%d_series", arg->key, i);
02210 series = getkey_str(xlist, ext);
02211 sprintf(ext, "%s_%d_series_sn", arg->key, i);
02212 seriesnum = getkey_int(xlist, ext);
02213 sprintf(jpedata, "prog:%s,level:%s,series:%s[%d]",
02214 prog, level, series, seriesnum);
02215 jcnt = getkey_int(JPElist, "JPE_out_nsets");
02216 for(k=0; k < jcnt; k++) {
02217 sprintf(ext, "JPE_%d_data", k);
02218 jdata = getkey_str(JPElist, ext);
02219 if(!strcmp(jpedata, jdata)) {
02220 sprintf(ext, "JPE_%d_wd", k);
02221 wd = getkey_str(JPElist, ext);
02222 break;
02223 }
02224 }
02225 if(k == jcnt) {
02226
02227 rs = drms_create_record(drms_env, drmsname, DRMS_PERMANENT, &dstatus);
02228 if(dstatus) {
02229 printk("**ERROR %d: Can't create record for %s\n", dstatus, drmsname);
02230 abortit(1);
02231 }
02232 drms_record_directory(rs, path, 0);
02233 sprintf(ext, "%s_%d_rs", arg->key, i);
02234 setkey_fileptr(&sptr->map_list, ext, (FILE *)rs);
02235 sprintf(ext, "%s_%d_wd", arg->key, i);
02236 setkey_str(&xlist, ext, path);
02237
02238 wd = path;
02239 sprintf(ext, "%s_%d_prog", arg->key, i);
02240 prog = getkey_str(xlist, ext);
02241 sprintf(ext, "%s_%d_level", arg->key, i);
02242 level = getkey_str(xlist, ext);
02243 sprintf(ext, "%s_%d_series", arg->key, i);
02244 series = getkey_str(xlist, ext);
02245 sprintf(ext, "%s_%d_series_sn", arg->key, i);
02246 seriesnum = getkey_int(xlist, ext);
02247 sprintf(jpedata, "prog:%s,level:%s,series:%s[%d]",
02248 prog, level, series, seriesnum);
02249
02250 sprintf(ext, "JPE_%d_data", jix);
02251 setkey_str(&JPElist, ext, jpedata);
02252 sprintf(ext, "JPE_%d_wd", jix);
02253 setkey_str(&JPElist, ext, path);
02254 jix++;
02255 }
02256 else {
02257 sprintf(ext, "JPE_%d_wd", k);
02258 wd = getkey_str(JPElist, ext);
02259 sprintf(ext, "%s_%d_wd", arg->key, i);
02260 setkey_str(&xlist, ext, wd);
02261 sprintf(ext, "JPE_%d_rs", k);
02262 rs = getkey_fileptr(JPElist, ext);
02263 sprintf(ext, "%s_%d_rs", arg->key, i);
02264 setkey_fileptr(&sptr->map_list, ext, (FILE *)rs);
02265 }
02266 }
02267 }
02268
02269 if(!wd || !strcmp(wd, "")) {
02270 pemail("***No %s for %s. Missing rule or template in map file.\n",
02271 ext, sptr->name);
02272 abortit(1);
02273 }
02274 if(*wd == '.') {
02275 pemail("***%s of \".\" not valid for %s.\n", ext, sptr->name);
02276 abortit(1);
02277 }
02278
02279 if(!wd_dup_ck(wd_dup_list, wd)) {
02280 sprintf(wdkey, "wd_%d", wdkey_int++);
02281 setkey_str(&wd_dup_list, wdkey, wd);
02282 }
02283 else {
02284 if(!nowarn) {
02285 printf("**WARNING: pe detects two datasets that have the same output wd:\n");
02286 printf(" Make sure that you have run with START_ARCHIVE/END_ARCHIVE\n");
02287 printf(" wd = %s\n", wd);
02288 }
02289 }
02290 }
02291 setkey_int(&JPElist, "JPE_out_nsets", jix);
02292 firstalloc = 0;
02293 if(dbxflg) {
02294 printf("\n***** The xlist at the end of form_arg_data_out is:\n");
02295 keyiterate(printkey, xlist);
02296 printf("\n##### The JPElist at the end of form_arg_data_out is:\n");
02297 keyiterate(printkey, JPElist);
02298 }
02299 return(xlist);
02300 }
02301
02302
02303
02304
02305
02306
02307
02308
02309
02310
02311
02312
02313 void in_ds_rcp(KEY *list)
02314 {
02315 int i, loop, innsets;
02316 char *argname, *wd, *cptr;
02317 char ext[MAX_STR], inname[MAX_STR], cmd[MAX_STR];
02318 char targetdir[MAX_STR];
02319 struct stat stbuf;
02320
02321 for(loop = 0; ; loop++) {
02322 sprintf(ext, "arg_data_in_%d", loop);
02323 if(!findkey(list, ext)) break;
02324 argname = getkey_str(list, ext);
02325 strcpy(inname, argname); strcat(inname, "_nsets");
02326 innsets = getkey_int(list, inname);
02327 for(i=0; i < innsets; i++) {
02328 sprintf(ext, "_%d_wd", i);
02329 strcpy(inname, argname); strcat(inname, ext);
02330 wd = getkey_str(list, inname);
02331 if(strcmp(wd, "")) {
02332 cptr = index(wd+1, '/');
02333 sprintf(targetdir, "%s%s", rcpdir, cptr);
02334 if(stat(targetdir, &stbuf)) {
02335 sprintf(cmd, "mkdir -p %s", targetdir);
02336 if(system(cmd)) {
02337 pemail("***Cannot %s\n", cmd);
02338 abortit(1);
02339 }
02340 }
02341 sprintf(cmd, "rcp -p %s:%s/* %s", prod_host_prime(), wd, targetdir);
02342 if(system(cmd)) {
02343 pemail("***Cannot %s\n", cmd);
02344 abortit(1);
02345 }
02346 printf("%s\n", cmd);
02347 setkey_str(&list, inname, targetdir);
02348 }
02349 }
02350 }
02351 }
02352
02353
02354
02355
02356
02357
02358
02359
02360
02361
02362
02363
02364
02365
02366
02367
02368
02369
02370
02371
02372
02373
02374
02375
02376
02377
02378
02379
02380
02381
02382
02383 void form_keylist(PSERVER *sptr, HDATA *hnext)
02384 {
02385 KEY *xlist, *inlist, *dslist;
02386 argument *arg, *argsort[MAX_ARGS];
02387 char ext[MAX_STR];
02388 char *pname;
02389 int inseq, i, ax;
02390 int inapp = 0;
02391
02392
02393 arg=sptr->arguments;
02394 i = 0;
02395 while(arg->kind != ARG_END) {
02396 if(arg->kind == ARG_DATA_IN) argsort[i++] = arg;
02397 arg++;
02398 }
02399 arg=sptr->arguments;
02400 while(arg->kind != ARG_END) {
02401 if(arg->kind != ARG_DATA_IN) argsort[i++] = arg;
02402 arg++;
02403 }
02404 argsort[i] = arg;
02405
02406
02407 inseq = 0; inlist = newkeylist(); dslist = newkeylist();
02408 ax = 0; arg = argsort[ax];
02409 while(arg->kind != ARG_END) {
02410 switch(arg->kind) {
02411 case ARG_DATA_IN:
02412 inlist = (KEY *)form_arg_data_in(inlist, sptr, arg, inseq);
02413 sprintf(ext, "%s_0_prog", arg->key);
02414
02415
02416 if(findkey(inlist, ext)) {
02417 pname = getkey_str(inlist, ext);
02418
02419 if(!strcmp(pname, "mdi_eof_log") || !strcmp(pname, "mdi_eof_rec")
02420 || !strcmp(pname, "mdi_log") || !strcmp(pname, "mdi_rec") ||
02421 !strcmp(pname, "mdi_sim_rec") || !strcmp(pname, "soi_eof_rec")) {
02422 inapp = 1;
02423 }
02424 if(!inseq++) {
02425
02426
02427
02428 }
02429 }
02430 add_keys(inlist, &hnext->param_list);
02431 break;
02432 case ARG_DATA_OUT:
02433 xlist = (KEY *)form_arg_data_out(sptr, arg);
02434 add_keys(xlist, &hnext->param_list);
02435 freekeylist(&xlist);
02436 break;
02437 default:
02438 break;
02439 }
02440 arg = argsort[++ax];
02441 }
02442 if((inseq) && (sptr->dsin) && (!inapp)) {
02443 if(dbxflg) {
02444 printf("\nThe list before the dsds_arg_data_in(inlist) call:\n");
02445 keyiterate(printkey, inlist);
02446 }
02447 dslist = (KEY *)dsds_arg_data_in(inlist);
02448 if(dbxflg) {
02449 printf("\nThe list after the dsds_arg_data_in(inlist) call:\n");
02450 keyiterate(printkey, dslist);
02451 }
02452 if(rcpflg) {
02453
02454
02455 in_ds_rcp(dslist);
02456 }
02457 add_keys(dslist, &hnext->param_list);
02458
02459
02460 freekeylist(&inlist); freekeylist(&dslist);
02461 }
02462 else if(inseq) {
02463 add_keys(inlist, &hnext->param_list);
02464 freekeylist(&inlist);
02465 }
02466
02467 ck_arglist(sptr, hnext);
02468 if(findkey(hnext->param_list, "T_FIRST")) {
02469 t_first_arg = getkey_str(hnext->param_list, "T_FIRST");
02470 }
02471 else {
02472 t_first_arg = NULL;
02473 }
02474 if(findkey(hnext->param_list, "T_LAST")) {
02475 t_last_arg = getkey_str(hnext->param_list, "T_LAST");
02476 }
02477 else {
02478 t_last_arg = NULL;
02479 }
02480 }
02481
02482
02483
02484
02485
02486
02487
02488 void send_to_serv(PSERVER *sptr, HDATA *hx, int doflg)
02489 {
02490 KEY *keybad;
02491 argument *arg;
02492 char cfsn[MAX_STR], clsn[MAX_STR];
02493
02494 hx->busy=1;
02495 sptr->busyall++;
02496 arg=sptr->arguments;
02497 while(arg->kind != ARG_END) {
02498 if(arg->kind == ARG_DATA_IN) {
02499 sprintf(cfsn, "%s_0_fsn", arg->key);
02500 if(!sptr->split || !findkey(hx->param_list, cfsn)) {
02501 printf("%s %s sent:\n %s=%s\n",hx->host_name,
02502 sptr->name,arg->key,getkey_str(hx->param_list,arg->key));
02503 break;
02504 }
02505 else {
02506 sprintf(clsn, "%s_0_lsn", arg->key);
02507 printf("%s %s sent:\n %s=%s split[%d-%d]\n",hx->host_name,
02508 sptr->name,arg->key,getkey_str(hx->param_list,arg->key),
02509 getkey_int(hx->param_list, cfsn),
02510 getkey_int(hx->param_list, clsn));
02511 break;
02512 }
02513 }
02514 arg++;
02515 }
02516 if(!sptr->cphist)
02517 setkey_int(&hx->param_list, "nocphist", 1);
02518 setkey_int(&hx->param_list, "dsds_tid", dsds_tid);
02519 setkey_uint(&hx->param_list, "dsds_uid", uid);
02520 setkey_str(&hx->param_list, "pe_mapfile", pe_map);
02521 pvm_initsend(PvmDataDefault);
02522 pvm_pkint(&doflg, 1, 1);
02523 if(keybad=(KEY *)pack_keylist(hx->param_list)) {
02524 pemail("***Err packing a pvm msg, type=%d name=%s\n",keybad->type,keybad->name);
02525 abortit(1);
02526 }
02527
02528
02529 if(pvm_send(hx->tid, sptr->msgid)) {
02530 pemail("***Error calling %s on %s\n",sptr->name,hx->host_name);
02531 abortit(1);
02532 }
02533 }
02534
02535
02536
02537
02538
02539
02540
02541
02542
02543
02544
02545 int ds_names_file(KEY *list, argument *argu)
02546 {
02547 AT *atp;
02548 argument *arg;
02549 char ext[MAX_STR], rdbline[1024], sysstr[1024];
02550 char *prog, *level, *series, *wd;
02551 double bytes;
02552 unsigned long dsindex;
02553 int nsets, levnum, prognum, seriesnum;
02554 int i = 0;
02555 char DSINFO[] ="arg\tprog\tprog#\tlevel\tlev#\tseries\tser#\tdsindex\tbytes\n----\t-----\t------\t------\t-----\t-------\t-----\t-------\t------\n";
02556 if(!(atp = at_create(DSINFO))) {
02557 printf("**Can't at_create() the table for ds.rdb file\n");
02558 return(1);
02559 }
02560 arg = argu;
02561 while(arg->kind != ARG_END) {
02562 if(arg->kind == ARG_DATA_IN || arg->kind == ARG_DATA_OUT) {
02563 sprintf(ext, "%s_nsets", arg->key);
02564 nsets = getkey_int(list, ext);
02565 for(i=0; i < nsets; i++) {
02566 sprintf(ext, "%s_%d_prog", arg->key, i);
02567 if(findkey(list, ext)) prog = getkey_str(list, ext);
02568 else prog = "(null)";
02569 sprintf(ext, "%s_%d_prog_sn", arg->key, i);
02570 if(findkey(list, ext))
02571 prognum = getkey_int(list, ext);
02572 else
02573 prognum = -1;
02574 sprintf(ext, "%s_%d_level", arg->key, i);
02575 if(findkey(list, ext)) level = getkey_str(list, ext);
02576 else level = "(null)";
02577 sprintf(ext, "%s_%d_level_sn", arg->key, i);
02578 if(findkey(list, ext))
02579 levnum = getkey_int(list, ext);
02580 else
02581 levnum = -1;
02582 sprintf(ext, "%s_%d_series", arg->key, i);
02583 if(findkey(list, ext)) series = getkey_str(list, ext);
02584 else series = "(null)";
02585 sprintf(ext, "%s_%d_series_sn", arg->key, i);
02586 if(findkey(list, ext))
02587 seriesnum = getkey_int(list, ext);
02588 else
02589 seriesnum = -1;
02590 sprintf(ext, "%s_%d_ds_index", arg->key, i);
02591 if(findkey(list, ext))
02592 dsindex = getkey_ulong(list, ext);
02593 else
02594 dsindex = (unsigned long)-1;
02595 sprintf(ext, "%s_%d_bytes", arg->key, i);
02596 if(findkey(list, ext))
02597 bytes = getkey_double(list, ext);
02598 else
02599 bytes = 0;
02600 sprintf(ext, "%s_%d", arg->key, i);
02601 sprintf(rdbline, "%s\t%s\t%d\t%s\t%d\t%s\t%d\t%d\t%g",
02602 ext,prog,prognum,level,levnum,series,seriesnum,dsindex,bytes);
02603 if(at_put_info(atp, rdbline) != AT_OK) {
02604 printf("**Error on at_put_info() to ds.rdb file\n");
02605 return(1);
02606 }
02607 }
02608 }
02609 arg++;
02610 }
02611
02612 arg = argu;
02613 while(arg->kind != ARG_END) {
02614 if(arg->kind == ARG_DATA_OUT) {
02615 sprintf(ext, "%s_nsets", arg->key);
02616 nsets = getkey_int(list, ext);
02617 for(i=0; i < nsets; i++) {
02618 sprintf(ext, "%s_%d_wd", arg->key, i);
02619 wd = getkey_str(list, ext);
02620 if(strcmp(wd, "")) {
02621 sprintf(ext, "%s/ds.rdb", wd);
02622 if(at_write(atp, ext) != AT_OK) {
02623 printf("**Error on at_write() to %s\n", ext);
02624 return(1);
02625 }
02626
02627
02628
02629 sprintf(sysstr, "chmod -R go-ws %s; chown -Rhf production %s",
02630 wd, wd);
02631 if(system(sysstr)) {
02632 printf("**Warning: Error on: %s\n", sysstr);
02633
02634 }
02635 }
02636 }
02637 }
02638 arg++;
02639 }
02640 at_free(atp);
02641 return(0);
02642 }
02643
02644
02645
02646
02647
02648
02649
02650
02651 int get_overview(char *wd, KEY **list)
02652 {
02653 SDS *sds;
02654 DIR *dfd;
02655 struct dirent *dp;
02656 char *value;
02657 char name[512];
02658 int found = 0;
02659
02660 if((dfd=opendir(wd)) == NULL) {
02661 printf("**Can't opendir(%s) to find overview.fits\n", wd);
02662 return(0);
02663 }
02664 while((dp=readdir(dfd)) != NULL) {
02665 if(!strstr(dp->d_name, "overview.fits"))
02666 continue;
02667 found = 1;
02668 sprintf(name, "%s/%s", wd, dp->d_name);
02669 closedir(dfd);
02670
02671 if ((sds = sds_get_fits_head (name)) == NULL) {
02672 printf("**Error on get_fits_head(%s)\n", name);
02673 return(0);
02674 }
02675 if((value=sds_search_attrvalue_str(sds, "T_FIRST")) == NULL) {
02676
02677 return(0);
02678 }
02679 if(!strcmp(value, ""))
02680 setkey_str(list, "t_first", "-1");
02681 else
02682 setkey_str(list, "t_first", value);
02683 if((value=sds_search_attrvalue_str(sds, "T_LAST")) == NULL) {
02684 printf("**No t_last in %s\n", name);
02685 return(0);
02686 }
02687 if(!strcmp(value, ""))
02688 setkey_str(list, "t_last", "-1");
02689 else
02690 setkey_str(list, "t_last", value);
02691 break;
02692 }
02693 if(found)
02694 return(1);
02695 else {
02696 closedir(dfd);
02697 if(t_first_arg) {
02698 setkey_str(list, "t_first", t_first_arg);
02699 if(t_last_arg) {
02700 setkey_str(list, "t_last", t_last_arg);
02701 }
02702 else {
02703 setkey_str(list, "t_last", "-1");
02704 }
02705 return(1);
02706 }
02707 if(t_last_arg) {
02708 setkey_str(list, "t_first", "-1");
02709 setkey_str(list, "t_last", t_last_arg);
02710 return(1);
02711 }
02712 return(0);
02713 }
02714 }
02715
02716
02717
02718
02719
02720
02721
02722
02723
02724
02725
02726
02727 KEY *set_key_archive(char *basename, PSERVER *stab, HDATA *hdata, KEY *rlist,
02728 int status)
02729 {
02730 KEY *alist;
02731 char *wd, *warnmsg;
02732 char ext[MAX_STR];
02733 double dsize;
02734
02735
02736 alist=newkeylist();
02737 sprintf(ext, "%s_wd", basename);
02738 wd = getkey_str(hdata->param_list, ext);
02739
02740 get_overview(wd, &alist);
02741 setkey_int(&alist, "status", status);
02742 if((warnmsg=getkey_str(rlist, "WARNING")))
02743 setkey_str(&alist, "warning", warnmsg);
02744
02745
02746
02747
02748
02749
02750
02751
02752
02753
02754
02755
02756
02757
02758
02759
02760
02761
02762 setkey_str(&alist, "wd", wd);
02763 sprintf(ext, "%s_prog", basename);
02764 setkey_str(&alist, "prog", getkey_str(hdata->param_list, ext));
02765 sprintf(ext, "%s_series", basename);
02766 setkey_str(&alist,"series", getkey_str(hdata->param_list, ext));
02767 sprintf(ext, "%s_prog_sn", basename);
02768 if(!findkey(hdata->param_list, ext))
02769 setkey_int(&alist, "prog_sn", -1);
02770 else
02771 setkey_int(&alist,"prog_sn", getkey_int(hdata->param_list, ext));
02772 sprintf(ext, "%s_series_sn", basename);
02773 if(!findkey(hdata->param_list, ext))
02774 setkey_int(&alist, "series_sn", -1);
02775 else
02776 setkey_int(&alist,"series_sn", getkey_int(hdata->param_list, ext));
02777 sprintf(ext, "%s_level", basename);
02778 setkey_str(&alist,"level", getkey_str(hdata->param_list, ext));
02779 sprintf(ext, "%s_level_sn", basename);
02780 if(findkey(hdata->param_list, ext))
02781 setkey_int(&alist,"level_sn", getkey_int(hdata->param_list, ext));
02782 dsize = du_dir(wd);
02783
02784
02785 setkey_int(&alist, "lago_tid", lago_tid);
02786 setkey_double(&alist, "dsds_bytes", dsize);
02787 setkey_uint(&alist, "dsds_uid", uid);
02788 setkey_str(&alist, "svc_name", stab->name);
02789 setkey_str(&alist, "svc_version", stab->version);
02790 setkey_str(&alist, "username", username);
02791 return(alist);
02792 }
02793
02794
02795
02796
02797
02798
02799
02800
02801
02802
02803
02804 void call_archive(PSERVER *stab, HDATA *hdata, KEY *rlist, int status)
02805 {
02806 KEY *alist;
02807 DRMS_Record_t *rsx;
02808 argument *arg;
02809 char *wd;
02810 char oname[MAX_STR];
02811 int outnsets, i, snum;
02812 unsigned long sunum;
02813 double dsize;
02814
02815 archactive = 1;
02816 arg = stab->arguments;
02817 while(arg->kind != ARG_END) {
02818 if(arg->kind == ARG_DATA_OUT) {
02819 sprintf(oname, "%s_nsets", arg->key);
02820 outnsets = getkey_int(hdata->param_list, oname);
02821 for(i=0; i < outnsets; i++) {
02822 sprintf(oname, "%s_%d", arg->key, i);
02823
02824 alist = set_key_archive(oname, stab, hdata, rlist, status);
02825 wd = getkey_str(alist, "wd");
02826 dsize = getkey_double(alist, "dsds_bytes");
02827
02828
02829
02830 sprintf(oname, "%s_%d_rs", arg->key, i);
02831 rsx = (DRMS_Record_t *)getkey_fileptr(stab->map_list, oname);
02832
02833 snum = getkey_int(alist, "series_sn");
02834
02835 status = drms_setkey_int(rsx, "snum", snum);
02836 if(status) {
02837 printk("ERROR: can't drms_setkey_int() for snum=%u\n", snum);
02838 abortit(1);
02839 }
02840
02841
02842
02843 sunum = rsx->sunum;
02844 rsx->seriesinfo->retention = stab->archive_day;
02845 if(!stab->archive || stab->archive == 't') {
02846
02847 rsx->seriesinfo->archive = -1;
02848 }
02849 else {
02850 rsx->seriesinfo->archive = 1;
02851 stab->archive_complete = 1;
02852
02853 if(stab->archive == 'p') rsx->seriesinfo->retention = 18250;
02854 }
02855
02856
02857 if((status = drms_close_record(rsx, DRMS_INSERT_RECORD))) {
02858 printk("**ERROR: drms_close_record failed status=%d\n", status);
02859 abortit(1);
02860 }
02861 sprintf(oname, "%s_%d_bytes", arg->key, i);
02862 setkey_double(&hdata->param_list, oname, dsize);
02863 sprintf(oname, "%s_%d_ds_index", arg->key, i);
02864 setkey_ulong(&hdata->param_list, oname, sunum);
02865
02866
02867
02868
02869
02870
02871 freekeylist(&alist);
02872 printf("Archive pending: wd=%s bytes=%g\n", wd, dsize);
02873 }
02874 }
02875 arg++;
02876 }
02877 stab->archive_complete = 1;
02878 ds_names_file(hdata->param_list, stab->arguments);
02879 if(archactive == -1) {
02880 abortit(1);
02881 }
02882 archactive = 0;
02883 }
02884
02885
02886
02887
02888
02889
02890
02891
02892
02893
02894
02895
02896
02897
02898
02899
02900 void call_archive_0(PSERVER *stab, HDATA *hdata, KEY *rlist, int status)
02901 {
02902 KEY *alist, *blist;
02903 argument *arg;
02904 char *wd;
02905 char oname[MAX_STR], ext[MAX_STR];
02906 int outnsets, found, i;
02907 double dsize;
02908
02909 arg = stab->arguments; found = 0;
02910 while(arg->kind != ARG_END) {
02911 if(arg->kind == ARG_DATA_OUT) {
02912 sprintf(oname, "%s_nsets", arg->key);
02913 outnsets = getkey_int(hdata->param_list, oname);
02914 for(i=0; i < outnsets; i++) {
02915 sprintf(oname, "%s_%d", arg->key, i);
02916 sprintf(ext, "%s_level_sn", oname);
02917 if(findkey(hdata->param_list, ext)) {
02918 if(getkey_int(hdata->param_list,ext) != 0)
02919 continue;
02920 }
02921 else continue;
02922 found = 1;
02923 stab->archive_day = 2;
02924
02925 alist = set_key_archive(oname, stab, hdata, rlist, status);
02926 wd = getkey_str(alist, "wd");
02927 dsize = getkey_double(alist, "dsds_bytes");
02928 if((blist = (KEY *)call_dsds(&alist, REQUPDATE0, dsds_tid, pe_tid, printf, debugflg)) == NULL) {
02929 pemail("***Error making a dsds_main entry for wd=%s\n", wd);
02930 abortit(1);
02931 }
02932 printf("level#0 entry: wd=%s bytes=%g\n", wd, dsize);
02933
02934
02935 if(findkey(blist, "ds_index")) {
02936 sprintf(ext, "%s_ds_index", oname);
02937 setkey_ulong(&hdata->param_list,ext,getkey_ulong(blist, "ds_index"));
02938 }
02939 sprintf(ext, "%s_bytes", oname);
02940 setkey_double(&hdata->param_list, ext, dsize);
02941
02942
02943
02944
02945
02946
02947 freekeylist(&alist); freekeylist(&blist);
02948 }
02949 }
02950 arg++;
02951 }
02952 if(found)
02953 ds_names_file(hdata->param_list, stab->arguments);
02954 }
02955
02956
02957
02958
02959
02960
02961
02962 void kick_server(PSERVER *sptr)
02963 {
02964 HDATA *hnext;
02965 int i;
02966
02967 if(!sptr->archive_group) {
02968 firstalloc = 1;
02969 JPE_out_nsets = 0;
02970 if(JPElist) freekeylist(&JPElist);
02971 JPElist = newkeylist();
02972 }
02973 hnext = (HDATA *)gethnext(sptr->hosts);
02974 for(i=0; i<effective_hosts; i++) {
02975
02976
02977
02978
02979 pvm_spawn(sptr->name, (char **)0, PvmTaskHost, hnext->host_name, 1,
02980 &hnext->tid);
02981 if(hnext->tid < 0) {
02982 pemail("***Can't spawn %s on %s\n",sptr->name,hnext->host_name);
02983 abortit(1);
02984 }
02985 printk("%s tid=%x spawned on %s\n",
02986 sptr->name, hnext->tid, hnext->host_name);
02987 msgid_send(sptr, hnext);
02988 arg_recv(sptr, hnext);
02989
02990
02991 form_keylist(sptr, hnext);
02992 send_to_serv(sptr,hnext,0);
02993 hnext = (HDATA *)gethnext((HDATA *)MONE);
02994
02995 }
02996 }
02997
02998
02999
03000
03001 int resp_any()
03002 {
03003 struct timeval tvalr;
03004 uint64_t tsr;
03005 int retcode, bufid, bytes, msgtag, tid, i;
03006
03007
03008 gettimeofday(&tvalr, NULL);
03009 tsr = tvalr.tv_sec;
03010 while(1) {
03011 gettimeofday(&tvalr, NULL);
03012 if(tvalr.tv_sec - tsr > RESPWAIT) {
03013 retcode = -1;
03014 break;;
03015 }
03016 if(bufid=pvm_nrecv(-1, -1)) {
03017 if(pvm_bufinfo(bufid, &bytes, &msgtag, &tid)) {
03018 pemail("***Can't get bufinfo on bufid=%d\n", bufid);
03019 abortit(1);
03020 }
03021 for(i=0; i<MAX_SERV; i++) {
03022 if(stab[i].name == NULL) {
03023 pemail("***Got a bad msg type response %d from tid=%x\n", msgtag, tid)
03024 ;
03025 abortit(1);
03026 }
03027 if(stab[i].msgid == msgtag)
03028 break;
03029 }
03030 retcode=i;
03031 break;
03032 }
03033 #ifdef __sgi
03034 sginap(1);
03035 #else
03036 sleep(1);
03037 #endif
03038 }
03039 return(retcode);
03040 }
03041
03042
03043
03044
03045
03046
03047
03048 void process_resp(int ix)
03049 {
03050 PSERVER *stabp, *stabn;
03051 HDATA *hnext;
03052 KEY *list;
03053 argument *arg;
03054 char remhost[MAX_STR], sysstr[MAX_STR], ext[MAX_STR];
03055 char *errtxt, *remoutfile, *outwd, *cptr, *warnmsg, *auxinfo;
03056 int j, remerrno, remtid, wait, groupid, sindex, outs, abortnum;
03057
03058 stabp = &stab[ix];
03059 pvm_upkint(&remtid, 1,1 );
03060 pvm_upkint(&remerrno, 1,1 );
03061 pvm_upkstr(remhost);
03062 list=newkeylist();
03063 if(!(list=(KEY *)unpack_keylist(list))) {
03064 printk("**Warning: no keylist returned from %s tid=%x\n",stabp->name,remtid);
03065 }
03066
03067 if(!(remoutfile=getkey_str(list, "out")))
03068 remoutfile = "<NONE>";
03069 if(remerrno) {
03070 switch(remerrno) {
03071 case MISSING_FILES:
03072 errtxt = "MISSING_FILES";
03073 break;
03074 default:
03075 errtxt = " ";
03076 break;
03077 }
03078 printk(" %s %x status %d on %s:%s\n", stabp->name, remtid, remerrno, remhost, errtxt);
03079 }
03080
03081
03082
03083
03084
03085 hnext = (HDATA *)gethnext(stabp->hosts);
03086 while(hnext) {
03087 if(hnext->tid == remtid) {
03088 hnext->busy = 0;
03089 stabp->busyall--;
03090
03091 arg = stabp->arguments;
03092 if(stabp->cphist) {
03093 while(arg->kind != ARG_END) {
03094 if(arg->kind == ARG_DATA_OUT) {
03095 sprintf(ext, "%s_nsets", arg->key);
03096 if(findkey(hnext->param_list, ext)) {
03097 outs = getkey_int(hnext->param_list, ext);
03098 for(j=0; j < outs; j++) {
03099 sprintf(sysstr, "%s_%d_wd", arg->key, j);
03100 if(outwd = getkey_str(hnext->param_list, sysstr)) {
03101 if(cptr = rindex(pe_map, '/'))
03102 cptr=cptr+1;
03103 else
03104 cptr=pe_map;
03105 sprintf(sysstr, "cp %s %s;chmod u+w,g+w %s/%s",
03106 pe_map, outwd, outwd, cptr);
03107 if(system(sysstr)) {
03108 pemail("***Error on: %s\n", sysstr);
03109 abortit(1);
03110 }
03111 }
03112 }
03113 }
03114 }
03115 arg++;
03116 }
03117 }
03118 if(!(auxinfo=getkey_str(list, "AUXINFO")))
03119 auxinfo = "<NONE>";
03120 if(findkey(list, "abortflg")) {
03121 if((abortnum=getkey_int(list, "abortflg"))) {
03122 if(groupid = stabp->groupid) {
03123 pemail(" ***Module %s abort. See %s\n",
03124 stabp->name, getkey_str(list, "log_file"));
03125
03126
03127
03128 pemail(" Not aborting as module is part of a group\n");
03129 }
03130 else {
03131 pemail(" ***Module %s abort. See %s\n",
03132 stabp->name, getkey_str(list, "log_file"));
03133 if (stabp->noabort) {
03134
03135
03136 if(abortnum == 1) {
03137 if(stabp->archive != 'a') {
03138 stabp->archive = 0;
03139 }
03140 }
03141 }
03142 else {
03143 if((warnmsg=getkey_str(list, "WARNING"))) {
03144 pemail(" with warning msg:\n %s\n", warnmsg);
03145 }
03146 pelog("%x\t%d\tabort\t0\t%s\t%s\t%s\n",
03147 remtid,remerrno,stabp->name,remoutfile,auxinfo);
03148
03149 if(!stabp->archive && reqmegs)
03150 call_archive_0(stabp, hnext, list, remerrno);
03151
03152 if(abortnum == 2) {
03153 if(stabp->archive && (stabp->archive_group != 1)) {
03154 if(stabp->archive != 'a') {
03155 if(reqmegs) {
03156 call_archive(stabp, hnext, list, remerrno);
03157 }
03158 }
03159 else {
03160 stabp->archive_complete = 1;
03161 }
03162 }
03163 }
03164
03165
03166
03167 if(abortnum == 1) {
03168 stabp->archive_group = 0;
03169 }
03170 abortit(-1);
03171 }
03172 }
03173 }
03174 }
03175 printk(" %s %s %x complete:\n %s\n",
03176 remhost,stabp->name,remtid,remoutfile);
03177 if((warnmsg=getkey_str(list, "WARNING"))) {
03178 printk(" **with warning msg:\n %s\n", warnmsg);
03179 }
03180
03181 pvm_kill(hnext->tid);
03182
03183 hnext->tid = 0;
03184
03185 if(!stabp->busyall) {
03186
03187
03188
03189
03190 if(stabp->archive && (stabp->archive_group != 1)) {
03191 if(stabp->archive != 'a') {
03192 if(reqmegs) {
03193 call_archive(stabp, hnext, list, remerrno);
03194 }
03195 else {
03196 printk("**Ignoring archive of data for %s:\n",stabp->name);
03197 printk(" No DSDS storage alloc request in map file.\n");
03198 }
03199 }
03200 else {
03201 stabp->archive_complete = 1;
03202 }
03203 }
03204
03205
03206 if(!stabp->archive && reqmegs)
03207 call_archive_0(stabp, hnext, list, remerrno);
03208 }
03209 if(abortnum) {
03210 pelog("%x\t%d\tabort\t%d\t%s\t%s\t%s\n",
03211 remtid,remerrno,stabp->archive_complete,stabp->name,remoutfile,auxinfo);
03212 }
03213 else {
03214 pelog("%x\t%d\tnormal\t%d\t%s\t%s\t%s\n",
03215 remtid,remerrno,stabp->archive_complete,stabp->name,remoutfile,auxinfo);
03216 }
03217 break;
03218 }
03219 hnext = (HDATA *)gethnext((HDATA *)MONE);
03220 }
03221 if(!hnext) {
03222 pemail("***No such host %s for server %s\n",remhost,stabp->name);
03223 abortit(1);
03224 }
03225
03226 wait = 0;
03227 if(groupid = stabp->groupid) {
03228 for(j=0; j<MAX_SERV; j++) {
03229 if(stab[j].groupid == groupid) {
03230 if(stab[j].busyall) {
03231 wait = 1;
03232 break;
03233 }
03234 }
03235 }
03236 }
03237 else {
03238 if(stabp->busyall) wait = 1;
03239 }
03240
03241 while(wait) {
03242 if((sindex=resp_any())== -1)
03243 who_died();
03244 else {
03245 process_resp(sindex);
03246 freekeylist(&list);
03247 return;
03248 }
03249 }
03250 if(groupid)
03251 printk("End Server Group #%d\n", groupid);
03252
03253
03254 stabn = &stab[ix+1];
03255 while(1) {
03256 if(stabn->name != NULL && groupid != 0 && stabn->groupid == groupid) stabn++;
03257 else break;
03258 }
03259 if(stabn->name != NULL) {
03260 if(groupid = stabn->groupid) {
03261 printk("Start Server Group #%d\n", groupid);
03262 while(stabn->groupid == groupid) {
03263 kick_server(stabn);
03264 stabn++;
03265 }
03266 }
03267 else
03268 kick_server(stabn);
03269 }
03270 freekeylist(&list);
03271 }
03272
03274
03275
03276
03277
03278
03279
03280
03281 void dereg()
03282 {
03283 PSERVER *sptr;
03284 HDATA *hdata;
03285 DRMS_Record_t *rsx;
03286 argument *arg;
03287 char *wd, *cptr;
03288 char oname[MAX_STR];
03289 double dsize;
03290 double mapstore = 0.0;
03291 int i, j, outnsets;
03292
03293 if(reqmegs) {
03294
03295 for(i=0; i < MAX_SERV; i++) {
03296 sptr = &stab[i];
03297 if(sptr->name == NULL)
03298 break;
03299
03300 arg = sptr->arguments;
03301 hdata = (HDATA *)gethnext(sptr->hosts);
03302 while(arg->kind != ARG_END) {
03303 if(arg->kind == ARG_DATA_OUT) {
03304 sprintf(oname, "%s_nsets", arg->key);
03305 outnsets = getkey_int(hdata->param_list, oname);
03306 for(j=0; j < outnsets; j++) {
03307 sprintf(oname, "%s_%d_wd", arg->key, j);
03308 if(wd=getkey_str(hdata->param_list, oname)) {
03309
03310 cptr = strstr(wd, "/SUM");
03311 if(cptr == wd) {
03312
03313
03314 if((sptr->archive_group != 1) && (sptr->archive != 'a')) {
03315 dsize = du_dir(wd);
03316 mapstore += dsize;
03317 }
03318 }
03319 }
03320
03321 sprintf(oname, "%s_%d_level_sn", arg->key, j);
03322 if(findkey(hdata->param_list, oname)) {
03323 if(getkey_int(hdata->param_list, oname) == 0)
03324 continue;
03325 }
03326 if(sptr->archive != 'a') {
03327 if(!sptr->archive_complete && (sptr->archive_group != 1)) {
03328
03329 sprintf(oname, "%s_%d_rs", arg->key, j);
03330 rsx = (DRMS_Record_t *)getkey_fileptr(sptr->map_list, oname);
03331 rsx->seriesinfo->archive = 0;
03332 if(wd) {
03333 if(strstr(wd, "/SUM")) {
03334 printk("Removing non-archived wd=%s bytes=%g\n", wd, dsize);
03335 if(rmdirs(wd, dsdswd)) {
03336 printk("**Can't rm wd=%s\n", wd);
03337 printk("(NOTE: May be due to an NFS delay. Usually ignore.)\n");
03338 }
03339 }
03340 }
03341 }
03342 }
03343 }
03344 }
03345 arg++;
03346 }
03347 }
03348 printk("High water storage bytes=%g\n", mapstore);
03349
03350
03351
03352
03353
03354
03356
03357
03358
03359
03360
03361
03362
03363
03364
03365
03366
03367
03368 }
03369
03370
03371
03372
03373
03374
03375
03376
03377
03378
03379
03380 }
03381
03382
03383
03384
03385
03386
03387
03388 void do_pipe()
03389 {
03390 PSERVER *sptr;
03391 int i, groupid;
03392 int sindex;
03393
03394
03395
03396 sptr = &stab[0];
03397 if(groupid = sptr->groupid) {
03398 printk("Start Server Group #%d\n", groupid);
03399 while(sptr->groupid == groupid) {
03400 kick_server(sptr);
03401 sptr++;
03402 }
03403 }
03404 else
03405 kick_server(sptr);
03406
03407 for(i=0; i<MAX_SERV; i++) {
03408 if(stab[i].name == NULL)
03409 break;
03410 while(stab[i].busyall) {
03411 if((sindex=resp_any())== -1)
03412 who_died();
03413 else
03414 process_resp(sindex);
03415 }
03416 }
03417 }
03418
03419
03420
03421 void setup(int argc, char *argv[])
03422 {
03423 struct timeval tvalr;
03424 struct tm *t_ptr;
03425 char logname[128], string[128], cwdbuf[128], idstr[256];
03426 int tvalr_int, i;
03427
03428 if((pe_tid=start_pvm(printf)) == 0) {
03429 fprintf(stderr, "Can't start a pvm daemon!!\n");
03430 exit(1);
03431 }
03432 if(!prod_host_set()) {
03433 fprintf(stderr, "Error accessing file %s\n", PHNAME);
03434 exit (1);
03435 }
03436 if (signal(SIGINT, SIG_IGN) != SIG_IGN)
03437 signal(SIGINT, sighandler);
03438 if (signal(SIGTERM, SIG_IGN) != SIG_IGN)
03439 signal(SIGTERM, sighandler);
03440
03441 gettimeofday(&tvalr, NULL);
03442 #ifndef __sparc
03443 tvalr_int = (int)tvalr.tv_sec;
03444 t_ptr = localtime(&tvalr_int);
03445 #else
03446 t_ptr = localtime(&tvalr.tv_sec);
03447 #endif
03448 sprintf(datestr, "%s", asctime(t_ptr));
03449 datestr[24] = NULL;
03450 wd_dup_list = newkeylist();
03451 wd_mk_list = newkeylist();
03452 JPElist = newkeylist();
03453 tae_tid=pvm_parent();
03454 uid = (uint)getpid();
03455 pvm_serror(1);
03456 if(!(username = (char *)getenv("USER"))) username = "nouser";
03457 sprintf(logname, PELOGFILE, username, getpid());
03458 open_pelog(logname);
03459 printk_set(printf, printf);
03460 getcwd(cwdbuf, 126);
03461 sprintf(idstr, "Cwd: %s\nCall: ", cwdbuf);
03462 for(i=0; i < argc; i++) {
03463 sprintf(string, "%s%s", argv[i], (i < argc-1) ? " " : "");
03464 strcat(idstr, string);
03465 }
03466 strcat(idstr, "\n");
03467 sprintf(string, "jpe started as tid=%x pid=%d user=%s\n",
03468 pe_tid, getpid(), username);
03469 strcat(idstr, string);
03470 sprintf(mailname, PEMAILFILE, username, getpid());
03471 open_pemail(mailname, idstr);
03472 printk("%s", idstr);
03473 pelog("#%s for %s on %s\n", logname, username, datestr);
03474 pelog("#listed in order of completion\n");
03475 pelog("#tid\tstatus\tcomplet\tarchive\tmodule\t\t\"out\"returned\tauxinfo\n");
03476 pelog("TID\tSTATUS\tCOMPLET\tARCHIVE\tMODULE\tOUT\tAUXINFO\n");
03477 pelog("---\t------\t-------\t-------\t------\t---\t-------\n");
03478
03479 umask(002);
03480 }
03481
03482
03483 int DoIt()
03484 {
03485 if(setjmp(env) != 0) {
03486 dereg();
03487 kill_pvm();
03488 pvm_exit();
03489 if (pemailfp) fclose(pemailfp);
03490 mailit();
03491 if (pelogfp) fclose(pelogfp);
03492 printk("PE Abnormal Completion\n");
03493 return(1);
03494 }
03495 cmdparams_get_argv(&cmdparams, &argv, &argc);
03496 setup(argc, argv);
03497 get_cmd(argc, argv);
03498 spawn_pvm();
03499 do_pipe();
03500 dereg();
03501 kill_pvm();
03502 pvm_exit();
03503 if (pemailfp) fclose(pemailfp);
03504 mailit();
03505 if (pelogfp) fclose(pelogfp);
03506 printk("PE Normal Completion\n");
03507 return(0);
03508 }