00001 #include "list.h"
00002
00003
00004 #define DEBUG 0
00005
00006
00007
00008
00009
00010 #define TESTMODE 0
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079 #include "jsoc_main.h"
00080 #include "drms.h"
00081 #include "drms_names.h"
00082 #include "json.h"
00083 #include "serverdefs.h"
00084
00085 #define EXPORT_SERIES "jsoc.export"
00086 #define EXPORT_SERIES_NEW "jsoc.export_new"
00087 #define EXPORT_USER "jsoc.export_user"
00088
00089 #define PACKLIST_VER "0.5"
00090
00091 #define kArgTestmode "t"
00092 #define kArgProcSeries "procser"
00093 #define kArgTestConvQuotes "tQuotes"
00094 #define kArgValNotUsed "NoT UsEd"
00095
00096 #define kMaxProcNameLen 128
00097 #define kMaxIntVar 64
00098 #define kMaxShVar 32
00099 #define kMaxArgVar 64
00100
00101 #define DIE(msg) { fprintf(stderr,"XXXX jsoc_exports_manager failure: %s\nstatus=%d",msg,status); exit(1); }
00102
00103 #define kHgPatchLog "hg_patch.log"
00104
00105 enum Protocol_enum
00106 {
00107 kProto_AsIs = 0,
00108 kProto_FITS = 1,
00109 kProto_MPEG = 2,
00110 kProto_JPEG = 3,
00111 kProto_PNG = 4,
00112 kProto_MP4 = 5,
00113 kProto_SuAsIs = 6
00114 };
00115
00116 typedef enum Protocol_enum Protocol_t;
00117
00118 const char *protos[] =
00119 {
00120 "as-is",
00121 "fits",
00122 "mpg",
00123 "jpg",
00124 "png",
00125 "mp4",
00126 "su-as-is"
00127 };
00128
00129 struct ExpProcArg_struct
00130 {
00131 char *name;
00132 char *def;
00133 };
00134
00135 typedef struct ExpProcArg_struct ExpProcArg_t;
00136
00137 struct ProcStep_struct
00138 {
00139 char *name;
00140 char *path;
00141 char *args;
00142 char *input;
00143 char *output;
00144 int crout;
00145 };
00146
00147 typedef struct ProcStep_struct ProcStep_t;
00148
00149
00150 #define kProcSerCOLproc "proc"
00151 #define kProcSerCOLpath "path"
00152 #define kProcSerCOLreq "required"
00153 #define kProcSerCOLopt "optional"
00154 #define kProcSerCOLmap "map"
00155 #define kProcSerCOLout "out"
00156
00157 struct ProcStepInfo_struct
00158 {
00159 char *name;
00160 char *path;
00161 LinkedList_t *req;
00162 LinkedList_t *opt;
00163 HContainer_t *namemap;
00164 char *suffix;
00165
00166 };
00167
00168 typedef struct ProcStepInfo_struct ProcStepInfo_t;
00169
00170 ModuleArgs_t module_args[] =
00171 {
00172 {ARG_STRING, "op", "process", "<Operation>"},
00173 {ARG_STRING, kArgProcSeries, "jsoc.export_procs", "The series containing the list of available processing steps. There is one such series on hmidb and one on hmidb2."},
00174 {ARG_STRING, kArgTestConvQuotes, kArgValNotUsed, "Put a record-set query in here to test the code that converts double-quoted strings to single-quoted strings."},
00175 {ARG_FLAG, kArgTestmode, NULL, "if set, then operates on new requests with status 12 (not 2)"},
00176 {ARG_FLAG, "h", "0", "help - show usage"},
00177 {ARG_END}
00178 };
00179
00180 char *module_name = "jsoc_export_manage";
00181
00182
00183
00184 HContainer_t *gIntVars = NULL;
00185 HContainer_t *gShVars = NULL;
00186
00187 static void FreeIntVars(void *data)
00188 {
00189 char **pstr = (char **)data;
00190
00191 if (pstr && *pstr)
00192 {
00193 free(*pstr);
00194 *pstr = NULL;
00195 }
00196 }
00197
00198 static void FreeShVars(void *data)
00199 {
00200 FreeIntVars(data);
00201 }
00202
00203 static int RegisterIntVar(const char *key, char type, void *val)
00204 {
00205 char numbuf[64];
00206 char *strval = NULL;
00207 int err;
00208
00209 err = 0;
00210
00211 if (key && val)
00212 {
00213 switch (type)
00214 {
00215 case 's':
00216 strval = strdup((char *)val);
00217 break;
00218 case 'i':
00219 snprintf(numbuf, sizeof(numbuf), "%d", *((int *)val));
00220 strval = strdup(numbuf);
00221 break;
00222 case 'f':
00223 snprintf(numbuf, sizeof(numbuf), "%f", *((double *)val));
00224 strval = strdup(numbuf);
00225 break;
00226 default:
00227 err = 1;
00228 }
00229
00230 if (!gIntVars)
00231 {
00232 gIntVars = hcon_create(sizeof(char *),
00233 kMaxIntVar,
00234 (void (*)(const void *))FreeIntVars,
00235 NULL,
00236 NULL,
00237 NULL,
00238 0);
00239 }
00240
00241 if (!gIntVars)
00242 {
00243 fprintf(stderr, "Out of memory.\n");
00244 err = 1;
00245 }
00246
00247
00248 hcon_remove(gIntVars, key);
00249
00250
00251 hcon_insert(gIntVars, key, &strval);
00252 }
00253 else
00254 {
00255 err = 1;
00256 }
00257
00258 return err;
00259 }
00260
00261 static int RegisterShVar(const char *key, const char *val)
00262 {
00263 char *strval = NULL;
00264 int err;
00265
00266 err = 0;
00267
00268 if (key && val)
00269 {
00270 if (!gShVars)
00271 {
00272 gShVars = hcon_create(sizeof(char *),
00273 kMaxShVar,
00274 (void (*)(const void *))FreeShVars,
00275 NULL,
00276 NULL,
00277 NULL,
00278 0);
00279 }
00280
00281 if (!gShVars)
00282 {
00283 fprintf(stderr, "Out of memory.\n");
00284 err = 1;
00285 }
00286
00287 strval = strdup(val);
00288
00289
00290 hcon_insert(gShVars, key, &strval);
00291
00292 }
00293 else
00294 {
00295 err = 1;
00296 }
00297
00298 return err;
00299 }
00300
00301
00302
00303
00304 static DB_Handle_t *GetDBHandle(DRMS_Env_t *env, const char *dbhost, int *contextOKout, int close)
00305 {
00306 static DB_Handle_t *dbh = NULL;
00307 static int contextOK = -1;
00308 int forceconn = 0;
00309 char contexthost[DRMS_MAXHOSTNAME];
00310
00311 if (dbh != NULL)
00312 {
00313 if (close && contextOK == 0)
00314 {
00315
00316 db_disconnect(&dbh);
00317 return NULL;
00318 }
00319
00320 if (contextOKout)
00321 {
00322 *contextOKout = contextOK;
00323 }
00324
00325 return dbh;
00326 }
00327 else if (close)
00328 {
00329
00330 return NULL;
00331 }
00332
00333 XASSERT(env && dbhost);
00334
00335 #ifdef DRMS_CLIENT
00336
00337
00338
00339
00340
00341 forceconn = 1;
00342 #else
00343
00344 snprintf(contexthost, sizeof(contexthost), "%s", env->session->db_handle->dbhost);
00345 forceconn = (strcasecmp(dbhost, contexthost) != 0);
00346 #endif
00347
00348 if (forceconn)
00349 {
00350
00351
00352
00353
00354
00355 if ((dbh = db_connect(dbhost, env->session->db_handle->dbuser, NULL, "jsoc", 1)) == NULL)
00356 {
00357 fprintf(stderr,"Couldn't connect to jsoc database on %s.\n", dbhost);
00358 }
00359 else
00360 {
00361
00362 if (contextOK == -1)
00363 {
00364 contextOK = 0;
00365 }
00366 }
00367 }
00368 else
00369 {
00370 dbh = env->session->db_handle;
00371 if (contextOK == -1)
00372 {
00373 contextOK = 1;
00374 }
00375 }
00376
00377 if (contextOKout)
00378 {
00379 *contextOKout = contextOK;
00380 }
00381
00382 return dbh;
00383 }
00384
00385 static void CloseDBHandle()
00386 {
00387 GetDBHandle(NULL, NULL, NULL, 1);
00388 }
00389
00390 static void FreeRecSpecParts(char ***snames, char ***filts, int nitems)
00391 {
00392 if (snames)
00393 {
00394 int iname;
00395 char **snameArr = *snames;
00396
00397 if (snameArr)
00398 {
00399 for (iname = 0; iname < nitems; iname++)
00400 {
00401 char *oneSname = snameArr[iname];
00402
00403 if (oneSname)
00404 {
00405 free(oneSname);
00406 }
00407 }
00408
00409 free(snameArr);
00410 }
00411
00412 *snames = NULL;
00413 }
00414
00415 if (filts)
00416 {
00417 int ifilt;
00418 char **filtArr = *filts;
00419
00420 if (filtArr)
00421 {
00422 for (ifilt = 0; ifilt < nitems; ifilt++)
00423 {
00424 char *onefilt = filtArr[ifilt];
00425
00426 if (onefilt)
00427 {
00428 free(onefilt);
00429 }
00430 }
00431
00432 free(filtArr);
00433 }
00434
00435 *filts = NULL;
00436 }
00437 }
00438
00439
00440
00441 static int ParseRecSetSpec(const char *rsquery,
00442 char ***snamesout,
00443 char ***filtsout,
00444 int *nsetsout,
00445 DRMS_RecQueryInfo_t *infoout)
00446 {
00447 int err = 0;
00448 char *allvers = NULL;
00449 char **sets = NULL;
00450 DRMS_RecordSetType_t *settypes = NULL;
00451 char **snames = NULL;
00452 char **filts = NULL;
00453 int nsets = 0;
00454 DRMS_RecQueryInfo_t rsinfo;
00455 int iset;
00456
00457
00458
00459 if (drms_record_parserecsetspec(rsquery, &allvers, &sets, &settypes, &snames, &filts, &nsets, &rsinfo) == DRMS_SUCCESS)
00460 {
00461 *infoout = rsinfo;
00462 *nsetsout = nsets;
00463
00464 if (nsets > 0)
00465 {
00466 *snamesout = (char **)calloc(nsets, sizeof(char *));
00467 *filtsout = (char **)calloc(nsets, sizeof(char *));
00468
00469 if (snamesout && filtsout)
00470 {
00471 for (iset = 0; iset < nsets; iset++)
00472 {
00473 if (snames[iset])
00474 {
00475 (*snamesout)[iset] = strdup(snames[iset]);
00476 }
00477 else
00478 {
00479 (*snamesout)[iset] = NULL;
00480 }
00481
00482 if (filts[iset])
00483 {
00484 (*filtsout)[iset] = strdup(filts[iset]);
00485 }
00486 else
00487 {
00488 (*filtsout)[iset] = NULL;
00489 }
00490 }
00491 }
00492 else
00493 {
00494 fprintf(stderr, "jsoc_export_manage FAILURE: out of memory.\n");
00495 err = 1;
00496 }
00497 }
00498 }
00499 else
00500 {
00501 fprintf(stderr, "jsoc_export_manage FAILURE: invalid record-set query %s.\n", rsquery);
00502 err = 1;
00503 }
00504
00505 drms_record_freerecsetspecarr(&allvers, &sets, &settypes, &snames, &filts, nsets);
00506
00507 if (err == 1)
00508 {
00509
00510 if (nsets > 0)
00511 {
00512 FreeRecSpecParts(snamesout, filtsout, nsets);
00513 }
00514 }
00515
00516 return err;
00517 }
00518
00519 int nice_intro ()
00520 {
00521 int usage = cmdparams_get_int (&cmdparams, "h", NULL);
00522 if (usage)
00523 {
00524 printf ("Usage:\njsoc_info {-h} "
00525 " details are:\n"
00526 "op=<command> tell which function to execute\n"
00527 " choices are:\n"
00528 " process - get new export requests and submit scripts.\n"
00529 " TBD\n"
00530 "h=help - show usage\n"
00531 );
00532 return(1);
00533 }
00534 return (0);
00535 }
00536
00537
00538 static void ErrorOutExpRec(DRMS_Record_t **exprec, int expstatus, const char *mbuf)
00539 {
00540 char cmd[DRMS_MAXQUERYLEN];
00541 DRMS_Keyword_t *key = NULL;
00542 char *requestid = NULL;
00543 int istat = DRMS_SUCCESS;
00544
00545
00546 if (mbuf)
00547 {
00548 fprintf(stderr, "%s\n", mbuf);
00549 }
00550
00551
00552 if (exprec && *exprec)
00553 {
00554
00555
00556
00557 requestid = drms_getkey_string(*exprec, "requestid", &istat);
00558
00559 if (!requestid)
00560 {
00561 fprintf(stderr, "Out of memory in ErrorOutExpRec().");
00562 }
00563 else
00564 {
00565 if (istat == DRMS_SUCCESS && strlen(requestid) > 0)
00566 {
00567 snprintf(cmd, sizeof(cmd), "DELETE FROM jsoc.export_md5 WHERE requestid='%s'", requestid);
00568 if (drms_dms((*exprec)->env->session, NULL, cmd))
00569 {
00570 fprintf(stderr, "Failure deleting expired recent export md5s: %s.\n", cmd);
00571 }
00572 }
00573
00574 free(requestid);
00575 requestid = NULL;
00576 }
00577
00578 drms_setkey_string(*exprec, "errmsg", mbuf);
00579 drms_setkey_int(*exprec, "Status", expstatus);
00580
00581
00582 drms_close_record(*exprec, DRMS_INSERT_RECORD);
00583 *exprec = NULL;
00584 }
00585 }
00586
00587
00588 static void ErrorOutExpNewRec(DRMS_RecordSet_t *exprecs, DRMS_Record_t **exprec, int irec, int expstatus, const char *mbuf)
00589 {
00590 int closedrec = 0;
00591
00592
00593 if (mbuf)
00594 {
00595 fprintf(stderr, "%s\n", mbuf);
00596 }
00597
00598
00599 if (exprec && *exprec)
00600 {
00601 drms_setkey_string(*exprec, "errmsg", mbuf);
00602 drms_setkey_int(*exprec, "Status", expstatus);
00603
00604
00605 drms_close_record(*exprec, DRMS_INSERT_RECORD);
00606 *exprec = NULL;
00607 closedrec = 1;
00608 }
00609
00610 if (exprecs)
00611 {
00612 if (closedrec)
00613 {
00614 exprecs->records[irec] = NULL;
00615 }
00616 }
00617 }
00618
00619
00620
00621
00622 static void make_qsub_call(char *requestid,
00623 char *reqdir,
00624 const char *notify,
00625 const char *dbname,
00626 const char *dbuser,
00627 const char *dbids,
00628 const char *dbexporthost,
00629 int submitcode)
00630 {
00631 FILE *fp;
00632 char qsubscript[DRMS_MAXPATHLEN];
00633
00634
00635 sprintf(qsubscript, "%s/%s.qsub", reqdir, requestid);
00636 fp = fopen(qsubscript, "w");
00637 fprintf(fp, "#! /bin/csh -f\n");
00638 fprintf(fp, "set echo\n");
00639
00640
00641
00642
00643
00644
00645
00646
00647
00648 fprintf(fp, "if (${?JSOCROOT_EXPORT}) then\n");
00649 fprintf(fp, " set path = ($JSOCROOT_EXPORT/bin/$JSOC_MACHINE $JSOCROOT_EXPORT/scripts $path)\n");
00650 fprintf(fp,"endif\n");
00651
00652 fprintf(fp, "while (`show_info JSOC_DBHOST=%s -q 'jsoc.export_new[%s]' key=Status %s` == %d)\n", dbexporthost, requestid, dbids, submitcode);
00653 fprintf(fp, " echo waiting for jsocdb commit >> /home/jsoc/exports/tmp/%s.runlog \n",requestid);
00654 fprintf(fp, " sleep 1\nend \n");
00655 if (dbname)
00656 {
00657 fprintf(fp, "setenv JSOC_DBNAME %s\n", dbname);
00658 }
00659 if (dbuser)
00660 {
00661 fprintf(fp, "setenv JSOC_DBUSER %s\n", dbuser);
00662 }
00663
00664 fprintf(fp, "setenv JSOC_DBHOST %s\n", dbexporthost);
00665 fprintf(fp, "setenv JSOC_DBEXPORTHOST %s\n", dbexporthost);
00666 fprintf(fp, "drms_run JSOC_DBHOST=%s %s/%s.drmsrun >>& /home/jsoc/exports/tmp/%s.runlog \n", dbexporthost, reqdir, requestid, requestid);
00667 fprintf(fp, "set DRMS_ERROR=$status\n");
00668 fprintf(fp, "set NewRecnum=`cat /home/jsoc/exports/tmp/%s.recnum` \n", requestid);
00669 fprintf(fp, "set WAITCOUNT = 20\n");
00670 fprintf(fp, "while (`show_info JSOC_DBHOST=%s -q -r 'jsoc.export[%s]' %s` < $NewRecnum)\n", dbexporthost, requestid, dbids);
00671 fprintf(fp, " echo waiting for jsocdb drms_run commit >> /home/jsoc/exports/tmp/%s.runlog \n",requestid);
00672 fprintf(fp, " @ WAITCOUNT = $WAITCOUNT - 1\n");
00673 fprintf(fp, " if ($WAITCOUNT <= 0) then\n set DRMS_ERROR = -1\n break\n endif\n");
00674 fprintf(fp, " sleep 1\nend \n");
00675
00676
00677 char notifyStr[512] = {0};
00678 int ich;
00679
00680 if (notify)
00681 {
00682 for (ich = 0; ich < strlen(notify); ich++)
00683 {
00684 if (notify[ich] == '\'' || notify[ich] == '"' || notify[ich] == ' ' || notify[ich] == '\t')
00685 {
00686 snprintf(notifyStr, sizeof(notifyStr), "0");
00687 break;
00688 }
00689 }
00690
00691 if (!*notifyStr)
00692 {
00693 snprintf(notifyStr, sizeof(notifyStr), notify);
00694 }
00695 }
00696 else
00697 {
00698 snprintf(notifyStr, sizeof(notifyStr), "0");
00699 }
00700
00701 fprintf(fp, "set Notify=%s\n", notifyStr);
00702 fprintf(fp, "set REQDIR = `show_info JSOC_DBHOST=%s -q -p 'jsoc.export[%s]' %s`\n", dbexporthost, requestid, dbids);
00703 fprintf(fp, "if ($DRMS_ERROR) then\n");
00704
00705
00706 fprintf(fp, " set_info -C JSOC_DBHOST=%s ds='jsoc.export[%s]' Status=4 %s\n", dbexporthost, requestid, dbids);
00707 fprintf(fp, "if (\"$Notify\" != 0) then\n");
00708 fprintf(fp, " mail -n -s 'JSOC export FAILED - %s' \"$Notify\" <<!\n", requestid);
00709 fprintf(fp, "Error status returned from DRMS session.\n");
00710 fprintf(fp, "See log files at http://jsoc.stanford.edu/$REQDIR\n");
00711 fprintf(fp, "Also complete log file at /home/jsoc/exports/tmp/%s.runlog\n", requestid);
00712 fprintf(fp, "!\n");
00713 fprintf(fp, "endif\n");
00714 fprintf(fp, "else\n");
00715 fprintf(fp, "if (\"$Notify\" != 0) then\n");
00716 fprintf(fp, " mail -n -s 'JSOC export complete - %s' \"$Notify\" <<!\n", requestid);
00717 fprintf(fp, "JSOC export request %s is complete.\n", requestid);
00718 fprintf(fp, "Results at http://jsoc.stanford.edu$REQDIR\n");
00719 fprintf(fp, "!\n");
00720 fprintf(fp, "endif\n");
00721
00722 fprintf(fp, " rm /home/jsoc/exports/tmp/%s.recnum\n", requestid);
00723 fprintf(fp, " mv /home/jsoc/exports/tmp/%s.reqlog /home/jsoc/exports/tmp/done \n", requestid);
00724
00725
00726 fprintf(fp, " mv /home/jsoc/exports/tmp/%s.runlog /home/jsoc/exports/tmp/done \n", requestid);
00727 fprintf(fp, "endif\n");
00728 fclose(fp);
00729 chmod(qsubscript, 0555);
00730 }
00731
00732
00733
00734 int isbadDataSet()
00735 {
00736 return(0);
00737 }
00738
00739
00740 int isbadProcessing()
00741 {
00742 return(0);
00743 }
00744
00745 #include <time.h>
00746 TIME timenow()
00747 {
00748 TIME UNIX_epoch = -220924792.000;
00749 TIME now = (double)time(NULL) + UNIX_epoch;
00750 return(now);
00751 }
00752
00753 enum PParseState_enum
00754 {
00755 kPPStError,
00756 kPPStBeginProc,
00757 kPPStOneProc,
00758 kPPStEndProc,
00759 kPPStEnd
00760 };
00761
00762 typedef enum PParseState_enum PParseState_t;
00763
00764 static void FreeProcStep(void *ps)
00765 {
00766 ProcStep_t *data = (ProcStep_t *)ps;
00767
00768 if (data && data->name)
00769 {
00770 free(data->name);
00771 }
00772
00773 if (data && data->args)
00774 {
00775 free(data->args);
00776 }
00777
00778 if (data && data->input)
00779 {
00780 free(data->input);
00781 }
00782
00783 if (data && data->output)
00784 {
00785 free(data->output);
00786 }
00787 }
00788
00789 static void FreeProcStepInfo(void *val)
00790 {
00791 ProcStepInfo_t *info = (ProcStepInfo_t *)val;
00792
00793 if (info->name)
00794 {
00795 free(info->name);
00796 }
00797
00798 if (info->path)
00799 {
00800 free(info->path);
00801 }
00802
00803 if (info->req)
00804 {
00805 list_llfree(&info->req);
00806 }
00807
00808 if (info->opt)
00809 {
00810 list_llfree(&info->opt);
00811 }
00812
00813 if (info->namemap)
00814 {
00815 hcon_destroy(&info->namemap);
00816 }
00817
00818 if (info->suffix)
00819 {
00820 free(info->suffix);
00821 }
00822 }
00823
00824 static void FreeProcArg(void *data)
00825 {
00826 ExpProcArg_t *parg = (ExpProcArg_t *)data;
00827
00828 if (parg)
00829 {
00830 if (parg->name)
00831 {
00832 free(parg->name);
00833 }
00834
00835 if (parg->def)
00836 {
00837 free(parg->def);
00838 }
00839
00840 memset(parg, 0, sizeof(ExpProcArg_t));
00841 }
00842 }
00843
00844 static LinkedList_t *ParseArgs(const char *list, int dodef, int *status)
00845 {
00846 char *arg = NULL;
00847 char *name = NULL;
00848 char *def = NULL;
00849 char *cpy = NULL;
00850 char *pc = NULL;
00851 char *eq = NULL;
00852 char *co = NULL;
00853 int intstat;
00854 int done;
00855 LinkedList_t *lst = NULL;
00856 ExpProcArg_t argnode;
00857
00858 if (list)
00859 {
00860 cpy = strdup(list);
00861
00862 intstat = DRMS_SUCCESS;
00863 for (pc = cpy, done = 0; !done;)
00864 {
00865 arg = pc;
00866 co = strchr(pc, ',');
00867 if (co)
00868 {
00869 *co = '\0';
00870 pc = co + 1;
00871 }
00872 else
00873 {
00874
00875 done = 1;
00876 }
00877
00878 name = arg;
00879 argnode.def = NULL;
00880
00881 if (dodef)
00882 {
00883 eq = strchr(arg, '=');
00884 if (eq)
00885 {
00886 *eq = '\0';
00887 def = eq + 1;
00888 argnode.def = strdup(def);
00889 }
00890 }
00891
00892 argnode.name = strdup(name);
00893
00894 if (!lst)
00895 {
00896 lst = list_llcreate(sizeof(ExpProcArg_t), (ListFreeFn_t)FreeProcArg);
00897 }
00898
00899 if (lst)
00900 {
00901 list_llinserttail(lst, &argnode);
00902 }
00903 else
00904 {
00905 intstat = DRMS_ERROR_OUTOFMEMORY;
00906 }
00907 }
00908 }
00909 else
00910 {
00911 intstat = DRMS_ERROR_INVALIDDATA;
00912 }
00913
00914 if (status)
00915 {
00916 *status = intstat;
00917 }
00918
00919 return lst;
00920 }
00921
00922 static void FreeProcMap(void *data)
00923 {
00924 char **pname = (char **)data;
00925
00926 if (pname && *pname)
00927 {
00928 free(*pname);
00929 *pname = NULL;
00930 }
00931 }
00932
00933 static HContainer_t *ParseMap(const char *list, int *status)
00934 {
00935 HContainer_t *map = NULL;
00936 char *cpy = NULL;
00937 int intstat;
00938 int done;
00939 char *pc = NULL;
00940 char *co = NULL;
00941 char *eq = NULL;
00942 char *mapping = NULL;
00943 char *argname = NULL;
00944 char *intname = NULL;
00945
00946 intstat = 0;
00947 if (list && *list)
00948 {
00949 cpy = strdup(list);
00950 if (cpy)
00951 {
00952 for (pc = cpy, done = 0; !done; )
00953 {
00954 mapping = pc;
00955 co = strchr(pc, ',');
00956 if (co)
00957 {
00958 *co = '\0';
00959 pc = co + 1;
00960 }
00961 else
00962 {
00963
00964 done = 1;
00965 }
00966
00967 argname = mapping;
00968
00969 eq = strchr(mapping, ':');
00970 if (!eq)
00971 {
00972 fprintf(stderr, "Invalid argument - no internal name supplied.\n");
00973 intstat = DRMS_ERROR_INVALIDDATA;
00974 break;
00975 }
00976 else
00977 {
00978 *eq = '\0';
00979 intname = strdup(eq + 1);
00980
00981 if (!intname)
00982 {
00983 intstat = DRMS_ERROR_OUTOFMEMORY;
00984 break;
00985 }
00986 }
00987
00988 if (!map)
00989 {
00990 map = hcon_create(sizeof(char *),
00991 kMaxProcNameLen,
00992 (void (*)(const void *))FreeProcMap,
00993 NULL,
00994 NULL,
00995 NULL,
00996 0);
00997 }
00998
00999 hcon_insert(map, argname, &intname);
01000 }
01001
01002 free(cpy);
01003 }
01004 else
01005 {
01006 intstat = DRMS_ERROR_OUTOFMEMORY;
01007 }
01008 }
01009
01010 if (status)
01011 {
01012 *status = intstat;
01013 }
01014
01015 return map;
01016 }
01017
01018 static int SuckInProcInfo(DRMS_Env_t *env, const char *procser, HContainer_t *info)
01019 {
01020
01021
01022
01023
01024
01025
01026
01027
01028
01029
01030
01031
01032
01033
01034
01035
01036
01037
01038
01039
01040
01041
01042
01043
01044
01045
01046 int status;
01047 DRMS_RecordSet_t *prs = NULL;
01048 DRMS_Record_t *rec = NULL;
01049 DRMS_RecChunking_t chunkstat;
01050 int newchunk;
01051
01052 char *proc = NULL;
01053 char *path = NULL;
01054 char *req = NULL;
01055 char *opt = NULL;
01056 char *map = NULL;
01057 char *out = NULL;
01058
01059 ProcStepInfo_t pi;
01060
01061 status = DRMS_SUCCESS;
01062
01063 if (info)
01064 {
01065
01066
01067
01068
01069
01070
01071
01072 prs = drms_open_records(env, procser, &status);
01073 if (status == DRMS_SUCCESS && prs)
01074 {
01075 if (prs->n > 0)
01076 {
01077 while ((rec = drms_recordset_fetchnext(env, prs, &status, &chunkstat, &newchunk))
01078 != NULL)
01079 {
01080 proc = drms_getkey_string(rec, kProcSerCOLproc, &status);
01081 if (status) break;
01082 path = drms_getkey_string(rec, kProcSerCOLpath, &status);
01083 if (status) break;
01084 req = drms_getkey_string(rec, kProcSerCOLreq, &status);
01085 if (status) break;
01086 opt = drms_getkey_string(rec, kProcSerCOLopt, &status);
01087 if (status) break;
01088 map = drms_getkey_string(rec, kProcSerCOLmap, &status);
01089 if (status) break;
01090 out = drms_getkey_string(rec, kProcSerCOLout, &status);
01091 if (status) break;
01092
01093
01094
01095 pi.name = proc;
01096 pi.path = path;
01097 pi.req = ParseArgs(req, 0, &status);
01098 if (status) break;
01099 pi.opt = ParseArgs(opt, 1, &status);
01100 if (status) break;
01101 pi.namemap = ParseMap(map, &status);
01102 if (status) break;
01103 pi.suffix = out;
01104
01105
01106 hcon_insert(info, proc, &pi);
01107 }
01108 }
01109
01110 drms_close_records(prs, DRMS_FREE_RECORD);
01111 }
01112 }
01113
01114 return status;
01115 }
01116
01117 static int NumPKeyKeys(DRMS_Env_t *env, const char *dbhost, const char *series)
01118 {
01119 int rv = -1;
01120 DB_Handle_t *dbh = NULL;
01121 int contextOK = 0;
01122 int istat = 0;
01123
01124
01125 dbh = GetDBHandle(env, dbhost, &contextOK, 0);
01126
01127 if (!dbh)
01128 {
01129 fprintf(stderr, "jsoc_export_manage: Unable to connect to database.\n");
01130 }
01131 else
01132 {
01133 if (!contextOK)
01134 {
01135
01136
01137
01138
01139
01140 char query[512];
01141 char *schema = NULL;
01142 DB_Text_Result_t *res = NULL;
01143
01144 if (get_namespace(series, &schema, NULL))
01145 {
01146 fprintf(stderr, "Invalid series name %s.\n", series);
01147 }
01148 else
01149 {
01150 snprintf (query,
01151 sizeof(query),
01152 "SELECT primary_idx FROM %s.drms_series WHERE lower(seriesname) = lower(\'%s\')",
01153 schema,
01154 series);
01155
01156 res = db_query_txt(dbh, query);
01157
01158 if (res && res->num_rows == 1)
01159 {
01160 const char *pidx = res->field[0][0];
01161 int nkeys = 0;
01162 const char *pc = pidx;
01163
01164 if (*pc == '\0')
01165 {
01166
01167 rv = 0;
01168 }
01169 else
01170 {
01171 nkeys++;
01172
01173
01174 while (*pc)
01175 {
01176 if (*pc == ',')
01177 {
01178 nkeys++;
01179 }
01180
01181 pc++;
01182 }
01183
01184 rv = nkeys;
01185 }
01186
01187 db_free_text_result(res);
01188 res = NULL;
01189 }
01190 else
01191 {
01192 fprintf(stderr, "Invalid SQL query: %s.\n", query);
01193 }
01194
01195 free(schema);
01196 }
01197 }
01198 else
01199 {
01200
01201
01202 DRMS_Record_t *template = drms_template_record(env, series, &istat);
01203
01204 if (istat != DRMS_SUCCESS || !template)
01205 {
01206 fprintf(stderr, "Problems obtaining template record for %s on %s.\n", series, dbhost);
01207 }
01208 else
01209 {
01210 rv = template->seriesinfo->pidx_num;
01211 }
01212 }
01213 }
01214
01215 return rv;
01216 }
01217
01218 void FreeArgInfo(const void *data)
01219 {
01220 char **info = (char **)data;
01221
01222 if (info && *info)
01223 {
01224 free(*info);
01225 *info = NULL;
01226 }
01227 }
01228
01229
01230
01231
01232 static int InitVarConts(const char *args,
01233 HContainer_t **pvarsargs )
01234 {
01235 char *cpy = NULL;
01236 char *pc = NULL;
01237 char *onename = NULL;
01238 char *oneval = NULL;
01239 int err;
01240
01241 err = 0;
01242
01243
01244
01245 if (pvarsargs && !*pvarsargs && args)
01246 {
01247 const char *valcpy = NULL;
01248 *pvarsargs = hcon_create(sizeof(char *),
01249 kMaxArgVar,
01250 (void (*)(const void *))FreeArgInfo,
01251 NULL,
01252 NULL,
01253 NULL,
01254 0);
01255
01256 if (*args)
01257 {
01258 cpy = strdup(args);
01259 pc = cpy;
01260 onename = pc;
01261 while (*pc)
01262 {
01263 if (*pc == '=')
01264 {
01265 *pc = '\0';
01266 oneval = ++pc;
01267 }
01268 else if (*pc == ',')
01269 {
01270 *pc = '\0';
01271
01272 if (oneval && *oneval != '\0')
01273 {
01274 valcpy = strdup(oneval);
01275 hcon_insert(*pvarsargs, onename, &valcpy);
01276 }
01277
01278 pc++;
01279 onename = pc;
01280 oneval = NULL;
01281 }
01282 else
01283 {
01284 pc++;
01285 }
01286 }
01287
01288 if (!err)
01289 {
01290
01291 if (oneval)
01292 {
01293 valcpy = strdup(oneval);
01294 hcon_insert(*pvarsargs, onename, &valcpy);
01295 }
01296 else
01297 {
01298 valcpy = strdup("");
01299 hcon_insert(*pvarsargs, onename, &valcpy);
01300 }
01301 }
01302
01303 free(cpy);
01304 }
01305 }
01306
01307 return err;
01308 }
01309
01310 static void DestroyVarConts(HContainer_t **pvarsargs)
01311 {
01312 hcon_destroy(pvarsargs);
01313 }
01314
01315
01316
01317
01318
01319
01320
01321
01322 static int GenProgArgs(ProcStepInfo_t *pinfo,
01323 HContainer_t *args,
01324 ProcStep_t *stepdata,
01325 const char *reclim,
01326 char **argsout)
01327 {
01328
01329
01330
01331
01332
01333
01334
01335
01336
01337
01338
01339
01340
01341
01342
01343
01344
01345
01346
01347 int err;
01348 ListNode_t *node = NULL;
01349 ExpProcArg_t *data = NULL;
01350 const char *intname = NULL;
01351 const char **pintname = NULL;
01352 size_t sz = 128;
01353 const char **pval = NULL;
01354 const char *val = NULL;
01355 char *finalargs = NULL;
01356 int first;
01357
01358 err = 0;
01359 finalargs = malloc(sz);
01360 memset(finalargs, 0, sz);
01361
01362 if (pinfo)
01363 {
01364 if (pinfo->req)
01365 {
01366 char **snames = NULL;
01367 char **filts = NULL;
01368 int nsets;
01369 DRMS_RecQueryInfo_t info;
01370
01371 list_llreset(pinfo->req);
01372
01373
01374 first = 1;
01375 while ((node = list_llnext(pinfo->req)) != NULL)
01376 {
01377 data = (ExpProcArg_t *)node->data;
01378 if (pinfo->namemap)
01379 {
01380 pintname = (const char **)hcon_lookup(pinfo->namemap, data->name);
01381 }
01382 else
01383 {
01384 pintname = NULL;
01385 }
01386
01387 if (pintname == NULL)
01388 {
01389
01390
01391 intname = data->name;
01392 }
01393 else
01394 {
01395 intname = *pintname;
01396 }
01397
01398
01399 if ((args && (pval = (const char **)hcon_lookup(args, intname)) != NULL) ||
01400 (gIntVars && (pval = (const char **)hcon_lookup(gIntVars, intname)) != NULL) ||
01401 (gShVars && (pval = (const char **)hcon_lookup(gShVars, intname)) != NULL))
01402 {
01403 val = *pval;
01404 }
01405 else if (strcasecmp(intname, "in") == 0)
01406 {
01407
01408
01409
01410
01411 val = stepdata->input;
01412 }
01413 else if (strcasecmp(intname, "out") == 0)
01414 {
01415
01416
01417
01418
01419
01420
01421
01422
01423 if (stepdata->output)
01424 {
01425 if (ParseRecSetSpec(stepdata->output, &snames, &filts, &nsets, &info))
01426 {
01427 fprintf(stderr, "Invalid output series record specification %s.\n", stepdata->output);
01428 err = 1;
01429 break;
01430 }
01431
01432
01433 val = snames[0];
01434 }
01435 else
01436 {
01437 val = NULL;
01438 }
01439 }
01440 else
01441 {
01442
01443 val = NULL;
01444 fprintf(stderr, "Unable to locate value for program argument %s.\n", data->name);
01445 err = 1;
01446 break;
01447 }
01448
01449 if (val)
01450 {
01451 if (!first)
01452 {
01453 finalargs = base_strcatalloc(finalargs, " ", &sz);
01454 }
01455 else
01456 {
01457 first = 0;
01458 }
01459
01460 finalargs = base_strcatalloc(finalargs, data->name, &sz);
01461 finalargs = base_strcatalloc(finalargs, "=", &sz);
01462 finalargs = base_strcatalloc(finalargs, "'", &sz);
01463 finalargs = base_strcatalloc(finalargs, val, &sz);
01464 finalargs = base_strcatalloc(finalargs, "'", &sz);
01465 }
01466 }
01467
01468 FreeRecSpecParts(&snames, &filts, nsets);
01469 }
01470
01471 if (!err)
01472 {
01473
01474 if (pinfo->opt)
01475 {
01476 list_llreset(pinfo->opt);
01477
01478 first = 1;
01479 while ((node = list_llnext(pinfo->opt)) != NULL)
01480 {
01481 data = (ExpProcArg_t *)node->data;
01482 if (pinfo->namemap)
01483 {
01484 pintname = (const char **)hcon_lookup(pinfo->namemap, data->name);
01485 }
01486 else
01487 {
01488 pintname = NULL;
01489 }
01490
01491 if (pintname == NULL)
01492 {
01493
01494
01495 intname = data->name;
01496 }
01497 else
01498 {
01499 intname = *pintname;
01500 }
01501
01502
01503 if ((pval = (const char **)hcon_lookup(args, intname)) != NULL ||
01504 (pval = (const char **)hcon_lookup(gIntVars, intname)) != NULL ||
01505 (pval = (const char **)hcon_lookup(gShVars, intname)) != NULL
01506 )
01507 {
01508 val = *pval;
01509 }
01510 else if (strcasecmp(intname, "reclim") == 0)
01511 {
01512
01513
01514
01515
01516
01517 val = reclim;
01518 }
01519 else
01520 {
01521
01522
01523 val = NULL;
01524 if (data->def && *data->def)
01525 {
01526
01527 val = data->def;
01528 }
01529 else
01530 {
01531
01532
01533 val = NULL;
01534 }
01535 }
01536
01537 if (val)
01538 {
01539 if (!first)
01540 {
01541 finalargs = base_strcatalloc(finalargs, " ", &sz);
01542 }
01543 else
01544 {
01545 if (*finalargs)
01546 {
01547
01548 finalargs = base_strcatalloc(finalargs, " ", &sz);
01549 }
01550 first = 0;
01551 }
01552
01553 finalargs = base_strcatalloc(finalargs, data->name, &sz);
01554
01555
01556 if (!(*data->name == '-'))
01557 {
01558 finalargs = base_strcatalloc(finalargs, "=", &sz);
01559 finalargs = base_strcatalloc(finalargs, val, &sz);
01560 }
01561 }
01562 }
01563 }
01564 }
01565 }
01566
01567 if (!err && finalargs && *finalargs)
01568 {
01569 *argsout = finalargs;
01570 }
01571 else
01572 {
01573 *argsout = NULL;
01574 }
01575
01576 return err;
01577 }
01578
01579
01580
01581 static int SeriesExists(DRMS_Env_t *env, const char *series, const char *dbhost, int *status)
01582 {
01583 int rv = 0;
01584 int istat = 0;
01585 DB_Handle_t *dbh = NULL;
01586 int contextOK = 0;
01587
01588
01589 dbh = GetDBHandle(env, dbhost, &contextOK, 0);
01590
01591 if (!dbh)
01592 {
01593 fprintf(stderr, "jsoc_export_manage: Unable to connect to database.\n");
01594 istat = 1;
01595 }
01596 else
01597 {
01598 if (!contextOK)
01599 {
01600
01601
01602
01603
01604
01605 char query[512];
01606 char *schema = NULL;
01607 char *table = NULL;
01608 DB_Text_Result_t *res = NULL;
01609
01610 if (get_namespace(series, &schema, &table))
01611 {
01612 fprintf(stderr, "Invalid series name %s.\n", series);
01613 istat = DRMS_ERROR_UNKNOWNSERIES;
01614 }
01615 else
01616 {
01617 snprintf (query, sizeof(query),
01618 "SELECT * FROM pg_catalog.pg_tables WHERE schemaname = lower(\'%s\') AND tablename = lower(\'%s\')",
01619 schema,
01620 table);
01621
01622 res = db_query_txt(dbh, query);
01623
01624 if (res)
01625 {
01626 rv = (res->num_rows != 0);
01627 db_free_text_result(res);
01628 res = NULL;
01629 }
01630 else
01631 {
01632 fprintf(stderr, "Invalid SQL query: %s.\n", query);
01633 }
01634
01635 free(schema);
01636 free(table);
01637 }
01638 }
01639 else
01640 {
01641
01642
01643 rv = drms_series_exists(env, series, &istat);
01644 if (istat != DRMS_SUCCESS && istat != DRMS_ERROR_UNKNOWNSERIES)
01645 {
01646 fprintf(stderr, "Problems checking for series '%s' existence on %s.\n", series, dbhost);
01647 rv = 0;
01648 }
01649 }
01650 }
01651
01652 if (status)
01653 {
01654 *status = istat;
01655 }
01656
01657 return rv;
01658 }
01659
01660
01661
01662 static int KeyExists(DRMS_Env_t *env, const char *dbhost, const char *series, const char *keyname, int *status)
01663 {
01664 int rv = 0;
01665 int istat = 0;
01666 DB_Handle_t *dbh = NULL;
01667 int contextOK = 0;
01668
01669
01670 dbh = GetDBHandle(env, dbhost, &contextOK, 0);
01671
01672 if (!dbh)
01673 {
01674 fprintf(stderr, "jsoc_export_manage: Unable to connect to database.\n");
01675 istat = 1;
01676 }
01677 else
01678 {
01679 if (!contextOK)
01680 {
01681 char query[512];
01682 char *schema = NULL;
01683 char *table = NULL;
01684 DB_Text_Result_t *res = NULL;
01685
01686 if (get_namespace(series, &schema, &table))
01687 {
01688 fprintf(stderr, "Invalid series name %s.\n", series);
01689 istat = DRMS_ERROR_UNKNOWNSERIES;
01690 }
01691 else
01692 {
01693 snprintf (query,
01694 sizeof(query),
01695 "SELECT * FROM %s.drms_keyword WHERE lower(seriesname) = lower(\'%s\') AND lower(keywordname) = lower(\'%s\')",
01696 series,
01697 series,
01698 keyname);
01699
01700 res = db_query_txt(dbh, query);
01701
01702 if (res)
01703 {
01704 rv = (res->num_rows != 0);
01705 db_free_text_result(res);
01706 res = NULL;
01707 }
01708 else
01709 {
01710 fprintf(stderr, "Invalid SQL query: %s.\n", query);
01711 }
01712
01713 free(schema);
01714 free(table);
01715 }
01716 }
01717 else
01718 {
01719
01720 DRMS_Record_t *template = drms_template_record(env, series, &istat);
01721
01722 if (istat != DRMS_SUCCESS || !template)
01723 {
01724 fprintf(stderr, "Problems obtaining template record for %s on %s.\n", series, dbhost);
01725 }
01726 else
01727 {
01728 rv = (drms_keyword_lookup(template, keyname, 0) != NULL);
01729 }
01730 }
01731 }
01732
01733 if (status)
01734 {
01735 *status = istat;
01736 }
01737
01738 return rv;
01739 }
01740
01741
01742
01743 static int GenOutRSSpec(DRMS_Env_t *env,
01744 const char *dbhost,
01745 ProcStepInfo_t *cpinfo,
01746 ProcStepInfo_t *ppinfo,
01747 ProcStep_t *data,
01748 const char *reqid)
01749 {
01750 int err = 0;
01751
01752 if (cpinfo)
01753 {
01754 const char *suffix = cpinfo->suffix;
01755 const char *psuffix = NULL;
01756 const char *pdataset = NULL;
01757 char **snamesIn = NULL;
01758 char **snamesOut = NULL;
01759 char **filtsIn = NULL;
01760 char **filtsOut = NULL;
01761 int nsetsIn;
01762 int nsetsOut;
01763 int iset;
01764 DRMS_RecQueryInfo_t infoIn;
01765 DRMS_RecQueryInfo_t infoOut;
01766 char *outseries = NULL;
01767 char *newoutseries = NULL;
01768 char *newfilter = NULL;
01769 int npkeys;
01770 size_t sz;
01771 int len;
01772 char *repl = NULL;
01773 int ierr = 0;
01774 size_t szCanon = 128;
01775 char *canonicalIn = NULL;
01776
01777 while (1)
01778 {
01779 if (ppinfo)
01780 {
01781
01782
01783
01784 psuffix = ppinfo->suffix;
01785 }
01786 else
01787 {
01788
01789
01790 }
01791
01792
01793 if (ParseRecSetSpec(data->input, &snamesIn, &filtsIn, &nsetsIn, &infoIn))
01794 {
01795 err = 1;
01796 break;
01797 }
01798
01799 if (nsetsIn > 1)
01800 {
01801
01802
01803
01804 err = 0;
01805 break;
01806 }
01807
01808 canonicalIn = calloc(szCanon, sizeof(char));
01809 if (!canonicalIn)
01810 {
01811 err = 1;
01812 fprintf(stderr, "out of memory\n");
01813 break;
01814 }
01815
01816 for (iset = 0; iset < nsetsIn; iset++)
01817 {
01818 if (iset > 0)
01819 {
01820 canonicalIn = base_strcatalloc(canonicalIn, ", ", &szCanon);
01821 }
01822
01823 canonicalIn = base_strcatalloc(canonicalIn, snamesIn[iset], &szCanon);
01824 if (*(filtsIn[iset]) != '\0')
01825 {
01826 canonicalIn = base_strcatalloc(canonicalIn, filtsIn[iset], &szCanon);
01827 }
01828 }
01829
01830
01831
01832
01833
01834 if (psuffix && *psuffix == '_' && suffix && *suffix == '_')
01835 {
01836
01837
01838 for (iset = 0; iset < nsetsIn; iset++)
01839 {
01840 outseries = base_strreplace(canonicalIn, psuffix, suffix);
01841 }
01842
01843 if (strcmp(psuffix, suffix) != 0)
01844 {
01845
01846
01847
01848 data->crout = 1;
01849 }
01850 }
01851 else if (suffix && *suffix == '_')
01852 {
01853
01854
01855
01856
01857 char replname[DRMS_MAXSERIESNAMELEN];
01858 char *psuff = NULL;
01859
01860
01861
01862 outseries = strdup(canonicalIn);
01863 data->crout = 0;
01864 for (iset = 0; iset < nsetsIn; iset++)
01865 {
01866
01867
01868
01869
01870
01871
01872 if ((psuff = strcasestr(snamesIn[iset], suffix)) == NULL || *(psuff + strlen(suffix)) != '\0')
01873 {
01874
01875
01876
01877 snprintf(replname, sizeof(replname), "%s%s", snamesIn[iset], suffix);
01878 newoutseries = base_strreplace(outseries, snamesIn[iset], replname);
01879 free(outseries);
01880 outseries = newoutseries;
01881
01882
01883
01884
01885 data->crout = 1;
01886 }
01887 else
01888 {
01889
01890
01891 }
01892 }
01893 }
01894 else if (suffix && *suffix)
01895 {
01896
01897
01898
01899 for (iset = 0; iset < nsetsIn; iset++)
01900 {
01901 outseries = base_strreplace(canonicalIn, snamesIn[iset], suffix);
01902 }
01903
01904
01905
01906
01907
01908
01909 }
01910 else
01911 {
01912
01913 outseries = strdup(canonicalIn);
01914 }
01915
01916
01917 for (iset = 0; iset < nsetsIn; iset++)
01918 {
01919 if (filtsIn[iset])
01920 {
01921 newoutseries = base_strreplace(outseries, filtsIn[iset], "");
01922 free(outseries);
01923 outseries = newoutseries;
01924 }
01925 }
01926
01927
01928 if (ParseRecSetSpec(outseries, &snamesOut, &filtsOut, &nsetsOut, &infoOut))
01929 {
01930 err = 1;
01931 break;
01932 }
01933
01934 sz = 32;
01935 newfilter = malloc(sz);
01936 memset(newfilter, 0, sz);
01937 for (iset = 0; iset < nsetsOut; iset++)
01938 {
01939
01940
01941
01942
01943
01944
01945
01946
01947
01948
01949
01950 if (data->crout == 1 && (!SeriesExists(env, snamesOut[iset], dbhost, &ierr) || ierr))
01951 {
01952 npkeys = NumPKeyKeys(env, dbhost, snamesIn[iset]);
01953
01954
01955
01956
01957
01958
01959 if (!KeyExists(env, dbhost, snamesIn[iset], "RequestID", &ierr) || ierr)
01960 {
01961
01962
01963 npkeys++;
01964 }
01965 }
01966 else
01967 {
01968 npkeys = NumPKeyKeys(env, dbhost, snamesOut[iset]);
01969 }
01970
01971 if (npkeys < 1)
01972 {
01973
01974 err = 1;
01975 break;
01976 }
01977
01978
01979 while (--npkeys)
01980 {
01981 newfilter = base_strcatalloc(newfilter, "[]", &sz);
01982 }
01983
01984 newfilter = base_strcatalloc(newfilter, "[", &sz);
01985 newfilter = base_strcatalloc(newfilter, reqid, &sz);
01986 newfilter = base_strcatalloc(newfilter, "]", &sz);
01987
01988 len = strlen(snamesOut[iset]) + strlen(newfilter) + 16;
01989 repl = malloc(len);
01990
01991 if (repl)
01992 {
01993 snprintf(repl, len, "%s%s", snamesOut[iset], newfilter);
01994 newoutseries = base_strreplace(outseries, snamesOut[iset], repl);
01995 free(repl);
01996 repl = NULL;
01997 }
01998 else
01999 {
02000 fprintf(stderr, "Out of memory.\n");
02001 err = 1;
02002 break;
02003 }
02004
02005 free(outseries);
02006 outseries = newoutseries;
02007
02008 *newfilter = '\0';
02009 }
02010
02011 if (err == 1)
02012 {
02013 break;
02014 }
02015
02016 free(newfilter);
02017 newfilter = NULL;
02018
02019 FreeRecSpecParts(&snamesOut, &filtsOut, nsetsOut);
02020 FreeRecSpecParts(&snamesIn, &filtsIn, nsetsIn);
02021
02022
02023 data->output = outseries;
02024
02025 break;
02026 }
02027
02028
02029
02030
02031
02032 if (canonicalIn)
02033 {
02034 free(canonicalIn);
02035 canonicalIn = NULL;
02036 }
02037 }
02038
02039 return err;
02040 }
02041
02042
02043
02044
02045
02046
02047
02048
02049
02050
02051
02052
02053
02054 static LinkedList_t *ParseFields(DRMS_Env_t *env,
02055 const char *procser,
02056 const char *dbhost,
02057 const char *val,
02058 const char *dset,
02059 const char *reqid,
02060 char **reclim,
02061 int *status)
02062 {
02063 LinkedList_t *rv = NULL;
02064 char *activestr = NULL;
02065 char *pc = NULL;
02066 PParseState_t state = kPPStBeginProc;
02067 char *onecmd = NULL;
02068 char *args = NULL;
02069 ProcStep_t data;
02070 int procnum;
02071 char *adataset = NULL;
02072 char *end = NULL;
02073 HContainer_t *pinfo = NULL;
02074 ProcStepInfo_t *cpinfo = NULL;
02075 ProcStepInfo_t *ppinfo = NULL;
02076 int intstat;
02077 int gettingargs;
02078 char *reclimint = NULL;
02079 int bar;
02080
02081 HContainer_t *varsargs = NULL;
02082
02083 intstat = 0;
02084 procnum = 0;
02085 activestr = malloc(strlen(val) + 2);
02086 pc = activestr;
02087 snprintf(pc, strlen(val) + 2, "%s|", val);
02088 end = pc + strlen(pc);
02089
02090
02091 adataset = strdup(dset);
02092
02093
02094
02095
02096 pinfo = hcon_create(sizeof(ProcStepInfo_t), kMaxProcNameLen, (void (*)(const void *))FreeProcStepInfo, NULL, NULL, NULL, 0);
02097
02098
02099 SuckInProcInfo(env, procser, pinfo);
02100
02101 while (1)
02102 {
02103 if (state == kPPStError)
02104 {
02105
02106 break;
02107 }
02108
02109 if (state == kPPStBeginProc)
02110 {
02111 onecmd = pc;
02112 args = NULL;
02113 procnum++;
02114 state = kPPStOneProc;
02115 data.name = NULL;
02116 data.path = NULL;
02117 data.args = NULL;
02118 data.input = adataset;
02119 data.output = NULL;
02120 data.crout = 0;
02121 cpinfo = NULL;
02122 gettingargs = 0;
02123 bar = 0;
02124 }
02125 else if (state == kPPStOneProc)
02126 {
02127 if ((*pc == '|' || *pc == ',') && !gettingargs)
02128 {
02129
02130
02131
02132 if (*pc == '|')
02133 {
02134 bar = 1;
02135 }
02136
02137 *pc = '\0';
02138
02139
02140
02141 cpinfo = hcon_lookup(pinfo, onecmd);
02142
02143
02144
02145 if (!cpinfo && strncasecmp(onecmd, "n=", 2) == 0 && procnum == 1)
02146 {
02147 args = onecmd + 2;
02148
02149
02150
02151 data.name = strdup("n=xx");
02152 data.output = strdup(data.input);
02153
02154
02155 if (bar && *(pc + 1) == '|')
02156 {
02157 pc++;
02158 }
02159
02160 state = kPPStEndProc;
02161 }
02162 else if (cpinfo)
02163 {
02164
02165
02166
02167 data.name = strdup(onecmd);
02168 args = pc + 1;
02169 if (bar)
02170 {
02171
02172 state = kPPStEndProc;
02173 }
02174 else
02175 {
02176 gettingargs = 1;
02177 }
02178 }
02179 else
02180 {
02181
02182 fprintf(stderr, "Unknown processing step %s.\n", onecmd);
02183 state = kPPStError;
02184 }
02185 }
02186 else if (*pc == '|')
02187 {
02188
02189 *pc = '\0';
02190 state = kPPStEndProc;
02191 }
02192 else
02193 {
02194
02195 pc++;
02196 }
02197 }
02198 else if (state == kPPStEndProc)
02199 {
02200
02201
02202
02203
02204
02205
02206
02207
02208
02209 char *finalargs = NULL;
02210
02211
02212
02213 if (strcasecmp(data.name, "n=xx") == 0)
02214 {
02215 state = kPPStBeginProc;
02216 reclimint = strdup(args);
02217 pc++;
02218 }
02219 else if (cpinfo->path && *cpinfo->path)
02220 {
02221
02222
02223
02224 if (!varsargs)
02225 {
02226 if (InitVarConts(args, &varsargs))
02227 {
02228 state = kPPStError;
02229 continue;
02230 }
02231 }
02232
02233 if (!varsargs)
02234 {
02235 state = kPPStError;
02236 continue;
02237 }
02238
02239
02240
02241
02242 if (GenOutRSSpec(env, dbhost, cpinfo, ppinfo, &data, reqid))
02243 {
02244 state = kPPStError;
02245 continue;
02246 }
02247
02248 if (GenProgArgs(cpinfo, varsargs, &data, reclimint, &finalargs))
02249 {
02250 state = kPPStError;
02251 continue;
02252 }
02253
02254
02255 DestroyVarConts(&varsargs);
02256
02257 data.path = strdup(cpinfo->path);
02258
02259
02260 if (finalargs)
02261 {
02262 data.args = finalargs;
02263 }
02264
02265 if (!rv)
02266 {
02267 rv = list_llcreate(sizeof(ProcStep_t), (ListFreeFn_t)FreeProcStep);
02268 }
02269
02270 list_llinserttail(rv, &data);
02271
02272 ppinfo = cpinfo;
02273
02274
02275 if (data.output)
02276 {
02277 adataset = strdup(data.output);
02278 state = kPPStBeginProc;
02279 pc++;
02280 }
02281 else
02282 {
02283
02284
02285 adataset = strdup(data.input);
02286 state = kPPStBeginProc;
02287 pc++;
02288 }
02289 }
02290 else
02291 {
02292
02293 state = kPPStBeginProc;
02294 pc++;
02295 }
02296 }
02297
02298 if (pc == end)
02299 {
02300 if (state == kPPStBeginProc)
02301 {
02302 state = kPPStEnd;
02303 }
02304
02305 if (state != kPPStEnd)
02306 {
02307 state = kPPStError;
02308 }
02309
02310 break;
02311 }
02312
02313 }
02314
02315 hcon_destroy(&pinfo);
02316
02317 if (activestr)
02318 {
02319 free(activestr);
02320 }
02321
02322 if (adataset)
02323 {
02324
02325 free(adataset);
02326 }
02327
02328 if (state == kPPStEnd)
02329 {
02330 intstat = 1;
02331 }
02332
02333 if (reclim)
02334 {
02335 if (reclimint)
02336 {
02337 *reclim = reclimint;
02338 }
02339 else
02340 {
02341 *reclim = strdup("0");
02342 }
02343 }
02344
02345 if (status)
02346 {
02347 *status = intstat;
02348 }
02349
02350 return rv;
02351 }
02352
02353 enum PSeqState_enum
02354 {
02355 kPSeqBegin,
02356 kPSeqReclim,
02357 kPSeqNoMore,
02358 kPSeqError,
02359 kPSeqMoreOK
02360 };
02361
02362 typedef enum PSeqState_enum PSeqState_t;
02363
02364 static int IsBadProcSequence(LinkedList_t *procs)
02365 {
02366 #if 0
02367
02368 ListNode_t *node = NULL;
02369 Processing_t type = kProc_Unk;
02370 PSeqState_t state = kPSeqBegin;
02371 int quit;
02372 int rv;
02373
02374 quit = 0;
02375 list_llreset(procs);
02376 while (!quit && (node = list_llnext(procs)) != 0)
02377 {
02378 type = ((ProcStep_t *)node->data)->type;
02379
02380 switch (state)
02381 {
02382 case kPSeqError:
02383 {
02384 quit = 1;
02385 }
02386 break;
02387 case kPSeqBegin:
02388 {
02389 if (type == kProc_Noop || type == kProc_SuExport)
02390 {
02391 state = kPSeqNoMore;
02392 }
02393 else if (type == kProc_Reclimit)
02394 {
02395 state = kPSeqReclim;
02396 }
02397 else if (type == kProc_NotSpec || type == kProc_HgPatch || type == kProc_AiaScale)
02398 {
02399 state = kPSeqMoreOK;
02400 }
02401 else
02402 {
02403 fprintf(stderr, "Unknown processing type %d.\n", (int)type);
02404 state = kPSeqError;
02405 }
02406 }
02407 break;
02408 case kPSeqReclim:
02409 {
02410 if (type == kProc_Noop || type == kProc_SuExport)
02411 {
02412 state = kPSeqNoMore;
02413 }
02414 else if (type == kProc_Reclimit)
02415 {
02416 fprintf(stderr, "Multiple record-limit statements.\n");
02417 state = kPSeqError;
02418 }
02419 else if (type == kProc_NotSpec || type == kProc_HgPatch || type == kProc_AiaScale)
02420 {
02421 state = kPSeqMoreOK;
02422 }
02423 else
02424 {
02425 fprintf(stderr, "Unknown processing type %d.\n", (int)type);
02426 state = kPSeqError;
02427 }
02428 }
02429 break;
02430 case kPSeqNoMore:
02431 {
02432 fprintf(stderr, "No additional processing allowed.\n");
02433 state = kPSeqError;
02434 }
02435 break;
02436 case kPSeqMoreOK:
02437 {
02438 if (type == kProc_Reclimit || type == kProc_Noop)
02439 {
02440 fprintf(stderr, "Multiple record-limit statements, or a processing step combined with a noop processing step.\n");
02441 state = kPSeqError;
02442 }
02443 else if (type == kProc_NotSpec || type == kProc_HgPatch || type == kProc_AiaScale)
02444 {
02445 state = kPSeqMoreOK;
02446 }
02447 else if (kProc_SuExport)
02448 {
02449 state = kPSeqNoMore;
02450 }
02451 else
02452 {
02453 fprintf(stderr, "Unknown processing type %d.\n", (int)type);
02454 state = kPSeqError;
02455 }
02456 }
02457 break;
02458 }
02459 }
02460
02461 if (state == kPSeqError)
02462 {
02463 rv = 1;
02464 }
02465 else
02466 {
02467 rv = 0;
02468 }
02469
02470 return rv;
02471 #endif
02472
02473 return 0;
02474 }
02475
02476 static void GenErrChkCmd(FILE *fptr)
02477 {
02478 fprintf(fptr, "set RUNSTAT = $status\nif ($RUNSTAT) goto EXITPLACE\n");
02479 }
02480
02481 static char *convertQuotes(const char *arg, DRMS_Record_t **export_rec, DRMS_Record_t **export_log, DRMS_RecordSet_t *exports_new, int irec)
02482 {
02483 char *ret = NULL;
02484 size_t sz;
02485 size_t bufSz;
02486 char bufCh[2] = {0};
02487 int pos;
02488 int error;
02489
02490
02491 int inSQString;
02492 int inEString;
02493 int inDQString;
02494 int inString;
02495 int lastE;
02496
02497
02498 int lastSQ;
02499
02500
02501 int lastEsc;
02502
02503
02504 if (!arg || strlen(arg) <= 0)
02505 {
02506 ErrorOutExpRec(export_rec, 4, "Missing record-set specification.");
02507 ErrorOutExpNewRec(exports_new, export_log, irec, 4, "Missing record-set specification.");
02508 return NULL;
02509 }
02510
02511 if (strlen(arg) >= 4096)
02512 {
02513 ErrorOutExpRec(export_rec, 4, "Record-set specification is too long.");
02514 ErrorOutExpNewRec(exports_new, export_log, irec, 4, "Record-set specification is too long.");
02515 return NULL;
02516 }
02517
02518 sz = strlen(arg);
02519 bufSz = sz;
02520 ret = calloc(1, bufSz);
02521
02522 if (!ret)
02523 {
02524 ErrorOutExpRec(export_rec, 4, "Out of memory.");
02525 ErrorOutExpNewRec(exports_new, export_log, irec, 4, "Out of memory.");
02526 return NULL;
02527 }
02528
02529
02530
02531
02532
02533
02534
02535
02536
02537
02538
02539
02540
02541
02542
02543
02544
02545
02546
02547
02548
02549
02550
02551
02552
02553 error = 0;
02554 for (pos = 0, inSQString = 0, inEString = 0, inDQString = 0, lastSQ = 0, lastE = 0, lastEsc = 0; pos < sz; pos++)
02555 {
02556 inString = (inSQString || inEString || inDQString);
02557
02558
02559 if (!inString)
02560 {
02561 if (lastE != 0)
02562 {
02563
02564 if (arg[pos] == '\'')
02565 {
02566 ret = base_strcatalloc(ret, "E'", &bufSz);
02567 inEString = 1;
02568 }
02569 else
02570 {
02571
02572 ret = base_strcatalloc(ret, (lastE == 1) ? "E" : "e", &bufSz);
02573 bufCh[0] = arg[pos];
02574 ret = base_strcatalloc(ret, bufCh, &bufSz);
02575 }
02576
02577 lastE = 0;
02578 }
02579 else
02580 {
02581 if (arg[pos] == '\'')
02582 {
02583 ret = base_strcatalloc(ret, "'", &bufSz);
02584 inSQString = 1;
02585 }
02586 else if (arg[pos] == 'E')
02587 {
02588 lastE = 1;
02589 }
02590 else if (arg[pos] == 'e')
02591 {
02592 lastE = 2;
02593 }
02594 else if (arg[pos] == '\"')
02595 {
02596
02597 ret = base_strcatalloc(ret, "'", &bufSz);
02598 inDQString = 1;
02599 }
02600 else
02601 {
02602 bufCh[0] = arg[pos];
02603 ret = base_strcatalloc(ret, bufCh, &bufSz);
02604 }
02605 }
02606 }
02607 else
02608 {
02609 if (inDQString)
02610 {
02611 if (arg[pos] == '\"')
02612 {
02613
02614 ret = base_strcatalloc(ret, "'", &bufSz);
02615 inDQString = 0;
02616 }
02617 else
02618 {
02619 bufCh[0] = arg[pos];
02620 ret = base_strcatalloc(ret, bufCh, &bufSz);
02621 }
02622 }
02623 else if (inSQString || inEString)
02624 {
02625 if (lastEsc)
02626 {
02627
02628 bufCh[0] = arg[pos];
02629 ret = base_strcatalloc(ret, bufCh, &bufSz);
02630 lastEsc = 0;
02631 }
02632 else if (lastSQ)
02633 {
02634 if (arg[pos] == '\'')
02635 {
02636
02637 ret = base_strcatalloc(ret, "''", &bufSz);
02638 }
02639 else
02640 {
02641
02642 ret = base_strcatalloc(ret, "'", &bufSz);
02643
02644 inSQString = 0;
02645 inEString = 0;
02646 lastSQ = 0;
02647
02648 pos--;
02649 }
02650
02651 lastSQ = 0;
02652 }
02653 else
02654 {
02655 if (arg[pos] == '\'')
02656 {
02657
02658 lastSQ = 1;
02659 }
02660 else
02661 {
02662 if (inEString && arg[pos] == '\\')
02663 {
02664 lastEsc = 1;
02665 }
02666
02667 bufCh[0] = arg[pos];
02668 ret = base_strcatalloc(ret, bufCh, &bufSz);
02669 }
02670 }
02671 }
02672 else
02673 {
02674
02675 ErrorOutExpRec(export_rec, 4, "Invalid SQL string in record-set specification.");
02676 ErrorOutExpNewRec(exports_new, export_log, irec, 4, "Invalid SQL string in record-set specification.");
02677 error = 1;
02678 break;
02679 }
02680 }
02681 }
02682
02683 if (lastE != 0)
02684 {
02685
02686 ret = base_strcatalloc(ret, (lastE == 1) ? "E" : "e", &bufSz);
02687 }
02688
02689 if (lastSQ && (inSQString || inEString))
02690 {
02691
02692
02693 if (pos > 0 && arg[pos - 1] == '\'')
02694 {
02695 ret = base_strcatalloc(ret, "'", &bufSz);
02696 inSQString = 0;
02697 inEString = 0;
02698 lastSQ = 0;
02699 }
02700 }
02701
02702 if (inSQString || inEString || inDQString || error)
02703 {
02704
02705 ErrorOutExpRec(export_rec, 4, "Invalid record-set specification.");
02706 ErrorOutExpNewRec(exports_new, export_log, irec, 4, "Invalid record-set specification.");
02707 free(ret);
02708 ret = NULL;
02709 }
02710
02711 return ret;
02712 }
02713
02714 static char *escapeArgument(const char *arg)
02715 {
02716 char *ret = NULL;
02717 size_t sz;
02718 size_t bufSz;
02719 char bufCh[2] = {0};
02720 int pos;
02721
02722
02723 if (strlen(arg) >= 4096)
02724 {
02725 fprintf(stderr, "Argument is too long.\n");
02726 return NULL;
02727 }
02728
02729 sz = strlen(arg);
02730 bufSz = sz;
02731 ret = calloc(1, bufSz);
02732
02733 if (!ret)
02734 {
02735 fprintf(stderr, "Out of memory.\n");
02736 return NULL;
02737 }
02738
02739 for (pos = 0; pos < sz; pos++)
02740 {
02741
02742
02743
02744 if (arg[pos] == '\n')
02745 {
02746 ret = base_strcatalloc(ret, "\\\\n", &bufSz);
02747 }
02748 else if (arg[pos] == '\t')
02749 {
02750 ret = base_strcatalloc(ret, "\\\\t", &bufSz);
02751 }
02752 else
02753 {
02754 if (!isalnum(arg[pos]))
02755 {
02756 ret = base_strcatalloc(ret, "\\", &bufSz);
02757 }
02758
02759 bufCh[0] = arg[pos];
02760 ret = base_strcatalloc(ret, bufCh, &bufSz);
02761 }
02762 }
02763
02764 return ret;
02765 }
02766
02767
02768 static int GenExpFitsCmd(FILE *fptr,
02769 DRMS_Record_t **export_rec,
02770 DRMS_Record_t **export_log,
02771 DRMS_RecordSet_t *exports_new,
02772 int irec,
02773 const char *proto,
02774 const char *dbmainhost,
02775 const char *requestid,
02776 const char *dataset,
02777 const char *RecordLimit,
02778 const char *filenamefmt,
02779 const char *method,
02780 const char *dbids)
02781 {
02782 char *protocol = strdup(proto);
02783 int rv = 0;
02784
02785 if (protocol)
02786 {
02787 char *cparms, *p = index(protocol, ',');
02788
02789
02790
02791
02792
02793 if (p)
02794 {
02795 *p = '\0';
02796 cparms = p+1;
02797 }
02798 else
02799 {
02800 cparms = "**NONE**";
02801 }
02802
02803
02804
02805
02806
02807
02808
02809
02810
02811
02812
02813
02814
02815
02816
02817 char *rssArgConv = convertQuotes(dataset, export_rec, export_log, exports_new, irec);
02818 char *rssArgEsc = NULL;
02819
02820 if (rssArgConv)
02821 {
02822 rssArgEsc = escapeArgument(rssArgConv);
02823 if (rssArgEsc)
02824 {
02825 fprintf(fptr, "jsoc_export_as_fits JSOC_DBHOST=%s reqid='%s' expversion=%s rsquery=%s n=%s path=$REQDIR ffmt='%s' method='%s' protocol='%s' %s\n", dbmainhost, requestid, PACKLIST_VER, rssArgEsc, RecordLimit, filenamefmt, method, protos[kProto_FITS], dbids);
02826 free(rssArgEsc);
02827 }
02828 else
02829 {
02830 rv = 1;
02831 }
02832
02833 free(rssArgConv);
02834 }
02835 else
02836 {
02837 rv = 1;
02838 }
02839
02840 GenErrChkCmd(fptr);
02841 }
02842 else
02843 {
02844 fprintf(stderr, "XX jsoc_export_manage FAIL - out of memory.\n");
02845 rv = 1;
02846 }
02847
02848 return rv;
02849 }
02850
02851
02852 static int GenPreProcessCmd(FILE *fptr,
02853 const char *progpath,
02854 const char *args,
02855 const char *dbhost,
02856 const char *dbids)
02857 {
02858 int rv = 0;
02859
02860 if (progpath)
02861 {
02862
02863
02864
02865 if (dbhost)
02866 {
02867 fprintf(fptr, "%s %s JSOC_DBHOST=%s %s\n", progpath, args, dbhost, dbids);
02868 }
02869 else
02870 {
02871 fprintf(fptr, "%s %s %s\n", progpath, args, dbids);
02872 }
02873 }
02874
02875 GenErrChkCmd(fptr);
02876
02877 return rv;
02878 }
02879
02880 static int GenProtoExpCmd(FILE *fptr,
02881 DRMS_Record_t **export_rec,
02882 DRMS_Record_t **export_log,
02883 DRMS_RecordSet_t *exports_new,
02884 int irec,
02885 const char *protocol,
02886 const char *dbmainhost,
02887 const char *dataset,
02888 const char *RecordLimit,
02889 const char *requestid,
02890 const char *method,
02891 const char *filenamefmt,
02892 const char *dbids)
02893 {
02894 int rv = 0;
02895
02896 if (strncasecmp(protocol, protos[kProto_FITS], strlen(protos[kProto_FITS])) == 0)
02897 {
02898 rv = (GenExpFitsCmd(fptr, export_rec, export_log, exports_new, irec, protocol, dbmainhost, requestid, dataset, RecordLimit, filenamefmt, method, dbids) != 0);
02899 }
02900 else if (strncasecmp(protocol, protos[kProto_MPEG], strlen(protos[kProto_MPEG])) == 0 ||
02901 strncasecmp(protocol, protos[kProto_JPEG], strlen(protos[kProto_JPEG])) == 0 ||
02902 strncasecmp(protocol, protos[kProto_PNG], strlen(protos[kProto_PNG])) == 0 ||
02903 strncasecmp(protocol, protos[kProto_MP4], strlen(protos[kProto_MP4])) == 0)
02904 {
02905 char *dupe = strdup(protocol);
02906 char *newproto = dupe;
02907 char *pcomma=index(newproto,',');
02908
02909 if (pcomma)
02910 *pcomma = '\0';
02911
02912 fprintf(fptr, "jsoc_export_as_images in='%s' reqid='%s' expversion='%s' n='%s' method='%s' outpath=$REQDIR ffmt='%s' cparms='%s'",
02913 dataset, requestid, PACKLIST_VER, RecordLimit, method, filenamefmt, "cparms is not needed");
02914
02915 fprintf(fptr, " protocol='%s'", newproto);
02916 while(pcomma)
02917 {
02918 newproto = pcomma+1;
02919 pcomma=index(newproto,',');
02920 if (pcomma)
02921 *pcomma = '\0';
02922 fprintf(fptr, " '%s'", newproto);
02923 }
02924 fprintf(fptr, "\n");
02925
02926 if (dupe)
02927 {
02928 free(dupe);
02929 }
02930 }
02931 else if (strncasecmp(protocol, protos[kProto_AsIs], strlen(protos[kProto_AsIs])) == 0)
02932 {
02933
02934
02935 fprintf(fptr, "jsoc_export_as_is JSOC_DBHOST=%s ds='%s' n=%s requestid='%s' method='%s' protocol='%s' filenamefmt='%s'\n",
02936 dbmainhost, dataset, RecordLimit, requestid, method, protos[kProto_AsIs], filenamefmt);
02937
02938 GenErrChkCmd(fptr);
02939
02940
02941 fprintf(fptr, "show_info JSOC_DBHOST=%s -ait ds='%s' n=%s > %s.keywords.txt\n",
02942 dbmainhost, dataset, RecordLimit, requestid);
02943 GenErrChkCmd(fptr);
02944 }
02945 else if (strncasecmp(protocol, protos[kProto_SuAsIs], strlen(protos[kProto_SuAsIs])) == 0)
02946 {
02947
02948 char *dupe = NULL;
02949 char *sunumList = NULL;
02950
02951 dupe = strdup(dataset);
02952 if (!dupe)
02953 {
02954 fprintf(stderr, "Out of memory.\n");
02955 rv = 1;
02956 }
02957 else
02958 {
02959 sunumList = strchr(dupe, '=');
02960
02961 if (!sunumList)
02962 {
02963 fprintf(stderr, "Bad format for DataSet column; should be 'sunum=...'.\n");
02964 rv = 1;
02965 }
02966 else
02967 {
02968 sunumList++;
02969
02970 fprintf(fptr, "jsoc_export_SU_as_is JSOC_DBHOST=%s ds='%s' requestid='%s'\n",
02971 dbmainhost, sunumList, requestid);
02972
02973 GenErrChkCmd(fptr);
02974
02975
02976 fprintf(fptr, "show_info JSOC_DBHOST=%s -ait sunum='%s' n=%s > %s.keywords.txt\n",
02977 dbmainhost, sunumList, RecordLimit, requestid);
02978 GenErrChkCmd(fptr);
02979 }
02980
02981 free(dupe);
02982 dupe = NULL;
02983 sunumList = NULL;
02984 }
02985 }
02986 else
02987 {
02988
02989 fprintf(stderr,
02990 "XX jsoc_export_manage FAILURE; invalid protocol, requestid=%s, protocol=%s, method=%s\n",
02991 requestid, protocol, method);
02992 rv = 1;
02993 }
02994
02995 return rv;
02996 }
02997
02998 static void FreeDataSetKw(void *val)
02999 {
03000 char **str = (char **)val;
03001
03002 if (str && *str)
03003 {
03004 free(*str);
03005 *str = NULL;
03006 }
03007 }
03008
03009
03010
03011
03012 static int DBCOMM(DRMS_Record_t **rec, const char *mbuf, int expstatus)
03013 {
03014 if (mbuf)
03015 {
03016 fprintf(stderr, "%s\n", mbuf);
03017 }
03018
03019 if (rec && *rec)
03020 {
03021 if (drms_setkey_int(*rec, "Status", expstatus))
03022 {
03023 return 1;
03024 }
03025
03026 drms_setkey_string(*rec, "errmsg", mbuf);
03027 drms_close_record(*rec, DRMS_INSERT_RECORD);
03028 *rec = NULL;
03029 }
03030
03031 return 0;
03032 }
03033
03034
03035 static int DBNEWCOMM(DRMS_RecordSet_t **exprecs, DRMS_Record_t **rec, int irec, const char *mbuf, int expstatus)
03036 {
03037 int closedrec = 0;
03038
03039 if (mbuf)
03040 {
03041 fprintf(stderr, "%s\n", mbuf);
03042 }
03043
03044 if (rec && *rec)
03045 {
03046 if (drms_setkey_int(*rec, "Status", expstatus))
03047 {
03048 return 1;
03049 }
03050
03051 drms_setkey_string(*rec, "errmsg", mbuf);
03052 drms_close_record(*rec, DRMS_INSERT_RECORD);
03053 *rec = NULL;
03054 closedrec = 1;
03055 }
03056
03057 if (exprecs && *exprecs)
03058 {
03059 if (closedrec)
03060 {
03061 (*exprecs)->records[irec] = NULL;
03062 }
03063 drms_close_records(*exprecs, DRMS_FREE_RECORD);
03064 *exprecs = NULL;
03065 }
03066
03067 return 0;
03068 }
03069
03070
03071 int DoIt(void)
03072 {
03073
03074 const char *op;
03075 const char *procser = NULL;
03076 char *dataset;
03077 char *requestid;
03078 char *process;
03079 char *requestor;
03080 long requestorid;
03081 char *notify;
03082 char *format;
03083 char *shipto;
03084 char *method;
03085 char *protocol;
03086 char *filenamefmt;
03087 char *errorreply;
03088 char reqdir[DRMS_MAXPATHLEN];
03089 char command[DRMS_MAXPATHLEN];
03090 char *RecordLimit = NULL;
03091 int doSuExport;
03092 long long size;
03093 TIME reqtime;
03094 TIME esttime;
03095 TIME exptime;
03096 TIME now;
03097 double waittime;
03098 DRMS_RecordSet_t *exports, *exports_new_orig, *exports_new;
03099 DRMS_RecordSet_t *requestor_rs;
03100 DRMS_Record_t *export_rec, *export_log;
03101 char requestorquery[DRMS_MAXQUERYLEN];
03102 int status = 0;
03103 FILE *fp;
03104 char runscript[DRMS_MAXPATHLEN];
03105 char *dashp;
03106 int testmode = 0;
03107 LinkedList_t *proccmds = NULL;
03108
03109 const char *dbname = NULL;
03110 const char *dbexporthost = NULL;
03111 const char *dbmainhost = NULL;
03112 const char *dbuser = NULL;
03113 const char *jsocroot = NULL;
03114 char dbids[128];
03115 char jsocrootstr[128] = {0};
03116 int pc = 0;
03117
03118 int procerr;
03119 ListNode_t *node = NULL;
03120 int quit = 0;
03121 char *now_at = NULL;
03122 char *progpath = NULL;
03123 char *args = NULL;
03124 const char *cdataset = NULL;
03125 const char *datasetout = NULL;
03126 char *exprecspec = NULL;
03127 char **snames = NULL;
03128 char **filts = NULL;
03129 char seriesin[DRMS_MAXSERIESNAMELEN];
03130 char seriesout[DRMS_MAXSERIESNAMELEN];
03131 int nsets;
03132 char *series = NULL;
03133 ProcStep_t *ndata = NULL;
03134 DRMS_RecQueryInfo_t info;
03135 char csname[DRMS_MAXSERIESNAMELEN];
03136 int iset;
03137 char msgbuf[1024];
03138 int submitcode = -1;
03139 DRMS_Env_t *seriesEnv = NULL;
03140
03141 if (nice_intro ()) return (0);
03142
03143 const char *testQuotes = cmdparams_get_str(&cmdparams, kArgTestConvQuotes, NULL);
03144 if (testQuotes)
03145 {
03146 if (strcmp(testQuotes, kArgValNotUsed) != 0)
03147 {
03148 char *quoted = convertQuotes(testQuotes, NULL, NULL, NULL, -1);
03149 if (quoted)
03150 {
03151 fprintf(stderr, "Quoted string:\n");
03152 fprintf(stderr, quoted);
03153 fprintf(stderr, "\n");
03154
03155 char *escaped = escapeArgument(quoted);
03156
03157 if (escaped)
03158 {
03159 fprintf(stderr, "Escaped string:\n");
03160 fprintf(stderr, escaped);
03161 fprintf(stderr, "\n");
03162
03163 free(escaped);
03164 }
03165
03166 free(quoted);
03167 }
03168 return 0;
03169 }
03170 }
03171
03172 testmode = (TESTMODE || cmdparams_isflagset(&cmdparams, kArgTestmode));
03173 submitcode = (testmode ? 12 : 2);
03174
03175 if ((dbmainhost = cmdparams_get_str(&cmdparams, "JSOC_DBMAINHOST", NULL)) == NULL)
03176 {
03177 dbmainhost = SERVER;
03178 }
03179
03180 if ((dbexporthost = cmdparams_get_str(&cmdparams, "JSOC_DBHOST", NULL)) == NULL)
03181 {
03182 dbexporthost = SERVER;
03183 }
03184
03185
03186
03187
03188 if ((dbname = cmdparams_get_str(&cmdparams, "JSOC_DBNAME", NULL)) == NULL)
03189 {
03190 dbname = DBNAME;
03191 }
03192
03193 if ((dbuser = cmdparams_get_str(&cmdparams, "JSOC_DBUSER", NULL)) == NULL)
03194 {
03195 dbuser = USER;
03196 }
03197
03198 if ((jsocroot = cmdparams_get_str(&cmdparams, "JSOCROOT", NULL)) != NULL)
03199 {
03200 snprintf(jsocrootstr, sizeof(jsocrootstr), "JSOCROOT_EXPORT=%s", jsocroot);
03201 }
03202
03203 if (dbname)
03204 {
03205 pc += snprintf(dbids + pc, sizeof(dbids) - pc, "JSOC_DBNAME=%s ", dbname);
03206 }
03207
03208 if (dbuser)
03209 {
03210 pc += snprintf(dbids + pc, sizeof(dbids) - pc, "JSOC_DBUSER=%s ", dbuser);
03211 }
03212
03213 op = cmdparams_get_str (&cmdparams, "op", NULL);
03214 procser = cmdparams_get_str(&cmdparams, kArgProcSeries, NULL);
03215
03216
03217
03218
03219
03220 if (strcmp(op,"process") == 0)
03221 {
03222 int irec;
03223 char ctlrecspec[1024];
03224
03225 snprintf(ctlrecspec, sizeof(ctlrecspec), "%s[][? Status=%d ?]", EXPORT_SERIES_NEW, submitcode);
03226 exports_new_orig = drms_open_records(drms_env, ctlrecspec, &status);
03227
03228 if (!exports_new_orig)
03229 DIE("Can not open RecordSet");
03230 if (exports_new_orig->n < 1)
03231 {
03232 drms_close_records(exports_new_orig, DRMS_FREE_RECORD);
03233 return(0);
03234 }
03235 exports_new = drms_clone_records(exports_new_orig, DRMS_PERMANENT, DRMS_SHARE_SEGMENTS, &status);
03236 if (!exports_new)
03237 DIE("Can not clone RecordSet");
03238 drms_close_records(exports_new_orig, DRMS_FREE_RECORD);
03239
03240 for (irec=0; irec < exports_new->n; irec++)
03241 {
03242 now = timenow();
03243 export_log = exports_new->records[irec];
03244
03245 status = drms_getkey_int(export_log, "Status", NULL);
03246 requestid = drms_getkey_string(export_log, "RequestID", NULL);
03247 dataset = drms_getkey_string(export_log, "DataSet", NULL);
03248 process = drms_getkey_string(export_log, "Processing", NULL);
03249 protocol = drms_getkey_string(export_log, "Protocol", NULL);
03250 filenamefmt= drms_getkey_string(export_log, "FilenameFmt", NULL);
03251 method = drms_getkey_string(export_log, "Method", NULL);
03252 format = drms_getkey_string(export_log, "Format", NULL);
03253 reqtime = drms_getkey_time(export_log, "ReqTime", NULL);
03254 esttime = drms_getkey_time(export_log, "EstTime", NULL);
03255 size = drms_getkey_longlong(export_log, "Size", NULL);
03256 requestorid = drms_getkey_int(export_log, "Requestor", NULL);
03257
03258
03259
03260
03261
03262
03263
03264
03265
03266
03267
03268
03269
03270
03271
03272
03273
03274
03275
03276 if (strncmp(dataset, "sunums=", 7) == 0)
03277 {
03278 doSuExport = 1;
03279 }
03280 else
03281 {
03282 doSuExport = 0;
03283 }
03284
03285 printf("New Request #%d/%d: %s, Status=%d, Processing=%s, DataSet=%s, Protocol=%s, Method=%s\n", irec, exports_new->n, requestid, status, process, dataset, protocol, method);
03286 fflush(stdout);
03287
03288 RegisterIntVar("requestid", 's', requestid);
03289
03290
03291 sprintf(requestorquery, "%s[:#%ld]", EXPORT_USER, requestorid);
03292 requestor_rs = drms_open_records(drms_env, requestorquery, &status);
03293 if (!requestor_rs)
03294 {
03295 return DBNEWCOMM(&exports_new, &export_log, irec, "JSOC error, Can't find requestor info series", 4);
03296
03297
03298 }
03299
03300 if (requestor_rs->n > 0)
03301 {
03302 DRMS_Record_t *rec = requestor_rs->records[0];
03303 notify = drms_getkey_string(rec, "Notify", NULL);
03304 if (*notify == '\0')
03305 notify = NULL;
03306 }
03307 else
03308 notify = NULL;
03309
03310 drms_close_records(requestor_rs, DRMS_FREE_RECORD);
03311
03312
03313
03314
03315 export_rec = drms_create_record(drms_env, EXPORT_SERIES, DRMS_PERMANENT, &status);
03316 if (!export_rec)
03317 {
03318 return DBNEWCOMM(&exports_new, &export_log, irec, "Cant create export control record", 4);
03319
03320
03321 }
03322
03323
03324
03325
03326 drms_setkey_string(export_rec, "RequestID", requestid);
03327 drms_setkey_string(export_rec, "DataSet", dataset);
03328 drms_setkey_string(export_rec, "Processing", process);
03329 drms_setkey_string(export_rec, "Protocol", protocol);
03330 drms_setkey_string(export_rec, "FilenameFmt", filenamefmt);
03331 drms_setkey_string(export_rec, "Method", method);
03332 drms_setkey_string(export_rec, "Format", format);
03333 drms_setkey_time(export_rec, "ReqTime", reqtime);
03334 drms_setkey_time(export_rec, "EstTime", esttime);
03335 drms_setkey_longlong(export_rec, "Size", size);
03336 drms_setkey_int(export_rec, "Requestor", requestorid);
03337
03338
03339 if (isbadDataSet() || isbadProcessing())
03340 {
03341 snprintf(msgbuf,
03342 sizeof(msgbuf),
03343 "Illegal format detected - security risk!\nRequestID= %s\n Processing = %s\n, DataSet=%s",
03344 requestid,
03345 process,
03346 dataset);
03347
03348 ErrorOutExpRec(&export_rec, 4, msgbuf);
03349 ErrorOutExpNewRec(exports_new, &export_log, irec, 4, msgbuf);
03350 continue;
03351 }
03352
03353
03354
03355
03356
03357
03358
03359
03360
03361
03362
03363
03364
03365
03366 RegisterShVar("$REQDIR", "$REQDIR");
03367 RegisterShVar("$HOSTNAME", "$HOSTNAME");
03368 RegisterShVar("$EXPSIZE", "$EXPSIZE");
03369
03370
03371
03372
03373
03374
03375
03376
03377
03378 int ppstat = 0;
03379
03380
03381
03382
03383
03384
03385
03386
03387 proccmds = ParseFields(drms_env, procser, dbmainhost, process, dataset, requestid, &RecordLimit, &ppstat);
03388 if (ppstat == 0)
03389 {
03390 snprintf(msgbuf, sizeof(msgbuf), "Invalid process field value: %s.", process);
03391 ErrorOutExpRec(&export_rec, 4, msgbuf);
03392 ErrorOutExpNewRec(exports_new, &export_log, irec, 4, msgbuf);
03393
03394
03395 continue;
03396 }
03397
03398
03399
03400
03401 if (!doSuExport)
03402 {
03403 if (ParseRecSetSpec(dataset, &snames, &filts, &nsets, &info))
03404 {
03405
03406
03407 snprintf(msgbuf, sizeof(msgbuf), "Unable to parse the export record specification '%s'.", dataset);
03408 ErrorOutExpRec(&export_rec, 4, msgbuf);
03409 ErrorOutExpNewRec(exports_new, &export_log, irec, 4, msgbuf);
03410 continue;
03411 }
03412 else if (nsets > 1)
03413 {
03414
03415
03416
03417
03418
03419
03420
03421
03422
03423
03424
03425 if (proccmds && list_llgetnitems(proccmds) != 0)
03426 {
03427 int iseries;
03428 const char *setname = NULL;
03429 const char *sname = NULL;
03430 int notSupported = 0;
03431 char dbHostAndPort[128];
03432 int makeNewEnv;
03433 const char *dbuser = NULL;
03434 const char *dbpasswd = NULL;
03435 DRMS_RecordSet_t *rs = NULL;
03436 char *converted = NULL;
03437 size_t szConverted;
03438 DRMS_Record_t *record = NULL;
03439 char recnumStr[64];
03440 int iRecConverted;
03441
03442 setname = snames[0];
03443
03444
03445
03446 for (iseries = 1; iseries < nsets; iseries++)
03447 {
03448 sname = snames[iseries];
03449
03450 if (strcasecmp(sname, setname) != 0)
03451 {
03452
03453 notSupported = 1;
03454 break;
03455 }
03456 }
03457
03458 if (notSupported)
03459 {
03460 snprintf(msgbuf, sizeof(msgbuf), "The export system does not currently support comma-separated lists of record-set specifications.");
03461 ErrorOutExpRec(&export_rec, 4, msgbuf);
03462 ErrorOutExpNewRec(exports_new, &export_log, irec, 4, msgbuf);
03463 FreeRecSpecParts(&snames, &filts, nsets);
03464 continue;
03465 }
03466
03467
03468
03469
03470
03471
03472 if (!seriesEnv)
03473 {
03474 #ifdef DRMS_CLIENT
03475
03476
03477
03478
03479
03480
03481 struct passwd *pwd = NULL;
03482
03483 makeNewEnv = 1;
03484
03485
03486
03487 pwd = getpwuid(geteuid());
03488 dbuser = pwd->pw_name;
03489
03490 #else
03491
03492 makeNewEnv = (strcasecmp(dbmainhost, drms_env->session->db_handle->dbhost) != 0);
03493 dbuser = drms_env->session->db_handle->dbuser;
03494 #endif
03495 if (makeNewEnv)
03496 {
03497 snprintf(dbHostAndPort, sizeof(dbHostAndPort), "%s:%s", dbmainhost, DRMSPGPORT);
03498 if ((seriesEnv = drms_open(dbHostAndPort, dbuser, NULL, DBNAME, NULL)) == NULL)
03499 {
03500 snprintf(msgbuf, sizeof(msgbuf), "Cannot access database containing series information.");
03501 ErrorOutExpRec(&export_rec, 4, msgbuf);
03502 ErrorOutExpNewRec(exports_new, &export_log, irec, 4, msgbuf);
03503 FreeRecSpecParts(&snames, &filts, nsets);
03504 continue;
03505 }
03506
03507 seriesEnv->logfile_prefix = module_name;
03508 drms_server_begin_transaction(seriesEnv);
03509 }
03510 else
03511 {
03512 seriesEnv = drms_env;
03513 }
03514 }
03515
03516 rs = drms_open_recordswithkeys(seriesEnv, dataset, "", &status);
03517 if (!rs || status != DRMS_SUCCESS)
03518 {
03519 snprintf(msgbuf, sizeof(msgbuf), "Cannot open record-set %s.\n", dataset);
03520 ErrorOutExpRec(&export_rec, 4, msgbuf);
03521 ErrorOutExpNewRec(exports_new, &export_log, irec, 4, msgbuf);
03522 FreeRecSpecParts(&snames, &filts, nsets);
03523 continue;
03524 }
03525
03526 if (rs->n == 0)
03527 {
03528
03529
03530
03531
03532 snprintf(msgbuf, sizeof(msgbuf), "Record-set %s contains no records.\n", dataset);
03533 ErrorOutExpRec(&export_rec, 4, msgbuf);
03534 ErrorOutExpNewRec(exports_new, &export_log, irec, 4, msgbuf);
03535 FreeRecSpecParts(&snames, &filts, nsets);
03536 continue;
03537 }
03538
03539
03540
03541
03542
03543
03544 szConverted = 256;
03545 converted = calloc(szConverted, sizeof(char));
03546
03547 if (!converted)
03548 {
03549 snprintf(msgbuf, sizeof(msgbuf), "Out of memory.\n");
03550 ErrorOutExpRec(&export_rec, 4, msgbuf);
03551 ErrorOutExpNewRec(exports_new, &export_log, irec, 4, msgbuf);
03552 FreeRecSpecParts(&snames, &filts, nsets);
03553 return 1;
03554 }
03555
03556 converted = base_strcatalloc(converted, setname, &szConverted);
03557 converted = base_strcatalloc(converted, "[", &szConverted);
03558
03559 for (iRecConverted = 0; iRecConverted < rs->n; iRecConverted++)
03560 {
03561 record = rs->records[iRecConverted];
03562 snprintf(recnumStr, sizeof(recnumStr), "%lld", record->recnum);
03563
03564 if (iRecConverted == 0)
03565 {
03566 converted = base_strcatalloc(converted, ":#" , &szConverted);
03567 }
03568 else
03569 {
03570 converted = base_strcatalloc(converted, ",#" , &szConverted);
03571
03572 }
03573
03574 converted = base_strcatalloc(converted, recnumStr, &szConverted);
03575 }
03576
03577 converted = base_strcatalloc(converted, "]", &szConverted);
03578
03579
03580
03581 if (dataset)
03582 {
03583
03584 free(dataset);
03585 }
03586
03587 dataset = converted;
03588
03589 ppstat = 0;
03590 list_llfree(&proccmds);
03591 proccmds = ParseFields(drms_env, procser, dbmainhost, process, dataset, requestid, &RecordLimit, &ppstat);
03592 }
03593 }
03594
03595 FreeRecSpecParts(&snames, &filts, nsets);
03596 }
03597
03598 drms_record_directory(export_rec, reqdir, 1);
03599
03600
03601 make_qsub_call(requestid, reqdir, notify, dbname, dbuser, dbids, dbexporthost, submitcode);
03602
03603
03604
03605
03606
03607
03608
03609
03610
03611
03612
03613 sprintf(runscript, "%s/%s.drmsrun", reqdir, requestid);
03614 fp = fopen(runscript, "w");
03615 fprintf(fp, "#! /bin/csh -f\n");
03616 fprintf(fp, "set echo\n");
03617 fprintf(fp, "set histchars\n");
03618
03619 fprintf(fp, "set_info_sock -C JSOC_DBHOST=%s ds='jsoc.export[%s]' Status=1\n", dbexporthost, requestid);
03620 fprintf(fp, "set RUNSTAT = $status\nif ($RUNSTAT) goto EXITPLACE\n");
03621
03622 fprintf(fp, "set REQDIR = `show_info_sock JSOC_DBHOST=%s -q -p 'jsoc.export[%s]'`\n", dbexporthost, requestid);
03623 fprintf(fp, "set RUNSTAT = $status\nif ($RUNSTAT) goto EXITPLACE\n");
03624
03625 fprintf(fp, "cd $REQDIR\n");
03626 fprintf(fp, "set RUNSTAT = $status\nif ($RUNSTAT) goto EXITPLACE\n");
03627
03628
03629 fprintf(fp, "echo Node = $HOSTNAME\n");
03630 fprintf(fp, "echo JSOC_DBHOST = %s, Processing DBHOST = %s\n", dbexporthost, dbmainhost);
03631 fprintf(fp, "echo SUdir = $REQDIR\n");
03632 fprintf(fp, "echo PATH = $PATH\n");
03633 fprintf(fp, "echo path = $path\n");
03634
03635
03636
03637
03638
03639
03640
03641
03642
03643
03644 if (IsBadProcSequence(proccmds))
03645 {
03646 list_llfree(&proccmds);
03647 snprintf(msgbuf, sizeof(msgbuf), "Bad sequence of processing steps, skipping recnum %lld.", export_rec->recnum);
03648 ErrorOutExpRec(&export_rec, 4, msgbuf);
03649 ErrorOutExpNewRec(exports_new, &export_log, irec, 4, msgbuf);
03650 fclose(fp);
03651 fp = NULL;
03652
03653
03654 continue;
03655 }
03656
03657
03658
03659
03660
03661 procerr = 0;
03662 quit = 0;
03663
03664 if (proccmds)
03665 {
03666 list_llreset(proccmds);
03667 }
03668
03669 datasetout = NULL;
03670
03671
03672 char fname[PATH_MAX];
03673 char *anArg = NULL;
03674 FILE *fpProc = NULL;
03675
03676 if (proccmds && list_llgetnitems(proccmds) > 0)
03677 {
03678 snprintf(fname, sizeof(fname), "%s/proc-steps.txt", reqdir);
03679 fpProc = fopen(fname, "w");
03680
03681 if (!fpProc)
03682 {
03683 list_llfree(&proccmds);
03684 snprintf(msgbuf, sizeof(msgbuf), "Unable to open file %s.", fname);
03685 ErrorOutExpRec(&export_rec, 4, msgbuf);
03686 ErrorOutExpNewRec(exports_new, &export_log, irec, 4, msgbuf);
03687 fclose(fp);
03688 fp = NULL;
03689
03690
03691 continue;
03692 }
03693 }
03694
03695 LinkedList_t *datasetkwlist = NULL;
03696 char *rsstr = NULL;
03697 int firstnode = 1;
03698 char *lhs = NULL;
03699 char *rhs = NULL;
03700 char *argsDup = NULL;
03701
03702 while (!quit && (node = list_llnext(proccmds)) != NULL)
03703 {
03704 ndata = (ProcStep_t *)node->data;
03705
03706
03707 fprintf(fpProc, "\nProcessing-step applied: %s\n", ndata->name);
03708 fprintf(fpProc, " argument\t\tvalue\n");
03709 fprintf(fpProc, " --------\t\t-----\n");
03710
03711
03712 argsDup = strdup(ndata->args);
03713 if (!argsDup)
03714 {
03715 snprintf(msgbuf, sizeof(msgbuf), "Out of memory .");
03716 quit = 1;
03717 break;
03718 }
03719
03720 for (anArg = strtok(argsDup, " ,"); !quit && anArg; anArg = strtok(NULL, " ,"))
03721 {
03722
03723
03724
03725 lhs = strdup(anArg);
03726 if (!lhs)
03727 {
03728 snprintf(msgbuf, sizeof(msgbuf), "Out of memory .");
03729 quit = 1;
03730 break;
03731 }
03732
03733 rhs = strchr(lhs, '=');
03734 if (rhs)
03735 {
03736 *rhs = '\0';
03737 rhs++;
03738 fprintf(fpProc, " %s\t\t%s\n", lhs, rhs);
03739 }
03740 else
03741 {
03742 fprintf(fpProc, " %s\n", lhs);
03743 }
03744
03745 free(lhs);
03746 lhs = NULL;
03747 }
03748
03749 if (quit)
03750 {
03751 break;
03752 }
03753
03754
03755
03756
03757 if (!datasetkwlist)
03758 {
03759 datasetkwlist = list_llcreate(sizeof(char *), (ListFreeFn_t)FreeDataSetKw);
03760 rsstr = strdup(ndata->input);
03761 list_llinserthead(datasetkwlist, &rsstr);
03762 }
03763
03764
03765
03766
03767 if (strcmp(ndata->input, ndata->output) != 0)
03768 {
03769 rsstr = strdup(ndata->output);
03770 list_llinserthead(datasetkwlist, &rsstr);
03771 }
03772
03773 cdataset = ndata->input;
03774 datasetout = ndata->output;
03775
03776
03777
03778
03779
03780
03781
03782
03783
03784 if (!doSuExport)
03785 {
03786 if (ParseRecSetSpec(cdataset, &snames, &filts, &nsets, &info))
03787 {
03788 snprintf(msgbuf, sizeof(msgbuf), "Invalid input series record-set query %s.", cdataset);
03789 quit = 1;
03790 break;
03791 }
03792 }
03793
03794
03795
03796 if (firstnode)
03797 {
03798 for (iset = 0, *csname = '\0'; iset < nsets; iset++)
03799 {
03800 series = snames[iset];
03801 if (*csname != '\0')
03802 {
03803 if (strcmp(series, csname) != 0)
03804 {
03805 snprintf(msgbuf, sizeof(msgbuf), "jsoc_export_manage FAILURE: attempt to export a recordset containing multiple input series.");
03806 quit = 1;
03807 break;
03808 }
03809 }
03810 else
03811 {
03812 snprintf(csname, sizeof(csname), "%s", series);
03813 }
03814 }
03815 }
03816 else if (nsets > 0)
03817 {
03818 snprintf(csname, sizeof(csname), "%s", snames[0]);
03819 }
03820 else
03821 {
03822 snprintf(msgbuf, sizeof(msgbuf), "No input series.\n");
03823 quit = 1;
03824 }
03825
03826 FreeRecSpecParts(&snames, &filts, nsets);
03827
03828 if (quit)
03829 {
03830 break;
03831 }
03832
03833 snprintf(seriesin, sizeof(seriesin), "%s", csname);
03834
03835 if (firstnode)
03836 {
03837 firstnode = 0;
03838 if (!SeriesExists(drms_env, seriesin, dbmainhost, &status) || status)
03839 {
03840 snprintf(msgbuf, sizeof(msgbuf), "Input series %s does not exist.", csname);
03841 quit = 1;
03842 break;
03843 }
03844 }
03845
03846
03847
03848
03849
03850
03851
03852
03853 if (!doSuExport)
03854 {
03855 if (ParseRecSetSpec(datasetout, &snames, &filts, &nsets, &info))
03856 {
03857 snprintf(msgbuf, sizeof(msgbuf), "Invalid output series record-set query %s.", datasetout);
03858 quit = 1;
03859 break;
03860 }
03861 }
03862
03863
03864
03865 for (iset = 0, *csname = '\0'; iset < nsets; iset++)
03866 {
03867 series = snames[iset];
03868 if (*csname != '\0')
03869 {
03870 if (strcmp(series, csname) != 0)
03871 {
03872 snprintf(msgbuf, sizeof(msgbuf), "jsoc_export_manage FAILURE: attempt to export a recordset to multiple output series.");
03873 quit = 1;
03874 break;
03875 }
03876 }
03877 else
03878 {
03879 snprintf(csname, sizeof(csname), "%s", series);
03880 }
03881 }
03882
03883 FreeRecSpecParts(&snames, &filts, nsets);
03884
03885 if (quit)
03886 {
03887 break;
03888 }
03889
03890 snprintf(seriesout, sizeof(seriesout), "%s", csname);
03891
03892
03893
03894
03895
03896
03897
03898 if (!ndata->crout)
03899 {
03900 if (!SeriesExists(drms_env, seriesout, dbmainhost, &status) || status)
03901 {
03902 snprintf(msgbuf, sizeof(msgbuf), "Output series %s does not exist.", csname);
03903 quit = 1;
03904 break;
03905 }
03906 }
03907 else
03908 {
03909
03910 char cloneargs[512];
03911
03912 snprintf(cloneargs, sizeof(cloneargs), "dsin=%s dsout=%s", seriesin, seriesout);
03913
03914 procerr = GenPreProcessCmd(fp,
03915 "jsoc_export_clone",
03916 cloneargs,
03917 dbmainhost,
03918 dbids);
03919 }
03920
03921 if (!procerr)
03922 {
03923 progpath = ((ProcStep_t *)node->data)->path;
03924 args = ((ProcStep_t *)node->data)->args;
03925 }
03926
03927 procerr = GenPreProcessCmd(fp,
03928 progpath,
03929 args,
03930 dbmainhost,
03931 dbids);
03932
03933 if (procerr)
03934 {
03935 snprintf(msgbuf, sizeof(msgbuf), "Problem running processing command '%s'.", progpath);
03936 quit = 1;
03937 break;
03938 }
03939 }
03940
03941
03942
03943
03944 if (!quit)
03945 {
03946 if (datasetout)
03947 {
03948 exprecspec = strdup(datasetout);
03949 }
03950 else
03951 {
03952 exprecspec = strdup(dataset);
03953 }
03954
03955 if (fpProc)
03956 {
03957 fclose(fpProc);
03958 fpProc = NULL;
03959 }
03960 }
03961
03962 if (proccmds)
03963 {
03964 list_llfree(&proccmds);
03965 proccmds = NULL;
03966 }
03967
03968 if (quit)
03969 {
03970 ErrorOutExpRec(&export_rec, 4, msgbuf);
03971 ErrorOutExpNewRec(exports_new, &export_log, irec, 4, msgbuf);
03972 fclose(fp);
03973 fp = NULL;
03974
03975
03976 continue;
03977 }
03978
03979
03980 if (datasetkwlist)
03981 {
03982 char **recsetstr = NULL;
03983 char *datasetkw = NULL;
03984 size_t datasetkwsz = 128;
03985 int ft = 1;
03986
03987 list_llreset(datasetkwlist);
03988 datasetkw = malloc(datasetkwsz);
03989 *datasetkw = '\0';
03990
03991
03992 while ((node = list_llnext(datasetkwlist)) != 0)
03993 {
03994 recsetstr = (char **)node->data;
03995
03996 if (recsetstr && *recsetstr)
03997 {
03998 if (ft)
03999 {
04000 datasetkw = base_strcatalloc(datasetkw, *recsetstr, &datasetkwsz);
04001 ft = 0;
04002 }
04003 else
04004 {
04005 datasetkw = base_strcatalloc(datasetkw, "|", &datasetkwsz);
04006 datasetkw = base_strcatalloc(datasetkw, *recsetstr, &datasetkwsz);
04007 }
04008 }
04009 else
04010 {
04011 snprintf(msgbuf, sizeof(msgbuf), "Problem obtaining a record-set specification string.");
04012 quit = 1;
04013 break;
04014 }
04015 }
04016
04017 if (quit)
04018 {
04019 ErrorOutExpRec(&export_rec, 4, msgbuf);
04020 ErrorOutExpNewRec(exports_new, &export_log, irec, 4, msgbuf);
04021 fclose(fp);
04022 fp = NULL;
04023
04024
04025 continue;
04026 }
04027
04028 drms_setkey_string(export_rec, "DataSet", datasetkw);
04029 free(datasetkw);
04030 datasetkw = NULL;
04031
04032 list_llfree(&datasetkwlist);
04033 }
04034
04035
04036
04037
04038
04039
04040 procerr = GenProtoExpCmd(fp,
04041 &export_rec,
04042 &export_log,
04043 exports_new,
04044 irec,
04045 protocol,
04046 dbmainhost,
04047 exprecspec,
04048 RecordLimit,
04049 requestid,
04050 method,
04051 filenamefmt,
04052 dbids);
04053
04054 if (procerr)
04055 {
04056 snprintf(msgbuf, sizeof(msgbuf), "Problem running protocol-export command.");
04057 ErrorOutExpRec(&export_rec, 4, msgbuf);
04058 ErrorOutExpNewRec(exports_new, &export_log, irec, 4, msgbuf);
04059 fclose(fp);
04060 fp = NULL;
04061
04062
04063 if (dataset)
04064 {
04065 free(dataset);
04066 }
04067
04068 if (exprecspec)
04069 {
04070 free(exprecspec);
04071 }
04072
04073 continue;
04074 }
04075
04076 if (exprecspec)
04077 {
04078 free(exprecspec);
04079 }
04080
04081 CloseDBHandle();
04082
04083
04084
04085 fprintf(fp, "jsoc_export_make_index\n");
04086 fprintf(fp, "set RUNSTAT = $status\nif ($RUNSTAT) goto EXITPLACE\n");
04087
04088
04089
04090 fprintf(fp, "set EXPSIZE = `extractexpsize.pl $REQDIR/index.json`\n");
04091 GenErrChkCmd(fp);
04092 fprintf(fp, "set_info_sock JSOC_DBHOST=%s ds='jsoc.export[%s]' Size=$EXPSIZE\n", dbexporthost, requestid);
04093
04094
04095 dashp = rindex(method, '-');
04096 if (dashp && strcmp(dashp, "-tar") == 0)
04097 {
04098
04099
04100
04101
04102
04103
04104 fprintf(fp, "tar chf ../%s.tar ./\n", requestid);
04105 fprintf(fp, "set RUNSTAT = $status\nif ($RUNSTAT) goto EXITPLACE\n");
04106
04107 fprintf(fp, "find . -not -path . -not -name '%s.*' -not -name 'index.*' -print0 | xargs -0 -L 32 rm -rf\n", requestid);
04108 fprintf(fp, "set RUNSTAT = $status\nif ($RUNSTAT) goto EXITPLACE\n");
04109
04110 fprintf(fp, "mv ../%s.tar .\n", requestid);
04111 fprintf(fp, "set RUNSTAT = $status\nif ($RUNSTAT) goto EXITPLACE\n");
04112
04113
04114
04115 }
04116
04117
04118
04119 fprintf(fp, "set DoneTime = `date -u '+%%Y.%%m.%%d_%%H:%%M:%%S_UT'`\n");
04120 fprintf(fp, "set_info_sock JSOC_DBHOST=%s ds='jsoc.export[%s]' Status=0 ExpTime=$DoneTime\n", dbexporthost, requestid);
04121
04122 fprintf(fp, "cp /home/jsoc/exports/tmp/%s.runlog ./%s.runlog \n", requestid, requestid);
04123
04124
04125 fprintf(fp, "EXITPLACE:\n");
04126 fprintf(fp, "if ($RUNSTAT) then\n");
04127 fprintf(fp, " echo XXXXXXXXXXXXXXXXXXXXXXX ERROR EXIT XXXXXXXXXXXXXXXXXXXXXXXXXXX\nprintenv\n");
04128 fprintf(fp, "endif\n");
04129
04130 fprintf(fp, "show_info_sock JSOC_DBHOST=%s -q -r 'jsoc.export[%s]' > /home/jsoc/exports/tmp/%s.recnum \n", dbexporthost, requestid, requestid);
04131 fprintf(fp, "set LOCKSTAT = $status\n");
04132 fprintf(fp, "if ($LOCKSTAT) then\n");
04133
04134 fprintf(fp, " show_info JSOC_DBHOST=%s -q -r 'jsoc.export[%s]' > /home/jsoc/exports/tmp/%s.recnum \n", dbexporthost, requestid, requestid);
04135 fprintf(fp, "endif\n");
04136 fprintf(fp, "exit $RUNSTAT\n");
04137 fclose(fp);
04138 chmod(runscript, 0555);
04139
04140
04141
04142 sprintf(command,"qsub -q exp.q -v %s "
04143 " -o /home/jsoc/exports/tmp/%s.runlog "
04144 " -e /home/jsoc/exports/tmp/%s.runlog "
04145 " %s/%s.qsub ",
04146 jsocrootstr, requestid, requestid, reqdir, requestid);
04147
04148
04149
04150
04151 if (system(command))
04152 {
04153 return DBCOMM(&export_rec, "Submission of qsub command failed", 4);
04154
04155
04156
04157 }
04158
04159
04160
04161 drms_setkey_int(export_rec, "Status", 1);
04162 drms_close_record(export_rec, DRMS_INSERT_RECORD);
04163
04164
04165
04166
04167
04168
04169
04170 drms_setkey_int(export_log, "Status", 1);
04171 drms_close_record(export_log, DRMS_INSERT_RECORD);
04172 exports_new->records[irec] = NULL;
04173
04174 printf("Request %s submitted\n",requestid);
04175
04176
04177 }
04178
04179
04180 if (seriesEnv != NULL && seriesEnv != drms_env)
04181 {
04182
04183
04184
04185 drms_server_end_transaction(seriesEnv, 1, 1);
04186 }
04187
04188
04189
04190 drms_close_records(exports_new, DRMS_FREE_RECORD);
04191
04192 return(0);
04193 }
04194 else if (strcmp(op, "SOMETHINGELSE") == 0)
04195 {
04196 }
04197 else
04198 DIE("Operation not allowed");
04199 return(1);
04200 }
04201