00001
00002
00003 #include "jsmn.h"
00004 #include "jsoc_main.h"
00005 #include "tasrw.h"
00006
00007 char *module_name = "rawingest";
00008
00009 #define kMaxJSONTokens 65536
00010 #define kRecChunkSz 128
00011
00012 #define kArgNotSpec "not specified"
00013 #define kArgSeries "series"
00014 #define kArgSegment "segment"
00015 #define kArgTest "t"
00016 #define kArgSetDate "d"
00017
00018 enum RIStatus_enum
00019 {
00020 kErrNone = 0,
00021 kErrNoMemory,
00022 kErrInvalidJSON,
00023 kErrHugeJSON,
00024 kErrIngest
00025 };
00026
00027 typedef enum RIStatus_enum RIStatus_t;
00028
00029 static int gNextTokIndex = 0;
00030
00031 ModuleArgs_t module_args[] =
00032 {
00033 {ARG_STRING,
00034 kArgSeries,
00035 NULL,
00036 "The series to which records are being added.",
00037 NULL},
00038
00039 {ARG_STRING,
00040 kArgSegment,
00041 NULL,
00042 "The segment that will contain the data files being ingested.",
00043 NULL},
00044
00045 {ARG_STRING,
00046 kArgTest,
00047 kArgNotSpec,
00048 "Test argument - if set, then this will be the path to a file containing json for testing.",
00049 NULL},
00050
00051 {ARG_FLAG,
00052 kArgSetDate,
00053 NULL,
00054 "If set, then this flag will cause the DATE keyword, if present, to be updated in every record created.",
00055 NULL},
00056
00057 {ARG_END}
00058 };
00059
00060 static int GetNextTokIndex()
00061 {
00062 return gNextTokIndex;
00063 }
00064
00065 static void SetNextTokIndex(int index)
00066 {
00067 gNextTokIndex = index;
00068 }
00069
00070 static int GetKeyAndValue(DRMS_Record_t *orec, const char *json, jsmntok_t *tokens, int ntoks, int itok, char **key, DRMS_Value_t **val, int *nchild)
00071 {
00072 jsmntok_t *keytok = NULL;
00073 jsmntok_t *valtok = NULL;
00074 size_t len;
00075 char keyname[DRMS_MAXKEYNAMELEN];
00076 char *keyval = NULL;
00077 DRMS_Keyword_t *drmskey = NULL;
00078 DRMS_Type_t type;
00079 int consumed;
00080 int istat;
00081
00082 keytok = &(tokens[itok]);
00083 valtok = &(tokens[itok + 1]);
00084 if ((keytok->start == 0 && keytok->end == 0) || itok == ntoks - 1)
00085 {
00086
00087
00088
00089
00090 }
00091
00092 if (keytok->type != JSMN_STRING)
00093 {
00094
00095 return 1;
00096 }
00097
00098 if (valtok->type == JSMN_ARRAY)
00099 {
00100
00101 }
00102 else if (valtok->type == JSMN_OBJECT)
00103 {
00104
00105
00106
00107
00108
00109 len = keytok->end - keytok->start;
00110 keyval = calloc(1, len + 1);
00111
00112 if (*keyval)
00113 {
00114 return 1;
00115 }
00116
00117 memcpy(keyval, &json[keytok->start], len);
00118 keyval[len] = '\0';
00119 *key = keyval;
00120
00121 *val = NULL;
00122 *nchild = valtok->size;
00123 }
00124 else
00125 {
00126
00127 len = keytok->end - keytok->start;
00128 memcpy(keyname, &json[keytok->start], len);
00129 keyname[len] = '\0';
00130 *key = strdup(keyname);
00131
00132 if (!key)
00133 {
00134 return 1;
00135 }
00136
00137
00138
00139
00140 len = valtok->end - valtok->start;
00141 keyval = calloc(1, len + 1);
00142
00143 if (!keyval)
00144 {
00145 return 1;
00146 }
00147
00148 memcpy(keyval, &json[valtok->start], len);
00149 keyval[len] = '\0';
00150
00151
00152
00153
00154
00155 drmskey = drms_keyword_lookup(orec, keyname, 0);
00156
00157 if (drmskey)
00158 {
00159 type = drms_keyword_gettype(drmskey);
00160 }
00161 else
00162 {
00163 type = DRMS_TYPE_STRING;
00164 }
00165
00166 *val = calloc(1, sizeof(DRMS_Value_t));
00167
00168 if (!*val)
00169 {
00170 return 1;
00171 }
00172
00173 (*val)->type = type;
00174
00175 if (type == DRMS_TYPE_CHAR ||
00176 type == DRMS_TYPE_SHORT ||
00177 type == DRMS_TYPE_INT ||
00178 type == DRMS_TYPE_LONGLONG)
00179 {
00180 (*val)->value.longlong_val = drms_types_strtoll(keyval, type, &consumed, &istat);
00181 if (istat)
00182 {
00183 return 1;
00184 }
00185 }
00186 else
00187 {
00188 drms_strval(type, &((*val)->value), keyval);
00189 }
00190
00191 *nchild = 0;
00192 }
00193
00194 return 0;
00195 }
00196
00197 static int GetNextKeyAndValue(DRMS_Record_t *orec, const char *json, jsmntok_t *tokens, int ntoks, char **key, DRMS_Value_t **val, int *nchild)
00198 {
00199 int itok = GetNextTokIndex();
00200 int ret = 0;
00201
00202 ret = GetKeyAndValue(orec, json, tokens, ntoks, itok, key, val, nchild);
00203 if (ret == 0)
00204 {
00205 SetNextTokIndex(itok + 2);
00206 }
00207
00208 return ret;
00209 }
00210
00211 static int IngestRawFile(DRMS_Record_t *orec, const char *segname, const char *infile)
00212 {
00213 DRMS_Segment_t *seg = NULL;
00214 struct stat stBuf;
00215 char outfile[DRMS_MAXPATHLEN] = {0};
00216 char key[DRMS_MAXKEYNAMELEN];
00217 int istat;
00218 int rv;
00219
00220 rv = 0;
00221
00222
00223 if (stat(infile, &stBuf) == -1)
00224 {
00225 rv = 1;
00226 }
00227 else
00228 {
00229
00230 seg = drms_segment_lookup(orec, segname);
00231
00232 if (!seg)
00233 {
00234 rv = 1;
00235 }
00236 else
00237 {
00238 if (seg->info->protocol == DRMS_GENERIC)
00239 {
00240
00241
00242 const char *filename = NULL;
00243 filename = rindex(infile, '/');
00244
00245 if (filename)
00246 {
00247 filename++;
00248 }
00249 else
00250 {
00251 filename = infile;
00252 }
00253
00254 CHECKSNPRINTF(snprintf(seg->filename, DRMS_MAXSEGFILENAME, "%s", filename), DRMS_MAXSEGFILENAME);
00255 drms_segment_filename(seg, outfile);
00256 }
00257 else if (seg->info->protocol == DRMS_BINARY ||
00258 seg->info->protocol == DRMS_BINZIP ||
00259 seg->info->protocol == DRMS_FITZ ||
00260 seg->info->protocol == DRMS_FITS)
00261 {
00262 int naxis;
00263 int iaxis;
00264 fitsfile *fptr = NULL;
00265 CFITSIO_IMAGE_INFO imginfo;
00266 int fileCreated = 0;
00267
00268
00269 drms_segment_filename(seg, outfile);
00270
00271
00272 fptr = fitsrw_getfptr(0, infile, 0, &istat, &fileCreated);
00273
00274 XASSERT(!fileCreated);
00275
00276 if (fptr && !istat)
00277 {
00278 if (fitsrw_getfpinfo_ext(fptr, &imginfo))
00279 {
00280 fprintf(stderr, "Unable to retrieve file info.\n");
00281 rv = 1;
00282 }
00283
00284 }
00285 else
00286 {
00287 fprintf(stderr, "Unable to read file %s.\n", infile);
00288 rv = 1;
00289 }
00290
00291 if (fptr)
00292 {
00293 fitsrw_closefptr(0, fptr);
00294 }
00295
00296 if (rv == 0)
00297 {
00298
00299
00300 snprintf(key, sizeof(key), "%s_bzero", seg->info->name);
00301
00302 if (imginfo.bitpix == BYTE_IMG)
00303 {
00304
00305
00306
00307
00308
00309
00310 drms_setkey_double(seg->record, key, 128 * imginfo.bscale + imginfo.bzero);
00311 }
00312 else
00313 {
00314 drms_setkey_double(seg->record, key, imginfo.bzero);
00315 }
00316
00317 snprintf(key, sizeof(key), "%s_bscale", seg->info->name);
00318 drms_setkey_double(seg->record, key, imginfo.bscale);
00319
00320 if (seg->info->protocol == DRMS_FITS)
00321 {
00322 if (seg->info->scope == DRMS_VARDIM)
00323 {
00324
00325 naxis = imginfo.naxis;
00326
00327
00328 seg->info->naxis = naxis;
00329
00330 for (iaxis = 0; iaxis < naxis; iaxis++)
00331 {
00332 seg->axis[iaxis] = imginfo.naxes[iaxis];
00333 }
00334 }
00335 else
00336 {
00337
00338 }
00339 }
00340 else
00341 {
00342
00343 }
00344 }
00345 }
00346 else
00347 {
00348 fprintf(stderr, "Unsupported segment protocol %d.\n", seg->info->protocol);
00349 rv = 1;
00350 }
00351 }
00352 }
00353
00354 if (rv == 0)
00355 {
00356 if (*outfile)
00357 {
00358 if (copyfile(infile, outfile) != 0)
00359 {
00360 fprintf(stderr, "failure copying file '%s' to '%s'.\n", infile, outfile);
00361 rv = 1;
00362 }
00363 }
00364 }
00365
00366 return rv;
00367 }
00368
00369
00370 int SetKeyValues(DRMS_Env_t *env,
00371 const char *series,
00372 const char *segname,
00373 DRMS_RecordSet_t **chunk,
00374 DRMS_RecordSet_t *final,
00375 char **pkeys,
00376 int npkeys,
00377 HContainer_t *pkeyvals,
00378 int pklev,
00379 const char *json,
00380 jsmntok_t *tokens,
00381 int ntoks,
00382 int ntoproc,
00383 int *nprocessed,
00384 int * hasnpk,
00385 int *irec,
00386 int setdate)
00387 {
00388 char *key = NULL;
00389 DRMS_Value_t *val = NULL;
00390 DRMS_Record_t *orec = NULL;
00391 DRMS_Value_t pkeyval;
00392 DRMS_Value_t *ppkeyval = NULL;
00393 int nproc;
00394 int nchild;
00395 int istat;
00396 int nelem;
00397 int ielem;
00398 int subnchild;
00399 HIterator_t *hit = NULL;
00400 int isubirec;
00401 int rv;
00402
00403 rv = 0;
00404
00405 *nprocessed = 0;
00406 *hasnpk = 0;
00407 nproc = 0;
00408 orec = (*chunk)->records[*irec];
00409
00410 while (nproc != ntoproc)
00411 {
00412
00413
00414 if (GetNextKeyAndValue(orec, json, tokens, ntoks, &key, &val, &nchild))
00415 {
00416 rv = 1;
00417 break;
00418 }
00419
00420 if (!key)
00421 {
00422
00423 break;
00424 }
00425
00426 if (!val)
00427 {
00428
00429 DRMS_Keyword_t *drmskey = NULL;
00430 DRMS_Type_t keytype;
00431 int consumed;
00432 int subnproc;
00433 int subhasnpk;
00434 const char *pkeyname = NULL;
00435 char pkeynamelc[DRMS_MAXKEYNAMELEN];
00436 DRMS_RecordSet_t *inrs = NULL;
00437 DRMS_Record_t *inrec = NULL;
00438 char *spec = NULL;
00439 char *stmp = NULL;
00440 size_t szspec = 256;
00441 DRMS_Segment_t *segin = NULL;
00442 DRMS_Link_t *linkin = NULL;
00443 DRMS_Record_t *lrec = NULL;
00444 char infile[DRMS_MAXPATHLEN];
00445
00446
00447
00448 if (strcasecmp(key, "keys") == 0)
00449 {
00450 *hasnpk = 1;
00451
00452
00453
00454
00455
00456
00457
00458 hit = hiter_create(pkeyvals);
00459 if (!hit)
00460 {
00461 rv = 1;
00462 break;
00463 }
00464
00465 spec = calloc(1, szspec * sizeof(char));
00466
00467 if (!spec)
00468 {
00469 rv = 1;
00470 break;
00471 }
00472
00473 spec = base_strcatalloc(spec, series, &szspec);
00474
00475 while ((ppkeyval = hiter_extgetnext(hit, &pkeyname)) != NULL)
00476 {
00477 spec = base_strcatalloc(spec, "[", &szspec);
00478 stmp = drms2string(ppkeyval->type, &(ppkeyval->value), &istat);
00479 if (istat)
00480 {
00481 rv = 1;
00482 break;
00483 }
00484
00485 spec = base_strcatalloc(spec, stmp, &szspec);
00486 free(stmp);
00487 spec = base_strcatalloc(spec, "]", &szspec);
00488
00489
00490 drms_setkey_p(orec, pkeyname, ppkeyval);
00491 }
00492
00493 hiter_destroy(&hit);
00494
00495 if (istat)
00496 {
00497 rv = 1;
00498 break;
00499 }
00500
00501 inrs = drms_open_records(env, spec, &istat);
00502 free(spec);
00503
00504 inrec = NULL;
00505 if (!istat && inrs && inrs->n == 1)
00506 {
00507 inrec = inrs->records[0];
00508
00509 if (drms_copykeys(orec, inrec, 0, kDRMS_KeyClass_Explicit))
00510 {
00511 rv = 1;
00512 break;
00513 }
00514
00515
00516 if (setdate)
00517 {
00518 drms_keyword_setdate(orec);
00519 }
00520 }
00521
00522 if (inrec)
00523 {
00524
00525 while ((segin = drms_record_nextseg(inrec, &hit, 0)) != NULL)
00526 {
00527 if (segin->info->islink || strcasecmp(segin->info->name, segname) == 0)
00528 {
00529
00530
00531
00532 continue;
00533 }
00534
00535 if (segin->record->sunum != -1LL)
00536 {
00537
00538
00539 drms_segment_filename(segin, infile);
00540
00541 if (IngestRawFile(orec, segin->info->name, infile) != 0)
00542 {
00543 rv = 1;
00544 break;
00545 }
00546 }
00547 }
00548
00549 if (rv)
00550 {
00551 break;
00552 }
00553
00554 hiter_destroy(&hit);
00555
00556
00557 while ((linkin = drms_record_nextlink(inrec, &hit)) != NULL)
00558 {
00559
00560
00561 if (hcon_lookup_lower(&orec->links, linkin->info->name))
00562 {
00563
00564 lrec = drms_link_follow(inrec, linkin->info->name, &istat);
00565
00566 if (istat == DRMS_SUCCESS && lrec)
00567 {
00568 if (drms_link_set(linkin->info->name, orec, lrec) != DRMS_SUCCESS)
00569 {
00570 fprintf(stderr, "Failure setting output record's link '%s'.\n", linkin->info->name);
00571 rv = 1;
00572 break;
00573 }
00574 }
00575 }
00576 }
00577
00578 hiter_destroy(&hit);
00579 }
00580
00581 drms_close_records(inrs, DRMS_FREE_RECORD);
00582
00583 if (rv)
00584 {
00585 break;
00586 }
00587
00588
00589 nelem = nchild / 2;
00590 for (ielem = 0; ielem < nelem; ielem++)
00591 {
00592 GetNextKeyAndValue(orec, json, tokens, ntoks, &key, &val, &subnchild);
00593 drms_setkey_p(orec, key, val);
00594 }
00595
00596
00597
00598 nproc += 2;
00599 }
00600 else
00601 {
00602
00603 drmskey = drms_keyword_lookup(orec, pkeys[pklev], 0);
00604
00605 if (!drmskey)
00606 {
00607 rv = 1;
00608 break;
00609 }
00610
00611 keytype = drms_keyword_gettype(drmskey);
00612 pkeyval.type = keytype;
00613
00614 if (keytype == DRMS_TYPE_CHAR ||
00615 keytype == DRMS_TYPE_SHORT ||
00616 keytype == DRMS_TYPE_INT ||
00617 keytype == DRMS_TYPE_LONGLONG)
00618 {
00619 pkeyval.value.longlong_val = drms_types_strtoll(key, keytype, &consumed, &istat);
00620 if (istat)
00621 {
00622 rv = 1;
00623 break;
00624 }
00625 }
00626 else
00627 {
00628 drms_strval(keytype, &(pkeyval.value), key);
00629 }
00630
00631
00632
00633
00634 snprintf(pkeynamelc, sizeof(pkeynamelc), "%s", pkeys[pklev]);
00635 strtolower(pkeynamelc);
00636
00637
00638 hcon_remove(pkeyvals, pkeynamelc);
00639 hcon_insert_lower(pkeyvals, pkeys[pklev], &pkeyval);
00640
00641 if (SetKeyValues(env, series, segname, chunk, final, pkeys, npkeys, pkeyvals, pklev + 1, json, tokens, ntoks, nchild, &subnproc, &subhasnpk, irec, setdate))
00642 {
00643 rv = 1;
00644 break;
00645 }
00646
00647 nproc += 2;
00648
00649
00650
00651
00652 if (subhasnpk)
00653 {
00654 (*irec)++;
00655
00656 if (*irec > (*chunk)->n - 1)
00657 {
00658 int nrecs = (*chunk)->n;
00659
00660
00661 for (isubirec = 0; isubirec < nrecs; isubirec++)
00662 {
00663 drms_merge_record(final, (*chunk)->records[isubirec]);
00664 (*chunk)->records[isubirec] = NULL;
00665 }
00666
00667
00668 drms_close_records(*chunk, DRMS_FREE_RECORD);
00669 *chunk = NULL;
00670
00671
00672 *irec = 0;
00673
00674 if (nproc == ntoproc)
00675 {
00676
00677 break;
00678 }
00679
00680 *chunk = drms_create_records(env, kRecChunkSz, series, DRMS_PERMANENT, &istat);
00681 }
00682
00683 orec = (*chunk)->records[*irec];
00684 }
00685 }
00686 }
00687 else
00688 {
00689
00690 *hasnpk = 1;
00691
00692 if (strcasecmp(key, "file") == 0)
00693 {
00694
00695 if (IngestRawFile(orec, segname, val->value.string_val) != 0)
00696 {
00697 rv = 1;
00698 break;
00699 }
00700
00701 nproc += 2;
00702 }
00703 else
00704 {
00705
00706
00707
00708 }
00709 }
00710 }
00711
00712 if (nprocessed)
00713 {
00714 *nprocessed = nproc;
00715 }
00716
00717 if (key)
00718 {
00719 free(key);
00720 }
00721
00722 if (val)
00723 {
00724 if (val->type == DRMS_TYPE_STRING)
00725 {
00726 free(val->value.string_val);
00727 }
00728
00729 free(val);
00730 }
00731
00732 return rv;
00733 }
00734
00735 int DoIt(void)
00736 {
00737
00738 FILE *strm = NULL;
00739 char line[LINE_MAX];
00740 char *json = NULL;
00741 size_t szjson = 2048;
00742 jsmn_parser parser;
00743 jsmntok_t *tokens = NULL;
00744 size_t szjstokens = 512;
00745 int res;
00746 const char *series = NULL;
00747 const char *segname = NULL;
00748 const char *testfile = NULL;
00749 int setdate;
00750 int istat;
00751 RIStatus_t rv;
00752
00753 rv = kErrNone;
00754
00755 json = calloc(1, szjson);
00756
00757 if (json)
00758 {
00759 series = cmdparams_get_str(&cmdparams, kArgSeries, &istat);
00760 segname = cmdparams_get_str(&cmdparams, kArgSegment, &istat);
00761 testfile = cmdparams_get_str(&cmdparams, kArgTest, &istat);
00762 setdate = cmdparams_isflagset(&cmdparams, kArgSetDate);
00763
00764 if (strcmp(testfile, kArgNotSpec) != 0)
00765 {
00766 strm = fopen(testfile, "r");
00767 }
00768 else
00769 {
00770 strm = stdin;
00771 }
00772
00773 while (fgets(line, LINE_MAX, strm) != NULL)
00774 {
00775
00776
00777
00778 json = base_strcatalloc(json, line, &szjson);
00779 }
00780
00781 if (strlen(json) > 0)
00782 {
00783 tokens = calloc(1, sizeof(jsmntok_t) * szjstokens);
00784
00785 if (tokens)
00786 {
00787 while (1)
00788 {
00789 jsmn_init(&parser);
00790 res = jsmn_parse(&parser, json, tokens, szjstokens);
00791
00792 if (res == JSMN_ERROR_NOMEM)
00793 {
00794
00795
00796
00797 if (szjstokens > kMaxJSONTokens)
00798 {
00799 fprintf(stderr, "Whoa! Try providing a smaller JSON string.\n");
00800 rv = kErrHugeJSON;
00801 break;
00802 }
00803 else
00804 {
00805 szjstokens *= 2;
00806 tokens = realloc(tokens, sizeof(jsmntok_t) * szjstokens);
00807 }
00808 }
00809 else if (res == JSMN_ERROR_INVAL)
00810 {
00811
00812 fprintf(stderr, "Invalid json.\n");
00813 rv = kErrInvalidJSON;
00814 break;
00815 }
00816 else if (res == JSMN_ERROR_PART)
00817 {
00818 fprintf(stderr, "The json provided was incomplete.\n");
00819 rv = kErrInvalidJSON;
00820 break;
00821 }
00822 else
00823 {
00824 XASSERT(res == JSMN_SUCCESS);
00825 break;
00826 }
00827 }
00828
00829 if (rv == kErrNone)
00830 {
00831
00832
00833
00834
00835
00836
00837
00838
00839
00840
00841
00842 HContainer_t *pkeyvals = NULL;
00843 int nproc;
00844 int hasnpk;
00845 DRMS_RecordSet_t *chunk = NULL;
00846 DRMS_RecordSet_t *final = NULL;
00847 int irec;
00848 int isubirec;
00849 int npkeys;
00850 char **pkeys = NULL;
00851
00852 pkeys = drms_series_createpkeyarray(drms_env, series, &npkeys, &istat);
00853 pkeyvals = hcon_create(sizeof(DRMS_Value_t), DRMS_MAXKEYNAMELEN, NULL, NULL, NULL, NULL, 0);
00854
00855 if (pkeyvals)
00856 {
00857 irec = 0;
00858
00859
00860 final = malloc(sizeof(DRMS_RecordSet_t));
00861 memset(final, 0, sizeof(DRMS_RecordSet_t));
00862
00863 chunk = drms_create_records(drms_env, kRecChunkSz, series, DRMS_PERMANENT, &istat);
00864
00865
00866 final->env = chunk->env;
00867
00868 SetNextTokIndex(1);
00869 if (SetKeyValues(drms_env, series, segname, &chunk, final, pkeys, npkeys, pkeyvals, 0, json, tokens, tokens[0].size, tokens[0].size, &nproc, &hasnpk, &irec, setdate))
00870 {
00871
00872 rv = kErrIngest;
00873 }
00874 else
00875 {
00876
00877 if (chunk)
00878 {
00879
00880
00881 for (isubirec = 0; isubirec < irec; isubirec++)
00882 {
00883 drms_merge_record(final, chunk->records[isubirec]);
00884 chunk->records[isubirec] = NULL;
00885 }
00886
00887 irec = 0;
00888 drms_close_records(chunk, DRMS_FREE_RECORD);
00889 }
00890
00891
00892 drms_close_records(final, DRMS_INSERT_RECORD);
00893 }
00894 }
00895 else
00896 {
00897 rv = kErrNoMemory;
00898 }
00899
00900 drms_series_destroypkeyarray(&pkeys, npkeys);
00901 }
00902 }
00903 else
00904 {
00905 rv = kErrNoMemory;
00906 }
00907 }
00908 }
00909 else
00910 {
00911 rv = kErrNoMemory;
00912 }
00913
00914 if (tokens)
00915 {
00916 free(tokens);
00917 tokens = NULL;
00918 }
00919
00920 if (json)
00921 {
00922 free(json);
00923 json = NULL;
00924 }
00925
00926 return rv;
00927 }
00928
00929