00001
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419 #include <jsoc_main.h>
00420 #include <sum_rpc.h>
00421 #include <cmdparams.h>
00422 #include <drms.h>
00423 #include <drms_names.h>
00424 #include <pvm3.h>
00425 #include <soi_key.h>
00426 #include <stdio.h>
00427 #include <stdlib.h>
00428 #include <ctype.h>
00429 #include <strings.h>
00430 #include <sys/types.h>
00431 #include <sys/time.h>
00432 #include <sys/stat.h>
00433 #include <dirent.h>
00434 #include <unistd.h>
00435 #include <printk.h>
00436 #include <setjmp.h>
00437
00438 #include "pe.h"
00439
00440 #define MONE (long)-1
00441 #define PEMAILFILE "/tmp/pe.%s.%d.mail"
00442 #define PKTSZ 1788 //size of VCDU pkt
00443 #define MAXFILES 8192 //max # of file can handle in tlmdir
00444 #define NUMTIMERS 8 //number of seperate timers avail
00445
00446 #define IMAGE_NUM_COMMIT 2
00447 #define TESTAPPID 0x199 //appid of test pattern packet
00448 #define TESTVALUE 0xc0b //first value in test pattern packet
00449 #define MAXERRMSGCNT 10 //max # of err msg before skip the tlm file
00450 #define NOTSPECIFIED "***NOTSPECIFIED***"
00451 #define mailable_users_num 6
00452
00453 #define out_namespace "sha.dsds"
00454
00455 extern int start_pvm(int (*msg)(const char *, ...));
00456 extern int prod_host_set(void);
00457
00458 static char *mailable_users[] = {"jprod","daemon","tprod","production",
00459 "jeneen", "thailand"};
00460 #define mailees_num 1
00461 static char *mailees[] = {"prod3@sun"};
00462
00463
00464
00465
00466 ModuleArgs_t module_args[] = {
00467 {ARG_STRING, "map", NOTSPECIFIED, "map file with the pe directives"},
00468 {ARG_INT, "touch","-1","#of days to retain input datasets before deletable"},
00469 {ARG_FLAG, "A", "0", "retrieve from tape flag"},
00470 {ARG_FLAG, "v", "0", "verbose flag"},
00471 {ARG_FLAG, "h", "0", "help flag"},
00472 {ARG_END}
00473 };
00474
00475 CmdParams_t cmdparams;
00476 char **argv = NULL;
00477 int argc = 0;
00478 char *module_name = "jpe";
00479 jmp_buf env;
00480
00481 int resp_dsds(int dsdstid);
00482 int pemail(char *fmt, ...);
00483 double du_dir();
00484
00485 FILE *pemailfp;
00486 FILE *pelogfp;
00487 PSERVER stab[MAX_SERV+1];
00488 KEY *wd_dup_list;
00489 KEY *wd_mk_list;
00490 KEY *JPElist;
00491
00492 static char datestr[32];
00493 static struct timeval first[NUMTIMERS], second[NUMTIMERS];
00494 static DRMS_Record_t *rs;
00495 extern DRMS_Env_t *drms_env;
00496 extern KEY *call_drms_in(KEY *list, int dbflg);
00497
00498 unsigned int uid;
00499 unsigned int vcdu_24_cnt, vcdu_24_cnt_next;
00500 int wdkey_int;
00501 int verbose;
00502 int whk_status;
00503 int ampexflg;
00504 int nocontrolc;
00505 int archactive;
00506 int ccactive;
00507 int mailable;
00508 int mailtrue;
00509 int touchflg;
00510 double reqbytes;
00511 int rcpflg;
00512 int dbxflg = 0;
00513 int debugflg;
00514 int reqmegs;
00515 int nowarn;
00516 int dsdsin;
00517 int msg_id;
00518 int num_hosts;
00519 int effective_hosts;
00520 int total_tlm_vcdu;
00521 int total_missing_vcdu;
00522 int dsds_tid;
00523
00524 int pe_tid;
00525 int ampex_tid;
00526 int lago_tid;
00527 int tae_tid;
00528 int errmsgcnt, fileimgcnt;
00529 int imagecnt = 0;
00530 int restartflg = 0;
00531 int sigalrmflg = 0;
00532 int ignoresigalrmflg = 0;
00533 int firstfound = 0;
00534 int firstalloc = 1;
00535 int JPE_out_nsets;
00536 int ALRMSEC = 60;
00537 char argvc[32], argindir[96], arglogfile[96], argoutdir[96];
00538 char timetag[32];
00539 char pchan[8];
00540 char rchan[8];
00541 char stopfile[80];
00542 char tlmseriesname[128];
00543 char lev0seriesname[128];
00544 char tlmnamekey[128];
00545 char tlmnamekeyfirst[128];
00546 char oldtlmdsnam[128];
00547 char mailname[256];
00548 char pdshost[MAX_STR];
00549 char pe_map[256];
00550 char dsdswd[MAX_STR];
00551 char *username;
00552 char *rcpdir;
00553 char *t_first_arg;
00554 char *t_last_arg;
00555
00556 void BeginTimer(int n)
00557 {
00558 gettimeofday (&first[n], NULL);
00559 }
00560
00561 float EndTimer(int n)
00562 {
00563 gettimeofday (&second[n], NULL);
00564 if (first[n].tv_usec > second[n].tv_usec) {
00565 second[n].tv_usec += 1000000;
00566 second[n].tv_sec--;
00567 }
00568 return (float) (second[n].tv_sec-first[n].tv_sec) +
00569 (float) (second[n].tv_usec-first[n].tv_usec)/1000000.0;
00570 }
00571
00572
00573
00574 int pelog(char *fmt, ...)
00575 {
00576 va_list args;
00577 char string[32768];
00578
00579 va_start(args, fmt);
00580 vsprintf(string, fmt, args);
00581 if(pelogfp) {
00582 fprintf(pelogfp, string);
00583 fflush(pelogfp);
00584 }
00585 va_end(args);
00586 return(0);
00587 }
00588
00589
00590
00591 void open_pelog(char *filename)
00592 {
00593 if((pelogfp=fopen(filename, "w")) == NULL)
00594 fprintf(stderr, "Can't open the log file %s\n", filename);
00595 }
00596
00597 void kill_pvm()
00598 {
00599 int i;
00600 HDATA *hnext;
00601
00602 for(i=0; i<MAX_SERV; i++) {
00603 if(stab[i].name==NULL)
00604 break;
00605 for(hnext=(HDATA *)gethnext(stab[i].hosts); hnext != NULL;
00606 hnext=(HDATA *)gethnext((HDATA *)MONE)) {
00607 if(hnext->tid > 0) {
00608 pvm_kill(hnext->tid);
00609
00610 pelog("%x\tn/a\tkilled\t0\t%s\t<NONE>\t<NONE>\n", hnext->tid,stab[i].name);
00611
00612
00613
00614
00615
00616
00617
00618 }
00619 }
00620 }
00621 }
00622
00623
00624 void abortit(int stat)
00625 {
00626 printk("***Abort in progress ...\n");
00627 printk("**Exit jpe w/ status = %d\n", stat);
00628
00629
00630 longjmp(env, 2);
00631 }
00632
00633 void sighandler(sig)
00634 int sig;
00635 {
00636 KEY *list = newkeylist();
00637 int respcnt;
00638
00639 if (signal(SIGINT, SIG_IGN) != SIG_IGN)
00640 signal(SIGINT, sighandler);
00641 if (signal(SIGTERM, SIG_IGN) != SIG_IGN)
00642 signal(SIGTERM, sighandler);
00643 if(archactive) {
00644 printf("Ctrl-C during an archive update. Will abort when done...\n");
00645 archactive = -1;
00646 return;
00647 }
00648 if(nocontrolc) {
00649
00650 if(ccactive) {
00651 printf("A ^C is already active. I'm awaiting a dsds_svc reply...\n");
00652 }
00653 else {
00654 ccactive = 1;
00655 setkey_uint(&list, "dsds_uid", uid);
00656 setkey_int(&list, "ampex_tid", ampex_tid);
00657 setkey_str(&list, "USER", username);
00658 printf("^C signal received by pe\n");
00659 printf("Sending TAPECANCEL request to dsds_svc...\n");
00660 if((list = call_dsds(&list,REQTAPECANCEL,dsds_tid,pe_tid,(int(*)(char *, ...))printf,debugflg)) == NULL) {
00661 printf("Can't REQTAPECANCEL with dsds_svc\n");
00662 }
00663 else {
00664 if(getkeytype(list,"MSG_PEND")==KEYTYP_INT) {
00665 respcnt = 0;
00666 while(1) {
00667 if(resp_dsds(dsds_tid)) {
00668 printf("..."); respcnt++;
00669 if(respcnt == 20) {
00670 respcnt = 0;
00671 printf("\n");
00672 }
00673 continue;
00674 }
00675 if(respcnt != 0) printf("\n");
00676 break;
00677 }
00678 printf("Ampex tape read request has been canceled.\n");
00679 abortit(1);
00680 }
00681 else {
00682 printf("Failed to get TAPECANCEL msg to ampex_svc\n");
00683
00684 }
00685 }
00686 }
00687 }
00688 else {
00689 printf("\n***PE received a termination signal\n");
00690 abortit(1);
00691 }
00692 }
00693
00694
00695
00696
00697
00698 void who_died()
00699 {
00700 PSERVER *sptr;
00701 HDATA *hnext;
00702 struct taskinfo *taskp;
00703 int i, j, ntasks;
00704
00705 for(i=0; i<MAX_SERV; i++) {
00706 sptr = &stab[i];
00707 if(sptr->name == NULL)
00708 break;
00709 if(sptr->busyall) {
00710 for(hnext=(HDATA *)gethnext(sptr->hosts); hnext != NULL;
00711 hnext=(HDATA *)gethnext((HDATA *)MONE))
00712 {
00713 if(hnext->busy) {
00714 if(pvm_tasks(pvm_tidtohost(hnext->tid), &ntasks, &taskp)) {
00715 pemail("***Error on pvm_tasks() call\n");
00716 abortit(1);
00717 }
00718 for(j=0; j<ntasks; j++) {
00719 if(taskp[j].ti_tid == hnext->tid) break;
00720 }
00721 if(j == ntasks) {
00722 pelog("%x\tn/a\tcrash\t0\t%s\t<NONE>\t<NONE>\n", hnext->tid,sptr->name);
00723 pemail("***Unexpected exit of %s tid=%x on %s **\n",
00724 sptr->name, hnext->tid, hnext->host_name);
00725
00726
00727
00728 if (sptr->noabort) {
00729
00730
00731
00732 hnext->tid = 0;
00733 hnext->busy = 0;
00734 sptr->busyall--;
00735 return;
00736 }
00737 hnext->tid = 0;
00738 abortit(1);
00739 }
00740 }
00741 }
00742 }
00743 }
00744 }
00745
00746
00747
00748
00749
00750
00751
00752
00753
00754
00755
00756
00757
00758
00759
00760
00761
00762
00763
00764
00765
00766
00767
00768
00769
00770
00771
00772
00773
00774
00775
00776
00777
00778
00779
00780
00781
00782 int arg_available (KEY *param, char *name, int type)
00783 {
00784 int paramtype = getkeytype (param, name);
00785
00786 if (paramtype == KEYTYP_BYTE)
00787 {
00788 if (type == ARG_FLAG)
00789 return (1);
00790 else
00791 return (0);
00792 }
00793 else if (paramtype >= KEYTYP_STRING)
00794 return (1);
00795 else
00796 return (0);
00797 }
00798
00799
00800
00801 void usage()
00802 {
00803 printk("Usage:\njpe [-v] [-A] [touch=#] map=map_file_name\n");
00804 printk("where: -v = verbose\n");
00805 printk(" -A = access the tape_svc if need to retrieve data\n");
00806 printk(" touch = #of days to retain input datasets before deletable\n");
00807 abortit(1);
00808 }
00809
00810
00811
00812 void getpasswd()
00813 {
00814 int getpasswd;
00815 char *envpasswd;
00816 char pline[MAX_STR];
00817
00818 getpasswd = 1;
00819 if(envpasswd = (char *)getenv("LMODEPASSWD")) {
00820 if(!strcmp(envpasswd, "mdi4soi")) getpasswd = 0;
00821 }
00822 if(getpasswd) {
00823 printf("Need password to run pe in local mode: passwd = ");
00824 if(system("stty -echo")) {
00825 printf("\n You're not the owner of this tty (you did an su?)\n");
00826 printf(" WARNING: Your passwd will be echoed\n");
00827 }
00828 if(gets(pline) == NULL) {
00829 system("stty echo");
00830 printf("\n***Invalid passwd\n");
00831 abortit(1);
00832 }
00833 system("stty echo");
00834 if(strcmp(pline, "mdi4soi")) {
00835 printf("\n***Invalid passwd\n");
00836 abortit(1);
00837 }
00838 printf("\n");
00839 }
00840 }
00841
00842
00843
00844
00845 void open_pemail(char *filename, char *idstr)
00846 {
00847 int i;
00848
00849 mailtrue = 0;
00850 mailable = 0;
00851 for(i = 0; i < mailable_users_num; i++) {
00852 if(!strcmp(username, mailable_users[i])) mailable = 1;
00853 }
00854 if(mailable) {
00855 if((pemailfp=fopen(filename, "w")) == NULL) {
00856 fprintf(stderr, "Can't open the mail file %s\n", filename);
00857 mailable = 0;
00858 return;
00859 }
00860 fprintf(pemailfp, idstr);
00861 }
00862 }
00863
00864
00865
00866
00867
00868 int pemail(char *fmt, ...)
00869 {
00870 va_list args;
00871 char string[32768];
00872
00873 va_start(args, fmt);
00874 vsprintf(string, fmt, args);
00875 printk(string);
00876 if(mailable) {
00877 fprintf(pemailfp, string);
00878 fflush(pemailfp);
00879 }
00880 va_end(args);
00881 mailtrue = 1;
00882 return(0);
00883 }
00884
00885
00886
00887 void mailit()
00888 {
00889 int i;
00890 char cmd[128];
00891
00892 if(mailable && mailtrue) {
00893
00894 if(!(strcmp("daemon", username)) || !(strcmp("production", username))) {
00895 for(i = 0; i < mailees_num; i++) {
00896 sprintf(cmd, "Mail -s \"pe 3* mail\" %s < %s", mailees[i], mailname);
00897 system(cmd);
00898 }
00899 }
00900 else {
00901 sprintf(cmd, "Mail -s \"pe 3* mail\" %s < %s", username, mailname);
00902 system(cmd);
00903 }
00904 }
00905 }
00906
00907
00908
00909
00910
00911 void do_cmd(char *line)
00912 {
00913 char cmd[256];
00914 char *token, *name, *value;
00915 char *putenvcmd;
00916
00917 strcpy(cmd, line);
00918 if(token=(char *)strtok(line, " \t\n")) {
00919 if(!strcmp(token, "setenv")) {
00920 if(name=(char *)strtok(NULL, " \t\n")) {
00921 if(value=(char *)strtok(NULL, " \t\n")) {
00922
00923 putenvcmd = (char *)malloc(128);
00924 sprintf(putenvcmd, "%s=%s", name, value);
00925 putenv(putenvcmd);
00926 }
00927 }
00928 }
00929 else {
00930 system(cmd);
00931 }
00932 }
00933 }
00934
00935
00936
00937
00938
00939
00940 void setstab(char *sname)
00941 {
00942 int i;
00943 char *s;
00944
00945 for(i=0; i<MAX_SERV; i++) {
00946 if(stab[i].name == NULL) {
00947 s = (char *)malloc((strlen(sname))+5);
00948 strcpy(s, sname);
00949 strcat(s, "_svc");
00950 stab[i].name=s;
00951 msg_id++;
00952 stab[i].msgid=msg_id;
00953
00954 strcpy(stab[i].version, soi_version);
00955 break;
00956 }
00957 }
00958 if(i == MAX_SERV) {
00959 pemail("***Map file exceeds pe limit of %d servers\n",MAX_SERV);
00960 abortit(1);
00961 }
00962 }
00963
00964
00965
00966
00967 int addpvmd(char *host)
00968 {
00969 struct hostinfo *hostp;
00970 char *hostptr[2];
00971 int nhost, narch, i, hinfos;
00972
00973
00974 if(pvm_config(&nhost, &narch, &hostp)) {
00975 pemail("***Can't get pvm configuration. Pvm daemon(s) running?\n");
00976 return(1);
00977 }
00978 for(i=0; i<nhost; i++) {
00979 if(!strcmp(hostp[i].hi_name, host)) {
00980 break;
00981 }
00982 }
00983 if(i == nhost) {
00984 printk("A pvm daemon is been added for host %s...\n", host);
00985
00986
00987 sleep(2);
00988 hostptr[0] = host;
00989 if((pvm_addhosts(hostptr, 1, &hinfos)) < 0) {
00990 pemail("***Can't add a pvm daemon for host %s\n", host);
00991 return(1);
00992 }
00993 if(hinfos < 0) {
00994 pemail("***Can't add %s pvm daemon. Error %d.\n", host, hinfos);
00995 pemail("An independently started one may be running? Halt it.\n");
00996 return(1);
00997 }
00998 sleep(2);
00999 }
01000 return(0);
01001 }
01002
01003
01004
01005
01006
01007
01008
01009
01010
01011
01012 int sethosts(char *hname)
01013 {
01014 HDATA *hnext;
01015 char *host;
01016 int j;
01017
01018 num_hosts=0;
01019 host=(char *)strtok(hname,",");
01020 while(host) {
01021 for(j=0; j<MAX_SERV; j++) {
01022 if(stab[j].name == NULL)
01023 break;
01024 sethname(&stab[j].hosts, host);
01025 }
01026 if(addpvmd(host)) return(1);
01027 num_hosts++;
01028 host=(char *)strtok(NULL,",");
01029 }
01030 effective_hosts = num_hosts;
01031 if(effective_hosts != 1) {
01032 pemail("**Multiple hosts no longer allowed. Will use first host only\n");
01033 effective_hosts = 1;
01034 }
01035
01036 for(j=0; j<MAX_SERV; j++) {
01037 if(stab[j].name==NULL)
01038 break;
01039 for(hnext=(HDATA *)gethnext(stab[j].hosts); hnext != NULL;
01040 hnext=(HDATA *)gethnext((HDATA *)MONE)) {
01041 hnext->param_list = newkeylist();
01042 add_keys(stab[j].map_list, &hnext->param_list);
01043 }
01044 }
01045 return(0);
01046 }
01047
01048
01049
01050
01051
01052
01053
01054
01055
01056
01057
01058
01059
01060
01061
01062
01063
01064 int CollectParams (KEY **newlist, int argc, char **argv)
01065 {
01066 int pct, i;
01067 char *arg, *valu, *keyname;
01068 char isval, flagval;
01069 static char flagname[] = "x";
01070
01071 pct = argc;
01072 *newlist = newkeylist ();
01073
01074 isval = 0;
01075 for (i=1; i<argc; i++)
01076 {
01077 arg = argv[i];
01078 if (isval)
01079
01080 {
01081 setkey_str (newlist, keyname, arg);
01082 free (keyname);
01083 isval = 0;
01084 pct--;
01085 continue;
01086 }
01087 if (arg[0] == '-' || arg[0] == '+')
01088
01089 {
01090 valu = arg;
01091 valu++;
01092 isval = 0;
01093 if (arg[0] == '-' && arg[1] == 'P')
01094 {
01095
01096 pct--;
01097 continue;
01098 }
01099 flagval = (arg[0] == '-') ? -1 : 1;
01100 while (*valu)
01101 {
01102 flagname[0] = valu[0];
01103 setkey_byte (newlist, flagname, flagval);
01104 valu++;
01105 }
01106 continue;
01107 }
01108 if (valu = index (arg, '='))
01109 {
01110 *valu = 0;
01111 keyname = key_strdup (arg);
01112 *valu = '=';
01113 valu++;
01114 if (strlen (valu))
01115 {
01116 setkey_str (newlist, keyname, valu);
01117 free (keyname);
01118 isval = 0;
01119 }
01120 else
01121 {
01122 isval = 1;
01123 }
01124 continue;
01125 }
01126
01127 pct--;
01128 isval = 0;
01129 }
01130 return (pct - 1);
01131 }
01132
01133 void get_cmd(int argc, char *argv[])
01134 {
01135 int xargc, first, pfound, groupflg, groupid, gid, archgrp;
01136 int qon, i;
01137 int nullarchflg = 0;
01138 char *toks[128];
01139 char *tokens1 = "*\t\n";
01140 char *tokens2 = " \t\n";
01141 char *tokens3 = "*\n";
01142 char *tokens4 = " \n";
01143 char hostname[MAX_STR], subname[MAX_STR];
01144 char *inname, *sp, *hname, *cptr;
01145 char *line, *line2, *token, *tokenchars, *tokencharsend;
01146 char carch;
01147 FILE *mapfp;
01148 PSERVER *sptr;
01149 KEY *work_list;
01150
01151 CollectParams(&work_list, argc, argv);
01152 if(!(inname=getkey_str(work_list, "map"))) {
01153 usage();
01154 }
01155 gethostname(hostname, MAX_STR);
01156 if(!findkey(work_list, "touch"))
01157 touchflg = -1;
01158 else {
01159 cptr = getkey_str(work_list, "touch");
01160 if(!strcmp(cptr, "keep")) {
01161 touchflg = 9999999;
01162 }
01163 else {
01164 if(!isdigit((int)*cptr)) {
01165 pemail("***touch= must say \"keep\" or give integer #of days\n");
01166 abortit(1);
01167 }
01168 touchflg = atoi(cptr);
01169 }
01170 }
01171 if(findkey(work_list, "pds")) {
01172 strcpy(pdshost, getkey_str(work_list, "pds"));
01173 } else if((char *)getenv("PDS_SET_HOST")) {
01174 strcpy(pdshost, (char *)getenv("PDS_SET_HOST"));
01175 } else {
01176 strcpy(pdshost, hostname);
01177 }
01178 if(cptr = index(pdshost, '.')) {
01179 *cptr = '\0';
01180 }
01181 if(findkey(work_list, "rcp")) {
01182 rcpdir=getkey_str(work_list, "rcp");
01183 if(strncmp(rcpdir, "/", 1)) {
01184 pemail("***The rcp= dir must start with \"/\"\n");
01185 abortit(1);
01186 }
01187 rcpflg = 1;
01188 }
01189
01190 verbose=(getkeytype(work_list, "v") == KEYTYP_BYTE);
01191
01192 ampexflg=(getkeytype(work_list, "A") == KEYTYP_BYTE);
01193 if(getkeytype(work_list, "O") == KEYTYP_BYTE) {
01194 printf("\npe does not have a \"-O\" switch.\n");
01195 abortit(1);
01196 }
01197 if(!(mapfp=fopen(inname, "r"))) {
01198 pemail("***Can't open the map file %s\n", inname);
01199 abortit(1);
01200 }
01201 strcpy(pe_map, inname);
01202 hname=hostname;
01203 sptr = &stab[0];
01204 first=1; pfound=0; groupflg=0; gid=0; groupid=0, archgrp=0;
01205 line = (char *)malloc(16384);
01206 line2 = (char *)malloc(16384);
01207 while(fgets(line, 16384, mapfp)) {
01208 if(line[0] == '#' || line[0] == '\n') continue;
01209 if(line[0] == '!') {
01210 do_cmd(line+1);
01211 continue;
01212 }
01213 if(strchr(line, '"')) {
01214 tokenchars = tokens1;
01215 tokencharsend = tokens3;
01216
01217
01218 qon = 0; line2[0] = '\0';
01219 for(i=0; ; i++) {
01220 if(line[i] == '"') {
01221 if(qon) qon = 0;
01222 else qon = 1;
01223 i++;
01224 }
01225 if((line[i] == '*') && qon) {
01226 pemail("Illegal \"*\" char in quoted string\n");
01227 abortit(1);
01228 }
01229 if((line[i] == ' ') && !qon) {
01230 line[i] = '*';
01231 }
01232 strncat(line2, &line[i], 1);
01233 if(line[i] == '\n') break;
01234 }
01235 if(qon) {
01236 pemail("Mismatched quotes in: %s\n", line);
01237 abortit(1);
01238 }
01239 } else {
01240 tokenchars = tokens2;
01241 tokencharsend = tokens4;
01242 strcpy(line2, line);
01243 }
01244 toks[0]=argv[0];
01245 xargc=1;
01246 token=(char *)strtok(line2, tokenchars);
01247 while(token) {
01248 if(token[0] == '\\') {
01249 line = (char *)malloc(16384);
01250 line2 = (char *)malloc(16384);
01251 while(fgets(line, 16384, mapfp)) {
01252 if(line[0] == '#' || line[0] == '\n') continue;
01253 break;
01254 }
01255 if(strchr(line, '"')) {
01256 tokenchars = tokens1;
01257 tokencharsend = tokens3;
01258
01259
01260 qon = 0; line2[0] = '\0';
01261 for(i=0; ; i++) {
01262 if(line[i] == '"') {
01263 if(qon) qon = 0;
01264 else qon = 1;
01265 i++;
01266 }
01267 if((line[i] == '*') && qon) {
01268 pemail("Illegal \"*\" char in quoted string\n");
01269 abortit(1);
01270 }
01271 if((line[i] == ' ') && !qon) {
01272 line[i] = '*';
01273 }
01274 strncat(line2, &line[i], 1);
01275 if(line[i] == '\n') break;
01276 }
01277 if(qon) {
01278 pemail("Mismatched quotes in: %s\n", line);
01279 abortit(1);
01280 }
01281 } else {
01282 tokenchars = tokens2;
01283 tokencharsend = tokens4;
01284 strcpy(line2, line);
01285 }
01286 if(!(token=(char *)strtok(line2, tokenchars)))
01287 break;
01288 }
01289 toks[xargc]=token;
01290 xargc++;
01291 token=(char *)strtok(NULL, tokencharsend);
01292 }
01293 CollectParams(&work_list, xargc, toks);
01294 if(sp=getkey_str(work_list, "DSDSOUT"))
01295 reqmegs=atoi(sp);
01296 else if(sp=getkey_str(work_list, "NOWARN"))
01297 nowarn=atoi(sp);
01298 else if(sp=getkey_str(work_list, "HOST"))
01299 hname=sp;
01300 else if(sp=getkey_str(work_list, "START_ARCHIVE")) {
01301 if(atoi(sp)) {
01302 if(archgrp) {
01303 pemail("***Non-matching START/END_ARCHIVE pair\n");
01304 abortit(1);
01305 }
01306 archgrp = 1;
01307 }
01308 }
01309 else if(sp=getkey_str(work_list, "END_ARCHIVE")) {
01310 if(atoi(sp)) {
01311 if(!archgrp) {
01312 pemail("***Non-matching START/END_ARCHIVE pair\n");
01313 abortit(1);
01314 }
01315 archgrp = 0;
01316
01317 if(pfound) {
01318 --sptr;
01319 sptr->archive_group = -1;
01320 ++sptr;
01321 }
01322 }
01323 }
01324 else if(sp=getkey_str(work_list, "START_GROUP")) {
01325 if(atoi(sp)) {
01326 if(groupflg) {
01327 pemail("***Non-matching START/END_GROUP pair\n");
01328 abortit(1);
01329 }
01330 groupflg = 1;
01331 groupid = ++gid;
01332 }
01333 }
01334 else if(sp=getkey_str(work_list, "END_GROUP")) {
01335 if(atoi(sp)) {
01336 if(!groupflg) {
01337 pemail("***Non-matching START/END_GROUP pair\n");
01338 abortit(1);
01339 }
01340 groupflg = 0;
01341 groupid = 0;
01342 }
01343 }
01344 else if(sp=getkey_str(work_list, "p")) {
01345 pfound=1;
01346 deletekey(&work_list, "p");
01347
01348
01349 if(!strcmp(sp, "ingest_sci160k_log") || !strcmp(sp, "ingest_sci5k_log") || !strcmp(sp, "merge_sci160k_log")) {
01350 sprintf(subname, "%s_jpe", sp);
01351 sp = subname;
01352 nullarchflg = 1;
01353 }
01354 setstab(sp);
01355 sptr->cphist = 1;
01356 sptr->groupid = groupid;
01357 sptr->archive_group = archgrp;
01358 if(first) {
01359 first=0;
01360 sptr->firstserver=1;
01361 }
01362 if(sp=getkey_str(work_list, "d")) {
01363 if(sptr->dsin=atoi(sp))
01364 dsdsin=1;
01365 deletekey(&work_list, "d");
01366 }
01367
01368 sptr->dsin = 1;
01369 dsdsin=1;
01370
01371
01372
01373
01374
01375
01376
01377
01378
01379 if (sp = getkey_str (work_list, "COPY_HISTORY")) {
01380 sptr->cphist = strcasecmp (sp, "no");
01381 deletekey (&work_list, "COPY_HISTORY");
01382 }
01383
01384
01385
01386
01387
01388
01389
01390
01391
01392 if (sp = getkey_str (work_list, "ABORT_ACTION")) {
01393 sptr->noabort = strcasecmp (sp, "continue") ? 0 : 1;
01394 deletekey (&work_list, "ABORT_ACTION");
01395 }
01396 if(sp=getkey_str(work_list, "a")) {
01397 carch = *sp++;
01398 if(carch != 'a' && carch != 't' && carch != 'p' && carch != 'n' && carch != '0') {
01399 pemail("***Illegal a= spec for p=%s. Must be a, t, p or n\n",sptr->name);
01400 abortit(1);
01401 }
01402 if(carch == '0') {
01403
01404 sptr->archive = 't';
01405 }
01406 else {
01407 sptr->archive = carch;
01408 sptr->archive_day=atoi(sp);
01409 }
01410 deletekey(&work_list, "a");
01411 }
01412 else {
01413
01414 if(nullarchflg) sptr->archive = '\0';
01415 else sptr->archive = 't';
01416 }
01417 if(sp=getkey_str(work_list, "s")) {
01418 sptr->split=atoi(sp);
01419 sptr->split=0;
01420 deletekey(&work_list, "s");
01421 }
01422 add_keys(work_list, &sptr->map_list);
01423 sptr++;
01424 }
01425 else {
01426 pemail("***A map file line must start w/a control stmt, or p=\n");
01427 pemail(" %s\n", line);
01428 abortit(1);
01429 }
01430 }
01431 fclose(mapfp);
01432 if(!hname) {
01433 pemail("***The map file has no HOST specification\n");
01434 abortit(1);
01435 }
01436 if(!pfound) {
01437 pemail("***The map file has no p= line to define the server\n");
01438 abortit(1);
01439 }
01440 if(groupflg) {
01441 pemail("***Non-matching START/END_GROUP pair\n");
01442 abortit(1);
01443 }
01444 if(archgrp) {
01445 pemail("***Non-matching START/END_ARCHIVE pair\n");
01446 abortit(1);
01447 }
01448 if(sethosts(hname)) abortit(1);
01449
01450
01451
01452
01453 }
01454
01455
01456
01457
01458
01459 void spawn_pvm()
01460 {
01461 struct hostinfo *hostp;
01462
01463 HDATA *hnext;
01464 int i, nhost, narch;
01465
01466
01467 if(pvm_config(&nhost, &narch, &hostp)) {
01468 pemail("***Can't get pvm configuration. Pvm daemon(s) running?\n");
01469 abortit(1);
01470 }
01471
01472 for(hnext=(HDATA *)gethnext(stab[0].hosts); hnext != NULL;
01473 hnext=(HDATA *)gethnext((HDATA *)MONE))
01474 {
01475 for(i=0; i<nhost; i++) {
01476 if(!strcmp(hostp[i].hi_name, hnext->host_name))
01477
01478 break;
01479 }
01480 if(i == nhost) {
01481 pemail("***No pvm daemon running on %s\n", hnext->host_name);
01482 abortit(1);
01483 }
01484 }
01485
01486
01487
01488
01489
01490
01491
01492
01493
01494
01495
01496
01497
01498
01499
01500
01501
01502
01503
01504
01505
01506
01507
01508
01509
01510 }
01511
01512
01513
01514
01515
01516
01517 void msgid_send(PSERVER *sptr, HDATA *hnext)
01518 {
01519 KEY *alist, *keybad;
01520 char *log;
01521
01522 pvm_initsend(PvmDataDefault);
01523 pvm_pkint(&sptr->msgid, 1, 1);
01524
01525
01526 alist=newkeylist();
01527 setkey_int(&alist, "verbose", verbose);
01528 if(log=getkey_str(sptr->map_list, "history"))
01529 setkey_str(&alist, "history", log);
01530 if(log=getkey_str(sptr->map_list, "errlog"))
01531 setkey_str(&alist, "errlog", log);
01532 if(keybad=(KEY *)pack_keylist(alist)) {
01533 pemail("***Err packing pvm msg, type=%d name=%s\n",keybad->type,keybad->name
01534 );
01535 abortit(1);
01536 }
01537 if(pvm_send(hnext->tid, MSGMSG)) {
01538 pemail("***Error sending %s on %s its message id\n",
01539 sptr->name,hnext->host_name);
01540 abortit(1);
01541 }
01542 }
01543
01544
01545
01546 void arg_recv(PSERVER *sptr, HDATA *hnext)
01547 {
01548 argument *args;
01549 struct timeval tvalr;
01550 uint64_t tsr;
01551 int bufid, j;
01552
01553 pvm_initsend(PvmDataDefault);
01554 if(pvm_send(hnext->tid, MSGARGS)) {
01555 pemail("***Error calling %s on %s for arg list\n",sptr->name,hnext->host_name);
01556 abortit(1);
01557 }
01558 gettimeofday(&tvalr, NULL); tsr = tvalr.tv_sec;
01559 while(1) {
01560 if(!(bufid=pvm_nrecv(hnext->tid, MSGARGS))) {
01561 gettimeofday(&tvalr, NULL);
01562 if(tvalr.tv_sec - tsr > RESPWAIT) {
01563 pemail("***Timeout awaiting an argument list from %s\n",sptr->name);
01564 abortit(1);
01565 }
01566 }
01567 else
01568 break;
01569 }
01570 if(bufid < 0) {
01571 pemail("***Error receiving arg list from %s\n",sptr->name);
01572 abortit(1);
01573 }
01574 for(j=0; j<MAX_ARGS; j++) {
01575 args = &sptr->arguments[j];
01576 pvm_upkint(&args->kind,1,1);
01577 args->key=(char *)malloc(MAX_ARG_STR);
01578 pvm_upkstr(args->key);
01579
01580 if (!strcmp (args->key, "p") || !strcmp (args->key, "a")) {
01581 printf("**WARNING: Module has arg %s which is reserved by pe\n",
01582 args->key);
01583 }
01584 args->default_value=(char *)malloc(MAX_ARG_STR);
01585 pvm_upkstr(args->default_value);
01586 args->range=(char *)malloc(MAX_ARG_STR);
01587 pvm_upkstr(args->range);
01588 args->description=(char *)malloc(MAX_ARG_STR*4);
01589 pvm_upkstr(args->description);
01590 if(args->kind == ARG_END)
01591 break;
01592 }
01593 if(j == MAX_ARGS) {
01594 pemail("***Never got an ARG_END from %s after %d arguments\n",
01595 sptr->name, MAX_ARGS);
01596 abortit(1);
01597 }
01598 }
01599
01600
01601
01602
01603
01604
01605 int wd_dup_ck(KEY *list, char *wd)
01606 {
01607 KEY *walker = list;
01608
01609 while(walker) {
01610 if(!strcmp(wd, (char *)walker->val)) {
01611
01612
01613
01614
01615
01616
01617 return(1);
01618 }
01619 walker = walker->next;
01620 }
01621 return(0);
01622 }
01623
01624
01625
01626
01627
01628
01629
01630 void ck_arglist(PSERVER *sptr, HDATA *hx)
01631 {
01632 DIR *dfd;
01633 double dblval;
01634 int found, outnsets, i, intval, parse_status;
01635 char dirstr[MAX_STR], inname[MAX_STR], ext[MAX_STR], wdkey[MAX_STR];
01636 char *rootdbase, *wd, *valstr, *failstr, *strck, *cptr;
01637 argument *arg;
01638
01639 arg=sptr->arguments;
01640 while(arg->kind != ARG_END) {
01641 found=arg_available(hx->param_list, arg->key, arg->kind);
01642 if (!found) {
01643 if (found = strlen (arg->default_value));
01644 setkey_str (&hx->param_list, arg->key, arg->default_value);
01645 }
01646 if(!found) {
01647 pemail("***Can't satisfy arg \"%s\" for server %s\n", arg->key,sptr->name);
01648 abortit(1);
01649 }
01650 if(arg->kind == 1) {
01651 pemail("***An obsolete ARG_DATASET found in %s\n", sptr->name);
01652 abortit(1);
01653 }
01654 if (arg->kind == ARG_DATA_OUT) {
01656 if(sptr->archive == 'a') {
01657 strcpy(inname, arg->key); strcat(inname, "_nsets");
01658 outnsets = getkey_int(hx->param_list, inname);
01659 for(i=0; i < outnsets; i++) {
01660 sprintf(ext, "%s_%d_wd", arg->key, i);
01661 wd = getkey_str(hx->param_list, ext);
01662 if((dfd=opendir(wd)) == NULL) {
01663 strcpy(dirstr, "mkdir -p "); strcat(dirstr, wd);
01664 if(system(dirstr)) {
01665 pemail("***Cannot %s for %s\n",dirstr,sptr->name);
01666 abortit(1);
01667 }
01668 }
01669 }
01670 }
01671 else {
01672 if(reqmegs) {
01673 strcpy(inname, arg->key); strcat(inname, "_nsets");
01674 outnsets = getkey_int(hx->param_list, inname);
01675
01676 sprintf(ext, "%s_0_dbase", arg->key);
01677 if(findkey(hx->param_list, ext)) {
01678 rootdbase = getkey_str(hx->param_list, ext);
01679 sprintf(wdkey, "wd_%d", wdkey_int++);
01680 setkey_str(&wd_mk_list, wdkey, rootdbase);
01681 }
01682 for(i=0; i < outnsets; i++) {
01683 sprintf(ext, "%s_%d_wd", arg->key, i);
01684 cptr = (char *)sptr->archive;
01685 if(cptr == (char *)'t' || cptr == (char *)'p' || cptr == (char *)'n') {
01686 wd = getkey_str(hx->param_list, ext);
01687 cptr = strstr(wd, "/SUM");
01688 if(cptr != wd) {
01689 pemail("Cannot archive non-/SUM storage: %s = %s\n", ext,wd);
01690 abortit(1);
01691 }
01692 }
01693
01694 if(!wd_dup_ck(wd_mk_list, getkey_str(hx->param_list,ext))) {
01696 strcpy(dirstr, "mkdir -p ");
01697 strcat(dirstr, getkey_str(hx->param_list, ext));
01698 if(system(dirstr)) {
01699 pemail("***Cannot %s for %s\n",dirstr,sptr->name);
01700 abortit(1);
01701 }
01702 sprintf(wdkey, "wd_%d", wdkey_int++);
01703 setkey_str(&wd_mk_list, wdkey, getkey_str(hx->param_list, ext));
01704 }
01705 }
01706 }
01707 }
01708 }
01709 else if (arg->kind == ARG_FILEPTR) {
01710 pemail("***ARG_FILEPTR used by %s, is not supported by pe\n",sptr->name);
01711 abortit(1);
01712 }
01713 else if (arg->kind == ARG_FLOAT) {
01714 valstr = getkey_str (hx->param_list, arg->key);
01715 dblval = strtod(valstr, &failstr);
01716 if (valstr == failstr) dblval = D_MISSING;
01717 setkey_double (&hx->param_list, arg->key, dblval);
01718 }
01719 else if (arg->kind == ARG_INT) {
01720 valstr = getkey_str (hx->param_list, arg->key);
01721 intval = (int)strtol(valstr, &failstr,0);
01722 if (valstr == failstr) intval = I_MISSING;
01723 setkey_int (&hx->param_list, arg->key, intval);
01724 }
01725 else if (arg->kind == ARG_TIME)
01726 setkey_time (&hx->param_list, arg->key,
01727 sscan_time (getkey_str (hx->param_list, arg->key)));
01728 else if (arg->kind == ARG_FLAG) {
01729 if(!(strck = getkey_str (hx->param_list, arg->key))) {
01730
01731
01732 setkey_byte(&hx->param_list, arg->key, 1);
01733 }
01734 else {
01735 setkey_byte(&hx->param_list, arg->key, atoi(strck));
01736 }
01737 }
01738 else if (arg->kind == ARG_FLOATS) {
01739
01740 if(parse_status=parse_array (&hx->param_list, arg->key, KEYTYP_DOUBLE)) {
01741 pemail("***ARG_FLOATS error %d for arg %s\n", parse_status, arg->key);
01742 abortit(1);
01743 }
01744 }
01745 else if (arg->kind == ARG_INTS) {
01746 if(parse_status=parse_array (&hx->param_list, arg->key, KEYTYP_INT)) {
01747 pemail("***ARG_INTS error %d for arg %s\n", parse_status, arg->key);
01748 abortit(1);
01749 }
01750 } else if (arg->kind == ARG_NUME) {
01751 char **names;
01752 int nval, nvals = parse_numerated (arg->range, &names);
01753 valstr = getkey_str (hx->param_list, arg->key);
01754 for (nval = 0; nval < nvals; nval++)
01755 if (!strcmp (names[nval], valstr)) break;
01756 if (nval >= nvals) nval = 0;
01757 setkey_int (&hx->param_list, arg->key, nval);
01758 }
01759 arg++;
01760 }
01761 }
01762
01763
01764
01765
01766
01767
01768
01769
01770
01771
01772
01773
01774
01775 void form_split(PSERVER *sptr, KEY *xlist, char *key)
01776 {
01777 HDATA *hnext;
01778 char keystr[MAX_STR];
01779 int i, fsn, lsn, delta, drem, xfsn, xlsn, nosplit;
01780
01781
01782
01783
01784
01785
01786
01787 nosplit = 0;
01788 if(!sptr->split) {
01789 nosplit = 1;
01790 }
01791 else {
01792 sprintf(keystr, "%s_nsets", key);
01793 if(findkey(xlist, keystr)) {
01794 if((getkey_int(xlist, keystr)) != 1) {
01795 printf("**Ignoring the s=1 split flag given for %s:\n", sptr->name);
01796 printf(" The %s= specification is not a single dataset\n", key);
01797 nosplit = 1;
01798 }
01799 else {
01800 sprintf(keystr, "%s_0_fsn", key);
01801 if(!findkey(xlist, keystr)) {
01802 printf("**Ignoring the s=1 split flag given for %s:\n", sptr->name);
01803 printf(" No sel:[fsn-lsn] for first input ds given in map file or found from dsds\n");
01804 nosplit = 1;
01805 }
01806 }
01807 }
01808 else
01809 nosplit = 1;
01810 }
01811 if(nosplit) {
01812 effective_hosts = 1;
01813 hnext = (HDATA *)gethnext(sptr->hosts);
01814 add_keys(xlist, &hnext->param_list);
01815 return;
01816 }
01817
01818 sprintf(keystr, "%s_0_fsn", key);
01819 fsn=getkey_int(xlist, keystr);
01820 sprintf(keystr, "%s_0_lsn", key);
01821 lsn=getkey_int(xlist, keystr);
01822
01823 effective_hosts = num_hosts;
01824 delta=((lsn-fsn)+1)/effective_hosts;
01825 if(delta == 0)
01826 delta = 1;
01827 drem=((lsn-fsn)+1) % effective_hosts;
01828 hnext = (HDATA *)gethnext(sptr->hosts);
01829
01830 for(i=0; i<effective_hosts; i++) {
01831 if(((lsn-fsn)+1) == i) {
01832 effective_hosts = i;
01833 break;
01834 }
01835 add_keys(xlist, &hnext->param_list);
01836 xfsn=fsn+(delta*i);
01837 xlsn=xfsn+delta-1;
01838 if(i+1 == effective_hosts)
01839 xlsn=xlsn+drem;
01840 sprintf(keystr, "%s_0_fsn", key);
01841 setkey_int(&hnext->param_list, keystr, xfsn);
01842 sprintf(keystr, "%s_0_lsn", key);
01843 setkey_int(&hnext->param_list, keystr, xlsn);
01844
01845
01846
01847
01848
01849 hnext = (HDATA *)gethnext((HDATA *)MONE);
01850 }
01851 }
01852
01853
01854
01855
01856
01857
01858
01859
01860
01861 KEY *form_arg_data_in(KEY *xlist, PSERVER *sptr, argument *arg, int seq)
01862 {
01863
01864 char dbasekey[MAX_STR], inname[MAX_STR], ext[MAX_STR];
01865 char *wd;
01866 int innsets, parse, i;
01867
01868 if(!seq) add_keys(sptr->map_list, &xlist);
01869 if(!arg_available(xlist, arg->key, arg->kind)) {
01870 if(!(strlen(arg->default_value))) {
01871 pemail("***Can't satisfy arg \"%s\" for server %s\n",arg->key,sptr->name);
01872 abortit(1);
01873 }
01874 else
01875 setkey_str(&xlist, arg->key, arg->default_value);
01876 }
01877 if(touchflg != -1)
01878 setkey_int(&xlist, "touch", touchflg);
01879 sprintf(ext, "arg_data_in_%d", seq);
01880 setkey_str(&xlist, ext, arg->key);
01881
01882 strcpy(dbasekey, arg->key); strcat(dbasekey, "_dbase");
01883
01884 if(reqmegs && !sptr->firstserver && !sptr->dsin) {
01885 setkey_str(&xlist, dbasekey, dsdswd);
01886 sprintf(ext, "%s_rule", arg->key);
01887 setkey_str(&xlist, ext, "wd:{dbase}");
01888 sprintf(ext, "%s_%d_rule", arg->key, seq);
01889 setkey_str(&xlist, ext, "wd:{dbase}");
01890 }
01891 else if(getenv("dbase"))
01892 setkey_str(&xlist, dbasekey, getenv("dbase"));
01893 else
01894 setkey_str(&xlist, dbasekey, "/tmp");
01895
01896 if(parse=parse_list(&xlist, arg->key)) {
01897 if(parse != CANNOT_FILL_TEMPLATE) {
01898 pemail("***Error %d in parse_list for %s\n", parse, sptr->name);
01899 if(dbxflg) {
01900 printf("\n***** The xlist after the parse is:\n");
01901 keyiterate(printkey, xlist);
01902 }
01903 abortit(1);
01904 }
01905 }
01906 if(dbxflg) {
01907 printf("\n***** The xlist after the parse for %s is:\n", sptr->name);
01908 keyiterate(printkey, xlist);
01909 }
01910
01911
01912
01913 if(!sptr->dsin) {
01914
01915 if(strcmp(getkey_str(xlist, arg->key), "NOT SPECIFIED")) {
01916 strcpy(inname, arg->key); strcat(inname, "_nsets");
01917 innsets = getkey_int(xlist, inname);
01918 for(i=0; i < innsets; i++) {
01919 sprintf(ext, "%s_%d_wd", arg->key, i);
01920 if(!(wd=getkey_str(xlist, ext)) || !strcmp(wd, "")) {
01921 pemail("***No %s for %s. Missing rule or template in map file.\n",
01922 ext, sptr->name);
01923 abortit(1);
01924 }
01925 if(*wd == '.') {
01926 pemail("***%s of \".\" not valid for %s.\n", ext, sptr->name);
01927 abortit(1);
01928
01929 }
01930 }
01931 }
01932 }
01933 return(xlist);
01934 }
01935
01936
01937
01938
01939
01940
01941
01942
01943 KEY *dsds_arg_data_in(KEY *xlist)
01944 {
01945 static KEY *blist;
01946 char inname[MAX_STR];
01947
01948 char *svcname, *svcversion;
01949 int respcnt, status, i;
01950
01951 blist = newkeylist();
01952
01953
01954 setkey_uint(&xlist, "dsds_uid", uid);
01955 setkey_str(&xlist, "pds_host", pdshost);
01956
01957 if(ampexflg)
01958 setkey_int(&xlist, "ampex_tid", ampex_tid);
01959 else
01960 setkey_int(&xlist, "lago_tid", lago_tid);
01961
01962 if(svcversion=getkey_str(xlist,"svc_version")) {
01963
01964 printf("**Obsolete use of svc_version in map file. Ignoring...\n");
01965 }
01966 if(svcname=getkey_str(xlist,"svc_name")) {
01967
01968 printf("**Obsolete use of svc_name in map file. Ignoring...\n");
01969 }
01970 printf("Querying for the input datasets...\n");
01971 nocontrolc = 1;
01972
01973 blist = (KEY *)call_drms_in(xlist, dbxflg);
01974 if(blist == NULL) {
01975 pemail("Can't resolve/retrieve the input datasets with drms.\n");
01976 pemail("(Usually no -A switch specified.)\n");
01977 abortit(1);
01978 }
01979
01980
01981
01982
01983
01984
01985
01986
01987
01988
01989
01990
01991
01992
01993
01994
01995
01996
01997
01998
01999
02000
02001
02002
02003
02004
02005 nocontrolc = 0;
02006
02007
02008
02009
02010
02011
02012
02013
02014
02015
02016
02017
02018
02019
02020
02021
02022
02023
02024
02025 return((KEY *)blist);
02026 }
02027
02028
02029
02030
02031
02032 KEY *form_arg_data_out(PSERVER *sptr, argument *arg)
02033 {
02034 KEY *xlist;
02035 FILE *fin;
02036 char path[DRMS_MAXPATHLEN] = {0};
02037 char *wd, *stmp, *cptr, *cptr2;
02038 char dbasekey[MAX_STR], ext[MAX_STR], wdkey[MAX_STR], inname[MAX_STR];
02039 char cmd[MAX_STR], newmdirec[MAX_STR], buf[128];
02040 int innsets, parse, levnum, i, dstatus, ccnt;
02041
02042 char *prog, *level, *series, *jdata;
02043 char jpedata[192];
02044 int seriesnum, jcnt, k;
02045 int jix = 0;
02046
02047
02048
02049
02050 xlist=newkeylist();
02051 add_keys(sptr->map_list, &xlist);
02052 if(!arg_available(xlist, arg->key, arg->kind)) {
02053 pemail("***The ARG_DATA_OUT \"%s=\" for %s ", arg->key, sptr->name);
02054 pemail("is missing or\n a non-allowed default value given\n");
02055 abortit(1);
02056 }
02057 strcpy(dbasekey, arg->key); strcat(dbasekey, "_dbase");
02058 if(strcmp(dsdswd, ""))
02059 setkey_str(&xlist, dbasekey, dsdswd);
02060 else if(getenv("dbase"))
02061 setkey_str(&xlist, dbasekey, getenv("dbase"));
02062 else
02063 setkey_str(&xlist, dbasekey, "/tmp");
02064
02065 if(parse=parse_list(&xlist, arg->key)) {
02066 pemail("***Error %d in parse_list for %s\n", parse, sptr->name);
02067 if(dbxflg) {
02068 printf("\n***** The xlist after the parse is:\n");
02069 keyiterate(printkey, xlist);
02070 }
02071 abortit(1);
02072 }
02073 if(dbxflg) {
02074 printf("\n***** The xlist after the parse is:\n");
02075 keyiterate(printkey, xlist);
02076 }
02077 strcpy(inname, arg->key); strcat(inname, "_nsets");
02078 innsets = getkey_int(xlist, inname);
02079
02080
02081 if(sptr->archive == 'a') {
02082
02083
02084
02085
02086
02087
02088 for(i=0; i < innsets; i++) {
02089
02090
02091
02092
02093
02094 if(cptr = getenv("mdi_rec")) {
02095 if(cptr2 = strstr(cptr, "/soidata/info")) {
02096 ccnt = (cptr2-cptr)+1;
02097 snprintf(newmdirec, ccnt, "%s", cptr);
02098 strcat(newmdirec, "{dbase}/info");
02099 strcat(newmdirec, cptr2+13);
02100 sprintf(ext, "%s_%d_rule", arg->key, i);
02101 setkey_str(&xlist, ext, newmdirec);
02102 }
02103 }
02104 setkey_str(&xlist, dbasekey, "/SUM0/PAS");
02106
02107
02108 sprintf(ext, "%s_%d_wd", arg->key, i);
02109 deletekey(&xlist, ext);
02110 if(parse=parse_list(&xlist, arg->key)) {
02111 pemail("***Error %d in parse_list for %s\n", parse, sptr->name);
02112 if(debugflg) {
02113 printf("\n***** The xlist after the parse is:\n");
02114 keyiterate(printkey, xlist);
02115 }
02116 abortit(1);
02117 }
02118 }
02120
02121
02122 return(xlist);
02123 }
02124
02125
02126
02127 for(i=0; i < innsets; i++) {
02128 sprintf(ext, "%s_%d_level_sn", arg->key, i);
02129 if(findkey(xlist, ext)) {
02130 levnum = getkey_int(xlist, ext);
02131 if(levnum != 0) {
02132 pemail("***Map Error: Explicit output level # other than 0 is not allowed\n");
02133 abortit(1);
02134 }
02135 else {
02136 if(sptr->archive) {
02137 pemail("***Map Error: Archive of level # 0 output is not allowed\n");
02138 abortit(1);
02139 }
02140 }
02141 }
02142
02143
02144
02145
02146
02147
02148
02149
02150
02151
02152
02153
02154
02155
02156
02157
02158
02159
02160 if (findkey(JPElist, "JPE_out_nsets"))
02161 jix = getkey_int(JPElist, "JPE_out_nsets");
02162
02163 if(reqmegs) {
02164 if(firstalloc) {
02165
02166 rs = drms_create_record (drms_env, out_namespace, DRMS_PERMANENT, &dstatus);
02167
02168 if(dstatus) {
02169 printk("**ERROR %d: Can't create record in %s\n", dstatus, out_namespace);
02170 abortit(1);
02171 }
02172
02173 drms_record_directory(rs, path, 0);
02174 strcpy(dsdswd, path);
02175 sprintf(ext, "%s_%d_rs", arg->key, i);
02176 setkey_fileptr(&sptr->map_list, ext, (FILE *)rs);
02177 sprintf(ext, "%s_%d_wd", arg->key, i);
02178 setkey_str(&xlist, ext, path);
02179 wd = path;
02180 sprintf(ext, "%s_%d_prog", arg->key, i);
02181 prog = getkey_str(xlist, ext);
02182 sprintf(ext, "%s_%d_level", arg->key, i);
02183 level = getkey_str(xlist, ext);
02184 sprintf(ext, "%s_%d_series", arg->key, i);
02185 series = getkey_str(xlist, ext);
02186 sprintf(ext, "%s_%d_series_sn", arg->key, i);
02187 seriesnum = getkey_int(xlist, ext);
02188 sprintf(jpedata, "prog:%s,level:%s,series:%s[%d]",
02189 prog, level, series, seriesnum);
02190
02191 sprintf(ext, "JPE_%d_data", jix);
02192 setkey_str(&JPElist, ext, jpedata);
02193 sprintf(ext, "JPE_%d_wd", jix);
02194 setkey_str(&JPElist, ext, path);
02195 sprintf(ext, "JPE_%d_rs", jix);
02196 setkey_fileptr(&JPElist, ext, (FILE *)rs);
02197 jix++;
02198 }
02199 else {
02200 sprintf(ext, "%s_%d_prog", arg->key, i);
02201 prog = getkey_str(xlist, ext);
02202 sprintf(ext, "%s_%d_level", arg->key, i);
02203 level = getkey_str(xlist, ext);
02204 sprintf(ext, "%s_%d_series", arg->key, i);
02205 series = getkey_str(xlist, ext);
02206 sprintf(ext, "%s_%d_series_sn", arg->key, i);
02207 seriesnum = getkey_int(xlist, ext);
02208 sprintf(jpedata, "prog:%s,level:%s,series:%s[%d]",
02209 prog, level, series, seriesnum);
02210 jcnt = getkey_int(JPElist, "JPE_out_nsets");
02211 for(k=0; k < jcnt; k++) {
02212 sprintf(ext, "JPE_%d_data", k);
02213 jdata = getkey_str(JPElist, ext);
02214 if(!strcmp(jpedata, jdata)) {
02215 sprintf(ext, "JPE_%d_wd", k);
02216 wd = getkey_str(JPElist, ext);
02217 break;
02218 }
02219 }
02220 if (k == jcnt) {
02221
02222 rs = drms_create_record(drms_env, out_namespace, DRMS_PERMANENT, &dstatus);
02223 if(dstatus) {
02224 printk("**ERROR %d: Can't create record in %s\n", dstatus, out_namespace);
02225 abortit(1);
02226 }
02227 drms_record_directory(rs, path, 0);
02228 sprintf(ext, "%s_%d_rs", arg->key, i);
02229 setkey_fileptr(&sptr->map_list, ext, (FILE *)rs);
02230 sprintf(ext, "%s_%d_wd", arg->key, i);
02231 setkey_str(&xlist, ext, path);
02232 wd = path;
02233 sprintf(ext, "%s_%d_prog", arg->key, i);
02234 prog = getkey_str(xlist, ext);
02235 sprintf(ext, "%s_%d_level", arg->key, i);
02236 level = getkey_str(xlist, ext);
02237 sprintf(ext, "%s_%d_series", arg->key, i);
02238 series = getkey_str(xlist, ext);
02239 sprintf(ext, "%s_%d_series_sn", arg->key, i);
02240 seriesnum = getkey_int(xlist, ext);
02241 sprintf(jpedata, "prog:%s,level:%s,series:%s[%d]",
02242 prog, level, series, seriesnum);
02243
02244 sprintf(ext, "JPE_%d_data", jix);
02245 setkey_str(&JPElist, ext, jpedata);
02246 sprintf(ext, "JPE_%d_wd", jix);
02247 setkey_str(&JPElist, ext, path);
02248 jix++;
02249 } else {
02250 sprintf(ext, "JPE_%d_wd", k);
02251 wd = getkey_str(JPElist, ext);
02252 sprintf(ext, "%s_%d_wd", arg->key, i);
02253 setkey_str(&xlist, ext, wd);
02254 sprintf(ext, "JPE_%d_rs", k);
02255 rs = (DRMS_Record_t *)getkey_fileptr(JPElist, ext);
02256 sprintf(ext, "%s_%d_rs", arg->key, i);
02257 setkey_fileptr(&sptr->map_list, ext, (FILE *)rs);
02258 }
02259 }
02260 }
02261
02262 if(!wd || !strcmp(wd, "")) {
02263 pemail("***No %s for %s. Missing rule or template in map file.\n",
02264 ext, sptr->name);
02265 abortit(1);
02266 }
02267 if(*wd == '.') {
02268 pemail("***%s of \".\" not valid for %s.\n", ext, sptr->name);
02269 abortit(1);
02270 }
02271
02272 if(!wd_dup_ck(wd_dup_list, wd)) {
02273 sprintf(wdkey, "wd_%d", wdkey_int++);
02274 setkey_str(&wd_dup_list, wdkey, wd);
02275 }
02276 else {
02277 if(!nowarn) {
02278 printf("**WARNING: pe detects two datasets that have the same output wd:\n");
02279 printf(" Make sure that you have run with START_ARCHIVE/END_ARCHIVE\n");
02280 printf(" wd = %s\n", wd);
02281 }
02282 }
02283 }
02284 setkey_int(&JPElist, "JPE_out_nsets", jix);
02285 firstalloc = 0;
02286 if(dbxflg) {
02287 printf("\n***** The xlist at the end of form_arg_data_out is:\n");
02288 keyiterate(printkey, xlist);
02289 printf("\n##### The JPElist at the end of form_arg_data_out is:\n");
02290 keyiterate(printkey, JPElist);
02291 }
02292 return(xlist);
02293 }
02294
02295
02296
02297
02298
02299
02300
02301
02302
02303
02304
02305
02306 void in_ds_rcp(KEY *list)
02307 {
02308 int i, loop, innsets;
02309 char *argname, *wd, *cptr;
02310 char ext[MAX_STR], inname[MAX_STR], cmd[MAX_STR];
02311 char targetdir[MAX_STR];
02312 struct stat stbuf;
02313
02314 for(loop = 0; ; loop++) {
02315 sprintf(ext, "arg_data_in_%d", loop);
02316 if(!findkey(list, ext)) break;
02317 argname = getkey_str(list, ext);
02318 strcpy(inname, argname); strcat(inname, "_nsets");
02319 innsets = getkey_int(list, inname);
02320 for(i=0; i < innsets; i++) {
02321 sprintf(ext, "_%d_wd", i);
02322 strcpy(inname, argname); strcat(inname, ext);
02323 wd = getkey_str(list, inname);
02324 if(strcmp(wd, "")) {
02325 cptr = index(wd+1, '/');
02326 sprintf(targetdir, "%s%s", rcpdir, cptr);
02327 if(stat(targetdir, &stbuf)) {
02328 sprintf(cmd, "mkdir -p %s", targetdir);
02329 if(system(cmd)) {
02330 pemail("***Cannot %s\n", cmd);
02331 abortit(1);
02332 }
02333 }
02334 sprintf(cmd, "rcp -p %s:%s/* %s", prod_host_prime(), wd, targetdir);
02335 if(system(cmd)) {
02336 pemail("***Cannot %s\n", cmd);
02337 abortit(1);
02338 }
02339 printf("%s\n", cmd);
02340 setkey_str(&list, inname, targetdir);
02341 }
02342 }
02343 }
02344 }
02345
02346
02347
02348
02349
02350
02351
02352
02353
02354
02355
02356
02357
02358
02359
02360
02361
02362
02363
02364
02365
02366
02367
02368
02369
02370
02371
02372
02373
02374
02375
02376 void form_keylist(PSERVER *sptr, HDATA *hnext)
02377 {
02378 KEY *xlist, *inlist, *dslist;
02379 argument *arg, *argsort[MAX_ARGS];
02380 char ext[MAX_STR];
02381 char *pname;
02382 int inseq, i, ax;
02383 int inapp = 0;
02384
02385
02386 arg=sptr->arguments;
02387 i = 0;
02388 while(arg->kind != ARG_END) {
02389 if(arg->kind == ARG_DATA_IN) argsort[i++] = arg;
02390 arg++;
02391 }
02392 arg=sptr->arguments;
02393 while(arg->kind != ARG_END) {
02394 if(arg->kind != ARG_DATA_IN) argsort[i++] = arg;
02395 arg++;
02396 }
02397 argsort[i] = arg;
02398
02399
02400 inseq = 0; inlist = newkeylist(); dslist = newkeylist();
02401 ax = 0; arg = argsort[ax];
02402 while(arg->kind != ARG_END) {
02403 switch(arg->kind) {
02404 case ARG_DATA_IN:
02405 inlist = (KEY *)form_arg_data_in(inlist, sptr, arg, inseq);
02406 sprintf(ext, "%s_0_prog", arg->key);
02407
02408
02409 if(findkey(inlist, ext)) {
02410 pname = getkey_str(inlist, ext);
02411
02412 if(!strcmp(pname, "mdi_eof_log") || !strcmp(pname, "mdi_eof_rec")
02413 || !strcmp(pname, "mdi_log") || !strcmp(pname, "mdi_rec") ||
02414 !strcmp(pname, "mdi_sim_rec") || !strcmp(pname, "soi_eof_rec")) {
02415 inapp = 1;
02416 }
02417 if(!inseq++) {
02418
02419
02420
02421 }
02422 }
02423 add_keys(inlist, &hnext->param_list);
02424 break;
02425 case ARG_DATA_OUT:
02426 xlist = (KEY *)form_arg_data_out(sptr, arg);
02427 add_keys(xlist, &hnext->param_list);
02428 freekeylist(&xlist);
02429 break;
02430 default:
02431 break;
02432 }
02433 arg = argsort[++ax];
02434 }
02435 if((inseq) && (sptr->dsin) && (!inapp)) {
02436 if(dbxflg) {
02437 printf("\nThe list before the dsds_arg_data_in(inlist) call:\n");
02438 keyiterate(printkey, inlist);
02439 }
02440 dslist = (KEY *)dsds_arg_data_in(inlist);
02441 if(dbxflg) {
02442 printf("\nThe list after the dsds_arg_data_in(inlist) call:\n");
02443 keyiterate(printkey, dslist);
02444 }
02445 if(rcpflg) {
02446
02447
02448 in_ds_rcp(dslist);
02449 }
02450 add_keys(dslist, &hnext->param_list);
02451
02452
02453 freekeylist(&inlist); freekeylist(&dslist);
02454 }
02455 else if(inseq) {
02456 add_keys(inlist, &hnext->param_list);
02457 freekeylist(&inlist);
02458 }
02459
02460 ck_arglist(sptr, hnext);
02461 if(findkey(hnext->param_list, "T_FIRST")) {
02462 t_first_arg = getkey_str(hnext->param_list, "T_FIRST");
02463 }
02464 else {
02465 t_first_arg = NULL;
02466 }
02467 if(findkey(hnext->param_list, "T_LAST")) {
02468 t_last_arg = getkey_str(hnext->param_list, "T_LAST");
02469 }
02470 else {
02471 t_last_arg = NULL;
02472 }
02473 }
02474
02475
02476
02477
02478
02479
02480
02481 void send_to_serv(PSERVER *sptr, HDATA *hx, int doflg)
02482 {
02483 KEY *keybad;
02484 argument *arg;
02485 char cfsn[MAX_STR], clsn[MAX_STR];
02486
02487 hx->busy=1;
02488 sptr->busyall++;
02489 arg=sptr->arguments;
02490 while(arg->kind != ARG_END) {
02491 if(arg->kind == ARG_DATA_IN) {
02492 sprintf(cfsn, "%s_0_fsn", arg->key);
02493 if(!sptr->split || !findkey(hx->param_list, cfsn)) {
02494 printf("%s %s sent:\n %s=%s\n",hx->host_name,
02495 sptr->name,arg->key,getkey_str(hx->param_list,arg->key));
02496 break;
02497 }
02498 else {
02499 sprintf(clsn, "%s_0_lsn", arg->key);
02500 printf("%s %s sent:\n %s=%s split[%d-%d]\n",hx->host_name,
02501 sptr->name,arg->key,getkey_str(hx->param_list,arg->key),
02502 getkey_int(hx->param_list, cfsn),
02503 getkey_int(hx->param_list, clsn));
02504 break;
02505 }
02506 }
02507 arg++;
02508 }
02509 if(!sptr->cphist)
02510 setkey_int(&hx->param_list, "nocphist", 1);
02511 setkey_int(&hx->param_list, "dsds_tid", dsds_tid);
02512 setkey_uint(&hx->param_list, "dsds_uid", uid);
02513 setkey_str(&hx->param_list, "pe_mapfile", pe_map);
02514 pvm_initsend(PvmDataDefault);
02515 pvm_pkint(&doflg, 1, 1);
02516 if(keybad=(KEY *)pack_keylist(hx->param_list)) {
02517 pemail("***Err packing a pvm msg, type=%d name=%s\n",keybad->type,keybad->name);
02518 abortit(1);
02519 }
02520
02521
02522 if(pvm_send(hx->tid, sptr->msgid)) {
02523 pemail("***Error calling %s on %s\n",sptr->name,hx->host_name);
02524 abortit(1);
02525 }
02526 }
02527
02528
02529
02530
02531
02532
02533
02534
02535
02536
02537
02538 int ds_names_file(KEY *list, argument *argu)
02539 {
02540 AT *atp;
02541 argument *arg;
02542 char ext[MAX_STR], rdbline[1024], sysstr[1024];
02543 char *prog, *level, *series, *wd;
02544 double bytes;
02545 unsigned long dsindex;
02546 int nsets, levnum, prognum, seriesnum;
02547 int i = 0;
02548 char DSINFO[] ="arg\tprog\tprog#\tlevel\tlev#\tseries\tser#\tdsindex\tbytes\n----\t-----\t------\t------\t-----\t-------\t-----\t-------\t------\n";
02549 if(!(atp = at_create(DSINFO))) {
02550 printf("**Can't at_create() the table for ds.rdb file\n");
02551 return(1);
02552 }
02553 arg = argu;
02554 while(arg->kind != ARG_END) {
02555 if(arg->kind == ARG_DATA_IN || arg->kind == ARG_DATA_OUT) {
02556 sprintf(ext, "%s_nsets", arg->key);
02557 nsets = getkey_int(list, ext);
02558 for(i=0; i < nsets; i++) {
02559 sprintf(ext, "%s_%d_prog", arg->key, i);
02560 if(findkey(list, ext)) prog = getkey_str(list, ext);
02561 else prog = "(null)";
02562 sprintf(ext, "%s_%d_prog_sn", arg->key, i);
02563 if(findkey(list, ext))
02564 prognum = getkey_int(list, ext);
02565 else
02566 prognum = -1;
02567 sprintf(ext, "%s_%d_level", arg->key, i);
02568 if(findkey(list, ext)) level = getkey_str(list, ext);
02569 else level = "(null)";
02570 sprintf(ext, "%s_%d_level_sn", arg->key, i);
02571 if(findkey(list, ext))
02572 levnum = getkey_int(list, ext);
02573 else
02574 levnum = -1;
02575 sprintf(ext, "%s_%d_series", arg->key, i);
02576 if(findkey(list, ext)) series = getkey_str(list, ext);
02577 else series = "(null)";
02578 sprintf(ext, "%s_%d_series_sn", arg->key, i);
02579 if(findkey(list, ext))
02580 seriesnum = getkey_int(list, ext);
02581 else
02582 seriesnum = -1;
02583 sprintf(ext, "%s_%d_ds_index", arg->key, i);
02584 if(findkey(list, ext))
02585 dsindex = getkey_ulong(list, ext);
02586 else
02587 dsindex = (unsigned long)-1;
02588 sprintf(ext, "%s_%d_bytes", arg->key, i);
02589 if(findkey(list, ext))
02590 bytes = getkey_double(list, ext);
02591 else
02592 bytes = 0;
02593 sprintf(ext, "%s_%d", arg->key, i);
02594 sprintf(rdbline, "%s\t%s\t%d\t%s\t%d\t%s\t%d\t%lu\t%g",
02595 ext,prog,prognum,level,levnum,series,seriesnum,dsindex,bytes);
02596 if(at_put_info(atp, rdbline) != AT_OK) {
02597 printf("**Error on at_put_info() to ds.rdb file\n");
02598 return(1);
02599 }
02600 }
02601 }
02602 arg++;
02603 }
02604
02605 arg = argu;
02606 while(arg->kind != ARG_END) {
02607 if(arg->kind == ARG_DATA_OUT) {
02608 sprintf(ext, "%s_nsets", arg->key);
02609 nsets = getkey_int(list, ext);
02610 for(i=0; i < nsets; i++) {
02611 sprintf(ext, "%s_%d_wd", arg->key, i);
02612 wd = getkey_str(list, ext);
02613 if(strcmp(wd, "")) {
02614 sprintf(ext, "%s/ds.rdb", wd);
02615 if(at_write(atp, ext) != AT_OK) {
02616 printf("**Error on at_write() to %s\n", ext);
02617 return(1);
02618 }
02619
02620
02621
02622 sprintf(sysstr, "chmod -R go-ws %s; chown -Rhf production %s",
02623 wd, wd);
02624 if(system(sysstr)) {
02625 printf("**Warning: Error on: %s\n", sysstr);
02626
02627 }
02628 }
02629 }
02630 }
02631 arg++;
02632 }
02633 at_free(atp);
02634 return(0);
02635 }
02636
02637
02638
02639
02640
02641
02642
02643
02644 int get_overview(char *wd, KEY **list)
02645 {
02646 SDS *sds;
02647 DIR *dfd;
02648 struct dirent *dp;
02649 char *value;
02650 char name[512];
02651 int found = 0;
02652
02653 if((dfd=opendir(wd)) == NULL) {
02654 printf("**Can't opendir(%s) to find overview.fits\n", wd);
02655 return(0);
02656 }
02657 while((dp=readdir(dfd)) != NULL) {
02658 if(!strstr(dp->d_name, "overview.fits"))
02659 continue;
02660 found = 1;
02661 sprintf(name, "%s/%s", wd, dp->d_name);
02662 closedir(dfd);
02663
02664 if ((sds = sds_get_fits_head (name)) == NULL) {
02665 printf("**Error on get_fits_head(%s)\n", name);
02666 return(0);
02667 }
02668 if((value=sds_search_attrvalue_str(sds, "T_FIRST")) == NULL) {
02669
02670 return(0);
02671 }
02672 if(!strcmp(value, ""))
02673 setkey_str(list, "t_first", "-1");
02674 else
02675 setkey_str(list, "t_first", value);
02676 if((value=sds_search_attrvalue_str(sds, "T_LAST")) == NULL) {
02677 printf("**No t_last in %s\n", name);
02678 return(0);
02679 }
02680 if(!strcmp(value, ""))
02681 setkey_str(list, "t_last", "-1");
02682 else
02683 setkey_str(list, "t_last", value);
02684 break;
02685 }
02686 if(found)
02687 return(1);
02688 else {
02689 closedir(dfd);
02690 if(t_first_arg) {
02691 setkey_str(list, "t_first", t_first_arg);
02692 if(t_last_arg) {
02693 setkey_str(list, "t_last", t_last_arg);
02694 }
02695 else {
02696 setkey_str(list, "t_last", "-1");
02697 }
02698 return(1);
02699 }
02700 if(t_last_arg) {
02701 setkey_str(list, "t_first", "-1");
02702 setkey_str(list, "t_last", t_last_arg);
02703 return(1);
02704 }
02705 return(0);
02706 }
02707 }
02708
02709
02710
02711
02712
02713
02714
02715
02716
02717
02718
02719
02720 KEY *set_key_archive(char *basename, PSERVER *stab, HDATA *hdata, KEY *rlist,
02721 int status)
02722 {
02723 KEY *alist;
02724 char *wd, *warnmsg;
02725 char ext[MAX_STR];
02726 double dsize;
02727
02728
02729 alist=newkeylist();
02730 sprintf(ext, "%s_wd", basename);
02731 wd = getkey_str(hdata->param_list, ext);
02732
02733 get_overview(wd, &alist);
02734 setkey_int(&alist, "status", status);
02735 if((warnmsg=getkey_str(rlist, "WARNING")))
02736 setkey_str(&alist, "warning", warnmsg);
02737
02738
02739
02740
02741
02742
02743
02744
02745
02746
02747
02748
02749
02750
02751
02752
02753
02754
02755 setkey_str(&alist, "wd", wd);
02756 sprintf(ext, "%s_prog", basename);
02757 setkey_str(&alist, "prog", getkey_str(hdata->param_list, ext));
02758 sprintf(ext, "%s_series", basename);
02759 setkey_str(&alist,"series", getkey_str(hdata->param_list, ext));
02760 sprintf(ext, "%s_prog_sn", basename);
02761 if(!findkey(hdata->param_list, ext))
02762 setkey_int(&alist, "prog_sn", -1);
02763 else
02764 setkey_int(&alist,"prog_sn", getkey_int(hdata->param_list, ext));
02765 sprintf(ext, "%s_series_sn", basename);
02766 if(!findkey(hdata->param_list, ext))
02767 setkey_int(&alist, "series_sn", -1);
02768 else
02769 setkey_int(&alist,"series_sn", getkey_int(hdata->param_list, ext));
02770 sprintf(ext, "%s_level", basename);
02771 setkey_str(&alist,"level", getkey_str(hdata->param_list, ext));
02772 sprintf(ext, "%s_level_sn", basename);
02773 if(findkey(hdata->param_list, ext))
02774 setkey_int(&alist,"level_sn", getkey_int(hdata->param_list, ext));
02775 dsize = du_dir(wd);
02776
02777
02778 setkey_int(&alist, "lago_tid", lago_tid);
02779 setkey_double(&alist, "dsds_bytes", dsize);
02780 setkey_uint(&alist, "dsds_uid", uid);
02781 setkey_str(&alist, "svc_name", stab->name);
02782 setkey_str(&alist, "svc_version", stab->version);
02783 setkey_str(&alist, "username", username);
02784 return(alist);
02785 }
02786
02787
02788
02789
02790
02791
02792
02793
02794
02795
02796
02797 void call_archive(PSERVER *stab, HDATA *hdata, KEY *rlist, int status)
02798 {
02799 KEY *alist;
02800 DRMS_Record_t *rsx;
02801 argument *arg;
02802 char *wd;
02803 char oname[MAX_STR];
02804 int outnsets, i, snum;
02805 unsigned long sunum;
02806 double dsize;
02807
02808 archactive = 1;
02809 arg = stab->arguments;
02810 while(arg->kind != ARG_END) {
02811 if(arg->kind == ARG_DATA_OUT) {
02812 sprintf(oname, "%s_nsets", arg->key);
02813 outnsets = getkey_int(hdata->param_list, oname);
02814 for(i=0; i < outnsets; i++) {
02815 sprintf(oname, "%s_%d", arg->key, i);
02816
02817 alist = set_key_archive(oname, stab, hdata, rlist, status);
02818 wd = getkey_str(alist, "wd");
02819 dsize = getkey_double(alist, "dsds_bytes");
02820
02821 sprintf(oname, "%s_%d_rs", arg->key, i);
02822 rsx = (DRMS_Record_t *)getkey_fileptr(stab->map_list, oname);
02823 snum = getkey_int(alist, "series_sn");
02824 status = drms_setkey_int(rsx, "snum", snum);
02825 if(status) {
02826 printk("ERROR: can't drms_setkey_int() for snum=%u\n", snum);
02827 abortit(1);
02828 }
02829 drms_setkey_string (rsx, "prog", getkey_str (alist, "prog"));
02830 drms_setkey_string (rsx, "level", getkey_str (alist, "level"));
02831 drms_setkey_string (rsx, "series", getkey_str (alist, "series"));
02832
02833 sunum = rsx->sunum;
02834 rsx->seriesinfo->retention = stab->archive_day;
02835 if (!stab->archive || stab->archive == 't') {
02836
02837 rsx->seriesinfo->archive = -1;
02838 }
02839 else {
02840 rsx->seriesinfo->archive = 1;
02841 stab->archive_complete = 1;
02842
02843 if (stab->archive == 'p') rsx->seriesinfo->retention = 10000;
02844 }
02845
02846 if((status = drms_close_record(rsx, DRMS_INSERT_RECORD))) {
02847 printk("**ERROR: drms_close_record failed status=%d\n", status);
02848 abortit(1);
02849 }
02850 sprintf(oname, "%s_%d_bytes", arg->key, i);
02851 setkey_double(&hdata->param_list, oname, dsize);
02852 sprintf(oname, "%s_%d_ds_index", arg->key, i);
02853 setkey_ulong(&hdata->param_list, oname, sunum);
02854
02855 freekeylist(&alist);
02856 printf("Archive pending: wd=%s bytes=%g\n", wd, dsize);
02857 }
02858 }
02859 arg++;
02860 }
02861 stab->archive_complete = 1;
02862 ds_names_file(hdata->param_list, stab->arguments);
02863 if(archactive == -1) {
02864 abortit(1);
02865 }
02866 archactive = 0;
02867 }
02868
02869
02870
02871
02872
02873
02874
02875
02876
02877
02878
02879
02880
02881
02882
02883
02884 void call_archive_0(PSERVER *stab, HDATA *hdata, KEY *rlist, int status)
02885 {
02886 KEY *alist, *blist;
02887 argument *arg;
02888 char *wd;
02889 char oname[MAX_STR], ext[MAX_STR];
02890 int outnsets, found, i;
02891 double dsize;
02892
02893 arg = stab->arguments; found = 0;
02894 while(arg->kind != ARG_END) {
02895 if(arg->kind == ARG_DATA_OUT) {
02896 sprintf(oname, "%s_nsets", arg->key);
02897 outnsets = getkey_int(hdata->param_list, oname);
02898 for(i=0; i < outnsets; i++) {
02899 sprintf(oname, "%s_%d", arg->key, i);
02900 sprintf(ext, "%s_level_sn", oname);
02901 if(findkey(hdata->param_list, ext)) {
02902 if(getkey_int(hdata->param_list,ext) != 0)
02903 continue;
02904 }
02905 else continue;
02906 found = 1;
02907 stab->archive_day = 2;
02908
02909 alist = set_key_archive(oname, stab, hdata, rlist, status);
02910 wd = getkey_str(alist, "wd");
02911 dsize = getkey_double(alist, "dsds_bytes");
02912 if((blist = (KEY *)call_dsds(&alist, REQUPDATE0, dsds_tid, pe_tid, (int(*)(char *, ...))printf, debugflg)) == NULL) {
02913 pemail("***Error making a dsds_main entry for wd=%s\n", wd);
02914 abortit(1);
02915 }
02916 printf("level#0 entry: wd=%s bytes=%g\n", wd, dsize);
02917
02918
02919 if(findkey(blist, "ds_index")) {
02920 sprintf(ext, "%s_ds_index", oname);
02921 setkey_ulong(&hdata->param_list,ext,getkey_ulong(blist, "ds_index"));
02922 }
02923 sprintf(ext, "%s_bytes", oname);
02924 setkey_double(&hdata->param_list, ext, dsize);
02925
02926
02927
02928
02929
02930
02931 freekeylist(&alist); freekeylist(&blist);
02932 }
02933 }
02934 arg++;
02935 }
02936 if(found)
02937 ds_names_file(hdata->param_list, stab->arguments);
02938 }
02939
02940
02941
02942
02943
02944
02945
02946 void kick_server(PSERVER *sptr)
02947 {
02948 HDATA *hnext;
02949 int i;
02950
02951 if(!sptr->archive_group) {
02952 firstalloc = 1;
02953 JPE_out_nsets = 0;
02954 if(JPElist) freekeylist(&JPElist);
02955 JPElist = newkeylist();
02956 }
02957 hnext = (HDATA *)gethnext(sptr->hosts);
02958 for(i=0; i<effective_hosts; i++) {
02959
02960
02961
02962
02963 pvm_spawn(sptr->name, (char **)0, PvmTaskHost, hnext->host_name, 1,
02964 &hnext->tid);
02965 if(hnext->tid < 0) {
02966 pemail("***Can't spawn %s on %s\n",sptr->name,hnext->host_name);
02967 abortit(1);
02968 }
02969 printk("%s tid=%x spawned on %s\n",
02970 sptr->name, hnext->tid, hnext->host_name);
02971 msgid_send(sptr, hnext);
02972 arg_recv(sptr, hnext);
02973
02974
02975 form_keylist(sptr, hnext);
02976 send_to_serv(sptr,hnext,0);
02977 hnext = (HDATA *)gethnext((HDATA *)MONE);
02978
02979 }
02980 }
02981
02982
02983
02984
02985 int resp_any()
02986 {
02987 struct timeval tvalr;
02988 uint64_t tsr;
02989 int retcode, bufid, bytes, msgtag, tid, i;
02990
02991
02992 gettimeofday(&tvalr, NULL);
02993 tsr = tvalr.tv_sec;
02994 while(1) {
02995 gettimeofday(&tvalr, NULL);
02996 if(tvalr.tv_sec - tsr > RESPWAIT) {
02997 retcode = -1;
02998 break;;
02999 }
03000 if(bufid=pvm_nrecv(-1, -1)) {
03001 if(pvm_bufinfo(bufid, &bytes, &msgtag, &tid)) {
03002 pemail("***Can't get bufinfo on bufid=%d\n", bufid);
03003 abortit(1);
03004 }
03005 for(i=0; i<MAX_SERV; i++) {
03006 if(stab[i].name == NULL) {
03007 pemail("***Got a bad msg type response %d from tid=%x\n", msgtag, tid)
03008 ;
03009 abortit(1);
03010 }
03011 if(stab[i].msgid == msgtag)
03012 break;
03013 }
03014 retcode=i;
03015 break;
03016 }
03017 #ifdef __sgi
03018 sginap(1);
03019 #else
03020 sleep(1);
03021 #endif
03022 }
03023 return(retcode);
03024 }
03025
03026
03027
03028
03029
03030
03031
03032 void process_resp(int ix)
03033 {
03034 PSERVER *stabp, *stabn;
03035 HDATA *hnext;
03036 KEY *list;
03037 argument *arg;
03038 char remhost[MAX_STR], sysstr[MAX_STR], ext[MAX_STR];
03039 char *errtxt, *remoutfile, *outwd, *cptr, *warnmsg, *auxinfo;
03040 int j, remerrno, remtid, wait, groupid, sindex, outs, abortnum;
03041
03042 stabp = &stab[ix];
03043 pvm_upkint(&remtid, 1,1 );
03044 pvm_upkint(&remerrno, 1,1 );
03045 pvm_upkstr(remhost);
03046 list=newkeylist();
03047 if(!(list=(KEY *)unpack_keylist(list))) {
03048 printk("**Warning: no keylist returned from %s tid=%x\n",stabp->name,remtid);
03049 }
03050
03051 if(!(remoutfile=getkey_str(list, "out")))
03052 remoutfile = "<NONE>";
03053 if(remerrno) {
03054 switch(remerrno) {
03055 case MISSING_FILES:
03056 errtxt = "MISSING_FILES";
03057 break;
03058 default:
03059 errtxt = " ";
03060 break;
03061 }
03062 printk(" %s %x status %d on %s:%s\n", stabp->name, remtid, remerrno, remhost, errtxt);
03063 }
03064
03065
03066
03067
03068
03069 hnext = (HDATA *)gethnext(stabp->hosts);
03070 while(hnext) {
03071 if(hnext->tid == remtid) {
03072 hnext->busy = 0;
03073 stabp->busyall--;
03074
03075 arg = stabp->arguments;
03076 if(stabp->cphist) {
03077 while(arg->kind != ARG_END) {
03078 if(arg->kind == ARG_DATA_OUT) {
03079 sprintf(ext, "%s_nsets", arg->key);
03080 if(findkey(hnext->param_list, ext)) {
03081 outs = getkey_int(hnext->param_list, ext);
03082 for(j=0; j < outs; j++) {
03083 sprintf(sysstr, "%s_%d_wd", arg->key, j);
03084 if(outwd = getkey_str(hnext->param_list, sysstr)) {
03085 if(cptr = rindex(pe_map, '/'))
03086 cptr=cptr+1;
03087 else
03088 cptr=pe_map;
03089 sprintf(sysstr, "cp %s %s;chmod u+w,g+w %s/%s",
03090 pe_map, outwd, outwd, cptr);
03091 if(system(sysstr)) {
03092 pemail("***Error on: %s\n", sysstr);
03093 abortit(1);
03094 }
03095 }
03096 }
03097 }
03098 }
03099 arg++;
03100 }
03101 }
03102 if(!(auxinfo=getkey_str(list, "AUXINFO")))
03103 auxinfo = "<NONE>";
03104 if(findkey(list, "abortflg")) {
03105 if((abortnum=getkey_int(list, "abortflg"))) {
03106 if(groupid = stabp->groupid) {
03107 pemail(" ***Module %s abort. See %s\n",
03108 stabp->name, getkey_str(list, "log_file"));
03109
03110
03111
03112 pemail(" Not aborting as module is part of a group\n");
03113 }
03114 else {
03115 pemail(" ***Module %s abort. See %s\n",
03116 stabp->name, getkey_str(list, "log_file"));
03117 if (stabp->noabort) {
03118
03119
03120 if(abortnum == 1) {
03121 if(stabp->archive != 'a') {
03122 stabp->archive = 0;
03123 }
03124 }
03125 }
03126 else {
03127 if((warnmsg=getkey_str(list, "WARNING"))) {
03128 pemail(" with warning msg:\n %s\n", warnmsg);
03129 }
03130 pelog("%x\t%d\tabort\t0\t%s\t%s\t%s\n",
03131 remtid,remerrno,stabp->name,remoutfile,auxinfo);
03132
03133 if(!stabp->archive && reqmegs)
03134 call_archive_0(stabp, hnext, list, remerrno);
03135
03136 if(abortnum == 2) {
03137 if(stabp->archive && (stabp->archive_group != 1)) {
03138 if(stabp->archive != 'a') {
03139 if(reqmegs) {
03140 call_archive(stabp, hnext, list, remerrno);
03141 }
03142 }
03143 else {
03144 stabp->archive_complete = 1;
03145 }
03146 }
03147 }
03148
03149
03150
03151 if(abortnum == 1) {
03152 stabp->archive_group = 0;
03153 }
03154 abortit(-1);
03155 }
03156 }
03157 }
03158 }
03159 printk(" %s %s %x complete:\n %s\n",
03160 remhost,stabp->name,remtid,remoutfile);
03161 if((warnmsg=getkey_str(list, "WARNING"))) {
03162 printk(" **with warning msg:\n %s\n", warnmsg);
03163 }
03164
03165 pvm_kill(hnext->tid);
03166
03167 hnext->tid = 0;
03168
03169 if(!stabp->busyall) {
03170
03171
03172
03173
03174 if(stabp->archive && (stabp->archive_group != 1)) {
03175 if(stabp->archive != 'a') {
03176 if(reqmegs) {
03177 call_archive(stabp, hnext, list, remerrno);
03178 }
03179 else {
03180 printk("**Ignoring archive of data for %s:\n",stabp->name);
03181 printk(" No DSDS storage alloc request in map file.\n");
03182 }
03183 }
03184 else {
03185 stabp->archive_complete = 1;
03186 }
03187 }
03188
03189
03190 if(!stabp->archive && reqmegs)
03191 call_archive_0(stabp, hnext, list, remerrno);
03192 }
03193 if(abortnum) {
03194 pelog("%x\t%d\tabort\t%d\t%s\t%s\t%s\n",
03195 remtid,remerrno,stabp->archive_complete,stabp->name,remoutfile,auxinfo);
03196 }
03197 else {
03198 pelog("%x\t%d\tnormal\t%d\t%s\t%s\t%s\n",
03199 remtid,remerrno,stabp->archive_complete,stabp->name,remoutfile,auxinfo);
03200 }
03201 break;
03202 }
03203 hnext = (HDATA *)gethnext((HDATA *)MONE);
03204 }
03205 if(!hnext) {
03206 pemail("***No such host %s for server %s\n",remhost,stabp->name);
03207 abortit(1);
03208 }
03209
03210 wait = 0;
03211 if(groupid = stabp->groupid) {
03212 for(j=0; j<MAX_SERV; j++) {
03213 if(stab[j].groupid == groupid) {
03214 if(stab[j].busyall) {
03215 wait = 1;
03216 break;
03217 }
03218 }
03219 }
03220 }
03221 else {
03222 if(stabp->busyall) wait = 1;
03223 }
03224
03225 while(wait) {
03226 if((sindex=resp_any())== -1)
03227 who_died();
03228 else {
03229 process_resp(sindex);
03230 freekeylist(&list);
03231 return;
03232 }
03233 }
03234 if(groupid)
03235 printk("End Server Group #%d\n", groupid);
03236
03237
03238 stabn = &stab[ix+1];
03239 while(1) {
03240 if(stabn->name != NULL && groupid != 0 && stabn->groupid == groupid) stabn++;
03241 else break;
03242 }
03243 if(stabn->name != NULL) {
03244 if(groupid = stabn->groupid) {
03245 printk("Start Server Group #%d\n", groupid);
03246 while(stabn->groupid == groupid) {
03247 kick_server(stabn);
03248 stabn++;
03249 }
03250 }
03251 else
03252 kick_server(stabn);
03253 }
03254 freekeylist(&list);
03255 }
03256
03258
03259
03260
03261
03262
03263
03264
03265 void dereg()
03266 {
03267 PSERVER *sptr;
03268 HDATA *hdata;
03269 DRMS_Record_t *rsx;
03270 argument *arg;
03271 char *wd, *cptr;
03272 char oname[MAX_STR];
03273 double dsize;
03274 double mapstore = 0.0;
03275 int i, j, outnsets;
03276
03277 if(reqmegs) {
03278
03279 for(i=0; i < MAX_SERV; i++) {
03280 sptr = &stab[i];
03281 if(sptr->name == NULL)
03282 break;
03283
03284 arg = sptr->arguments;
03285 hdata = (HDATA *)gethnext(sptr->hosts);
03286 while(arg->kind != ARG_END) {
03287 if(arg->kind == ARG_DATA_OUT) {
03288 sprintf(oname, "%s_nsets", arg->key);
03289 outnsets = getkey_int(hdata->param_list, oname);
03290 for(j=0; j < outnsets; j++) {
03291 sprintf(oname, "%s_%d_wd", arg->key, j);
03292 if(wd=getkey_str(hdata->param_list, oname)) {
03293
03294 cptr = strstr(wd, "/SUM");
03295 if(cptr == wd) {
03296
03297
03298 if((sptr->archive_group != 1) && (sptr->archive != 'a')) {
03299 dsize = du_dir(wd);
03300 mapstore += dsize;
03301 }
03302 }
03303 }
03304
03305 sprintf(oname, "%s_%d_level_sn", arg->key, j);
03306 if(findkey(hdata->param_list, oname)) {
03307 if(getkey_int(hdata->param_list, oname) == 0)
03308 continue;
03309 }
03310 if(sptr->archive != 'a') {
03311 if(!sptr->archive_complete && (sptr->archive_group != 1)) {
03312
03313 sprintf(oname, "%s_%d_rs", arg->key, j);
03314 rsx = (DRMS_Record_t *)getkey_fileptr(sptr->map_list, oname);
03315 rsx->seriesinfo->archive = 0;
03316 if(wd) {
03317 if(strstr(wd, "/SUM")) {
03318 printk("Removing non-archived wd=%s bytes=%g\n", wd, dsize);
03319 if(rmdirs(wd, dsdswd)) {
03320 printk("**Can't rm wd=%s\n", wd);
03321 printk("(NOTE: May be due to an NFS delay. Usually ignore.)\n");
03322 }
03323 }
03324 }
03325 }
03326 }
03327 }
03328 }
03329 arg++;
03330 }
03331 }
03332 printk("High water storage bytes=%g\n", mapstore);
03333
03334
03335
03336
03337
03338
03340
03341
03342
03343
03344
03345
03346
03347
03348
03349
03350
03351
03352 }
03353
03354
03355
03356
03357
03358
03359
03360
03361
03362
03363
03364 }
03365
03366
03367
03368
03369
03370
03371
03372 void do_pipe()
03373 {
03374 PSERVER *sptr;
03375 int i, groupid;
03376 int sindex;
03377
03378
03379
03380 sptr = &stab[0];
03381 if(groupid = sptr->groupid) {
03382 printk("Start Server Group #%d\n", groupid);
03383 while(sptr->groupid == groupid) {
03384 kick_server(sptr);
03385 sptr++;
03386 }
03387 }
03388 else
03389 kick_server(sptr);
03390
03391 for(i=0; i<MAX_SERV; i++) {
03392 if(stab[i].name == NULL)
03393 break;
03394 while(stab[i].busyall) {
03395 if((sindex=resp_any())== -1)
03396 who_died();
03397 else
03398 process_resp(sindex);
03399 }
03400 }
03401 }
03402
03403
03404
03405 void setup(int argc, char *argv[])
03406 {
03407 struct timeval tvalr;
03408 struct tm *t_ptr;
03409 char logname[128], string[128], cwdbuf[128], idstr[256];
03410 int i;
03411
03412 if((pe_tid=start_pvm(printf)) == 0) {
03413 fprintf(stderr, "Can't start a pvm daemon!!\n");
03414 exit(1);
03415 }
03416 if(!prod_host_set()) {
03417 fprintf(stderr, "Error accessing file %s\n", PHNAME);
03418 exit (1);
03419 }
03420 if (signal(SIGINT, SIG_IGN) != SIG_IGN)
03421 signal(SIGINT, sighandler);
03422 if (signal(SIGTERM, SIG_IGN) != SIG_IGN)
03423 signal(SIGTERM, sighandler);
03424
03425 gettimeofday(&tvalr, NULL);
03426
03427 #ifdef __sparc
03428 #error
03429 #endif
03430 t_ptr = localtime(&tvalr.tv_sec);
03431
03432 sprintf(datestr, "%s", asctime(t_ptr));
03433 datestr[24] = '\0';
03434 wd_dup_list = newkeylist();
03435 wd_mk_list = newkeylist();
03436 JPElist = newkeylist();
03437 tae_tid=pvm_parent();
03438 uid = (uint)getpid();
03439 pvm_serror(1);
03440 if(!(username = (char *)getenv("USER"))) username = "nouser";
03441 sprintf(logname, PELOGFILE, username, getpid());
03442 open_pelog(logname);
03443 printk_set(printf, printf);
03444 getcwd(cwdbuf, 126);
03445 sprintf(idstr, "Cwd: %s\nCall: ", cwdbuf);
03446 for(i=0; i < argc; i++) {
03447 sprintf(string, "%s%s", argv[i], (i < argc-1) ? " " : "");
03448 strcat(idstr, string);
03449 }
03450 strcat(idstr, "\n");
03451 sprintf(string, "jpe started as tid=%x pid=%d user=%s\n",
03452 pe_tid, getpid(), username);
03453 strcat(idstr, string);
03454 sprintf(mailname, PEMAILFILE, username, getpid());
03455 open_pemail(mailname, idstr);
03456 printk("%s", idstr);
03457 pelog("#%s for %s on %s\n", logname, username, datestr);
03458 pelog("#listed in order of completion\n");
03459 pelog("#tid\tstatus\tcomplet\tarchive\tmodule\t\t\"out\"returned\tauxinfo\n");
03460 pelog("TID\tSTATUS\tCOMPLET\tARCHIVE\tMODULE\tOUT\tAUXINFO\n");
03461 pelog("---\t------\t-------\t-------\t------\t---\t-------\n");
03462
03463 umask(002);
03464 }
03465
03466
03467 int DoIt()
03468 {
03469 if(setjmp(env) != 0) {
03470 dereg();
03471 kill_pvm();
03472 pvm_exit();
03473 if (pemailfp) fclose(pemailfp);
03474 mailit();
03475 if (pelogfp) fclose(pelogfp);
03476 printk("PE Abnormal Completion\n");
03477 return(1);
03478 }
03479 cmdparams_get_argv(&cmdparams, &argv, &argc);
03480 setup(argc, argv);
03481 get_cmd(argc, argv);
03482 spawn_pvm();
03483 do_pipe();
03484 dereg();
03485 kill_pvm();
03486 pvm_exit();
03487 if (pemailfp) fclose(pemailfp);
03488 mailit();
03489 if (pelogfp) fclose(pelogfp);
03490 printk("PE Normal Completion\n");
03491 return(0);
03492 }