00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041 #include <jsoc.h>
00042 #include <cmdparams.h>
00043 #include <drms.h>
00044 #include <stdio.h>
00045 #include <stdlib.h>
00046 #include <strings.h>
00047 #include <signal.h>
00048 #include <sys/types.h>
00049 #include <sys/time.h>
00050 #include <sys/stat.h>
00051 #include <unistd.h>
00052 #include <printk.h>
00053 #include <errno.h>
00054 #include <sys/wait.h>
00055 #include "lev0lev1.h"
00056
00057
00058 #define LEV0SERIESNAMEHMI "hmi.lev0e"
00059 #define LEV0SERIESNAMEAIA "aia.lev0e"
00060 #define LEV1SERIESNAMEHMI "su_production.hmi_lev1e" //temp test case
00061 #define LEV1SERIESNAMEAIA "su_production.aia_lev1e" //temp test case
00062
00063 #define LEV1LOG_BASEDIR "/usr/local/logs/lev1"
00064 #define H1LOGFILE "/usr/local/logs/lev1/build_lev1_mgr_fsn.%s.log"
00065 #define QSUBDIR "/scr21/production/qsub"
00066 #define NUMTIMERS 8 //number of seperate timers avail
00067 #define MAXCPULEV1 32 //max# of forks can do at a time for stream mode
00068 #define DEFAULTCPULEV1 "8" //default# of forks can do at a time
00069 #define MAXQSUBLEV1 64 //max# of qsub can do at a time for reprocessing mode
00070 #define DEFAULTQSUBLEV1 "16"
00071 #define MAXJIDSTR MAXQSUBLEV1*16
00072 #define NOTSPECIFIED "***NOTSPECIFIED***"
00073 #define LOGTEST 0
00074 char args7sv[128];
00075
00076
00077 int qsubjob(unsigned int rec1, unsigned int rec2);
00078
00079
00080 ModuleArgs_t module_args[] = {
00081 {ARG_STRING, "instru", NOTSPECIFIED, "instrument. either hmi or aia"},
00082 {ARG_STRING, "dsin", NOTSPECIFIED, "dataset of lev0 filtergrams"},
00083 {ARG_STRING, "dsout", NOTSPECIFIED, "dataset of lev1 output"},
00084 {ARG_STRING, "logfile", NOTSPECIFIED, "optional log file name. Will create one if not given"},
00085 {ARG_INTS, "bfsn", "0", "first lev0 fsn to process. 0=error, must specify"},
00086 {ARG_INTS, "efsn", "0", "last lev0 fsn to process. 0=error, must specify"},
00087 {ARG_INTS, "numrec", NUMRECLEV1S, "number of lev0 to lev1 fsn at a time"},
00088 {ARG_INTS, "numcpu", DEFAULTCPULEV1, "max# of forks to do at a time for stream mode"},
00089 {ARG_INTS, "numqsub", DEFAULTQSUBLEV1, "max# of qsub to do at a time for reprocessing mode"},
00090 {ARG_FLAG, "v", "0", "verbose flag"},
00091 {ARG_FLAG, "h", "0", "help flag"},
00092 {ARG_END}
00093 };
00094
00095 ModuleArgs_t *gModArgs = module_args;
00096 CmdParams_t cmdparams;
00097
00098 char *module_name = "build_lev1_mgr_fsn";
00099
00100 FILE *h1logfp;
00101 FILE *qsubfp;
00102 static char datestr[32];
00103 static char open_dsname[256];
00104 static struct timeval first[NUMTIMERS], second[NUMTIMERS];
00105
00106 pid_t mypid;
00107 uint64_t jid;
00108 int verbose;
00109 unsigned int bfsn, efsn;
00110 long long lastrecnum0_now = 0;
00111 long long lastrecnum0_prev = 0;
00112 int numrec, numcpu, numqsub;
00113 int qcnt = 0;
00114
00115 int stream_mode = 0;
00116 int hmiaiaflg = 0;
00117 int loopcnt = 0;
00118 char logname[128];
00119 char argdsin[128], argdsout[128], arglogfile[128], arginstru[80];
00120 char argbfsn[80], argefsn[80], argnumrec[80], argnumcpu[80], argnumqsub[80];
00121 char timetag[32];
00122 char *username;
00123 char *logfile;
00124 char *instru;
00125 char *dsin;
00126 char *dsout;
00127
00128
00129 int nice_intro ()
00130 {
00131 int usage = cmdparams_get_int (&cmdparams, "h", NULL);
00132 if (usage)
00133 {
00134 printf ("Runs build_lev1_fsn processes to create lev1 datasets.\n\n");
00135 printf ("Usage: build_lev1_mgr_fsn [-vh]\n"
00136 "instru=<hmi|aia> dsin=<lev0> dsout=<lev1> bfsn=<fsn#> efsn=<fsn#>"
00137 "\nnumcpu=<#> numqsub=<#> logfile=<file>\n"
00138 " -h: help - show this message then exit\n"
00139 " -v: verbose\n"
00140 "instru= instrument. must be 'hmi' or 'aia'\n"
00141 "dsin= data set name of lev0 input\n"
00142 " default hmi=hmi.lev0e aia=aia.lev0e\n"
00143 "dsout= data set name of lev1 output\n"
00144 " default hmi=su_production.hmi_lev1e aia=su_production.aia_lev1e\n"
00145 "bfsn= first lev0 fsn to process. 0=error\n"
00146 "efsn= last lev0 fsn to process. 0=error\n"
00147 "numcpu= max# of forks to do at a time for stream mode. Default %s\n"
00148 "numqsub= max# of qsub to do at a time for reprocessing mode. Default %s\n"
00149 "logfile= optional log file name. If not given uses:\n"
00150 " /usr/local/logs/lev1/build_lev1_mgr_fsn.<time_stamp>.log\n",
00151 DEFAULTCPULEV1, DEFAULTQSUBLEV1);
00152 printf ("\n * Has two modes:\n"
00153 "!!NOTE: Stream Mode has been depricated!!\n"
00154 " * Stream Mode (one instance):\n"
00155 " * This is the normal quick look mode that runs continuously and\n"
00156 " * keeps up with the lev0 records.\n"
00157 " * brec=0, erec=0\n"
00158 " * - start at the previous highest lev0 record processed\n"
00159 " * This is keep in the DB table lev1_highest_lev0_recnum\n"
00160 " * - fork up to 8 (MAXCPULEV1) build_lev1_fsn for every \n"
00161 " * 17 (NUMRECLEV1) lev0 records. \n"
00162 " * - when an build_lev1_fsn completes, fork another for next 17 rec\n"
00163 " * - if no more lev0 records available, sleep and try again\n"
00164 " * - if 8 CPU not enough to keep up with lev0, go to 16, etc.\n"
00165 " *\n"
00166 " * Reprocessing Mode (any number of instances):\n"
00167 " * This is the definitive mode lev1 processing.\n"
00168 " * bfsn=1000, efsn=2000 \n"
00169 " * - qsub up to 16 (MAXQSUBLEV1) build_lev1_fsn for 17 records ea\n"
00170 " * - when a job completes qsub next 17 records until efsn is reached\n"
00171 " * - when all jobs are done, build_lev1_mgr_fsn will exit\n\n");
00172 return(1);
00173 }
00174 verbose = cmdparams_get_int (&cmdparams, "v", NULL);
00175 return (0);
00176 }
00177
00178
00179 char *get_datetime()
00180 {
00181 struct timeval tvalr;
00182 struct tm *t_ptr;
00183 static char datestr[32];
00184
00185 gettimeofday(&tvalr, NULL);
00186 t_ptr = localtime((const time_t *)&tvalr.tv_sec);
00187 sprintf(datestr, "%s", asctime(t_ptr));
00188 return(datestr);
00189 }
00190
00191
00192 char *do_datestr() {
00193 time_t tval;
00194 struct tm *t_ptr;
00195
00196 tval = time(NULL);
00197 t_ptr = localtime(&tval);
00198 sprintf(datestr, "%d.%02d.%02d_%02d:%02d:%02d",
00199 (t_ptr->tm_year+1900), (t_ptr->tm_mon+1),
00200 t_ptr->tm_mday, t_ptr->tm_hour, t_ptr->tm_min, t_ptr->tm_sec);
00201 return(datestr);
00202 }
00203
00204
00205 char *gettimetag()
00206 {
00207 struct timeval tvalr;
00208 struct tm *t_ptr;
00209
00210 gettimeofday(&tvalr, NULL);
00211 t_ptr = localtime((const time_t *)&tvalr);
00212 sprintf(timetag, "%04d.%02d.%02d.%02d%02d%02d",
00213 (t_ptr->tm_year+1900), (t_ptr->tm_mon+1), t_ptr->tm_mday, t_ptr->tm_hour, t_ptr->tm_min, t_ptr->tm_sec);
00214 return(timetag);
00215 }
00216
00217
00218 void BeginTimer(int n)
00219 {
00220 gettimeofday (&first[n], NULL);
00221 }
00222
00223 float EndTimer(int n)
00224 {
00225 gettimeofday (&second[n], NULL);
00226 if (first[n].tv_usec > second[n].tv_usec) {
00227 second[n].tv_usec += 1000000;
00228 second[n].tv_sec--;
00229 }
00230 return (float) (second[n].tv_sec-first[n].tv_sec) +
00231 (float) (second[n].tv_usec-first[n].tv_usec)/1000000.0;
00232 }
00233
00234
00235 int h1log(const char *fmt, ...)
00236 {
00237 va_list args;
00238 char string[32768];
00239
00240 va_start(args, fmt);
00241 vsprintf(string, fmt, args);
00242 if(h1logfp) {
00243 fprintf(h1logfp, string);
00244 fflush(h1logfp);
00245 }
00246 else {
00247 printf(string);
00248 fflush(stdout);
00249 }
00250 va_end(args);
00251 return(0);
00252 }
00253
00254 int send_mail(char *fmt, ...)
00255 {
00256 va_list args;
00257 char string[1024], cmd[1024];
00258
00259 va_start(args, fmt);
00260 vsprintf(string, fmt, args);
00261 sprintf(cmd, "echo \"%s\" | Mail -s \"build_lev1_fsn mail\" lev0_user", string);
00262 system(cmd);
00263 va_end(args);
00264 return(0);
00265 }
00266
00267
00268 void abortit(int stat)
00269 {
00270 char pcmd[128];
00271
00272 printk("***Abort in progress ...\n");
00273 printk("**Exit build_lev1_mgr_fsn w/ status = %d\n", stat);
00274 sprintf(pcmd, "/bin/rm %s/build_lev1_mgr_fsn.stream.touch 2>/dev/null",
00275 LEV1LOG_BASEDIR);
00276 system(pcmd);
00277 if (h1logfp) fclose(h1logfp);
00278 exit(stat);
00279 }
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293 int forkstream(long long recn0, long long maxrecn0, int force)
00294 {
00295 pid_t pid, wpid, fpid[MAXCPULEV1];
00296 unsigned int numofrecs, frec, lrec;
00297 int stat_loc, i, j, k, l, numtofork;
00298 char *args[9], pcmd[128];
00299 char args1[128], args2[128], args3[128], args4[128], args5[128], args6[128];
00300 char args7[128];
00301
00302 numofrecs = (maxrecn0 - recn0) + 1;
00303 numtofork = numofrecs/numrec;
00304 j = numtofork;
00305 if(j == 0) {
00306 if(force) { j=numtofork=1; }
00307 else return(0);
00308 }
00309 lrec = recn0-1;
00310 if(j < numcpu) l = j;
00311 else l = numcpu;
00312 if(LOGTEST) {
00313 sprintf(args7sv, "logfile=%s/build_lev1_fsn.%s.log",
00314 LEV1LOG_BASEDIR, gettimetag());
00315 }
00316 for(k=0; k < l; k++) {
00317 if(force) { frec = lrec+1; lrec = (frec + numofrecs)-1; }
00318 else { frec = lrec+1; lrec = (frec + numrec)-1; }
00319 if((pid = fork()) < 0) {
00320 printk("***Can't fork(). errno=%d\n", errno);
00321 return(1);
00322 }
00323 else if(pid == 0) {
00324 args[0] = "build_lev1_fsn";
00325 sprintf(args1, "dsin=%s", dsin);
00326 args[1] = args1;
00327 sprintf(args2, "dsout=%s", dsout);
00328 args[2] = args2;
00329 sprintf(args3, "bfsn=%u", frec);
00330 args[3] = args3;
00331 sprintf(args4, "efsn=%u", lrec);
00332 args[4] = args4;
00333 sprintf(args5, "instru=%s", instru);
00334 args[5] = args5;
00335 sprintf(args6, "quicklook=%d", stream_mode);
00336 args[6] = args6;
00337
00338 if(LOGTEST) {
00339 sprintf(args7, "%s", args7sv);
00340 }
00341 else {
00342 sprintf(args7, "logfile=%s/l1s_%u_%u.log",
00343 LEV1LOG_BASEDIR, frec, lrec);
00344 }
00345 args[7] = args7;
00346 args[8] = NULL;
00347 printk("execvp: %s %s %s %s %s %s %s %s\n",
00348 args[0],args[1],args[2],args[3],args[4],args[5],args[6],args[7]);
00349 if(execvp(args[0], args) < 0) {
00350 printk("***Can't execvp() build_lev1_fsn. errno=%d\n", errno);
00351 exit(1);
00352 }
00353 }
00354 --numtofork;
00355 printf("forked pid = %d\n", pid);
00356 fpid[k] = pid;
00357 }
00358 wpid = -1;
00359 while(1) {
00360
00361 pid = waitpid(wpid, &stat_loc, WNOHANG+WUNTRACED);
00362
00363
00364 if(pid == 0) { sleep(5); continue; }
00365 if(pid == -1) {
00366 if(errno == ECHILD) printf("!!No More Children\n");errno=0;
00368
00369
00370 sprintf(pcmd, "echo \"update lev1_highest_lev0_recnum set lev0recnum=%lld, date='%s' where lev0series='%s'\" | psql -h hmidb jsoc", lrec, get_datetime(), dsin);
00371 system(pcmd);
00372
00373 if(numtofork <= 0) break;
00374 }
00375 else {
00376 for(k=0; k < numcpu; k++) {
00377 if(fpid[k] == pid) { break; }
00378 }
00379 if(k == numcpu) continue;
00380 printf("returned pid = %d stat_loc = %d\n", pid, stat_loc);
00381 if(numtofork == 0) continue;
00382 }
00383
00384 frec = lrec+1; lrec = (frec + numrec)-1;
00385 if((pid = fork()) < 0) {
00386 printk("***Can't fork(). errno=%d\n", errno);
00387 return(1);
00388 }
00389 else if(pid == 0) {
00390 args[0] = "build_lev1_fsn";
00391 sprintf(args1, "dsin=%s", dsin);
00392 args[1] = args1;
00393 sprintf(args2, "dsout=%s", dsout);
00394 args[2] = args2;
00395 sprintf(args3, "bfsn=%u", frec);
00396 args[3] = args3;
00397 sprintf(args4, "efsn=%u", lrec);
00398 args[4] = args4;
00399 sprintf(args5, "instru=%s", instru);
00400 args[5] = args5;
00401 sprintf(args6, "quicklook=%d", stream_mode);
00402 args[6] = args6;
00403
00404 if(LOGTEST) {
00405 sprintf(args7, "%s", args7sv);
00406 }
00407 else {
00408 sprintf(args7, "logfile=%s/l1s_%u_%u.log", LEV1LOG_BASEDIR, frec, lrec);
00409 }
00410 args[7] = args7;
00411 args[8] = NULL;
00412 printk("execvp: %s %s %s %s %s %s %s %s\n",
00413 args[0],args[1],args[2],args[3],args[4],args[5],args[6],args[7]);
00414 if(execvp(args[0], args) < 0) {
00415 printk("***Can't execvp() build_lev1_fsn. errno=%d\n", errno);
00416 exit(1);
00417 }
00418 }
00419 --numtofork;
00420 printf("forked pid = %d\n", pid);
00421 fpid[k] = pid;
00422 }
00423 return(0);
00424 }
00425
00426
00427 int qsubjob(unsigned int rec1, unsigned int rec2)
00428 {
00429 FILE *fin;
00430 char astr[32], bstr[32], string[128], qlogname[128], qsubcmd[512];;
00431 char recrange[128];
00432
00433 sprintf(recrange, "%u-%u", rec1, rec2);
00434 sprintf(open_dsname, "%s[%s]", dsin, recrange);
00435 printk("open_dsname = %s\n", open_dsname);
00436 sprintf(qlogname, "%s/qsub_%s_%d.csh", QSUBDIR, username, qcnt++);
00437 if((qsubfp=fopen(qlogname, "w")) == NULL) {
00438 fprintf(stderr, "**Can't open the qsub log file %s\n", qlogname);
00439 return(1);
00440 }
00441 fprintf(qsubfp, "#!/bin/csh\n");
00442 fprintf(qsubfp, "echo \"TMPDIR = $TMPDIR\"\n");
00443 fprintf(qsubfp, "build_lev1_fsn dsin=%s dsout=%s bfsn=%u efsn=%u instru=%s quicklook=%d logfile=%s/l1q_b%u_e%u_$JOB_ID.log\n",
00444 dsin, dsout, rec1, rec2, instru, stream_mode, QSUBDIR, rec1, rec2);
00445 fclose(qsubfp);
00446 sprintf(qsubcmd, "qsub -o %s -e %s -q j.q %s",
00447 QSUBDIR, QSUBDIR, qlogname);
00448
00449 printf("%s\n", qsubcmd);
00450 printk("%s\n", qsubcmd);
00451 sprintf(qsubcmd, "%s | grep \"Your job\"", qsubcmd);
00452 fin = popen(qsubcmd, "r");
00453 while(fgets(string, sizeof string, fin)) {
00454 sscanf(string, "%s %s %d", astr, bstr, &jid);
00455 }
00456 pclose(fin);
00457 printf("\$JOB_ID = %u\n", jid);
00458 return(0);
00459 }
00460
00461 int qsubmode(unsigned int frec, unsigned int lrec)
00462 {
00463 FILE *fin;
00464 char qsubcmd[512], string[128];
00465 char astr[32], bstr[32], jidstr[MAXJIDSTR];
00466 uint64_t qjid[MAXQSUBLEV1], qstatjid[MAXQSUBLEV1];
00467 unsigned int numofrecs, rfirst, rlast;
00468 int numtoqsub, i, j, l, k, found, status;
00469 int jobdone=0;
00470
00471 numofrecs = (lrec - frec) + 1;
00472 numtoqsub = numofrecs/numrec;
00473 if(numofrecs % numrec) numtoqsub++;
00474 j = numtoqsub;
00475 rlast = frec-1;
00476 if(j < numqsub) l = j;
00477 else l = numqsub;
00478 for(k=0; k < l; k++) {
00479 rfirst = rlast+1; rlast = (rfirst + numrec)-1;
00480 if(rlast > lrec) rlast = lrec;
00481 status = qsubjob(rfirst, rlast);
00482 --numtoqsub;
00483 qjid[k] = jid;
00484 if(k == 0) sprintf(jidstr, "%u", jid);
00485 else sprintf(jidstr, "%s,%u", jidstr, jid);
00486 }
00487
00488 printf("numtoqsub left = %d\n", numtoqsub);
00489
00490 sprintf(qsubcmd, "qstat -u production | grep \"qsub_prod\"");
00491 while(1) {
00492
00493 if(!(fin = popen(qsubcmd, "r"))) {
00494 printf("!!!Fatal Error: can't do %s\n", qsubcmd);
00495 return(1);
00496 }
00497
00498 found = 0; k = 0;
00499 while(fgets(string, sizeof string, fin)) {
00500
00501 sscanf(string, "%u", &jid);
00502 printf("job id from qstat = %u\n", jid);
00503 qstatjid[k++] = jid;
00504 found = 1;
00505 }
00506 pclose(fin);
00507
00508
00509 for(i=0; i < l; i++) {
00510 for(j=0; j < k; j++) {
00511 if(qjid[i] == qstatjid[j]) {
00512 break;
00513 }
00514 }
00515 if(j == k) {
00516 if(qjid[i] != 0) {
00517 printf("Job done jid=%u\n", qjid[i]);
00518 jobdone++;
00519 qjid[i] = 0;
00520 }
00521 if(numtoqsub) {
00522
00523 rfirst = rlast+1; rlast = (rfirst + numrec)-1;
00524 if(rlast > lrec) rlast = lrec;
00525 status = qsubjob(rfirst, rlast);
00526 --numtoqsub;
00527 found = 1;
00528 qjid[i] = jid;
00529
00530
00531 }
00532
00533 }
00534 }
00535 for(i=0; i < l; i++) {
00536 if(i == 0) sprintf(jidstr, "%u", qjid[i]);
00537 else sprintf(jidstr, "%s,%u", jidstr, qjid[i]);
00538 }
00539 printf("\n");
00540 if(!found) break;
00541 sleep(3);
00542 }
00543 printf("All jobs done = %d. numtoqsub = %d\n", jobdone, numtoqsub);
00544 return(0);
00545 }
00546
00547
00548
00549
00550
00551
00552 int do_ingest(int force)
00553 {
00554 FILE *fin;
00555 int rstatus;
00556 long long recnum0, maxrecnum0;
00557 char string[128], pcmd[128];
00558
00560 if(stream_mode) {
00561 sprintf(pcmd, "echo \"select lev0recnum from lev1_highest_lev0_recnum where lev0series='%s'\" | psql -h hmidb jsoc", dsin);
00562 fin = popen(pcmd, "r");
00563 while(fgets(string, sizeof string, fin)) {
00564 if(strstr(string, "lev0recnum")) continue;
00565 if(strstr(string, "-----")) continue;
00566
00567 if((rstatus = sscanf(string, "%lld", &recnum0)) == 0) {
00568 printf("Abort no lev0 entry in lev1_highest_lev0_recnum\n");
00569 printk("Abort no lev0 entry in lev1_highest_lev0_recnum\n");
00570 abortit(1);
00571 }
00572 recnum0++;
00573 break;
00574 }
00575 pclose(fin);
00576 sprintf(pcmd, "echo \"select max(recnum) from %s\" | psql -h hmidb jsoc", dsin);
00577 fin = popen(pcmd, "r");
00578 while(fgets(string, sizeof string, fin)) {
00579 if(strstr(string, "max")) continue;
00580 if(strstr(string, "-----")) continue;
00581 if(!strcmp(string, " \n")) {
00582 printf("Abort no max lev0 recnum (new series?)\n");
00583 printk("Abort no max lev0 recnum (new series?)\n");
00584 abortit(1);
00585 }
00586 sscanf(string, "%lld", &maxrecnum0);
00587
00588 maxrecnum0 = maxrecnum0 - 40;
00589 lastrecnum0_prev = lastrecnum0_now;
00590 lastrecnum0_now = maxrecnum0;
00591 break;
00592 }
00593 pclose(fin);
00594 printf("Stream Mode starting at lev0 recnum = %lld maxrecnum = %lld\n",
00595 recnum0, maxrecnum0);
00596 if(recnum0 > maxrecnum0) return(0);
00597 rstatus = forkstream(recnum0, maxrecnum0, force);
00598 }
00599 else {
00600
00601 rstatus = qsubmode(bfsn, efsn);
00602 }
00603 return(rstatus);
00604 }
00605
00606 void sighandler(sig)
00607 int sig;
00608 {
00609 char pcmd[128];
00610 if(sig == SIGTERM) {
00611 printf("*** %s build_lev1_mgr_fsn got SIGTERM. Exiting.\n", datestr);
00612 printk("*** %s build_lev1_mgr_fsn got SIGTERM. Exiting.\n", datestr);
00613 sprintf(pcmd, "/bin/rm %s/build_lev1_mgr_fsn.stream.touch 2>/dev/null",
00614 LEV1LOG_BASEDIR);
00615 system(pcmd);
00616 exit(1);
00617 }
00618 if(sig == SIGINT) {
00619 printf("*** %s build_lev1_mgr_fsn got SIGINT. Exiting.\n", datestr);
00620 printk("*** %s build_lev1_mgr_fsn got SIGINT. Exiting.\n", datestr);
00621 sprintf(pcmd, "/bin/rm %s/build_lev1_mgr_fsn.stream.touch 2>/dev/null",
00622 LEV1LOG_BASEDIR);
00623 system(pcmd);
00624 exit(1);
00625 }
00626 printk("*** %s build_lev1_mgr_fsn got an illegal signal %d, ignoring...\n",
00627 datestr, sig);
00628 if (signal(SIGINT, SIG_IGN) != SIG_IGN)
00629 signal(SIGINT, sighandler);
00630 if (signal(SIGALRM, SIG_IGN) != SIG_IGN)
00631 signal(SIGALRM, sighandler);
00632 }
00633
00634
00635 void setup()
00636 {
00637 FILE *fin;
00638 char string[128], cwdbuf[128], idstr[256], lfile[128];
00639 int tpid;
00640
00641 do_datestr();
00642 printk_set(h1log, h1log);
00643 printk("%s\n", datestr);
00644 getcwd(cwdbuf, 126);
00645 mypid = getpid();
00646 sprintf(idstr, "Cwd: %s\nCall: ", cwdbuf);
00647 sprintf(string, "build_lev1_mgr_fsn started as pid=%d user=%s\n", mypid, username);
00648 strcat(idstr, string);
00649 printk("%s", idstr);
00650 printf("%s", idstr);
00651 sprintf(arginstru, "instru=%s", instru);
00652 sprintf(argdsin, "dsin=%s", dsin);
00653 sprintf(argdsout, "dsout=%s", dsout);
00654 sprintf(argbfsn, "bfsn=%u", bfsn);
00655 sprintf(argefsn, "efsn=%u", efsn);
00656 sprintf(argnumrec, "numrec=%d", numrec);
00657 sprintf(argnumcpu, "numcpu=%d", numcpu);
00658 sprintf(argnumqsub, "numqsub=%d", numqsub);
00659 sprintf(arglogfile, "logfile=%s", logname);
00660 printk("%s %s %s %s %s %s %s %s\n", arginstru, argdsin, argdsout, argbfsn, argefsn, argnumrec, argnumcpu, argnumqsub);
00661 printf("%s %s %s %s %s %s %s %s\n", arginstru, argdsin, argdsout, argbfsn, argefsn, argnumrec, argnumcpu, argnumqsub);
00662 if (signal(SIGINT, SIG_IGN) != SIG_IGN)
00663 signal(SIGINT, sighandler);
00664 if (signal(SIGTERM, SIG_IGN) != SIG_IGN)
00665 signal(SIGTERM, sighandler);
00666 if (signal(SIGALRM, SIG_IGN) != SIG_IGN)
00667 signal(SIGALRM, sighandler);
00668 sprintf(idstr, "ps -ef | grep %s", LEV1VIEWERNAME);
00669 fin = popen(idstr, "r");
00670 while(fgets(string, sizeof string, fin)) {
00671 if(!(strstr(string, "perl"))) continue;
00672 sscanf(string, "%s %d", idstr, &tpid);
00673 sprintf(lfile, "%s/build_lev1_mgr_restart_fsn_%d.touch", LEV1LOG_BASEDIR, tpid);
00674 sprintf(idstr, "/bin/touch %s", lfile);
00675 printk("%s\n", idstr);
00676 system(idstr);
00677 }
00678 umask(002);
00679 }
00680
00681
00682 int main(int argc, char **argv)
00683 {
00684 FILE *fin;
00685 int wflg = 1;
00686 char line[128], pcmd[128];
00687
00688 if (cmdparams_parse(&cmdparams, argc, argv) == -1)
00689 {
00690 fprintf(stderr,"Error: Command line parsing failed. Aborting.\n");
00691 return 1;
00692 }
00693 if (nice_intro())
00694 return (0);
00695 if(!(username = (char *)getenv("USER"))) username = "nouser";
00696 instru = cmdparams_get_str(&cmdparams, "instru", NULL);
00697 if(strcmp(instru, "hmi") && strcmp(instru, "aia")) {
00698 printf("Error: instru= must be given as 'hmi' or 'aia'\n");
00699 return(0);
00700 }
00701 if(!strcmp(instru, "aia")) hmiaiaflg = 1;
00702 dsin = cmdparams_get_str(&cmdparams, "dsin", NULL);
00703 dsout = cmdparams_get_str(&cmdparams, "dsout", NULL);
00704 bfsn = cmdparams_get_int(&cmdparams, "bfsn", NULL);
00705 efsn = cmdparams_get_int(&cmdparams, "efsn", NULL);
00706 numrec = cmdparams_get_int(&cmdparams, "numrec", NULL);
00707 numcpu = cmdparams_get_int(&cmdparams, "numcpu", NULL);
00708 numqsub = cmdparams_get_int(&cmdparams, "numqsub", NULL);
00709 if(numcpu > MAXCPULEV1) {
00710 printf("numcpu exceeds max of %d\n", MAXCPULEV1);
00711 return(0);
00712 }
00713 if(numqsub > MAXQSUBLEV1) {
00714 printf("numqsub exceeds max of %d\n", MAXQSUBLEV1);
00715 return(0);
00716 }
00717 if(bfsn == 0 && efsn == 0) {
00718
00719 sprintf(pcmd, "ls %s/build_lev1_mgr_fsn.stream.touch 2>/dev/null",
00720 LEV1LOG_BASEDIR);
00721 fin = popen(pcmd, "r");
00722 while(fgets(line, sizeof line, fin)) {
00723 printf("Error: build_lev1_mgr_fsn already running.");
00724 printf(" If not so, do:\n");
00725 printf("/bin/rm %s/build_lev1_mgr_fsn.stream.touch\n", LEV1LOG_BASEDIR);
00726 pclose(fin);
00727 return(0);
00728 }
00729 pclose(fin);
00730 sprintf(pcmd, "touch %s/build_lev1_mgr_fsn.stream.touch 2>/dev/null",
00731 LEV1LOG_BASEDIR);
00732 system(pcmd);
00733 stream_mode = 1;
00734 }
00735 logfile = cmdparams_get_str(&cmdparams, "logfile", NULL);
00736 if (strcmp(dsin, NOTSPECIFIED) == 0) {
00737 if(hmiaiaflg == 0) dsin = LEV0SERIESNAMEHMI;
00738 else dsin = LEV0SERIESNAMEAIA;
00739 }
00740 if (strcmp(dsout, NOTSPECIFIED) == 0) {
00741 if(hmiaiaflg == 0) dsout = LEV1SERIESNAMEHMI;
00742 else dsout = LEV1SERIESNAMEAIA;
00743 }
00744 if(hmiaiaflg) {
00745 if(strstr(dsin, "hmi") || strstr(dsout, "hmi")) {
00746 printf("Warning: You said instru=aia but have 'hmi' in ds name?\n");
00747 printf("Do you want to abort this [y/n]? ");
00748 if(gets(line) == NULL) { return(0); }
00749 if(strcmp(line, "n")) { return(0); }
00750 }
00751 }
00752 else {
00753 if(strstr(dsin, "aia") || strstr(dsout, "aia")) {
00754 printf("Warning: You said instru=hmi but have 'aia' in ds name?\n");
00755 printf("Do you want to abort this [y/n]? ");
00756 if(gets(line) == NULL) { return(0); }
00757 if(strcmp(line, "n")) { return(0); }
00758 }
00759 }
00760 if (strcmp(logfile, NOTSPECIFIED) == 0) {
00761 sprintf(logname, H1LOGFILE, gettimetag());
00762 }
00763 else {
00764 sprintf(logname, "%s", logfile);
00765 }
00766 if((h1logfp=fopen(logname, "w")) == NULL)
00767 fprintf(stderr, "**Can't open the log file %s\n", logname);
00768 setup();
00769 while(wflg) {
00770 if(do_ingest(0)) {
00771 printk("**ERROR: in do_ingest() for %s\n", open_dsname);
00772 }
00773 if(!stream_mode) return(0);
00774 sleep(30);
00775 if(lastrecnum0_now == lastrecnum0_prev) {
00776 if(loopcnt++ == 15) {
00777 printk("Timeout: force any left over lev0 to lev1\n");
00778 if(do_ingest(1)) {
00779 printk("**ERROR: in do_ingest() for %s\n", open_dsname);
00780 }
00781 loopcnt = 0;
00782 }
00783 }
00784 else loopcnt = 0;
00785
00786 }
00787 sprintf(pcmd, "/bin/rm %s/build_lev1_mgr_fsn.stream.touch 2>/dev/null",
00788 LEV1LOG_BASEDIR);
00789 system(pcmd);
00790 return(0);
00791 }
00792