00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056 #include <jsoc.h>
00057 #include <cmdparams.h>
00058 #include <drms.h>
00059 #include <stdio.h>
00060 #include <stdlib.h>
00061 #include <strings.h>
00062 #include <signal.h>
00063 #include <sys/types.h>
00064 #include <sys/time.h>
00065 #include <sys/stat.h>
00066 #include <unistd.h>
00067 #include <printk.h>
00068 #include <errno.h>
00069 #include <sys/wait.h>
00070 #include "lev0lev1.h"
00071
00072
00073 #define LEV0SERIESNAMEHMI "hmi.lev0e"
00074 #define LEV0SERIESNAMEAIA "aia.lev0e"
00075 #define LEV0SERIESNAMEIRIS "iris_ground.lev0_dc1"
00076 #define LEV1SERIESNAMEHMI "su_production.hmi_lev1e" //temp test case
00077 #define LEV1SERIESNAMEAIA "su_production.aia_lev1e" //temp test case
00078 #define LEV1SERIESNAMEIRIS "iris_ground.lev1_dc1"
00079 #define DSAIABAD "lm_jps.aia_bad_blobs" //dsaiabad= arg value to build_lev1 for aia. Eliminated by jim on 03Mar2011
00080
00081 #define LEV1LOG_BASEDIR "/usr/local/logs/lev1"
00082 #define H1LOGFILE "/usr/local/logs/lev1/build_lev1_mgr_%s.%s.log"
00083 #define QSUBDIR "/surge40/jsocprod/qsub/tmp"
00084 #define NUMTIMERS 8 //number of seperate timers avail
00085 #define MAXCPULEV1 32 //max# of forks can do at a time for stream mode
00086 #define DEFAULTCPULEV1 "8" //default# of forks can do at a time
00087 #define MAXQSUBLEV1 64 //max# of qsub can do at a time for reprocessing mode
00088 #define DEFAULTQSUBLEV1 "16"
00089 #define MAXJIDSTR MAXQSUBLEV1*16
00090 #define NOTSPECIFIED "***NOTSPECIFIED***"
00091 #define LOGTEST 0
00092 char args8sv[128];
00093
00094
00095 int qsubjob(long long rec1, long long rec2);
00096
00097
00098 ModuleArgs_t module_args[] = {
00099 {ARG_STRING, "instru", "hmi" , "instrument. either hmi,aia or iris"},
00100 {ARG_STRING, "dsin", NOTSPECIFIED, "dataset of lev0 filtergrams"},
00101 {ARG_STRING, "dsout", NOTSPECIFIED, "dataset of lev1 output"},
00102 {ARG_STRING, "logfile", NOTSPECIFIED, "optional log file name. Will create one if not given"},
00103 {ARG_INTS, "brec", "-1", "first lev0 rec# to process"},
00104 {ARG_INTS, "erec", "-1", "last lev0 rec# to process"},
00105 {ARG_INTS, "bfsn", "-1", "first lev0 fsn to process"},
00106 {ARG_INTS, "efsn", "-1", "last lev0 fsn to process"},
00107 {ARG_INTS, "quicklook", "0", "1=quick look, 0 = definitive mode"},
00108 {ARG_INTS, "numrec", NUMRECLEV1S, "number of lev0 to lev1 records at a time"},
00109 {ARG_INTS, "numcpu", DEFAULTCPULEV1, "max# of forks to do at a time for stream mode"},
00110 {ARG_INTS, "numqsub", DEFAULTQSUBLEV1, "max# of qsub to do at a time for reprocessing mode"},
00111 {ARG_FLAG, "v", "0", "verbose flag"},
00112 {ARG_FLAG, "h", "0", "help flag"},
00113 {ARG_END}
00114 };
00115
00116 ModuleArgs_t *gModArgs = module_args;
00117 CmdParams_t cmdparams;
00118
00119 char *module_name = "build_lev1_mgr";
00120
00121 FILE *h1logfp;
00122 FILE *qsubfp;
00123 static char datestr[32];
00124 static char open_dsname[256];
00125 static struct timeval first[NUMTIMERS], second[NUMTIMERS];
00126 static struct stat stbuf;
00127
00128 pid_t mypid;
00129 uint64_t jid;
00130 int verbose;
00131 long long brec, erec, bfsn, efsn;
00132 long long lastrecnum0_now = 0;
00133 long long lastrecnum0_prev = 0;
00134 int numrec, numcpu, numqsub;
00135 int qcnt = 0;
00136
00137 int stream_mode = 0;
00138 int quicklook = 0;
00139
00140 int instruflg = 0;
00141 int modeflg = 0;
00142 int loopcnt = 0;
00143 int abortnow = 0;
00144 char stopfile[80];
00145 char hmiaianame[16];
00146 char logname[128];
00147 char argdsin[128], argdsout[128], arglogfile[128], arginstru[80], argmode[80];
00148 char argbx[80], argex[80], argnumrec[80], argnumcpu[80], argnumqsub[80];
00149 char timetag[32];
00150 char *username;
00151 char *logfile;
00152 char *instru;
00153 char *mode;
00154 char *dsin;
00155 char *dsout;
00156
00157
00158 int nice_intro ()
00159 {
00160 int usage = cmdparams_get_int (&cmdparams, "h", NULL);
00161 if (usage)
00162 {
00163 printf ("Runs build_lev1 processes to create lev1 datasets.\n\n");
00164 printf ("Usage: build_lev1_mgr [-vh]\n"
00165 "mode=<recnum|fsn> instru=<hmi|aia|iris> dsin=<lev0> dsout=<lev1>\n"
00166 "brec=<rec#>|bfsn=<fsn#> erec=<rec#>|efsn=<fsn#>\n"
00167 "numcpu=<#> numqsub=<#> logfile=<file>\n"
00168 " -h: help - show this message then exit\n"
00169 " -v: verbose\n"
00170 "mode= recnum: brec and erec have the record # range to process \n"
00171 " fsn: bfsn and efsn have the fsn # range to process\n"
00172 " For safety, the mode and arg name used must be consistent\n"
00173 "instru= instrument. must be 'hmi' or 'aia' or 'iris'\n"
00174 "dsin= data set name of lev0 input\n"
00175 " default hmi=hmi.lev0e aia=aia.lev0e iris=iris_ground.lev0_dc1\n"
00176 "dsout= data set name of lev1 output\n"
00177 " default hmi=su_production.hmi_lev1e aia=su_production.aia_lev1e iris=iris_ground.lev1_dc1\n"
00178 "brec= first lev0 rec# to process. 0=Stream Mode if erec also 0\n"
00179 "erec= last lev0 rec# to process. 0=Stream Mode if brec also 0\n"
00180 "bfsn= first fsn# to process. -1=error must be given if fsn mode\n"
00181 "efsn= last fsn# to process. -1=error must be given if fsn mode\n"
00182 "numcpu= max# of forks to do at a time for stream mode. Default %s\n"
00183 "numqsub= max# of qsub to do at a time for reprocessing mode. Default %s\n"
00184 "logfile= optional log file name. If not given uses:\n"
00185 " /usr/local/logs/lev1/build_lev1_mgr.<time_stamp>.log\n",
00186 DEFAULTCPULEV1, DEFAULTQSUBLEV1);
00187 printf ("\n * Has two modes:\n"
00188 " * Stream Mode (one instance):\n"
00189 " * This is the normal quick look mode that runs continuously and\n"
00190 " * keeps up with the lev0 records.\n"
00191 " * mode=recnum, brec=0, erec=0\n"
00192 " * - start at the previous highest lev0 record processed\n"
00193 " * This is keep in the DB table lev1_highest_lev0_recnum\n"
00194 " * - fork up to 8 (MAXCPULEV1) build_lev1 for every \n"
00195 " * 12 (NUMRECLEV1) lev0 records. \n"
00196 " * - when an build_lev1 completes, fork another for next 12 rec\n"
00197 " * - if no more lev0 records available, sleep and try again\n"
00198 " * - if 8 CPU not enough to keep up with lev0, go to 16, etc.\n"
00199 " *\n"
00200 " * Range Mode (any number of instances):\n"
00201 " * brec=1000, erec=2000 or bfsn=44000, efsn=45000\n"
00202 " * - qsub up to 16 (MAXQSUBLEV1) build_lev1 for 12 records ea\n"
00203 " * - when a job completes qsub next 12 records until erec is reached\n"
00204 " * - when all jobs are done, build_lev1_mgr will exit\n\n");
00205 return(1);
00206 }
00207 verbose = cmdparams_get_int (&cmdparams, "v", NULL);
00208 return (0);
00209 }
00210
00211
00212 char *get_datetime()
00213 {
00214 struct timeval tvalr;
00215 struct tm *t_ptr;
00216 static char datestr[32];
00217
00218 gettimeofday(&tvalr, NULL);
00219 t_ptr = localtime((const time_t *)&tvalr.tv_sec);
00220 sprintf(datestr, "%s", asctime(t_ptr));
00221 return(datestr);
00222 }
00223
00224
00225 char *do_datestr() {
00226 time_t tval;
00227 struct tm *t_ptr;
00228
00229 tval = time(NULL);
00230 t_ptr = localtime(&tval);
00231 sprintf(datestr, "%d.%02d.%02d_%02d:%02d:%02d",
00232 (t_ptr->tm_year+1900), (t_ptr->tm_mon+1),
00233 t_ptr->tm_mday, t_ptr->tm_hour, t_ptr->tm_min, t_ptr->tm_sec);
00234 return(datestr);
00235 }
00236
00237
00238 char *gettimetag()
00239 {
00240 struct timeval tvalr;
00241 struct tm *t_ptr;
00242
00243 gettimeofday(&tvalr, NULL);
00244 t_ptr = localtime((const time_t *)&tvalr);
00245 sprintf(timetag, "%04d.%02d.%02d.%02d%02d%02d",
00246 (t_ptr->tm_year+1900), (t_ptr->tm_mon+1), t_ptr->tm_mday, t_ptr->tm_hour, t_ptr->tm_min, t_ptr->tm_sec);
00247 return(timetag);
00248 }
00249
00250
00251 void BeginTimer(int n)
00252 {
00253 gettimeofday (&first[n], NULL);
00254 }
00255
00256 float EndTimer(int n)
00257 {
00258 gettimeofday (&second[n], NULL);
00259 if (first[n].tv_usec > second[n].tv_usec) {
00260 second[n].tv_usec += 1000000;
00261 second[n].tv_sec--;
00262 }
00263 return (float) (second[n].tv_sec-first[n].tv_sec) +
00264 (float) (second[n].tv_usec-first[n].tv_usec)/1000000.0;
00265 }
00266
00267
00268 int h1log(const char *fmt, ...)
00269 {
00270 va_list args;
00271 char string[32768];
00272
00273 va_start(args, fmt);
00274 vsprintf(string, fmt, args);
00275 if(h1logfp) {
00276 fprintf(h1logfp, string);
00277 fflush(h1logfp);
00278 }
00279 else {
00280 printf(string);
00281 fflush(stdout);
00282 }
00283 va_end(args);
00284 return(0);
00285 }
00286
00287 int send_mail(char *fmt, ...)
00288 {
00289 va_list args;
00290 char string[1024], cmd[1024];
00291
00292 va_start(args, fmt);
00293 vsprintf(string, fmt, args);
00294 sprintf(cmd, "echo \"%s\" | Mail -s \"%s mail\" lev0_user", string, module_name);
00295 system(cmd);
00296 va_end(args);
00297 return(0);
00298 }
00299
00300
00301 void abortit(int stat)
00302 {
00303 char pcmd[128];
00304
00305 printk("***Abort in progress ...\n");
00306 printk("**Exit build_lev1_mgr w/ status = %d\n", stat);
00307 sprintf(pcmd, "/bin/rm %s/build_lev1_mgr_%s.stream.touch 2>/dev/null",
00308 LEV1LOG_BASEDIR, hmiaianame);
00309 system(pcmd);
00310 if (h1logfp) fclose(h1logfp);
00311 exit(stat);
00312 }
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326 int forkstream(long long recn0, long long maxrecn0, int force)
00327 {
00328 pid_t pid, wpid, fpid[MAXCPULEV1];
00329 long long numofrecs, frec, lrec;
00330 int stat_loc, i, j, k, l, numtofork;
00331 char *args[11], pcmd[128];
00332 char args1[128], args2[128], args3[128], args4[128], args5[128], args6[128];
00333 char args7[128], args8[128], args9[128];
00334
00335 numofrecs = (maxrecn0 - recn0) + 1;
00336 numtofork = numofrecs/numrec;
00337 j = numtofork;
00338 if(j == 0) {
00339 if(force) { j=numtofork=1; }
00340 else return(0);
00341 }
00342 lrec = recn0-1;
00343 if(j < numcpu) l = j;
00344 else l = numcpu;
00345 if(LOGTEST) {
00346 sprintf(args8sv, "logfile=%s/build_lev1.%s.log",
00347 LEV1LOG_BASEDIR, gettimetag());
00348 }
00349 for(k=0; k < l; k++) {
00350 if(force) { frec = lrec+1; lrec = (frec + numofrecs)-1; }
00351 else { frec = lrec+1; lrec = (frec + numrec)-1; }
00352 if((pid = fork()) < 0) {
00353 printk("***Can't fork(). errno=%d\n", errno);
00354 return(1);
00355 }
00356 else if(pid == 0) {
00357 switch(instruflg) {
00358 case 0:
00359 args[0] = "build_lev1_hmi";
00360 break;
00361 case 1:
00362 args[0] = "build_lev1_aia";
00363 break;
00364 case 2:
00365 args[0] = "build_lev1_iris";
00366 break;
00367 }
00368 sprintf(args1, "mode=%s", mode);
00369 args[1] = args1;
00370 sprintf(args2, "dsin=%s", dsin);
00371 args[2] = args2;
00372 sprintf(args3, "dsout=%s", dsout);
00373 args[3] = args3;
00374 if(modeflg) {
00375 sprintf(args4, "brec=%lld", frec);
00376 args[4] = args4;
00377 sprintf(args5, "erec=%lld", lrec);
00378 args[5] = args5;
00379 }
00380 else {
00381 sprintf(args4, "bfsn=%lld", frec);
00382 args[4] = args4;
00383 sprintf(args5, "efsn=%lld", lrec);
00384 args[5] = args5;
00385 }
00386 sprintf(args6, "instru=%s", instru);
00387 args[6] = args6;
00388 sprintf(args7, "quicklook=%d", quicklook);
00389 args[7] = args7;
00390
00391 if(LOGTEST) {
00392 sprintf(args8, "%s", args8sv);
00393 }
00394 else {
00395 sprintf(args8, "logfile=%s/l1s_%lld_%lld_%s.log",
00396 LEV1LOG_BASEDIR, frec, lrec, hmiaianame);
00397 }
00398 args[8] = args8;
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409 args[9] = NULL;
00410 printk("execvp: %s %s %s %s %s %s %s %s %s\n",
00411 args[0],args[1],args[2],args[3],args[4],args[5],args[6],args[7],args[8]);
00412
00413 if(execvp(args[0], args) < 0) {
00414 switch(instruflg) {
00415 case 0:
00416 printk("***Can't execvp() build_lev1_hmi. errno=%d\n", errno);
00417 break;
00418 case 1:
00419 printk("***Can't execvp() build_lev1_aia. errno=%d\n", errno);
00420 break;
00421 case 2:
00422 printk("***Can't execvp() build_lev1_iris. errno=%d\n", errno);
00423 break;
00424 }
00425 exit(1);
00426 }
00427 }
00428 --numtofork;
00429 printf("forked pid = %d\n", pid);
00430 fpid[k] = pid;
00431 }
00432 wpid = -1;
00433 while(1) {
00434
00435 pid = waitpid(wpid, &stat_loc, WNOHANG+WUNTRACED);
00436
00437
00438 if(pid == 0) { sleep(5); continue; }
00439 if(pid == -1) {
00440 if(errno == ECHILD) printf("!!No More Children\n");errno=0;
00442
00443
00444 sprintf(pcmd, "echo \"update lev1_highest_lev0_recnum set lev0recnum=%lld, date='%s' where lev0series='%s'\" | psql -h hmidb -U production jsoc", lrec, get_datetime(), dsin);
00445 printf("%s\n", pcmd);
00446 system(pcmd);
00447 if(stat(stopfile, &stbuf) == 0) {
00448 printf("Stop file %s seen.\nWait until all children are done and exit...\n", stopfile);
00449 abortnow = 1;
00450 }
00451 if(numtofork <= 0) break;
00452 }
00453 else {
00454 for(k=0; k < numcpu; k++) {
00455 if(fpid[k] == pid) { break; }
00456 }
00457 if(k == numcpu) continue;
00458 printf("returned pid = %d stat_loc = %d\n", pid, stat_loc);
00459 if(numtofork == 0) continue;
00460 }
00461
00462 if(stat(stopfile, &stbuf) == 0) {
00463 printf("Stop file %s seen.\nWait until all children are done and exit...\n", stopfile);
00464 numtofork = 0;
00465 abortnow = 1;
00466 continue;
00467 }
00468 frec = lrec+1; lrec = (frec + numrec)-1;
00469 if((pid = fork()) < 0) {
00470 printk("***Can't fork(). errno=%d\n", errno);
00471 return(1);
00472 }
00473 else if(pid == 0) {
00474 switch(instruflg) {
00475 case 0:
00476 args[0] = "build_lev1_hmi";
00477 break;
00478 case 1:
00479 args[0] = "build_lev1_aia";
00480 break;
00481 case 2:
00482 args[0] = "build_lev1_iris";
00483 break;
00484 }
00485 sprintf(args1, "mode=%s", mode);
00486 args[1] = args1;
00487 sprintf(args2, "dsin=%s", dsin);
00488 args[2] = args2;
00489 sprintf(args3, "dsout=%s", dsout);
00490 args[3] = args3;
00491 if(modeflg) {
00492 sprintf(args4, "brec=%lld", frec);
00493 args[4] = args4;
00494 sprintf(args5, "erec=%lld", lrec);
00495 args[5] = args5;
00496 }
00497 else {
00498 sprintf(args4, "bfsn=%lld", frec);
00499 args[4] = args4;
00500 sprintf(args5, "efsn=%lld", lrec);
00501 args[5] = args5;
00502 }
00503 sprintf(args6, "instru=%s", instru);
00504 args[6] = args6;
00505 sprintf(args7, "quicklook=%d", quicklook);
00506 args[7] = args7;
00507 if(LOGTEST) {
00508 sprintf(args8, "%s", args8sv);
00509 }
00510 else {
00511 sprintf(args8, "logfile=%s/l1s_%lld_%lld_%s.log",
00512 LEV1LOG_BASEDIR, frec, lrec, hmiaianame);
00513 }
00514 args[8] = args8;
00515
00516
00517
00518
00519
00520
00521
00522
00523
00524
00525 args[9] = NULL;
00526 printk("execvp: %s %s %s %s %s %s %s %s %s\n",
00527 args[0],args[1],args[2],args[3],args[4],args[5],args[6],args[7],args[8]);
00528
00529 if(execvp(args[0], args) < 0) {
00530 switch(instruflg) {
00531 case 0:
00532 printk("***Can't execvp() build_lev1_hmi. errno=%d\n", errno);
00533 break;
00534 case 1:
00535 printk("***Can't execvp() build_lev1_aia. errno=%d\n", errno);
00536 break;
00537 case 2:
00538 printk("***Can't execvp() build_lev1_iris. errno=%d\n", errno);
00539 break;
00540 }
00541 exit(1);
00542 }
00543 }
00544 --numtofork;
00545 printf("forked pid = %d\n", pid);
00546 fpid[k] = pid;
00547 }
00548 return(0);
00549 }
00550
00551
00552
00553 int qsubjob(long long rec1, long long rec2)
00554 {
00555 FILE *fin;
00556 char astr[32], bstr[32], string[128], qlogname[128], qsubcmd[512];;
00557 char recrange[128];
00558
00559 if(modeflg) sprintf(recrange, ":#%lld-#%lld", rec1, rec2);
00560 else sprintf(recrange, "%lld-%lld", rec1, rec2);
00561 sprintf(open_dsname, "%s[%s]", dsin, recrange);
00562 printk("open_dsname = %s\n", open_dsname);
00563
00564 sprintf(qlogname, "%s/p_%d_%d.csh", QSUBDIR, mypid, qcnt++);
00565 if((qsubfp=fopen(qlogname, "w")) == NULL) {
00566 fprintf(stderr, "**Can't open the qsub log file %s\n", qlogname);
00567 return(1);
00568 }
00569 fprintf(qsubfp, "#!/bin/csh\n");
00570 fprintf(qsubfp, "limit vm 1000M\n");
00571 fprintf(qsubfp, "limit coredumpsize 0\n");
00572 fprintf(qsubfp, "echo \"TMPDIR = $TMPDIR\"\n");
00573
00574 if(modeflg) {
00575 switch(instruflg) {
00576 case 0:
00577 fprintf(qsubfp, "build_lev1_hmi mode=%s dsin=%s dsout=%s brec=%lld erec=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n",
00578 mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2);
00579 break;
00580 case 1:
00581 fprintf(qsubfp,"build_lev1_aia mode=%s dsin=%s dsout=%s brec=%lld erec=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n",
00582 mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2);
00583 break;
00584 case 2:
00585 fprintf(qsubfp,"build_lev1_iris mode=%s dsin=%s dsout=%s brec=%lld erec=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n",
00586 mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2);
00587 break;
00588 }
00589 }
00590 else {
00591 switch(instruflg) {
00592 case 0:
00593 fprintf(qsubfp, "build_lev1_hmi mode=%s dsin=%s dsout=%s bfsn=%lld efsn=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n",
00594 mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2);
00595 break;
00596 case 1:
00597 fprintf(qsubfp,"build_lev1_aia mode=%s dsin=%s dsout=%s bfsn=%lld efsn=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n",
00598 mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2);
00599 break;
00600 case 2:
00601 fprintf(qsubfp,"build_lev1_iris mode=%s dsin=%s dsout=%s bfsn=%lld efsn=%lld instru=%s quicklook=%d logfile=%s/l1q_b%lld_e%lld_$JOB_ID.log\n",
00602 mode, dsin, dsout, rec1, rec2, instru, quicklook, QSUBDIR, rec1, rec2);
00603 break;
00604 }
00605 }
00606
00607
00608
00609
00610
00611
00612
00613
00614
00615
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628 fclose(qsubfp);
00629 sprintf(qsubcmd, "qsub2 -o %s -e %s -q p.q %s",
00630 QSUBDIR, QSUBDIR, qlogname);
00631
00632
00633
00634 printf("%s\n", qsubcmd);
00635 printk("%s\n", qsubcmd);
00636
00637 sprintf(qsubcmd, "%s | grep \"Your job\"", qsubcmd);
00638 fin = popen(qsubcmd, "r");
00639 while(fgets(string, sizeof string, fin)) {
00640 sscanf(string, "%s %s %d", astr, bstr, &jid);
00641 }
00642 pclose(fin);
00643 printf("\$JOB_ID = %u\n", jid);
00644
00645 return(0);
00646 }
00647
00648 int qsubmode(long long frec, long long lrec)
00649 {
00650 FILE *fin;
00651 char qsubcmd[512], string[128];
00652 char astr[32], bstr[32], jidstr[MAXJIDSTR];
00653 uint64_t qjid[MAXQSUBLEV1], qstatjid[MAXQSUBLEV1];
00654 long long numofrecs, rfirst, rlast;
00655 int numtoqsub, i, j, l, k, found, status;
00656 int jobdone=0;
00657
00658 numofrecs = (lrec - frec) + 1;
00659 numtoqsub = numofrecs/numrec;
00660 if(numofrecs % numrec) numtoqsub++;
00661 j = numtoqsub;
00662 rlast = frec-1;
00663 if(j < numqsub) l = j;
00664 else l = numqsub;
00665 for(k=0; k < l; k++) {
00666 rfirst = rlast+1; rlast = (rfirst + numrec)-1;
00667 if(rlast > lrec) rlast = lrec;
00668 status = qsubjob(rfirst, rlast);
00669 --numtoqsub;
00670 qjid[k] = jid;
00671 if(k == 0) sprintf(jidstr, "%u", jid);
00672 else sprintf(jidstr, "%s,%u", jidstr, jid);
00673 }
00674
00675 printf("numtoqsub left = %d\n", numtoqsub);
00676
00677
00678
00679
00680 sprintf(qsubcmd, "qstat2 -u %s | grep \"p_%d_\"", username, mypid);
00681 while(1) {
00682
00683 if(!(fin = popen(qsubcmd, "r"))) {
00684 printf("!!!Fatal Error: can't do %s\n", qsubcmd);
00685 return(1);
00686 }
00687
00688 found = 0; k = 0;
00689 while(fgets(string, sizeof string, fin)) {
00690
00691 sscanf(string, "%u", &jid);
00692 printf("job id from qstat2 = %u\n", jid);
00693 qstatjid[k++] = jid;
00694 found = 1;
00695 }
00696 pclose(fin);
00697
00698
00699 for(i=0; i < l; i++) {
00700 for(j=0; j < k; j++) {
00701 if(qjid[i] == qstatjid[j]) {
00702 break;
00703 }
00704 }
00705 if(j == k) {
00706 if(qjid[i] != 0) {
00707 printf("Job done jid=%u\n", qjid[i]);
00708 jobdone++;
00709 qjid[i] = 0;
00710 }
00711 if(numtoqsub) {
00712
00713 rfirst = rlast+1; rlast = (rfirst + numrec)-1;
00714 if(rlast > lrec) rlast = lrec;
00715 status = qsubjob(rfirst, rlast);
00716 --numtoqsub;
00717 found = 1;
00718 qjid[i] = jid;
00719
00720
00721 }
00722
00723 }
00724 }
00725 for(i=0; i < l; i++) {
00726 if(i == 0) sprintf(jidstr, "%u", qjid[i]);
00727 else sprintf(jidstr, "%s,%u", jidstr, qjid[i]);
00728 }
00729 printf("\n");
00730 if(!found) break;
00731 sleep(3);
00732 }
00733 printf("All jobs done = %d. numtoqsub = %d\n", jobdone, numtoqsub);
00734 return(0);
00735 }
00736
00737
00738
00739
00740
00741
00742 int do_ingest(int force)
00743 {
00744 FILE *fin;
00745 int rstatus;
00746 long long recnum0, maxrecnum0;
00747 char string[128], pcmd[128];
00748
00749 if(stream_mode) {
00750 sprintf(pcmd, "echo \"select lev0recnum from lev1_highest_lev0_recnum where lev0series='%s'\" | psql -h hmidb -U production jsoc", dsin);
00751 fin = popen(pcmd, "r");
00752 while(fgets(string, sizeof string, fin)) {
00753 if(strstr(string, "lev0recnum")) continue;
00754 if(strstr(string, "-----")) continue;
00755
00756 if((rstatus = sscanf(string, "%lld", &recnum0)) == 0) {
00757 printf("Abort no lev0 entry in lev1_highest_lev0_recnum\n");
00758 printk("Abort no lev0 entry in lev1_highest_lev0_recnum\n");
00759 abortit(1);
00760 }
00761 recnum0++;
00762 break;
00763 }
00764 pclose(fin);
00765 sprintf(pcmd, "echo \"select max(recnum) from %s\" | psql -h hmidb -U production jsoc", dsin);
00766 fin = popen(pcmd, "r");
00767 while(fgets(string, sizeof string, fin)) {
00768 if(strstr(string, "max")) continue;
00769 if(strstr(string, "-----")) continue;
00770 if(!strcmp(string, " \n")) {
00771 printf("Abort no max lev0 recnum (new series?)\n");
00772 printk("Abort no max lev0 recnum (new series?)\n");
00773 abortit(1);
00774 }
00775 sscanf(string, "%lld", &maxrecnum0);
00776
00777
00778 if(instruflg == 2) maxrecnum0 = maxrecnum0 - 200;
00779 else maxrecnum0 = maxrecnum0 - 80;
00780 lastrecnum0_prev = lastrecnum0_now;
00781 lastrecnum0_now = maxrecnum0;
00782 break;
00783 }
00784 pclose(fin);
00785 printf("Stream Mode starting at lev0 recnum = %lld maxrecnum = %lld\n",
00786 recnum0, maxrecnum0);
00787 if(recnum0 > maxrecnum0) return(0);
00788 rstatus = forkstream(recnum0, maxrecnum0, force);
00789 }
00790 else {
00791 if(modeflg) {
00792
00793 rstatus = qsubmode(brec, erec);
00794 }
00795 else {
00796 rstatus = qsubmode(bfsn, efsn);
00797 }
00798 }
00799 return(rstatus);
00800 }
00801
00802 void sighandler(sig)
00803 int sig;
00804 {
00805 char pcmd[128];
00806 if(sig == SIGTERM) {
00807 printf("*** %s build_lev1_mgr got SIGTERM. Exiting.\n", datestr);
00808 printk("*** %s build_lev1_mgr got SIGTERM. Exiting.\n", datestr);
00809 sprintf(pcmd, "/bin/rm %s/build_lev1_mgr_%s.stream.touch 2>/dev/null",
00810 LEV1LOG_BASEDIR, hmiaianame);
00811 system(pcmd);
00812 exit(1);
00813 }
00814 if(sig == SIGINT) {
00815 printf("*** %s build_lev1_mgr got SIGINT. Exiting.\n", datestr);
00816 printk("*** %s build_lev1_mgr got SIGINT. Exiting.\n", datestr);
00817 sprintf(pcmd, "/bin/rm %s/build_lev1_mgr_%s.stream.touch 2>/dev/null",
00818 LEV1LOG_BASEDIR, hmiaianame);
00819 system(pcmd);
00820 exit(1);
00821 }
00822 printk("*** %s build_lev1_mgr got an illegal signal %d, ignoring...\n",
00823 datestr, sig);
00824 if (signal(SIGINT, SIG_IGN) != SIG_IGN)
00825 signal(SIGINT, sighandler);
00826 if (signal(SIGALRM, SIG_IGN) != SIG_IGN)
00827 signal(SIGALRM, sighandler);
00828 }
00829
00830
00831 void setup()
00832 {
00833 FILE *fin;
00834 char string[128], cwdbuf[128], idstr[256], lfile[128];
00835 int tpid;
00836
00837 do_datestr();
00838 printk_set(h1log, h1log);
00839 printk("%s\n", datestr);
00840 getcwd(cwdbuf, 126);
00841 mypid = getpid();
00842 sprintf(idstr, "Cwd: %s\nCall: ", cwdbuf);
00843 sprintf(string, "%s started as pid=%d user=%s\n", module_name, mypid, username);
00844 strcat(idstr, string);
00845 printk("%s", idstr);
00846 printf("%s", idstr);
00847 switch(instruflg) {
00848 case 0:
00849 sprintf(stopfile, "/usr/local/logs/lev1/build_mgr_stop_hmi");
00850 break;
00851 case 1:
00852 sprintf(stopfile, "/usr/local/logs/lev1/build_mgr_stop_aia");
00853 break;
00854 case 2:
00855 sprintf(stopfile, "/usr/local/logs/lev1/build_mgr_stop_iris");
00856 break;
00857 }
00858 if(stream_mode) {
00859 sprintf(string, "/bin/rm -f %s", stopfile);
00860 system(string);
00861 }
00862 sprintf(argmode, "mode=%s", mode);
00863 sprintf(arginstru, "instru=%s", instru);
00864 sprintf(argdsin, "dsin=%s", dsin);
00865 sprintf(argdsout, "dsout=%s", dsout);
00866 if(modeflg) {
00867 sprintf(argbx, "brec=%lld", brec);
00868 sprintf(argex, "erec=%lld", erec);
00869 }
00870 else {
00871 sprintf(argbx, "bfsn=%lld", bfsn);
00872 sprintf(argex, "efsn=%lld", efsn);
00873 }
00874 sprintf(argnumrec, "numrec=%d", numrec);
00875 sprintf(argnumcpu, "numcpu=%d", numcpu);
00876 sprintf(argnumqsub, "numqsub=%d", numqsub);
00877 sprintf(arglogfile, "logfile=%s", logname);
00878 printk("%s %s %s %s %s %s %s %s %s\n", argmode, arginstru, argdsin, argdsout, argbx, argex, argnumrec, argnumcpu, argnumqsub);
00879 printf("%s %s %s %s %s %s %s %s %s\n", argmode, arginstru, argdsin, argdsout, argbx, argex, argnumrec, argnumcpu, argnumqsub);
00880 if (signal(SIGINT, SIG_IGN) != SIG_IGN)
00881 signal(SIGINT, sighandler);
00882 if (signal(SIGTERM, SIG_IGN) != SIG_IGN)
00883 signal(SIGTERM, sighandler);
00884 if (signal(SIGALRM, SIG_IGN) != SIG_IGN)
00885 signal(SIGALRM, sighandler);
00886 sprintf(idstr, "ps -ef | grep %s", LEV1VIEWERNAME);
00887 fin = popen(idstr, "r");
00888 while(fgets(string, sizeof string, fin)) {
00889 if(!(strstr(string, "perl"))) continue;
00890 sscanf(string, "%s %d", idstr, &tpid);
00891 sprintf(lfile, "%s/build_lev1_mgr_restart_%d.touch", LEV1LOG_BASEDIR, tpid);
00892 sprintf(idstr, "/bin/touch %s", lfile);
00893 printk("%s\n", idstr);
00894 system(idstr);
00895 }
00896 umask(002);
00897 }
00898
00899
00900 int main(int argc, char **argv)
00901 {
00902 FILE *fin;
00903 int wflg = 1;
00904 char line[128], pcmd[128];
00905
00906 if (cmdparams_parse(&cmdparams, argc, argv) == -1)
00907 {
00908 fprintf(stderr,"Error: Command line parsing failed. Aborting.\n");
00909 return 1;
00910 }
00911 if (nice_intro())
00912 return (0);
00913 if(!(username = (char *)getenv("USER"))) username = "nouser";
00914 instru = cmdparams_get_str(&cmdparams, "instru", NULL);
00915 if(strcmp(instru, "hmi") && strcmp(instru, "aia") && strcmp(instru, "iris")) {
00916 printf("instru= %s\n", instru);
00917 printf("Error: instru= must be given as 'hmi' or 'aia' or 'iris'\n");
00918 return(0);
00919 }
00920 if(!strcmp(instru, "aia")) {
00921 instruflg = 1;
00922 sprintf(hmiaianame, "aia");
00923 }
00924 else if(!strcmp(instru, "hmi")) {
00925 instruflg = 0;
00926 sprintf(hmiaianame, "hmi");
00927 }
00928 else {
00929 instruflg = 2;
00930 sprintf(hmiaianame, "iris");
00931 }
00932 mode = cmdparams_get_str(&cmdparams, "mode", NULL);
00933 if(strcmp(mode, "recnum") && strcmp(mode, "fsn")) {
00934 printf("Error: mode= must be given as 'recnum' or 'fsn'\n");
00935 return(0);
00936 }
00937 if(!strcmp(mode, "recnum")) modeflg = 1;
00938 dsin = cmdparams_get_str(&cmdparams, "dsin", NULL);
00939 dsout = cmdparams_get_str(&cmdparams, "dsout", NULL);
00940 brec = cmdparams_get_int(&cmdparams, "brec", NULL);
00941 erec = cmdparams_get_int(&cmdparams, "erec", NULL);
00942 bfsn = cmdparams_get_int(&cmdparams, "bfsn", NULL);
00943 efsn = cmdparams_get_int(&cmdparams, "efsn", NULL);
00944 quicklook = cmdparams_get_int(&cmdparams, "quicklook", NULL);
00945 numrec = cmdparams_get_int(&cmdparams, "numrec", NULL);
00946 numcpu = cmdparams_get_int(&cmdparams, "numcpu", NULL);
00947 numqsub = cmdparams_get_int(&cmdparams, "numqsub", NULL);
00948 if(numcpu > MAXCPULEV1) {
00949 printf("numcpu exceeds max of %d\n", MAXCPULEV1);
00950 return(0);
00951 }
00952 if(numqsub > MAXQSUBLEV1) {
00953 printf("numqsub exceeds max of %d\n", MAXQSUBLEV1);
00954 return(0);
00955 }
00956 if(modeflg) {
00957 if(brec == -1 || erec == -1) {
00958 fprintf(stderr, "brec and erec must be given. -1 not allowed\n");
00959 fprintf(stderr, "use brec=0 erec=0 to process in stream mode\n");
00960 return(0);
00961 }
00962 if(brec > erec) {
00963 fprintf(stderr, "brec must be <= erec\n");
00964 return(0);
00965 }
00966 if(brec == 0 && erec == 0) {
00967
00968 sprintf(pcmd, "ls %s/build_lev1_mgr_%s.stream.touch 2>/dev/null",
00969 LEV1LOG_BASEDIR, hmiaianame);
00970 fin = popen(pcmd, "r");
00971 while(fgets(line, sizeof line, fin)) {
00972 printf("Error: build_lev1_mgr already running.");
00973 printf(" If not so, do:\n");
00974 printf("/bin/rm %s/build_lev1_mgr_%s.stream.touch\n",
00975 LEV1LOG_BASEDIR, hmiaianame);
00976 pclose(fin);
00977 return(0);
00978 }
00979 pclose(fin);
00980 sprintf(pcmd, "touch %s/build_lev1_mgr_%s.stream.touch 2>/dev/null",
00981 LEV1LOG_BASEDIR, hmiaianame);
00982 system(pcmd);
00983 stream_mode = 1;
00984 quicklook = 1;
00985 }
00986 }
00987 else {
00988 if(bfsn == -1 || efsn == -1) {
00989 fprintf(stderr, "bfsn and efsn must be given. -1 not allowed\n");
00990 return(0);
00991 }
00992 if(bfsn == 0 || efsn == 0) {
00993 fprintf(stderr, "bfsn and efsn must be given. 0 not allowed\n");
00994 return(0);
00995 }
00996 if(bfsn > efsn) {
00997 fprintf(stderr, "bfsn must be <= efsn\n");
00998 return(0);
00999 }
01000 }
01001 logfile = cmdparams_get_str(&cmdparams, "logfile", NULL);
01002 if (strcmp(dsin, NOTSPECIFIED) == 0) {
01003 switch(instruflg) {
01004 case 0:
01005 dsin = LEV0SERIESNAMEHMI;
01006 break;
01007 case 1:
01008 dsin = LEV0SERIESNAMEAIA;
01009 break;
01010 case 2:
01011 dsin = LEV0SERIESNAMEIRIS;
01012 break;
01013 }
01014 }
01015 if (strcmp(dsout, NOTSPECIFIED) == 0) {
01016 switch(instruflg) {
01017 case 0:
01018 dsout = LEV1SERIESNAMEHMI;
01019 break;
01020 case 1:
01021 dsout = LEV1SERIESNAMEAIA;
01022 break;
01023 case 2:
01024 dsout = LEV1SERIESNAMEIRIS;
01025 break;
01026 }
01027 }
01028 if (strcmp(logfile, NOTSPECIFIED) == 0) {
01029 sprintf(logname, H1LOGFILE, hmiaianame, gettimetag());
01030 }
01031 else {
01032 sprintf(logname, "%s", logfile);
01033 }
01034 if((h1logfp=fopen(logname, "w")) == NULL)
01035 fprintf(stderr, "**Can't open the log file %s\n", logname);
01036 setup();
01037 while(wflg) {
01038 if(do_ingest(0)) {
01039 printk("**ERROR: in do_ingest() for %s\n", open_dsname);
01040 }
01041 if(!stream_mode) return(0);
01042 if(abortnow) break;
01043 sleep(30);
01044 if(lastrecnum0_now == lastrecnum0_prev) {
01045
01046
01047
01048
01049
01050
01051
01052
01053
01054
01055 sleep(30);
01056 }
01057
01058
01059 }
01060 sprintf(pcmd, "/bin/rm %s/build_lev1_mgr_%s.stream.touch 2>/dev/null",
01061 LEV1LOG_BASEDIR, hmiaianame);
01062 system(pcmd);
01063 return(0);
01064 }
01065