00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "jsoc_main.h"
00012 #include "drms.h"
00013 #include "drms_names.h"
00014 #include <SUM.h>
00015 #include <soi_error.h>
00016 #include <sys/errno.h>
00017 #include <rpc/rpc.h>
00018 #include <rpc/pmap_clnt.h>
00019 #include <signal.h>
00020 #include <sum_rpc.h>
00021 #include <printk.h>
00022
00023 ModuleArgs_t module_args[] =
00024 {
00025 {ARG_STRING, "server", "", "host with sum_pe_svc"},
00026 {ARG_STRING, "keyfile", "", "file with peq keylist"},
00027 {ARG_STRING, "logdir", "/usr/local/logs/SUM", "dir for log file"},
00028 {ARG_FLAG, "v", "0", "verbose flag"},
00029 {ARG_FLAG, "h", "0", "help - print usage info"},
00030 {ARG_END}
00031 };
00032
00033 int file2keylist(char *filename, KEY **list);
00034 void logkey();
00035 KEY *getsumpe(KEY *params);
00036 struct timeval TIMEOUT = { 20, 0 };
00037
00038 static KEY *retlist;
00039 char *module_name = "sum_pe";
00040 char *logdir, *db, *keyfile, *server;
00041 char datestr[32];
00042 char logname[MAX_STR];
00043 int pid;
00044 int dellog = 1;
00045 uint32_t rinfo;
00046 FILE *logfp;
00047 SVCXPRT *glb_transp;
00048 CLIENT *current_client;
00049
00050 int open_log(char *filename)
00051 {
00052 if((logfp=fopen(filename, "w")) == NULL) {
00053 fprintf(stderr, "Can't open the log file %s\n", filename);
00054 return(1);
00055 }
00056 return(0);
00057 }
00058
00059
00060
00061 int write_log(const char *fmt, ...)
00062 {
00063 va_list args;
00064 char string[4096];
00065
00066 va_start(args, fmt);
00067 vsprintf(string, fmt, args);
00068 if(logfp) {
00069 fprintf(logfp, string);
00070 fflush(logfp);
00071 }
00072 else
00073 fprintf(stderr, string);
00074 va_end(args);
00075 return(0);
00076 }
00077
00078
00079 static char *datestring()
00080 {
00081 struct timeval tvalr;
00082 struct tm *t_ptr;
00083
00084 gettimeofday(&tvalr, NULL);
00085 t_ptr = localtime((const time_t *)&tvalr);
00086 sprintf(datestr, "%s", asctime(t_ptr));
00087 datestr[19] = (char)NULL;
00088 return(&datestr[4]);
00089 }
00090
00091 void sighandler(int sig)
00092 {
00093 if(sig == SIGTERM) {
00094 printk("*** %s sum_pe_svc got SIGTERM. Exiting.\n", datestring());
00095 exit(1);
00096 }
00097 if(sig == SIGINT) {
00098 printk("*** %s sum_pe_svc got SIGINT. Exiting.\n", datestring());
00099 exit(1);
00100 }
00101 printk("*** %s sum_pe_svc got an illegal signal %d, ignoring...\n",
00102 datestring(), sig);
00103 if (signal(SIGINT, SIG_IGN) != SIG_IGN)
00104 signal(SIGINT, sighandler);
00105 if (signal(SIGALRM, SIG_IGN) != SIG_IGN)
00106 signal(SIGALRM, sighandler);
00107 }
00108
00109
00110 int setup () {
00111 pid = getpid();
00112 sprintf(logname, "%s/sum_pe_%d.log", logdir, pid);
00113 if(open_log(logname)) return(1);
00114 printk_set(write_log, write_log);
00115 printk("%s\nStarting sum_pe for database = %s\nserver=%s\nkeyfile = %s\nlogfile = %s\n\n", datestring(), db, server, keyfile, logname);
00116 if (signal(SIGINT, SIG_IGN) != SIG_IGN)
00117 signal(SIGINT, sighandler);
00118 if (signal(SIGTERM, SIG_IGN) != SIG_IGN)
00119 signal(SIGTERM, sighandler);
00120 if (signal(SIGALRM, SIG_IGN) != SIG_IGN)
00121 signal(SIGALRM, sighandler);
00122 return(0);
00123 }
00124
00125 int nice_intro(int usage) {
00126 if (usage)
00127 {
00128 printf ("Usage:\nsum_pe [-v] server=hostname keyfile=file "
00129 "[logdir=<dir for log file>] dbname\n"
00130 " details are:\n"
00131 " -v: verbose mode\n"
00132 "server=host where sum_pe_svc runs\n"
00133 "keyfile=filename containing peq generated keylist\n"
00134 "logdir=dir to put the log file in\n"
00135 "dbname is the DRMS data base to connect to, e.g. jsoc\n");
00136 return(1);
00137 }
00138 return (0);
00139 }
00140
00141 void drms_print_query_rec(DRMS_Record_t *rec)
00142 {
00143 int iprime, nprime;
00144 DRMS_Keyword_t *rec_key, *key, **prime_keys;
00145 printk("%s\n",rec->seriesinfo->seriesname);
00146 nprime = rec->seriesinfo->pidx_num;
00147 prime_keys = rec->seriesinfo->pidx_keywords;
00148 printk("!!!!TEMP in drms_print_query_rec()\n");
00149 if (nprime > 0)
00150 {
00151 for (iprime = 0; iprime < nprime; iprime++)
00152 {
00153 key = prime_keys[iprime];
00154 rec_key = drms_keyword_lookup (rec, key->info->name, 1);
00155 printk("[");
00156 if (key->info->type != DRMS_TYPE_STRING)
00157 drms_keyword_printval (rec_key);
00158 else
00159 {
00160 printk("\"");
00161 drms_keyword_printval (rec_key);
00162 printk("\"");
00163 }
00164 printk("]\n");
00165 }
00166 }
00167 else
00168 printk("[:#%lld]",rec->recnum);
00169 }
00170
00171
00172
00173
00174 int DoIt(void) {
00175 int status = 0;
00176 uint32_t sumpeback;
00177 char cmd[128];
00178 char *call_err;
00179 KEY *list=newkeylist();
00180 CLIENT *clntsumpesvc;
00181
00182
00183
00184 int usage = cmdparams_get_int (&cmdparams, "h", NULL);
00185 logdir = cmdparams_get_str (&cmdparams, "logdir", NULL);
00186 server = cmdparams_get_str (&cmdparams, "server", NULL);
00187 keyfile = cmdparams_get_str (&cmdparams, "keyfile", NULL);
00188 if(nice_intro(usage)) return (0);
00189 if(cmdparams_numargs(&cmdparams) >= 1 && (db = cmdparams_getarg (&cmdparams, 1))) {
00190
00191 }
00192 else {
00193 nice_intro(1);
00194 return(0);
00195 }
00196 if(setup()) return(1);
00197 if(file2keylist(keyfile, &list)) {
00198 printk("Error in file2keylist() for %s\n", keyfile);
00199 return(1);
00200 }
00201 sprintf(cmd, "/bin/rm -f %s", keyfile);
00202 system(cmd);
00203 keyiterate(logkey, list);
00204
00205 clntsumpesvc = clnt_create(server, SUMPEPROG, SUMPEVERS, "tcp");
00206 if(!clntsumpesvc) {
00207 clnt_pcreateerror("Can't get client handle to sum_pe_svc");
00208 printk("sum_pe_svc not there on %s\n", server);
00209 return(1);
00210 }
00211
00212 retlist = getsumpe(list);
00213
00214 status = clnt_call(clntsumpesvc,SUMPEACK, (xdrproc_t)xdr_Rkey,
00215 (char *)retlist, (xdrproc_t)xdr_uint32_t, (char *)&sumpeback, TIMEOUT);
00216 if(status != RPC_SUCCESS) {
00217 call_err = clnt_sperror(clntsumpesvc, "Err clnt_call for SUMPEACK");
00218 printk("%s %s\n", datestring(), call_err);
00219 if(status != RPC_TIMEDOUT) {
00220 return(status);
00221 }
00222 }
00223 if(sumpeback != 0) {
00224 printk("Error status= %d from clnt_call to SUMPEACK = %d\n", sumpeback);
00225 status = 1;
00226 }
00227 if(dellog) {
00228 sprintf(cmd, "/bin/rm -f %s", logname);
00229 system(cmd);
00230 }
00231 return(0);
00232 }
00233
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270 KEY *getsumpe(KEY *params)
00271 {
00272 int reqcnt;
00273 int status;
00274 DRMS_RecordSet_t *recordset;
00275 DRMS_Record_t *rec;
00276 int xdirflg = 0;
00277 int first_rec, last_rec, nrecs, irec, retrieve_flg, i;
00278 char name[80], value[80], cmd[80], xdir[128], errmsg[128];
00279 char path[DRMS_MAXPATHLEN];
00280
00281 char *in, *cptr;
00282 FILE *infile;
00283
00284
00285
00286 retlist = newkeylist();
00287 add_keys(params, &retlist);
00288 setkey_fileptr(&retlist, "current_client", getkey_fileptr(params, "current_client"));
00289 setkey_int(&retlist, "REQCODE", PEPEQRESPDO);
00290 reqcnt = getkey_int(params, "in_nsets");
00291 for(i=0; i < reqcnt; i++) {
00292 in = getkey_str(params, "in");
00293
00294 if(strstr(in, "ds_mdi.fd_V_01h_lev1_8") ||
00295 strstr(in, "ds_mdi.fd_V_30s_01h_lev1_8") ||
00296 strstr(in, "ds_mdi.fd_M_96m_01d_lev1_8") ||
00297 strstr(in, "ds_mdi.vw_V_06h_lev1_8") ||
00298 strstr(in, "ds_mdi.fd_M_01h_lev1_8")) {
00299 xdirflg = 1;
00300 }
00301 retrieve_flg = getkey_int(params, "retrieve_flg");
00302 printk("%s\nretrieve_flg in sumpedo_1 = %d\n", datestring(), retrieve_flg);
00303
00304 recordset = drms_open_records (drms_env, in, &status);
00305 if (status) {
00306 printk("drms_open_records failed, in=%s, status=%d.\n", in, status);
00307 setkey_int(&retlist, "STATUS", 1);
00308 sprintf(errmsg, "Err: see on sum host: %s\n", logname);
00309 dellog = 0;
00310 setkey_str(&retlist, "ERRMSG", errmsg);
00311 return(retlist);
00312 }
00313
00314
00315
00316 nrecs = recordset->n;
00317 printk("#of records in %s = %d\n", in, nrecs);
00318 if (nrecs == 0) {
00319 printk ("** No records in selected data set **\n");
00320 setkey_int(&retlist, "STATUS", 1);
00321 sprintf(errmsg, "** No records in selected data set **\n");
00322 setkey_str(&retlist, "ERRMSG", errmsg);
00323 return(retlist);
00324 }
00325 last_rec = nrecs - 1;
00326 first_rec = 0;
00327 for (irec = first_rec; irec <= last_rec; irec++) {
00328 rec = recordset->records[irec];
00329
00330 drms_record_directory (rec, path, retrieve_flg);
00331 if(xdirflg && strcmp(path, "")) {
00332 sprintf(cmd, "/bin/ls %s", path);
00333 if(!(infile = popen(cmd, "r"))) {
00334 printk("Can't do popen() for %s\n", cmd);
00335 setkey_int(&retlist, "STATUS", 1);
00336 sprintf(errmsg, "Err: Can't do popen() for %s\n", cmd);
00337 setkey_str(&retlist, "ERRMSG", errmsg);
00338 return(retlist);
00339 }
00340 if(!fgets(xdir, 128, infile)) {
00341 printk("Can't get the extra dir for special ds\n");
00342 setkey_int(&retlist, "STATUS", 1);
00343 sprintf(errmsg, "Err: Can't get the extra dir for special ds\n");
00344 setkey_str(&retlist, "ERRMSG", errmsg);
00345 return(retlist);
00346 }
00347 if(cptr = rindex(xdir, '\n')) *cptr = NULL;
00348 sprintf(path, "%s/%s", path, xdir);
00349 pclose(infile);
00350 }
00351 sprintf(name, "in_%d_wd", irec);
00352 setkey_str(&retlist, name, path);
00353 printk("%s %s\n", name, path);
00354 sprintf(name, "inname_%d", irec);
00355 sprintf(value, "in_%d", irec);
00356 setkey_str(&retlist, name, value);
00357 if(!strcmp(path, "")) {
00358 sprintf(name, "status_%d", irec);
00359 setkey_int(&retlist, name, DS_ARCHIVE);
00360 }
00361 else {
00362 sprintf(name, "status_%d", irec);
00363 setkey_int(&retlist, name, 0);
00364 }
00365 }
00366 drms_close_records(recordset, DRMS_FREE_RECORD);
00367 }
00368 setkey_int(&retlist, "in_nsets", nrecs);
00369 setkey_int(&retlist, "STATUS", 0);
00370 return(retlist);
00371 }