00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074 #include "jsoc_main.h"
00075 #include "drms.h"
00076 #include "drms_names.h"
00077 #include "json.h"
00078 #include <time.h>
00079 #include <unistd.h>
00080 #include <sys/types.h>
00081 #include <sys/socket.h>
00082 #include <netinet/in.h>
00083
00084
00085
00086 #define QUERY_MAX 1024
00087
00088 #define TIMESTR_MAX 32
00089
00090 #define JSON_MIME "Content-type: application/json\n\n"
00091
00092 #define ARG_UNDEF "@"
/*
* singletons to encapsulate the overall server and query state
*/
// query
typedef struct {
// times
double t0;
double t1;
char t0_str[TIMESTR_MAX];
char t1_str[TIMESTR_MAX];
// everything else
char line[QUERY_MAX];
char op [QUERY_MAX];
char ds [QUERY_MAX];
char emsg[QUERY_MAX]; // error message, if nonzero status
char remote_host[MAXHOSTNAMELEN];
int max_recs;
int status; // nonzero for error
} query_bag_t;
// server
typedef struct {
char hostname[MAXHOSTNAMELEN];
int port; // port (0 if not open)
char *port_fn; // filename to put port number in (or NULL)
FILE *log; // server log file
char *log_fn; // log filename
int preserve_log; // preserve log file after exit?
unsigned int
export_num; // number of export requests
unsigned long
export_len; // length of response
double
timeout_time; // seconds until server times out
int timeout; // has the server timed out?
int running; // 0 if we are shutting it down
int sock; // socket
int connfd; // connection fd
int head; // attach mime head?
} server_bag_t;
// set according to verbosity input parameter
static int verbflag;
// log file pointers (initialized to NULL)
static FILE *LOGout = NULL;
static FILE *LOGerr = NULL;
/*************************************************************
*
* ERROR HANDLING/LOGGING MACROS
*
*************************************************************
*/
// V_printf: facilitate verbflag output
// if flag is > 0, output is to stdout+LOGout, if < 0, to stderr+LOGerr
// if flag is 0, no output is made at all
// If LOGout/LOGerr are NULL, they are skipped.
// It is legal to have LOGerr = stdout, so that error messages are
// duplicated to stdout.
// The message is printed in the form:
// <message> (if preamble is NULL)
// <module_name>: <preamble><message> (if preamble is non-NULL)
// <module_name>: <preamble+1><message>\n (if non-NULL && *preamble==\n)
// Usage:
// V_printf(verbflag > 0, "\t", "Mask(%d) = %d\n", 2048, mask[2048]);
00093 void
00094 V_printf(int flag, char *preamble, char *format, ...) {
00095 va_list args;
00096 extern char *module_name;
00097 extern FILE *LOGout;
00098 extern FILE *LOGerr;
00099 FILE *fp = (flag > 0) ? stdout : stderr;
00100 FILE *lp = (flag > 0) ? LOGout : LOGerr;
00101 int append_newline;
00102
00103 if (flag != 0) {
00104
00105 va_start(args, format);
00106 append_newline = 0;
00107 if (preamble) {
00108 if (preamble[0] == '\n') {
00109 append_newline = 1;
00110 preamble++;
00111 }
00112 fprintf(fp, "%s: %s", module_name, preamble);
00113 }
00114 vfprintf(fp, format, args);
00115 if (append_newline)
00116 fprintf(fp, "\n");
00117 fflush(fp);
00118 va_end(args);
00119
00120 if (lp) {
00121 va_start(args, format);
00122 if (preamble)
00123 fprintf(lp, "%s: %s", module_name, preamble);
00124 vfprintf(lp, format, args);
00125 if (append_newline)
00126 fprintf(fp, "\n");
00127 fflush(lp);
00128 va_end(args);
00129 }
00130 } else {
00131 va_start(args, format);
00132
00133 va_end(args);
00134 }
00135 }
00136
00137
00138
00139
00140
00141
00142
00143 #define DIE(...) do { \
00144 fflush(stdout); \
00145 V_printf(-1, "\nFATAL. ", __VA_ARGS__); \
00146 if (LOGout) fclose(LOGout); \
00147 if (LOGerr) fclose(LOGerr); \
00148 return 1; \
00149 } while (0)
00150
00151
00152 #define WARN(...) do { \
00153 fflush(stdout); \
00154 V_printf(-1, "\nWARNING (continuing). ", __VA_ARGS__); \
00155 } while (0)
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169 char *drms_record_getlogdir(DRMS_Record_t *rec)
00170 {
00171 char *logpath;
00172 char query[DRMS_MAXQUERYLEN];
00173 DB_Text_Result_t *qres;
00174
00175 sprintf(query, "select sunum from %s.drms_session where sessionid=%lld", rec->sessionns, rec->sessionid);
00176 if ((qres = drms_query_txt(drms_env->session, query)) && qres->num_rows>0)
00177 {
00178 if (qres->field[0][0][0] == '\0')
00179 logpath = strdup("No log avaliable");
00180 else
00181 {
00182 DRMS_StorageUnit_t *su;
00183 int status, save_retention = drms_env->retention;
00184 int retrieve = 0;
00185 su = malloc(sizeof(DRMS_StorageUnit_t));
00186 su->sunum = atoll(qres->field[0][0]);
00187 drms_env->retention = DRMS_LOG_RETENTION;
00188
00189
00190
00191
00192
00193
00194
00195 #ifdef DRMS_CLIENT
00196 status = 1;
00197 char *or_else = "Log offline";
00198 #else
00199 status = 1;
00200 char *or_else = "Logdir unimplemented";
00201 #endif
00202 if (!status)
00203 logpath = strdup(su->sudir);
00204 else
00205 logpath = strdup(or_else);
00206 free(su);
00207 drms_env->retention = save_retention;
00208 }
00209 }
00210 else
00211 logpath = strdup("Log query failed");
00212 db_free_text_result(qres);
00213 return logpath;
00214 }
00215
00216 int drms_ismissing_keyval(DRMS_Keyword_t *key)
00217 {
00218 XASSERT(key);
00219 switch(key->info->type)
00220 {
00221 case DRMS_TYPE_CHAR:
00222 return(drms_ismissing_char(key->value.char_val));
00223 case DRMS_TYPE_SHORT:
00224 return(drms_ismissing_short(key->value.short_val));
00225 case DRMS_TYPE_INT:
00226 return(drms_ismissing_int(key->value.int_val));
00227 case DRMS_TYPE_LONGLONG:
00228 return(drms_ismissing_longlong(key->value.longlong_val));
00229 case DRMS_TYPE_FLOAT:
00230 return(drms_ismissing_float(key->value.float_val));
00231 case DRMS_TYPE_DOUBLE:
00232 return(drms_ismissing_double(key->value.double_val));
00233 case DRMS_TYPE_TIME:
00234 return(drms_ismissing_time(key->value.time_val));
00235 case DRMS_TYPE_STRING:
00236 return(drms_ismissing_string(key->value.string_val));
00237 default:
00238 V_printf(-1, "", "ERROR: Unhandled DRMS type %d\n",(int)key->info->type);
00239 XASSERT(0);
00240 }
00241 return 0;
00242 }
00243
00244
00245
00246 DRMS_RecordSet_t *drms_find_rec_first(DRMS_Record_t *rec, int wantprime)
00247 {
00248 int nprime;
00249 int status;
00250 DRMS_RecordSet_t *rs;
00251 char query[DRMS_MAXQUERYLEN];
00252 strcpy(query, rec->seriesinfo->seriesname);
00253 nprime = rec->seriesinfo->pidx_num;
00254 if (wantprime && nprime > 0)
00255
00256
00257 strcat(query, "[^]");
00258 else
00259 strcat(query, "[:#^]");
00260 V_printf(verbflag > 1, "", "test 1 query is %s\n",query);
00261 rs = drms_open_nrecords(rec->env, query, 1, &status);
00262 V_printf(verbflag > 1, "", "test 1 status is %d\n",status);
00263 return(rs);
00264 }
00265
00266
00267 DRMS_RecordSet_t *drms_find_rec_last(DRMS_Record_t *rec, int wantprime)
00268 {
00269 int nprime;
00270 int status;
00271 DRMS_RecordSet_t *rs;
00272 char query[DRMS_MAXQUERYLEN];
00273 strcpy(query, rec->seriesinfo->seriesname);
00274 nprime = rec->seriesinfo->pidx_num;
00275 if (wantprime && nprime > 0)
00276
00277
00278 strcat(query, "[$]");
00279 else
00280 strcat(query, "[:#$]");
00281 rs = drms_open_nrecords(rec->env, query, -1, &status);
00282 return(rs);
00283 }
00284
00285
00286 char *drms_getseriesowner(DRMS_Env_t *drms_env, char *series, int *status)
00287 {
00288 char *nspace = NULL;
00289 char *relname = NULL;
00290 int istat = DRMS_SUCCESS;
00291 DB_Text_Result_t *qres = NULL;
00292 static char owner[256];
00293 owner[0] = '\0';
00294
00295 if (!get_namespace(series, &nspace, &relname))
00296 {
00297 char query[1024];
00298 strtolower(nspace);
00299 strtolower(relname);
00300
00301 snprintf(query, sizeof(query), "SELECT pg_catalog.pg_get_userbyid(T1.relowner) AS owner FROM pg_catalog.pg_class AS T1, (SELECT oid FROM pg_catalog.pg_namespace WHERE nspname = '%s') AS T2 WHERE T1.relnamespace = T2.oid AND T1.relname = '%s'", nspace, relname);
00302
00303 if ((qres = drms_query_txt(drms_env->session, query)) != NULL)
00304 {
00305 if (qres->num_cols == 1 && qres->num_rows == 1)
00306 strcpy(owner, qres->field[0][0]);
00307 db_free_text_result(qres);
00308 }
00309 }
00310 *status = owner[0] != 0;
00311 return(owner);
00312 }
00313
00314 static char *
00315 string_to_json(char *in)
00316 {
00317
00318 char *new;
00319 new = json_escape(in);
00320 return new;
00321 }
00322
00323
00324 static void list_series_info(DRMS_Env_t *drms_env, DRMS_Record_t *rec, json_t *jroot, int wantOwner)
00325 {
00326 DRMS_Keyword_t *key;
00327 DRMS_Segment_t *seg;
00328 DRMS_Link_t *link;
00329 HIterator_t *last = NULL;
00330 char intstring[100];
00331 char *notework;
00332 char *owner;
00333 json_t *indexarray, *primearray, *keyarray, *segarray, *linkarray;
00334 json_t *primeinfoarray;
00335 int npkeys;
00336 int status;
00337 char prevKeyName[DRMS_MAXNAMELEN] = "";
00338 char baseKeyName[DRMS_MAXNAMELEN];
00339
00340
00341 notework = string_to_json(rec->seriesinfo->description);
00342 json_insert_pair_into_object(jroot, "note", json_new_string(notework));
00343 free(notework);
00344
00345 sprintf(intstring, "%d", rec->seriesinfo->retention);
00346 json_insert_pair_into_object(jroot, "retention", json_new_number(intstring));
00347 sprintf(intstring, "%d", rec->seriesinfo->unitsize);
00348 json_insert_pair_into_object(jroot, "unitsize", json_new_number(intstring));
00349 sprintf(intstring, "%d", rec->seriesinfo->archive);
00350 json_insert_pair_into_object(jroot, "archive", json_new_number(intstring));
00351 sprintf(intstring, "%d", rec->seriesinfo->tapegroup);
00352 json_insert_pair_into_object(jroot, "tapegroup", json_new_number(intstring));
00353
00354 if (wantOwner) {
00355 owner = string_to_json(drms_getseriesowner(drms_env, rec->seriesinfo->seriesname, &status));
00356 json_insert_pair_into_object(jroot, "owner", json_new_string(owner));
00357 free(owner);
00358 }
00359
00360
00361
00362
00363
00364 primearray = json_new_array();
00365 primeinfoarray = json_new_array();
00366 npkeys = rec->seriesinfo->pidx_num;
00367 if (npkeys > 0)
00368 {
00369 int i;
00370 json_t *primeinfo, *val;
00371 char *jsonstr;
00372 for (i=0; i<npkeys; i++)
00373 {
00374 json_t *primeinfo = json_new_object();
00375 DRMS_Keyword_t *pkey, *skey;
00376 pkey = rec->seriesinfo->pidx_keywords[i];
00377 if (pkey->info->recscope > 1)
00378 {
00379 char rawval[100];
00380 skey = drms_keyword_slotfromindex(pkey);
00381 json_insert_pair_into_object(primeinfo, "name", json_new_string(skey->info->name));
00382 json_insert_pair_into_object(primeinfo, "slotted", json_new_number("1"));
00383 drms_keyword_snprintfval(drms_keyword_stepfromvalkey(skey), rawval, sizeof(rawval));
00384 jsonstr = string_to_json(rawval);
00385 val = json_new_string(jsonstr);
00386 free(jsonstr);
00387 json_insert_pair_into_object(primeinfo, "step", val);
00388 json_insert_child(primearray, json_new_string(skey->info->name));
00389
00390 }
00391 else
00392 {
00393 json_insert_pair_into_object(primeinfo, "name", json_new_string(pkey->info->name));
00394 json_insert_pair_into_object(primeinfo, "slotted", json_new_number("0"));
00395 json_insert_child(primearray, json_new_string(pkey->info->name));
00396 }
00397 json_insert_child(primeinfoarray, primeinfo);
00398 }
00399 }
00400 else
00401 {
00402 json_insert_child(primearray, json_new_null());
00403 json_insert_child(primeinfoarray, json_new_null());
00404 }
00405 json_insert_pair_into_object(jroot, "primekeys", primearray);
00406 json_insert_pair_into_object(jroot, "primekeysinfo", primeinfoarray);
00407
00408
00409 indexarray = json_new_array();
00410 if (rec->seriesinfo->dbidx_num > 0)
00411 {
00412 int i;
00413 for (i=0; i<rec->seriesinfo->dbidx_num; i++)
00414 json_insert_child(indexarray, json_new_string((rec->seriesinfo->dbidx_keywords[i])->info->name));
00415 }
00416 else
00417 json_insert_child(indexarray, json_new_null());
00418 json_insert_pair_into_object(jroot, "dbindex", indexarray);
00419
00420 V_printf(verbflag > 1, "", " starting all keywords\n");
00421
00422 keyarray = json_new_array();
00423
00424
00425
00426 while ((key = drms_record_nextkey(rec, &last, 0)))
00427 {
00428 json_t *keyinfo;
00429 json_t *keytype;
00430 json_t *defval, *recscope;
00431 char rawval[100], *jsonstr;
00432 int persegment = key->info->kwflags & kKeywordFlag_PerSegment;
00433 V_printf(verbflag > 1, "", " starting keyword %s\n",key->info->name);
00434
00435 if (persegment)
00436 {
00437 char *underscore;
00438 strcpy(baseKeyName, key->info->name);
00439 underscore = rindex(baseKeyName, '_');
00440 if (underscore) *underscore = '\0';
00441 if (strcmp(prevKeyName, baseKeyName) == 0)
00442 continue;
00443 strcpy(prevKeyName, baseKeyName);
00444 }
00445 keyinfo = json_new_object();
00446 json_insert_pair_into_object(keyinfo, "name", json_new_string(persegment ? baseKeyName : key->info->name));
00447 if (key->info->islink)
00448 {
00449
00450 char linknames[100], *tmpstr;
00451 json_t *linkinfo;
00452 sprintf(linknames,"%s->%s", key->info->linkname, key->info->target_key);
00453 tmpstr = string_to_json(linknames);
00454 linkinfo = json_new_string(tmpstr);
00455 json_insert_pair_into_object(keyinfo, "linkinfo", linkinfo);
00456 free(tmpstr);
00457
00458
00459 int lnkstat = DRMS_SUCCESS;
00460 DRMS_Keyword_t *linkedkw = drms_template_keyword_followlink(key, &lnkstat);
00461
00462 if (lnkstat == DRMS_SUCCESS && linkedkw)
00463 {
00464 keytype = json_new_string(drms_type_names[linkedkw->info->type]);
00465 }
00466 else
00467 {
00468 keytype = json_new_string("link");
00469 }
00470 }
00471 else
00472 keytype = json_new_string(drms_type_names[key->info->type]);
00473 json_insert_pair_into_object(keyinfo, "type", keytype);
00474
00475
00476 if (persegment)
00477 {
00478 recscope = json_new_string("segment");
00479 }
00480 else
00481 {
00482 jsonstr = string_to_json((char *)drms_keyword_getrecscopestr(key, NULL));
00483 recscope = json_new_string(jsonstr);
00484 free(jsonstr);
00485 }
00486 json_insert_pair_into_object(keyinfo, "recscope", recscope);
00487
00488 drms_keyword_snprintfval(key, rawval, sizeof(rawval));
00489 jsonstr = string_to_json(rawval);
00490 defval = json_new_string(jsonstr);
00491 free(jsonstr);
00492 json_insert_pair_into_object(keyinfo, "defval", defval);
00493
00494 notework = string_to_json(key->info->unit);
00495 json_insert_pair_into_object(keyinfo, "units", json_new_string(notework));
00496 free(notework);
00497 notework = string_to_json(key->info->description);
00498 json_insert_pair_into_object(keyinfo, "note", json_new_string(notework));
00499 free(notework);
00500 json_insert_child(keyarray, keyinfo);
00501 }
00502 json_insert_pair_into_object(jroot, "keywords", keyarray);
00503
00504 V_printf(verbflag > 1, "", " done with keywords, start segments\n");
00505
00506
00507 segarray = json_new_array();
00508 if (rec->segments.num_total)
00509 {
00510 if (last)
00511 {
00512 hiter_destroy(&last);
00513 }
00514
00515 while ((seg = drms_record_nextseg(rec, &last, 0)))
00516 {
00517 json_t *seginfo = json_new_object();
00518 int naxis = seg->info->naxis;
00519 json_insert_pair_into_object(seginfo, "name", json_new_string(seg->info->name));
00520 if (seg->info->islink)
00521 {
00522 char linkinfo[DRMS_MAXNAMELEN+10];
00523 sprintf(linkinfo, "link via %s", seg->info->linkname);
00524 json_insert_pair_into_object(seginfo, "type", json_new_string(drms_type_names[seg->info->type]));
00525 json_insert_pair_into_object(seginfo, "units", json_new_null());
00526 json_insert_pair_into_object(seginfo, "protocol", json_new_string(linkinfo));
00527 json_insert_pair_into_object(seginfo, "dims", json_new_null());
00528 }
00529 else
00530 {
00531 char prot[DRMS_MAXNAMELEN];
00532 char diminfo[160];
00533 int iaxis;
00534 strcpy(prot, drms_prot2str(seg->info->protocol));
00535 json_insert_pair_into_object(seginfo, "type", json_new_string(drms_type_names[seg->info->type]));
00536 json_insert_pair_into_object(seginfo, "units", json_new_string(seg->info->unit));
00537 json_insert_pair_into_object(seginfo, "protocol", json_new_string(prot));
00538 diminfo[0] = '\0';
00539 for (iaxis=0; iaxis<naxis; iaxis++)
00540 {
00541 if (iaxis != 0)
00542 strcat(diminfo,"x");
00543 if (seg->info->scope == DRMS_VARDIM)
00544 strcat(diminfo,"VAR");
00545 else
00546 {
00547 char size[10];
00548 sprintf(size,"%d",seg->axis[iaxis]);
00549 strcat(diminfo,size);
00550 }
00551 }
00552 json_insert_pair_into_object(seginfo, "dims", json_new_string(diminfo));
00553 }
00554 notework = string_to_json(seg->info->description);
00555 json_insert_pair_into_object(seginfo, "note", json_new_string(notework));
00556 free(notework);
00557 json_insert_child(segarray, seginfo);
00558 }
00559 }
00560
00561
00562 json_insert_pair_into_object(jroot, "segments", segarray);
00563
00564 V_printf(verbflag > 1, "", " done with segments, start links\n");
00565
00566 linkarray = json_new_array();
00567 if (rec->links.num_total)
00568 {
00569 if (last)
00570 {
00571 hiter_destroy(&last);
00572 }
00573
00574 while ((link = drms_record_nextlink(rec, &last)))
00575 {
00576 V_printf(verbflag > 1, "", " link: %s\n",link->info->name);
00577 json_t *linkinfo = json_new_object();
00578 json_insert_pair_into_object(linkinfo, "name", json_new_string(link->info->name));
00579 json_insert_pair_into_object(linkinfo, "target", json_new_string(link->info->target_series));
00580 json_insert_pair_into_object(linkinfo, "kind", json_new_string(link->info->type == STATIC_LINK ? "STATIC" : "DYNAMIC"));
00581 notework = string_to_json(link->info->description);
00582 json_insert_pair_into_object(linkinfo, "note", json_new_string(notework));
00583 free(notework);
00584 json_insert_child(linkarray,linkinfo);
00585 }
00586 }
00587
00588
00589 json_insert_pair_into_object(jroot, "links", linkarray);
00590
00591 if (last)
00592 {
00593 hiter_destroy(&last);
00594 }
00595 return;
00596 }
00597
00598
00599 static int get_series_stats(DRMS_Record_t *rec, json_t *jroot)
00600 {
00601 DRMS_RecordSet_t *rs;
00602 int nprime;
00603 int status;
00604 char query[DRMS_MAXQUERYLEN];
00605 json_t *interval = json_new_object();
00606
00607 nprime = rec->seriesinfo->pidx_num;
00608 if (nprime > 0)
00609 sprintf(query,"%s[^]", rec->seriesinfo->seriesname);
00610 else
00611 sprintf(query,"%s[:#^]", rec->seriesinfo->seriesname);
00612 rs = drms_open_nrecords(rec->env, query, 1, &status);
00613
00614 if (status == DRMS_ERROR_QUERYFAILED)
00615 {
00616 if (rs)
00617 {
00618 drms_free_records(rs);
00619 }
00620
00621 return status;
00622 }
00623
00624 if (!rs || rs->n < 1)
00625 {
00626 json_insert_pair_into_object(interval, "FirstRecord", json_new_string("NA"));
00627 json_insert_pair_into_object(interval, "FirstRecnum", json_new_string("NA"));
00628 json_insert_pair_into_object(interval, "LastRecord", json_new_string("NA"));
00629 json_insert_pair_into_object(interval, "LastRecnum", json_new_string("NA"));
00630 json_insert_pair_into_object(interval, "MaxRecnum", json_new_number("0"));
00631 if (rs) drms_free_records(rs);
00632 json_insert_pair_into_object(jroot, "Interval", interval);
00633 return DRMS_SUCCESS;
00634 }
00635 else
00636 {
00637 char recquery[DRMS_MAXQUERYLEN];
00638 char *jsonquery;
00639 char val[100];
00640 int status;
00641 drms_sprint_rec_query(recquery,rs->records[0]);
00642 jsonquery = string_to_json(recquery);
00643 status = json_insert_pair_into_object(interval, "FirstRecord", json_new_string(jsonquery));
00644 if (status != JSON_OK)
00645 V_printf(-1, "", "json_insert_pair_into_object, status=%d, text=%s\n",status,jsonquery);
00646 free(jsonquery);
00647 sprintf(val,"%lld", rs->records[0]->recnum);
00648 json_insert_pair_into_object(interval, "FirstRecnum", json_new_number(val));
00649 drms_free_records(rs);
00650
00651 if (nprime > 0)
00652 sprintf(query,"%s[$]", rec->seriesinfo->seriesname);
00653 else
00654 sprintf(query,"%s[:#$]", rec->seriesinfo->seriesname);
00655 rs = drms_open_nrecords(rec->env, query, -1, &status);
00656
00657 if (status == DRMS_ERROR_QUERYFAILED)
00658 {
00659 if (rs)
00660 {
00661 drms_free_records(rs);
00662 }
00663
00664 return status;
00665 }
00666
00667 drms_sprint_rec_query(recquery,rs->records[0]);
00668 jsonquery = string_to_json(recquery);
00669 json_insert_pair_into_object(interval, "LastRecord", json_new_string(jsonquery));
00670 free(jsonquery);
00671 sprintf(val,"%lld", rs->records[0]->recnum);
00672 json_insert_pair_into_object(interval, "LastRecnum", json_new_number(val));
00673 drms_free_records(rs);
00674
00675 sprintf(query,"%s[:#$]", rec->seriesinfo->seriesname);
00676 rs = drms_open_records(rec->env, query, &status);
00677
00678 if (status == DRMS_ERROR_QUERYFAILED)
00679 {
00680 if (rs)
00681 {
00682 drms_free_records(rs);
00683 }
00684
00685 return status;
00686 }
00687
00688 sprintf(val,"%lld", rs->records[0]->recnum);
00689 json_insert_pair_into_object(interval, "MaxRecnum", json_new_number(val));
00690 drms_free_records(rs);
00691 }
00692 json_insert_pair_into_object(jroot, "Interval", interval);
00693 return 0;
00694 }
00695
00696
00697
00698
00699
00700
00701
00702
00703 static
00704 void
00705 current_time(double *t_num, char *t_str)
00706 {
00707 struct timeval tv;
00708 time_t nowtime;
00709 struct tm *nowtm;
00710 char tmbuf[TIMESTR_MAX];
00711
00712 gettimeofday(&tv, NULL);
00713 if (t_num)
00714 *t_num = tv.tv_sec + tv.tv_usec/1000000.0;
00715 if (t_str) {
00716 nowtime = tv.tv_sec;
00717 nowtm = localtime(&nowtime);
00718 strftime(tmbuf, TIMESTR_MAX, "%Y.%m.%d_%H:%M:%S", nowtm);
00719
00720 snprintf(t_str, TIMESTR_MAX, "%s.%03d", tmbuf, (int) (tv.tv_usec/1000));
00721 }
00722 }
00723
00724 static
00725 void
00726 insert_log_header(server_bag_t *server_bag)
00727 {
00728 extern char *module_name;
00729 FILE *fp = server_bag->log;
00730 char tnow_str[TIMESTR_MAX];
00731
00732 if (!fp) return;
00733 current_time(NULL, tnow_str);
00734 fprintf(fp, "# %s: transaction log\n", module_name);
00735 fprintf(fp, "# running as PID %d on %s:%d\n",
00736 getpid(), server_bag->hostname, server_bag->port);
00737 fprintf(fp, "# start time = %s\n", tnow_str);
00738 fflush(fp);
00739 }
00740
00741
00742
00743
00744 static
00745 void
00746 insert_log_footer(server_bag_t *server_bag)
00747 {
00748 extern char *module_name;
00749 char tnow_str[TIMESTR_MAX];
00750 FILE *fp = server_bag->log;
00751
00752 if (!fp) return;
00753 current_time(NULL, tnow_str);
00754 fprintf(fp, "# %s: closing transaction log\n", module_name);
00755 fprintf(fp, "# end time = %s\n", tnow_str);
00756 fprintf(fp, "# requests = %d\n", server_bag->export_num);
00757 fprintf(fp, "# bytes = %ld\n", server_bag->export_len);
00758 fflush(fp);
00759 fclose(fp);
00760 server_bag->log = NULL;
00761 }
00762
00763
00764 static
00765 void
00766 insert_log_summary(server_bag_t *Sbag,
00767 query_bag_t *Qbag)
00768 {
00769 FILE *log = Sbag->log;
00770
00771 if (!log) return;
00772 if (Qbag->status == 0) {
00773
00774 fprintf(log, "host='%s'\t", Sbag->hostname);
00775 fprintf(log, "lag=%0.3f\t", (Qbag->t1) - (Qbag->t0));
00776 fprintf(log, "IP='%s'\t", Qbag->remote_host);
00777 fprintf(log, "op='%s'\t", Qbag->op);
00778 fprintf(log, "ds='%s'\t", Qbag->ds);
00779 fprintf(log, "n=%d\t", Qbag->max_recs);
00780 fprintf(log, "status=%d\n", Qbag->status);
00781 } else {
00782
00783 fprintf(log, "# error on following line at %s:\n", Qbag->t1_str);
00784 fprintf(log, "# |%s|\n", Qbag->line);
00785 fprintf(log, "# Error was: %s\n", *(Qbag->emsg) ? Qbag->emsg : "Unknown");
00786 }
00787 fflush(log);
00788 }
00789
00790
00791 static
00792 void
00793 insert_log_message(server_bag_t *Sbag, char *msg)
00794 {
00795 FILE *log = Sbag->log;
00796 if (!log) return;
00797 fprintf(log, "# %s\n", msg);
00798 fflush(log);
00799 }
00800
00801
00802
00803
00804
00805 static
00806 char *
00807 server_setup(int port_given,
00808 const char *port_fn,
00809 const char *log_fn,
00810 server_bag_t *server_bag)
00811 {
00812 int sock = 0;
00813 struct sockaddr_in serv_addr;
00814 socklen_t serv_addr_len = sizeof(serv_addr);
00815 int rc;
00816 int port;
00817 FILE *fp;
00818
00819
00820 memset(server_bag->hostname, 0, sizeof(server_bag->hostname));
00821 gethostname(server_bag->hostname, sizeof(server_bag->hostname)-1);
00822
00823 sock = socket(AF_INET, SOCK_STREAM, 0);
00824 if (sock < 0)
00825 return "socket() failed";
00826
00827 memset(&serv_addr, 0, sizeof(serv_addr));
00828 serv_addr.sin_family = AF_INET;
00829 serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
00830 if (port_given > 0)
00831 serv_addr.sin_port = htons((short) port_given);
00832 else
00833 serv_addr.sin_port = htons(0);
00834 rc = bind(sock, (struct sockaddr*) &serv_addr, sizeof(serv_addr));
00835 if (rc < 0)
00836 return "bind() failed";
00837
00838 rc = getsockname(sock, (struct sockaddr*) &serv_addr, &serv_addr_len);
00839 if (rc < 0)
00840 return "getsockname() failed";
00841 port = ntohs(serv_addr.sin_port);
00842
00843 server_bag->port = port;
00844 server_bag->sock = sock;
00845
00846
00847 if (strcmp(log_fn, ARG_UNDEF) == 0) {
00848 server_bag->log = NULL;
00849 server_bag->log_fn = NULL;
00850 } else {
00851 if ((fp = fopen(log_fn, "w")) != NULL) {
00852 server_bag->log = fp;
00853 } else {
00854 server_bag->log = NULL;
00855 return "Could not create log file";
00856 }
00857 server_bag->log_fn = strdup(log_fn);
00858 }
00859 insert_log_header(server_bag);
00860
00861
00862 if (strcmp(port_fn, ARG_UNDEF) == 0) {
00863 server_bag->port_fn = NULL;
00864 } else {
00865 if ((fp = fopen(port_fn, "w")) == NULL) {
00866 V_printf(-1, "", "Could not create port file `%s'\n", port_fn);
00867 return "Could not create port file";
00868 }
00869 fprintf(fp, "%d\n", (int) port);
00870 fclose(fp);
00871 server_bag->port_fn = strdup(port_fn);
00872 }
00873 return NULL;
00874 }
00875
00876
00877
00878
00879
00880 static
00881 char *
00882 server_teardown(server_bag_t *server_bag)
00883 {
00884 int rc;
00885 char *msg = NULL;
00886
00887
00888 if (server_bag->port_fn != NULL)
00889 unlink(server_bag->port_fn);
00890
00891 insert_log_footer(server_bag);
00892
00893 if (server_bag->port > 0) {
00894 server_bag->port = 0;
00895 rc = close(server_bag->sock);
00896 if (rc < 0)
00897 msg = "Error on socket close";
00898 }
00899
00900 if (server_bag->log_fn) {
00901 if (!server_bag->preserve_log)
00902 unlink(server_bag->log_fn);
00903 free(server_bag->log_fn);
00904 }
00905 return msg;
00906 }
00907
00908
00909
00910
00911
00912
00913
00914
00915 static
00916 int
00917 isready(int fd, double t0)
00918 {
00919 int rc;
00920 fd_set fds;
00921 struct timeval tv;
00922
00923
00924 tv.tv_sec = (int) floor(t0);
00925 tv.tv_usec = (int) floor((t0 - tv.tv_sec) * 1e-6);
00926
00927
00928 FD_ZERO(&fds);
00929 FD_SET(fd, &fds);
00930
00931 rc = select(fd+1, &fds, NULL, NULL, &tv);
00932 if (rc < 0)
00933 return -1;
00934 return FD_ISSET(fd,&fds) ? 1 : 0;
00935 }
00936
00937 static
00938 char *
00939 input_query(server_bag_t *server_bag, query_bag_t *query_bag)
00940 {
00941 int connfd;
00942 int rc;
00943 int n;
00944 struct sockaddr remote_addr;
00945 socklen_t addr_size = sizeof remote_addr;
00946
00947
00948
00949 current_time(&(query_bag->t0), query_bag->t0_str);
00950
00951 server_bag->timeout = 0;
00952
00953 rc = isready(server_bag->sock, server_bag->timeout_time);
00954 if (rc == 0) {
00955 server_bag->timeout = 1;
00956 return "server timed out";
00957 } else if (rc < 0) {
00958 return "select() failed";
00959 }
00960
00961
00962
00963
00964 connfd = accept(server_bag->sock, &remote_addr, &addr_size);
00965 if (connfd < 0)
00966 return "accept() failed";
00967 else if (connfd == 0)
00968 V_printf(-1, "", "input_query: got connfd == 0 from sock = %d\n",
00969 server_bag->sock);
00970
00971
00972 n = sizeof(query_bag->line) - 1;
00973 rc = read(connfd, query_bag->line, n);
00974 if (rc < 0)
00975 return "read failed";
00976
00977 query_bag->line[rc] = '\0';
00978
00979 n = strcspn(query_bag->line, "\n\r");
00980 if ((query_bag->line)[n] == '\0')
00981 return "read failed: no newline seen (line too long?)";
00982 (query_bag->line)[n] = '\0';
00983
00984
00985 rc = getnameinfo(&remote_addr, addr_size,
00986 query_bag->remote_host,
00987 sizeof(query_bag->remote_host), NULL, 0, 0);
00988 if (rc != 0)
00989 strcpy(query_bag->remote_host, "unresolved_host");
00990
00991 V_printf(verbflag > 1, "", "remote host = %s\n", query_bag->remote_host);
00992 V_printf(verbflag > 1, "", "got <%s>\n", query_bag->line);
00993
00994 current_time(&(query_bag->t0), query_bag->t0_str);
00995
00996 server_bag->connfd = connfd;
00997
00998 return NULL;
00999 }
01000
01001
01002
01003
01004
01005
01006
01007
01008
01009 static
01010 char *
01011 parse_line(query_bag_t *qBag,
01012 int srv_max_recs,
01013 char *keylist,
01014 char *seglist,
01015 char *linklist)
01016 {
01017 const char *delims = " \t\n\r";
01018 char *token;
01019 char *argname, *argvalu;
01020 char *line;
01021
01022
01023 strcpy(qBag->op, "rs_list");
01024 strcpy(qBag->ds, "");
01025 strcpy(keylist, "");
01026 strcpy(seglist, "");
01027 strcpy(linklist, "");
01028 qBag->max_recs = srv_max_recs;
01029
01030 line = strdup(qBag->line);
01031
01032 while ((token = strsep(&line, delims)) != NULL) {
01033 if (*token == '\0') continue;
01034 argname = token;
01035 argvalu = strchr(token, (int) '=');
01036 if (!argvalu) {
01037 free(line);
01038 return "Missing = in arg=value pair in input line";
01039 }
01040 *argvalu++ = '\0';
01041
01042 argvalu += strspn(argvalu, delims);
01043
01044 *(argvalu + strcspn(argvalu, delims)) = '\0';
01045
01046 *(argname + strcspn(argname, delims)) = '\0';
01047
01048 if (strcmp(argname, "op") == 0) {
01049 strcpy(qBag->op, argvalu);
01050 } else if (strcmp(argname, "ds") == 0) {
01051 strcpy(qBag->ds, argvalu);
01052 } else if (strcmp(argname, "key") == 0) {
01053 strcpy(keylist, argvalu);
01054 } else if (strcmp(argname, "seg") == 0) {
01055 strcpy(seglist, argvalu);
01056 } else if (strcmp(argname, "link") == 0) {
01057 strcpy(linklist, argvalu);
01058 } else if (strcmp(argname, "n") == 0) {
01059 if (sscanf(argvalu, "%d", &(qBag->max_recs)) != 1) {
01060 qBag->max_recs = srv_max_recs;
01061 free(line);
01062 return "Bad number in n=max_recs parameter in input line";
01063 }
01064 } else {
01065 free(line);
01066 return "Unrecognized arg name in arg=value pair in input line";
01067 }
01068 }
01069 free(line);
01070
01071 if (strcmp(qBag->op, "exit") != 0 && strlen(qBag->ds) == 0)
01072 return "Missing ds=value pair in input line";
01073
01074 return NULL;
01075 }
01076
01077
01078
01079
01080
01081
01082
01083
01084
01085
01086
01087 int
01088 write_buf(int fd, char *buf, int len)
01089 {
01090 int rc;
01091
01092 while ((rc = write(fd, buf, len)) > 0) {
01093 buf += rc;
01094 len -= rc;
01095 }
01096 if (rc < 0)
01097 return -1;
01098 else if (rc == 0 && len == 0)
01099 return 0;
01100 else
01101 return 1;
01102 }
01103
01104
01105
01106
01107
01108
01109 static
01110 void
01111 export_json(server_bag_t *server_bag, query_bag_t *query_bag, json_t *jroot)
01112 {
01113 char *initial_json;
01114 char *final_json;
01115 char runtime[32];
01116 char status_string[12];
01117 int rc, ok1, ok2;
01118 int fd = server_bag->connfd;
01119 int head = server_bag->head;
01120 int len1, len2;
01121
01122
01123 current_time(&(query_bag->t1), query_bag->t1_str);
01124
01125 snprintf(runtime, sizeof(runtime), "%0.3f", query_bag->t1 - query_bag->t0);
01126 json_insert_pair_into_object(jroot, "runtime", json_new_number(runtime));
01127
01128 snprintf(status_string, sizeof(status_string), "%8d", query_bag->status);
01129 json_insert_pair_into_object(jroot, "status", json_new_number(status_string));
01130
01131 json_tree_to_string(jroot, &initial_json);
01132 final_json = json_format_string(initial_json);
01133 free(initial_json);
01134
01135 ok1 = ok2 = 0;
01136 if (head) {
01137 len1 = strlen(JSON_MIME);
01138 rc = write_buf(fd, JSON_MIME, strlen(JSON_MIME));
01139 if (rc < 0)
01140 V_printf(-1, "", "Error writing json mime\n");
01141 else if (rc > 0)
01142 V_printf(-1, "", "Incomplete write on json mime\n");
01143 else
01144 ok1 = 1;
01145 } else {
01146 ok1 = 1;
01147 len1 = 0;
01148 }
01149 len2 = strlen(final_json);
01150 rc = write_buf(fd, final_json, len2);
01151 if (rc < 0)
01152 V_printf(-1, "", "Error writing json buf\n");
01153 else if (rc > 0)
01154 V_printf(-1, "", "Incomplete write on json buf\n");
01155 else
01156 ok2 = 1;
01157
01158 rc = close(fd);
01159 if (rc < 0)
01160 V_printf(-1, "", "Error closing json buf\n");
01161
01162 server_bag->connfd = 0;
01163 free(final_json);
01164
01165 if (!ok1 || !ok2 || rc < 0)
01166 insert_log_message(server_bag,
01167 "communication error responding to request below.");
01168
01169 insert_log_summary(server_bag, query_bag);
01170 server_bag->export_num += 1;
01171 server_bag->export_len += len1 + len2;
01172 }
01173
01174
01175
01176
01177
01178
01179
01180
01181
01182 static
01183 void
01184 export_json_exception(server_bag_t *server_bag, query_bag_t *query_bag, char *msg)
01185 {
01186 char *emsg;
01187 char *emsg_json;
01188 json_t *jroot = json_new_object();
01189
01190 emsg = msg ? msg : "Unspecified error";
01191
01192 snprintf(query_bag->emsg, sizeof(query_bag->emsg), "%s", emsg);
01193 if (query_bag->status)
01194 V_printf(verbflag, "", "Failed request (%s):\n%s\n",
01195 emsg,
01196 query_bag->line);
01197 emsg_json = string_to_json(emsg);
01198 if (query_bag->status)
01199 json_insert_pair_into_object(jroot, "error", json_new_string(emsg_json));
01200 else
01201 json_insert_pair_into_object(jroot, "message", json_new_string(emsg_json));
01202 export_json(server_bag, query_bag, jroot);
01203 json_free_value(&jroot);
01204 }
01205
01206
01207
01208
01209
01210
01211
01212
01213 char *module_name = "query_engine";
01214
01215 ModuleArgs_t module_args[] = {
01216 {ARG_STRING, "logfile", ARG_UNDEF, "log file (default is no log)"},
01217 {ARG_STRING, "portfile",ARG_UNDEF, "port filename (default is no file)"},
01218 {ARG_INT, "port", "0", "server port (default is to auto-assign)"},
01219 {ARG_INT, "verb", "0", "verbosity (0, 1, 2)"},
01220 {ARG_INT, "n", "0", "recordSet limit"},
01221 {ARG_FLOAT, "timeout", "30.0", "server-exit timeout (minutes, >0)"},
01222 {ARG_FLAG, "m", "0", "include MIME header"},
01223 {ARG_FLAG, "p", "0", "preserve logfile after exit"},
01224 {ARG_FLAG, "R", "0", "show record query"},
01225 {ARG_FLAG, "o", "0", "add owner info to series_struct"},
01226 {ARG_END}
01227 };
01228
01229
01230
01231
01232 #define SKIP_REQUEST(msg) \
01233 do { \
01234 export_json_exception(&server_bag, &query_bag, (msg)); \
01235 goto loop_end; } while (0)
01236
01237
01238
01239
01240
01241 static char *sigint_message = NULL;
01242
01243
01244
01245
01246 static
01247 void
01248 OnSIGINT_init(int setup, int head)
01249 {
01250 if (setup > 0) {
01251 char *initial_json, *final_json;
01252 char *header;
01253 json_t *jroot = json_new_object();
01254
01255 header = head ? JSON_MIME : "";
01256
01257 json_insert_pair_into_object(jroot,
01258 "error",
01259 json_new_string("server killed during request"));
01260 json_insert_pair_into_object(jroot,
01261 "status",
01262 json_new_string("-1"));
01263 json_insert_pair_into_object(jroot,
01264 "runtime",
01265 json_new_number("0.0"));
01266
01267 json_tree_to_string(jroot, &initial_json);
01268 final_json = json_format_string(initial_json);
01269 free(initial_json);
01270
01271 sigint_message = malloc(strlen(final_json) + strlen(header) + 1);
01272 sprintf(sigint_message, "%s%s", header, final_json);
01273 } else {
01274
01275 free(sigint_message);
01276 }
01277
01278 }
01279
01280
01281
01282
01283 static
01284 int
01285 OnSIGINT(void *data)
01286 {
01287 int rc;
01288 server_bag_t *server_bag = (server_bag_t *) data;
01289
01290 V_printf(-1, "", "Recieved sigint, shutting down.\n");
01291 insert_log_message(server_bag, "Received sigint, shutting down.");
01292
01293
01294
01295 if (server_bag->connfd != 0) {
01296 rc = write(server_bag->connfd, sigint_message, strlen(sigint_message));
01297 if (rc < 0)
01298 V_printf(-1, "", "Error writing final json buf\n");
01299 rc = close(server_bag->connfd);
01300 if (rc < 0)
01301 V_printf(-1, "", "Error closing json output channel\n");
01302 }
01303 server_teardown(server_bag);
01304 return 0;
01305 }
01306
01307
01308
01309
01310
01311
01312
01313
01314
01315
01316
01317
01318
01319
01320
01321 int
01322 DoIt(void)
01323 {
01324
01325 const char *log_fn = cmdparams_get_str(&cmdparams, "logfile", NULL);
01326 const char *port_fn = cmdparams_get_str(&cmdparams, "portfile",NULL);
01327 int port_given = cmdparams_get_int(&cmdparams, "port", NULL);
01328 int verbflag = cmdparams_get_int(&cmdparams, "verb", NULL);
01329 float timeout_time = cmdparams_get_float(&cmdparams, "timeout", NULL);
01330 int max_recs_server = cmdparams_get_int(&cmdparams, "n", NULL);
01331 int wantMimeHead = cmdparams_get_int(&cmdparams, "m", NULL);
01332 int wantRecInfo = cmdparams_get_int(&cmdparams, "R", NULL);
01333 int wantOwner = cmdparams_get_int(&cmdparams, "o", NULL);
01334 int preserveLog = cmdparams_get_int(&cmdparams, "p", NULL);
01335
01336 char ds[QUERY_MAX];
01337 char keylist[QUERY_MAX];
01338 char seglist[QUERY_MAX];
01339 char linklist[QUERY_MAX];
01340
01341 server_bag_t server_bag;
01342 query_bag_t query_bag;
01343 int max_recs;
01344 json_t *jroot = NULL;
01345 char *msg;
01346 int rc;
01347
01348
01349 memset(&server_bag, 0, sizeof(server_bag));
01350 server_bag.connfd = 0;
01351 server_bag.preserve_log = preserveLog;
01352 server_bag.head = wantMimeHead;
01353 server_bag.timeout_time = timeout_time * 60.0;
01354 if (!(server_bag.timeout_time > 0 && server_bag.timeout < 1e8))
01355 DIE("Given timeout (%g) is out of range", timeout_time);
01356
01357
01358
01359 OnSIGINT_init(1, wantMimeHead);
01360 CleanerData_t cleaner;
01361 cleaner.cb = (pFn_Cleaner_t) &OnSIGINT;
01362 cleaner.data = (void *) &server_bag;
01363
01364
01365 #ifdef DRMS_CLIENT
01366 drms_client_registercleaner(drms_env, &cleaner);
01367 #else
01368 drms_server_registercleaner(drms_env, &cleaner);
01369 #endif
01370
01371
01372 msg = server_setup(port_given, port_fn, log_fn, &server_bag);
01373 if (msg)
01374 DIE("Could not start up server connections: %s", msg);
01375
01376
01377
01378
01379 #ifdef DAEMONIZE
01380
01381 switch (fork()) {
01382 case -1:
01383
01384 DIE("fork failed");
01385 break;
01386 default:
01387
01388
01389 close(server_bag.sock);
01390 _exit(1);
01391
01392 return 0;
01393 break;
01394 case 0:
01395
01396 break;
01397 }
01398 #endif
01399
01400
01401
01402
01403 V_printf(verbflag, "", "Serving on port %d\n", server_bag.port);
01404 rc = listen(server_bag.sock, 5);
01405 if (rc < 0)
01406 DIE("socket listen() failed");
01407 server_bag.running = 1;
01408 while (server_bag.running) {
01409
01410 memset(&query_bag, 0, sizeof(query_bag));
01411 query_bag.status = 1;
01412
01413
01414 jroot = json_new_object();
01415
01416
01417
01418
01419 if ((msg = malloc(4*1024*1024)) == NULL) {
01420 V_printf(-1, "", "Quitting: out of memory\n");
01421
01422 insert_log_message(&server_bag, "Quit: out of memory");
01423
01424 server_teardown(&server_bag);
01425 DIE("Out of memory");
01426 } else {
01427 free(msg);
01428 }
01429
01430
01431
01432 msg = input_query(&server_bag, &query_bag);
01433 if (server_bag.timeout) {
01434
01435 query_bag.status = 0;
01436 server_bag.running = 0;
01437 strcpy(query_bag.op, "timeout");
01438 SKIP_REQUEST("Server exiting due to inactivity");
01439 } else if (msg != NULL) {
01440 SKIP_REQUEST(msg);
01441 }
01442 V_printf(verbflag>1, "", "Input query will go to fd = %d\n", server_bag.connfd);
01443
01444
01445 msg = parse_line(&query_bag, max_recs_server, keylist, seglist, linklist);
01446 if (msg)
01447 SKIP_REQUEST(msg);
01448 strcpy(ds, query_bag.ds);
01449 max_recs = query_bag.max_recs;
01450
01451 V_printf(verbflag, "\t", "Running: %s ds=`%s' (K=`%s', S=`%s', L=`%s')\n",
01452 query_bag.op, query_bag.ds, keylist, seglist, linklist);
01453
01454
01455
01456
01457 if (strcmp(query_bag.op, "exit") == 0) {
01458 server_bag.running = 0;
01459 query_bag.status = 0;
01460 SKIP_REQUEST("Exiting server normally.");
01461 }
01462
01463
01464
01465 else if (strcmp(query_bag.op, "series_struct") == 0) {
01466 char *p, *emsg;
01467 DRMS_Record_t *rec;
01468 int status = 0;
01469
01470
01471
01472 if (p = index(ds, '[')) *p = '\0';
01473 if (p = index(ds, '{')) *p = '\0';
01474
01475 rec = drms_template_record(drms_env, ds, &status);
01476 if (status == DRMS_ERROR_QUERYFAILED) {
01477 emsg = (char *) DB_GetErrmsg(drms_env->session->db_handle);
01478 if (!emsg) emsg = "problem with database query";
01479 SKIP_REQUEST(emsg);
01480 } else if (status != 0) {
01481 SKIP_REQUEST("series not found");
01482 }
01483
01484 list_series_info(drms_env, rec, jroot, wantOwner);
01485 if (get_series_stats(rec, jroot) == DRMS_ERROR_QUERYFAILED) {
01486 emsg = (char *) DB_GetErrmsg(drms_env->session->db_handle);
01487 if (!emsg) emsg = "problem with database query";
01488 SKIP_REQUEST(emsg);
01489 }
01490
01491 query_bag.status = 0;
01492 export_json(&server_bag, &query_bag, jroot);
01493 }
01494
01495
01496
01497
01498 else if (strcmp(query_bag.op, "rs_summary") == 0) {
01499 char *emsg;
01500 int count=0, status=0;
01501 int countlimit = abs(max_recs);
01502 char val[32];
01503
01504 char *bracket = index(ds, '{');
01505 if (bracket)
01506 *bracket = '\0';
01507 if (countlimit) {
01508 DRMS_RecordSet_t *recordset = drms_open_nrecords(drms_env, ds, max_recs, &status);
01509 if (status == DRMS_ERROR_QUERYFAILED) {
01510 emsg = (char *) DB_GetErrmsg(drms_env->session->db_handle);
01511 if (!emsg) emsg = "problem with database query";
01512 SKIP_REQUEST(emsg);
01513 } else if (status != 0 || (!recordset)) {
01514 SKIP_REQUEST("unable to open records");
01515 }
01516 count = recordset->n;
01517 drms_close_records(recordset, DRMS_FREE_RECORD);
01518 } else {
01519 count = drms_count_records(drms_env, ds, &status);
01520 }
01521 if (bracket)
01522 *bracket = '{';
01523 if (status == DRMS_ERROR_QUERYFAILED) {
01524 emsg = (char *) DB_GetErrmsg(drms_env->session->db_handle);
01525 if (!emsg) emsg = "problem with database query";
01526 SKIP_REQUEST(emsg);
01527 } else if (status != 0) {
01528 SKIP_REQUEST("series not found");
01529 }
01530
01531
01532 snprintf(val, sizeof(val), "%d", count);
01533 json_insert_pair_into_object(jroot, "count", json_new_number(val));
01534
01535 query_bag.status = 0;
01536 export_json(&server_bag, &query_bag, jroot);
01537 }
01538
01539
01540
01541
01542 else if (strcmp(query_bag.op, "rs_list") == 0) {
01543 DRMS_RecordSet_t *recordset;
01544 DRMS_Record_t *rec, *template;
01545 DRMS_RecChunking_t cstat = kRecChunking_None;
01546 char seriesname[DRMS_MAXQUERYLEN];
01547 char *keys[1000];
01548 char *segs[1000];
01549 char *links[1000];
01550 int ikey, nkeys = 0;
01551 int iseg, nsegs = 0;
01552 int ilink, nlinks = 0;
01553 int irec, nrecs;
01554 char count[32];
01555 json_t **keyvals = NULL, **segvals = NULL, **segdims = NULL, **linkvals = NULL;
01556 json_t *recinfo = NULL;
01557 json_t **segcparms = NULL;
01558 json_t **segbzeros = NULL;
01559 json_t **segbscales = NULL;
01560 json_t *json_keywords = NULL;
01561 json_t *json_segments = NULL;
01562 json_t *json_links = NULL;
01563 char *emsg;
01564 int status = 0;
01565 int record_set_staged = 0;
01566 char *lbracket;
01567 int requireSUMinfo;
01568
01569
01570 strcpy(seriesname, ds);
01571 lbracket = index(seriesname, '[');
01572 if (lbracket) *lbracket = '\0';
01573 template = drms_template_record(drms_env, seriesname, &status);
01574 if (status == DRMS_ERROR_QUERYFAILED) {
01575 emsg = (char *) DB_GetErrmsg(drms_env->session->db_handle);
01576 if (!emsg) emsg = "problem with database query";
01577 SKIP_REQUEST(emsg);
01578 } else if (status != 0) {
01579 SKIP_REQUEST("series not found");
01580 }
01581
01582
01583 if (max_recs == 0) {
01584
01585
01586
01587 recordset = drms_open_records(drms_env, ds, &status);
01588 } else {
01589
01590 recordset = drms_open_nrecords(drms_env, ds, max_recs, &status);
01591 }
01592 if (status == DRMS_ERROR_QUERYFAILED) {
01593 emsg = (char *) DB_GetErrmsg(drms_env->session->db_handle);
01594 if (!emsg) emsg = "problem with database query";
01595 SKIP_REQUEST(emsg);
01596 } else if (status == DRMS_QUERY_TRUNCATED) {
01597 SKIP_REQUEST("Query truncated; too many records?");
01598 }
01599 if (status != 0 || (!recordset))
01600 SKIP_REQUEST("series not found.");
01601 nrecs = recordset->n;
01602 if (nrecs == 0) {
01603
01604 json_insert_pair_into_object(jroot, "count", json_new_number("0"));
01605 query_bag.status = 0;
01606 export_json(&server_bag, &query_bag, jroot);
01607
01608 goto loop_end;
01609 }
01610
01611
01612
01613
01614
01615
01616
01617
01618
01619
01620
01621 requireSUMinfo = 0;
01622 nkeys = 0;
01623
01624 if (strlen(keylist) > 0) {
01625
01626 char *thiskey;
01627 for (thiskey = strtok(keylist, ","); thiskey; thiskey = strtok(NULL, ",")) {
01628 if (strcmp(thiskey,"**NONE**") == 0) {
01629 nkeys = 0;
01630 break;
01631 } else if (strcmp(thiskey, "**ALL**") == 0) {
01632 DRMS_Keyword_t *key;
01633 HIterator_t *last = NULL;
01634 while ((key = drms_record_nextkey(template, &last, 0)))
01635 if (!drms_keyword_getimplicit(key))
01636 keys[nkeys++] = strdup(key->info->name);
01637 if (last) hiter_destroy(&last);
01638 } else {
01639 keys[nkeys++] = strdup(thiskey);
01640 }
01641 if (strcmp(thiskey, "*size*" ) == 0 ||
01642 strcmp(thiskey, "*online*" ) == 0 ||
01643 strcmp(thiskey, "*retain*" ) == 0 ||
01644 strcmp(thiskey, "*archive*") == 0)
01645 requireSUMinfo = 1;
01646 }
01647 }
01648
01649 if (nkeys) {
01650 keyvals = (json_t **) malloc(nkeys * sizeof(json_t *));
01651 for (ikey = 0; ikey < nkeys; ikey++)
01652 keyvals[ikey] = json_new_array();
01653 } else {
01654 keyvals = NULL;
01655 }
01656
01657
01658 nsegs = 0;
01659 if (strlen(seglist) > 0) {
01660 char *thisseg;
01661 for (thisseg = strtok(seglist, ","); thisseg; thisseg = strtok(NULL, ",")) {
01662 if (strcmp(thisseg, "**NONE**") == 0) {
01663 nsegs = 0;
01664 break;
01665 } else if (strcmp(thisseg, "**ALL**") == 0) {
01666 DRMS_Segment_t *seg;
01667 HIterator_t *last = NULL;
01668 while ((seg = drms_record_nextseg(template, &last, 0)))
01669 segs[nsegs++] = strdup(seg->info->name);
01670 if (last) hiter_destroy(&last);
01671 } else {
01672 segs[nsegs++] = strdup(thisseg);
01673 }
01674 }
01675 }
01676
01677 if (nsegs) {
01678 segvals = (json_t **) malloc(nsegs * sizeof(json_t *));
01679 segdims = (json_t **) malloc(nsegs * sizeof(json_t *));
01680 segcparms = (json_t **) malloc(nsegs * sizeof(json_t *));
01681 segbzeros = (json_t **) malloc(nsegs * sizeof(json_t *));
01682 segbscales = (json_t **) malloc(nsegs * sizeof(json_t *));
01683 for (iseg=0; iseg < nsegs; iseg++) {
01684 segvals[iseg] = json_new_array();
01685 segdims[iseg] = json_new_array();
01686 segcparms[iseg] = json_new_array();
01687 segbzeros[iseg] = json_new_array();
01688 segbscales[iseg] = json_new_array();
01689 }
01690 } else {
01691 segvals = segdims = segcparms = segbzeros = segbscales = NULL;
01692 }
01693
01694
01695 nlinks = 0;
01696 if (strlen(linklist) > 0) {
01697 char *thislink;
01698 for (thislink = strtok(linklist, ","); thislink; thislink = strtok(NULL,",")) {
01699 if (strcmp(thislink, "**NONE**") == 0) {
01700 nlinks = 0;
01701 break;
01702 } else if (strcmp(thislink, "**ALL**") == 0) {
01703 DRMS_Link_t *link;
01704 HIterator_t *last = NULL;
01705 while ((link = drms_record_nextlink(template, &last)))
01706 links[nlinks++] = strdup(link->info->name);
01707 if (last)
01708 hiter_destroy(&last);
01709 } else {
01710 links[nlinks++] = strdup(thislink);
01711 }
01712 }
01713 }
01714
01715 if (nlinks) {
01716 linkvals = (json_t **) malloc(nlinks * sizeof(json_t *));
01717 for (ilink=0; ilink<nlinks; ilink++)
01718 linkvals[ilink] = json_new_array();
01719 } else {
01720 linkvals = NULL;
01721 }
01722
01723
01724 if (requireSUMinfo) {
01725 drms_record_getinfo(recordset);
01726 }
01727
01728
01729 for (irec = 0; irec < nrecs; irec++) {
01730 char recquery[DRMS_MAXQUERYLEN];
01731 char *jsonquery;
01732 json_t *recobj;
01733
01734 if (max_recs == 0) {
01735 rec = drms_recordset_fetchnext(drms_env, recordset, &status, &cstat, NULL);
01736 } else {
01737 rec = recordset->records[irec];
01738 status = DRMS_SUCCESS;
01739 }
01740
01741 if (wantRecInfo) {
01742 drms_sprint_rec_query(recquery,rec);
01743 jsonquery = string_to_json(recquery);
01744 recobj = json_new_object();
01745 json_insert_pair_into_object(recobj, "name", json_new_string(jsonquery));
01746 free(jsonquery);
01747 }
01748
01749 for (ikey=0; ikey<nkeys; ikey++) {
01750 DRMS_Keyword_t *rec_key_ikey;
01751 json_t *thiskeyval = keyvals[ikey];
01752 json_t *val;
01753 char rawval[20000];
01754 char *jsonval;
01755
01756 if (strcmp(keys[ikey],"*recnum*") == 0)
01757 {
01758 sprintf(rawval,"%lld",rec->recnum);
01759 val = json_new_number(rawval);
01760 }
01761 else if (strcmp(keys[ikey],"*sunum*") == 0)
01762 {
01763 sprintf(rawval,"%lld",rec->sunum);
01764 val = json_new_number(rawval);
01765 }
01766 else if (strcmp(keys[ikey],"*size*") == 0)
01767 {
01768 char size[40];
01769 SUM_info_t *sinfo = rec->suinfo;
01770 if (!sinfo)
01771 val = json_new_string("NA");
01772 else
01773 {
01774 sprintf(size,"%.0f", sinfo->bytes);
01775 val = json_new_string(size);
01776 }
01777 }
01778 else if (strcmp(keys[ikey],"*online*") == 0)
01779 {
01780 SUM_info_t *sinfo = rec->suinfo;
01781 if (!sinfo)
01782 val = json_new_string("NA");
01783 else
01784 val = json_new_string(sinfo->online_status);
01785 }
01786 else if (strcmp(keys[ikey],"*retain*") == 0)
01787 {
01788 SUM_info_t *sinfo = rec->suinfo;
01789 if (!sinfo)
01790 val = json_new_string("NA");
01791 else
01792 {
01793 int y,m,d;
01794 char retain[20];
01795 if (strcmp("N", sinfo->online_status) == 0)
01796 val = json_new_string("N/A");
01797 else
01798 {
01799 sscanf(sinfo->effective_date, "%4d%2d%2d", &y,&m,&d);
01800 sprintf(retain, "%4d.%02d.%02d",y,m,d);
01801 val = json_new_string(retain);
01802 }
01803 }
01804 }
01805 else if (strcmp(keys[ikey],"*archive*") == 0)
01806 {
01807 SUM_info_t *sinfo = rec->suinfo;
01808 if (!sinfo)
01809 val = json_new_string("NA");
01810 else
01811 {
01812 if(sinfo->pa_status == DAAP && sinfo->pa_substatus == DAADP)
01813 val = json_new_string("Pending");
01814 else
01815 val = json_new_string(sinfo->archive_status);
01816 }
01817 }
01818 else if (strcmp(keys[ikey], "*recdir*") == 0)
01819 {
01820 char path[DRMS_MAXPATHLEN];
01821 if (!record_set_staged)
01822 {
01823 drms_stage_records(recordset, 0, 0);
01824 record_set_staged = 1;
01825 }
01826 drms_record_directory (rec, path, 0);
01827 jsonval = string_to_json(path);
01828 val = json_new_string(jsonval);
01829 free(jsonval);
01830 }
01831 else if (strcmp(keys[ikey], "*dirmtime*") == 0)
01832 {
01833 struct stat buf;
01834 char path[DRMS_MAXPATHLEN];
01835 char timebuf[100];
01836 if (!record_set_staged)
01837 {
01838 drms_stage_records(recordset, 0, 0);
01839 record_set_staged = 1;
01840 }
01841 drms_record_directory (rec, path, 0);
01842 stat(path, &buf);
01843 sprint_ut(timebuf, buf.st_mtime + UNIX_EPOCH);
01844 jsonval = string_to_json(timebuf);
01845 val = json_new_string(jsonval);
01846 free(jsonval);
01847 }
01848 else if (strcmp(keys[ikey], "*logdir*") == 0)
01849 {
01850 char *logdir = drms_record_getlogdir(rec);
01851 if (logdir)
01852 {
01853 jsonval = string_to_json(logdir);
01854 free(logdir);
01855 }
01856 else
01857 jsonval = string_to_json("NO LOG");
01858 val = json_new_string(jsonval);
01859 free(jsonval);
01860 }
01861 else {
01862 rec_key_ikey = drms_keyword_lookup (rec, keys[ikey], 1);
01863 if (!rec_key_ikey) {
01864 V_printf(-1, "", "error, keyword not in series: %s\n",keys[ikey]);
01865
01866 jsonval = string_to_json("Invalid KeyLink");
01867 } else if (drms_ismissing_keyval(rec_key_ikey) &&
01868 strcmp(keys[ikey],"QUALITY") != 0) {
01869 jsonval = string_to_json("MISSING");
01870 } else {
01871 drms_keyword_snprintfval(rec_key_ikey, rawval, sizeof(rawval));
01872
01873 jsonval = string_to_json(rawval);
01874 }
01875 val = json_new_string(jsonval);
01876 free(jsonval);
01877 }
01878 json_insert_child(thiskeyval, val);
01879 }
01880
01881
01882 int online = 0;
01883 for (iseg=0; iseg<nsegs; iseg++)
01884 {
01885 DRMS_Segment_t *rec_seg_iseg = drms_segment_lookup(rec, segs[iseg]);
01886 char *jsonpath;
01887 char *jsondims;
01888 char path[DRMS_MAXPATHLEN];
01889 json_t *thissegval = segvals[iseg];
01890 json_t *thissegdim = segdims[iseg];
01891 json_t *thissegcparms = segcparms[iseg];
01892 json_t *thissegbzero = segbzeros[iseg];
01893 json_t *thissegbscale = segbscales[iseg];
01894 if (rec_seg_iseg) {
01895 int iaxis;
01896 int naxis = rec_seg_iseg->info->naxis;
01897 char dims[100], dimval[20];
01898
01899
01900 if (!record_set_staged)
01901 {
01902 drms_stage_records(recordset, 0, 0);
01903 record_set_staged = 1;
01904 }
01905 drms_record_directory (rec_seg_iseg->record, path, 0);
01906
01907
01908 if (!*path) {
01909 strcpy(path, "NoDataDirectory");
01910 } else {
01911 strncat(path, "/", DRMS_MAXPATHLEN);
01912 strncat(path, rec_seg_iseg->filename, DRMS_MAXPATHLEN);
01913 }
01914 jsonpath = string_to_json(path);
01915 json_insert_child(thissegval, json_new_string(jsonpath));
01916 free(jsonpath);
01917 online = strncmp(path, "/SUM", 4) == 0;
01918
01919
01920 dims[0] = '\0';
01921 for (iaxis=0; iaxis<naxis; iaxis++) {
01922 if (iaxis)
01923 strcat(dims, "x");
01924 sprintf(dimval,"%d",rec_seg_iseg->axis[iaxis]);
01925 strcat(dims, dimval);
01926 }
01927 jsondims = string_to_json(dims);
01928 json_insert_child(thissegdim, json_new_string(jsondims));
01929 free(jsondims);
01930
01931
01932
01933 char keybuf[DRMS_MAXKEYNAMELEN];
01934 DRMS_Keyword_t *anckey = NULL;
01935 char *jsonkeyval = NULL;
01936
01937 if (strlen(rec_seg_iseg->cparms)) {
01938 jsonkeyval = string_to_json(rec_seg_iseg->cparms);
01939 json_insert_child(thissegcparms, json_new_string(jsonkeyval));
01940 free(jsonkeyval);
01941 }
01942
01943 snprintf(keybuf, sizeof(keybuf), "%s_bzero", segs[iseg]);
01944 anckey = drms_keyword_lookup(rec, keybuf, 1);
01945 if (anckey) {
01946 drms_keyword_snprintfval(anckey, keybuf, sizeof(keybuf));
01947
01948 jsonkeyval = string_to_json(keybuf);
01949 json_insert_child(thissegbzero, json_new_string(jsonkeyval));
01950 free(jsonkeyval);
01951 }
01952
01953 anckey = NULL;
01954 snprintf(keybuf, sizeof(keybuf), "%s_bscale", segs[iseg]);
01955 anckey = drms_keyword_lookup(rec, keybuf, 1);
01956 if (anckey) {
01957 drms_keyword_snprintfval(anckey, keybuf, sizeof(keybuf));
01958
01959 jsonkeyval = string_to_json(keybuf);
01960 json_insert_child(thissegbscale, json_new_string(jsonkeyval));
01961 free(jsonkeyval);
01962 }
01963 } else {
01964 char *nosegmsg = "InvalidSegName";
01965 DRMS_Segment_t *segment = hcon_lookup_lower(&rec->segments, segs[iseg]);
01966 if (segment && segment->info->islink)
01967 nosegmsg = "BadSegLink";
01968 jsonpath = string_to_json(nosegmsg);
01969 json_insert_child(thissegval, json_new_string(jsonpath));
01970 free(jsonpath);
01971 jsondims = string_to_json("NA");
01972 json_insert_child(thissegdim, json_new_string(jsondims));
01973 free(jsondims);
01974 }
01975 }
01976
01977
01978 for (ilink=0; ilink<nlinks; ilink++) {
01979 DRMS_Link_t *rec_link = hcon_lookup_lower (&rec->links, links[ilink]);
01980 DRMS_Record_t *linked_rec = drms_link_follow(rec, links[ilink], &status);
01981 char linkquery[DRMS_MAXQUERYLEN];
01982 if (linked_rec) {
01983 if (rec_link->info->type == DYNAMIC_LINK)
01984 drms_sprint_rec_query(linkquery, linked_rec);
01985 else
01986 sprintf(linkquery, "%s[:#%lld]",
01987 linked_rec->seriesinfo->seriesname,
01988 linked_rec->recnum);
01989 drms_close_record(linked_rec, DRMS_FREE_RECORD);
01990
01991 json_t *thislinkval = linkvals[ilink];
01992 json_insert_child(thislinkval, json_new_string(linkquery));
01993 } else {
01994 json_t *thislinkval = linkvals[ilink];
01995 json_insert_child(thislinkval, json_new_string("Invalid_Link"));
01996 }
01997 }
01998
01999
02000 if (wantRecInfo) {
02001 recinfo = json_new_array();
02002 json_insert_pair_into_object(recobj, "online", json_new_number(online ? "1" : "0"));
02003 json_insert_child(recinfo, recobj);
02004 }
02005 }
02006
02007
02008 if (wantRecInfo)
02009 json_insert_pair_into_object(jroot, "recinfo", recinfo);
02010
02011 json_keywords = json_new_array();
02012 for (ikey=0; ikey<nkeys; ikey++)
02013 {
02014 json_t *keyname = json_new_string(keys[ikey]);
02015 json_t *keyobj = json_new_object();
02016 json_insert_pair_into_object(keyobj, "name", keyname);
02017 json_insert_pair_into_object(keyobj, "values", keyvals[ikey]);
02018 json_insert_child(json_keywords, keyobj);
02019 }
02020 json_insert_pair_into_object(jroot, "keywords", json_keywords);
02021
02022 json_segments = json_new_array();
02023 for (iseg=0; iseg<nsegs; iseg++)
02024 {
02025 json_t *segname = json_new_string(segs[iseg]);
02026 json_t *segobj = json_new_object();
02027 json_insert_pair_into_object(segobj, "name", segname);
02028 json_insert_pair_into_object(segobj, "values", segvals[iseg]);
02029 json_insert_pair_into_object(segobj, "dims", segdims[iseg]);
02030 json_insert_pair_into_object(segobj, "cparms", segcparms[iseg]);
02031 json_insert_pair_into_object(segobj, "bzeros", segbzeros[iseg]);
02032 json_insert_pair_into_object(segobj, "bscales", segbscales[iseg]);
02033 json_insert_child(json_segments, segobj);
02034 }
02035 json_insert_pair_into_object(jroot, "segments", json_segments);
02036
02037
02038 json_links = json_new_array();
02039 for (ilink=0; ilink<nlinks; ilink++)
02040 {
02041 json_t *linkname = json_new_string(links[ilink]);
02042 json_t *linkobj = json_new_object();
02043 json_insert_pair_into_object(linkobj, "name", linkname);
02044 json_insert_pair_into_object(linkobj, "values", linkvals[ilink]);
02045 json_insert_child(json_links, linkobj);
02046 }
02047 json_insert_pair_into_object(jroot, "links", json_links);
02048
02049 drms_close_records(recordset, DRMS_FREE_RECORD);
02050
02051
02052 snprintf(count, sizeof(count), "%d", nrecs);
02053 json_insert_pair_into_object(jroot, "count", json_new_number(count));
02054
02055 query_bag.status = 0;
02056 export_json(&server_bag, &query_bag, jroot);
02057
02058
02059 if (nlinks > 0)
02060 free(linkvals);
02061 if (nkeys > 0)
02062 free(keyvals);
02063 if (nsegs > 0) {
02064 free(segvals);
02065 free(segdims);
02066 free(segcparms);
02067 free(segbzeros);
02068 free(segbscales);
02069 }
02070 for (iseg = 0; iseg < nsegs; iseg++)
02071 free(segs[iseg]);
02072 for (ikey = 0; ikey < nkeys; ikey++)
02073 free(keys[ikey]);
02074 for (ilink = 0; ilink < nlinks; ilink++)
02075 free(links[ilink]);
02076 }
02077
02078
02079
02080
02081 else {
02082
02083 SKIP_REQUEST("unknown operator, skipping.");
02084 }
02085
02086 query_bag.status = 0;
02087
02088 loop_end:
02089
02090 if (jroot)
02091 json_free_value(&jroot);
02092 }
02093
02094 msg = server_teardown(&server_bag);
02095 if (msg)
02096 V_printf(-1, "", "Error closing server: %s\n", msg);
02097
02098 OnSIGINT_init(0, wantMimeHead);
02099
02100 return 0;
02101 }
02102
02103
02104