00001 #include "drms.h"
00002 #include "jsoc_main.h"
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012 #define kArgSeries "series"
00013 #define kArgDatafile "dataf"
00014 #define kUndefined "undefined"
00015
00016 #define kChunkSize 128
00017
00018 #define kDateKeyName "date"
00019 #define kDateStrKeyName "datestr"
00020
00021 typedef enum
00022 {
00023 kIDErr_Success = 0,
00024 kIDErr_InvalidArgs,
00025 kIDErr_DRMS,
00026 kIDErr_NoSeries,
00027 kIDErr_FileIO,
00028 kIDErr_OutOfMemory,
00029 kIDErr_SegNotSupported
00030 } IDError_t;
00031
00032 char *module_name = "ingestdata";
00033
00034
00035 ModuleArgs_t module_args[] =
00036 {
00037 {ARG_STRING, kArgSeries, "", "The series into which data are to be ingested."},
00038 {ARG_STRING, kArgDatafile, kUndefined, "Optional - data can be specified in tabular format in a file. The first row contains the keyword names."},
00039 {ARG_END}
00040 };
00041
00042 static int IngestFile()
00043 {
00044
00045 int rv;
00046
00047 rv = 1;
00048
00049 return rv;
00050 }
00051
00052 static FILE *OpenDataStream(const char *dfile, IDError_t *status)
00053 {
00054 FILE *dfptr = NULL;
00055
00056 *status = kIDErr_Success;
00057
00058 if (strcmp(dfile, kUndefined) == 0)
00059 {
00060
00061 dfptr = stdin;
00062 }
00063 else
00064 {
00065 dfptr = fopen(dfile, "r");
00066 if (!dfptr)
00067 {
00068 fprintf(stderr, "Unable to open ");
00069 *status = kIDErr_FileIO;
00070 }
00071 }
00072
00073 return dfptr;
00074 }
00075
00076
00077
00078
00079 static int IsWS(const char *ch)
00080 {
00081 int rv = -1;
00082
00083 if (ch && *ch)
00084 {
00085 rv = (*ch == '\t' || *ch == ' ');
00086 }
00087
00088 return rv;
00089 }
00090
00091 static IDError_t SetValue(const char *val, LinkedList_t *objlist, DRMS_Record_t *rec, const char *series, int lineno)
00092 {
00093 ListNode_t *nextobj = NULL;
00094 const char *objname = NULL;
00095 DRMS_Keyword_t *drmskey = NULL;
00096 DRMS_Segment_t *drmsseg = NULL;
00097 DRMS_Type_Value_t dval;
00098 int dstat = DRMS_SUCCESS;
00099 IDError_t rv;
00100
00101 rv = kIDErr_Success;
00102
00103
00104 nextobj = list_llnext(objlist);
00105 objname = (const char *)(nextobj->data);
00106
00107 if (objname)
00108 {
00109 drmskey = drms_keyword_lookup(rec, objname, 0);
00110 if (!drmskey)
00111 {
00112 drmsseg = drms_segment_lookup(rec, objname);
00113 if (!drmsseg)
00114 {
00115 fprintf(stderr, "Unable to locate keyword/segment %s in series %s.\n", objname, series);
00116 rv = kIDErr_DRMS;
00117 }
00118 }
00119 }
00120 else
00121 {
00122 fprintf(stderr, "The number of headers and the number of data columns do not match.\n");
00123 rv = kIDErr_InvalidArgs;
00124 }
00125
00126 if (rv == kIDErr_Success)
00127 {
00128 if (drmskey)
00129 {
00130 dval.string_val = strdup(val);
00131
00132
00133 dstat = drms_setkey(rec, objname, DRMS_TYPE_STRING, &dval);
00134
00135 if (dstat != DRMS_SUCCESS)
00136 {
00137 fprintf(stderr, "Unable to set key %s with value %s, source line number %d, DRMS returned %d.\n", objname, val, lineno, dstat);
00138 rv = kIDErr_DRMS;
00139 }
00140 else
00141 {
00142 if (dval.string_val)
00143 {
00144 free(dval.string_val);
00145 dval.string_val = NULL;
00146 }
00147 }
00148 }
00149 else if (drmsseg)
00150 {
00151 fprintf(stderr, "Ingestion of DRMS segments is not currently supported.\n");
00152 rv = kIDErr_SegNotSupported;
00153 }
00154 }
00155
00156 return rv;
00157 }
00158
00159 int DoIt(void)
00160 {
00161 IDError_t rv = kIDErr_Success;
00162 int dstat = DRMS_SUCCESS;
00163 FILE *dfptr = NULL;
00164 const char *series = NULL;
00165 const char *dfile = NULL;
00166 DRMS_RecordSet_t *recset = NULL;
00167 int nrecs;
00168 int irec;
00169
00170 series = cmdparams_get_str(&cmdparams, kArgSeries, &dstat);
00171 dfile = cmdparams_get_str(&cmdparams, kArgDatafile, &dstat);
00172
00173 if (!drms_series_exists(drms_env, series, &dstat) || dstat != DRMS_SUCCESS)
00174 {
00175 fprintf(stderr, "Series %s does not exist.\n", series);
00176 rv = kIDErr_NoSeries;
00177 }
00178
00179 if (rv == kIDErr_Success)
00180 {
00181 dfptr = OpenDataStream(dfile, &rv);
00182 }
00183
00184 if (rv == kIDErr_Success)
00185 {
00186 char lineBuf[LINE_MAX];
00187 size_t len;
00188 char *fullline = NULL;
00189 size_t szFL;
00190 char *pline = NULL;
00191 DRMS_Record_t *rec = NULL;
00192 int lineno;
00193 char header[DRMS_MAXKEYNAMELEN];
00194 char *pheader = NULL;
00195 char val[128];
00196 char *pval = NULL;
00197 LinkedList_t *objlist = NULL;
00198
00199 nrecs = 0;
00200 lineno = 1;
00201
00202
00203 while (!(fgets(lineBuf, LINE_MAX, dfptr) == NULL))
00204 {
00205
00206 len = strlen(lineBuf);
00207 fullline = strdup(lineBuf);
00208 szFL = len + 1;
00209
00210 if (len == LINE_MAX - 1)
00211 {
00212
00213 while (!(fgets(lineBuf, LINE_MAX, dfptr) == NULL))
00214 {
00215 fullline = base_strcatalloc(fullline, lineBuf, &szFL);
00216
00217 if (strlen(lineBuf) > 1 && lineBuf[strlen(lineBuf) - 1] == '\n')
00218 {
00219 break;
00220 }
00221 }
00222 }
00223
00224 len = strlen(fullline);
00225
00226 if (fullline[len - 1] == '\n')
00227 {
00228 fullline[len - 1] = '\0';
00229 }
00230
00231
00232 if (lineno == 1)
00233 {
00234 if (!objlist)
00235 {
00236 objlist = list_llcreate(DRMS_MAXNAMELEN, (ListFreeFn_t)NULL);
00237 }
00238
00239 if (!objlist)
00240 {
00241 rv = kIDErr_OutOfMemory;
00242 break;
00243 }
00244
00245
00246 for (pheader = header, pline = fullline; *pline != '\0';)
00247 {
00248 if (IsWS(pline))
00249 {
00250 if (*header != '\0')
00251 {
00252
00253 *pheader = '\0';
00254 list_llinserttail(objlist, header);
00255 *header = '\0';
00256 pheader = header;
00257 }
00258
00259
00260 pline++;
00261 }
00262 else
00263 {
00264 *pheader++ = *pline++;
00265 }
00266 }
00267
00268
00269 if (*header != '\0')
00270 {
00271
00272 *pheader = '\0';
00273 list_llinserttail(objlist, header);
00274 *header = '\0';
00275 }
00276
00277 lineno++;
00278 continue;
00279 }
00280
00281
00282 irec = nrecs % kChunkSize;
00283 if (irec == 0)
00284 {
00285
00286 if (recset != NULL)
00287 {
00288 if (drms_close_records(recset, DRMS_INSERT_RECORD) != DRMS_SUCCESS)
00289 {
00290 fprintf(stderr, "Failure inserting records into the database.\n");
00291 rv = kIDErr_DRMS;
00292 break;
00293 }
00294
00295 recset = NULL;
00296 }
00297
00298
00299 recset = drms_create_records(drms_env, kChunkSize, series, DRMS_PERMANENT, &dstat);
00300 if (!recset || dstat != DRMS_SUCCESS)
00301 {
00302 rv = kIDErr_DRMS;
00303 break;
00304 }
00305 }
00306
00307 rec = recset->records[irec];
00308
00309
00310
00311 for (pval = val, pline = fullline, list_llreset(objlist); *pline != '\0';)
00312 {
00313 if (IsWS(pline))
00314 {
00315 if (*val != '\0')
00316 {
00317
00318 *pval = '\0';
00319 rv = SetValue(val, objlist, rec, series, lineno);
00320 if (rv != kIDErr_Success)
00321 {
00322 break;
00323 }
00324
00325
00326 *val = '\0';
00327 pval = val;
00328 }
00329
00330
00331 pline++;
00332 }
00333 else
00334 {
00335 *pval++ = *pline++;
00336 }
00337 }
00338
00339 if (rv != kIDErr_Success)
00340 {
00341 break;
00342 }
00343
00344 if (*val != '\0')
00345 {
00346 *pval = '\0';
00347 rv = SetValue(val, objlist, rec, series, lineno);
00348 *val = '\0';
00349 if (rv != kIDErr_Success)
00350 {
00351 break;
00352 }
00353 }
00354
00355
00356
00357 {
00358 time_t timenow;
00359 struct tm *ltime;
00360 char tbuf[128];
00361
00362 time(&timenow);
00363 ltime = localtime (&timenow);
00364 strftime(tbuf, sizeof(tbuf) - 1, "%Y.%m.%d_%H:%M:%S_%Z", ltime);
00365
00366 if (drms_keyword_lookup(rec, kDateKeyName, 0))
00367 {
00368
00369 if (drms_setkey_string(rec, kDateKeyName, tbuf))
00370 {
00371 fprintf(stderr, "Couldn't set date keyword.\n");
00372 rv = kIDErr_DRMS;
00373 break;
00374 }
00375 }
00376
00377 if (drms_keyword_lookup(rec, kDateStrKeyName, 0))
00378 {
00379 if (drms_setkey_string(rec, kDateStrKeyName, tbuf))
00380 {
00381 fprintf(stderr, "Couldn't set datestr keyword.\n");
00382 rv = kIDErr_DRMS;
00383 break;
00384 }
00385 }
00386 }
00387
00388 lineno++;
00389 nrecs++;
00390 }
00391 }
00392
00393 if (rv == kIDErr_Success)
00394 {
00395
00396
00397
00398 if (recset != NULL && recset->n > 0)
00399 {
00400 DRMS_RecordSet_t *final = NULL;
00401
00402 final = malloc(sizeof(DRMS_RecordSet_t));
00403 if (final)
00404 {
00405 memset(final, 0, sizeof(DRMS_RecordSet_t));
00406
00407
00408 for (irec = 0; irec < (nrecs % kChunkSize); irec++)
00409 {
00410 drms_merge_record(final, recset->records[irec]);
00411 recset->records[irec] = NULL;
00412 }
00413
00414
00415
00416
00417 final->env = recset->env;
00418
00419 if (drms_close_records(recset, DRMS_FREE_RECORD))
00420 {
00421 fprintf(stderr, "Failure inserting records into the database.\n");
00422 rv = kIDErr_DRMS;
00423 }
00424 else
00425 {
00426 if (drms_close_records(final, DRMS_INSERT_RECORD) != DRMS_SUCCESS)
00427 {
00428 fprintf(stderr, "Failure inserting records into the database.\n");
00429 rv = kIDErr_DRMS;
00430 }
00431 }
00432
00433 final = NULL;
00434 recset = NULL;
00435 }
00436 else
00437 {
00438 rv = kIDErr_OutOfMemory;
00439 }
00440 }
00441 }
00442
00443 if (dfptr != stdin)
00444 {
00445 fclose(dfptr);
00446 }
00447
00448 return rv;
00449 }