00001 #include "jsoc_main.h"
00002 #include "drms_types.h"
00003 #include <sys/file.h>
00004
00005 char *module_name = "dscp";
00006
00007 typedef enum
00008 {
00009 kDSCPErrSuccess = 0,
00010 kDSCPErrBadArgs = 1,
00011 kDSCPErrCantOpenRec = 2
00012 } DSCPError_t;
00013
00014 #define kRecSetIn "rsin"
00015 #define kDSOut "dsout"
00016 #define kUndef "undef"
00017 #define kMaxChunkSize 8192
00018 #define kGb 1073741824
00019
00020 ModuleArgs_t module_args[] =
00021 {
00022 {ARG_STRING, kRecSetIn, kUndef, "Input record-set specification."},
00023 {ARG_STRING, kDSOut, kUndef, "Output data series."},
00024 {ARG_END}
00025 };
00026
00027 static int SeriesExist(DRMS_Env_t *env, const char *rsspec, char ***names, int *nseries, int *ostat)
00028 {
00029 int rv = 0;
00030 char *allvers = NULL;
00031 char **sets = NULL;
00032 DRMS_RecordSetType_t *settypes = NULL;
00033 char **snames = NULL;
00034 char **filts = NULL;
00035 int nsets = 0;
00036 int istat;
00037
00038 if ((istat = drms_record_parserecsetspec(rsspec, &allvers, &sets, &settypes, &snames, &filts, &nsets, NULL)) == DRMS_SUCCESS)
00039 {
00040 int iseries;
00041
00042 for (iseries = 0; iseries < nsets; iseries++)
00043 {
00044 rv = drms_series_exists(env, snames[iseries], &istat);
00045 if (istat != DRMS_SUCCESS)
00046 {
00047 fprintf(stderr, "Problems checking for series '%s' existence.\n", snames[iseries]);
00048 rv = 0;
00049 break;
00050 }
00051 else if (rv == 0)
00052 {
00053 break;
00054 }
00055 }
00056 }
00057 else
00058 {
00059 fprintf(stderr, "dscp FAILURE: invalid record-set specification %s.\n", rsspec);
00060 rv = 0;
00061 }
00062
00063 if (istat == DRMS_SUCCESS)
00064 {
00065 int iseries;
00066
00067 if (nseries)
00068 {
00069 *nseries = nsets;
00070 }
00071
00072 if (names)
00073 {
00074 *names = (char **)malloc(nsets * sizeof(char *));
00075
00076 for (iseries = 0; iseries < nsets; iseries++)
00077 {
00078 (*names)[iseries] = strdup(snames[iseries]);
00079 }
00080 }
00081 }
00082
00083 drms_record_freerecsetspecarr(&allvers, &sets, &settypes, &snames, &filts, nsets);
00084
00085 if (ostat)
00086 {
00087 *ostat = istat;
00088 }
00089
00090 return rv;
00091 }
00092
00093
00094
00095 static int CalcChunkSize(DRMS_Env_t *env, const char *sname)
00096 {
00097 int rv = 0;
00098
00099
00100
00101
00102
00103 char query[2048];
00104 DB_Binary_Result_t *res = NULL;
00105
00106
00107 snprintf(query, sizeof(query),
00108 "SELECT sunum FROM %s WHERE recnum = (SELECT max(recnum) FROM %s)",
00109 sname,
00110 sname);
00111 res = drms_query_bin(env->session, query);
00112
00113 if (res)
00114 {
00115 if (res->num_rows == 1 && res->num_cols == 1)
00116 {
00117 long long sunum = db_binary_field_getint(res, 0, 0);
00118 SUM_info_t **info = (SUM_info_t **)malloc(sizeof(SUM_info_t *) * 1);
00119
00120
00121 if (drms_getsuinfo(env, &sunum, 1, info) == DRMS_SUCCESS)
00122 {
00123 if (info[0]->online_loc == '\0')
00124 {
00125
00126 rv = kMaxChunkSize;
00127 }
00128 else
00129 {
00130 rv = (int)((double)kGb / info[0]->bytes);
00131 if (rv < 1)
00132 {
00133 rv = 1;
00134 }
00135 }
00136 }
00137 else
00138 {
00139 rv = kMaxChunkSize;
00140 }
00141
00142 if (info)
00143 {
00144 int iinfo;
00145
00146 for (iinfo = 0; iinfo < 1; iinfo++)
00147 {
00148 if (info[iinfo])
00149 {
00150 free(info[iinfo]);
00151 info[iinfo] = NULL;
00152 }
00153 }
00154
00155 free(info);
00156 }
00157 }
00158 else
00159 {
00160 rv = kMaxChunkSize;
00161 }
00162
00163 db_free_binary_result(res);
00164 res = NULL;
00165 }
00166 else
00167 {
00168 rv = kMaxChunkSize;
00169 }
00170
00171 return rv;
00172 }
00173
00174 static int ProcessRecord(DRMS_Record_t *recin, DRMS_Record_t *recout)
00175 {
00176 HIterator_t *iter = NULL;
00177 char infile[DRMS_MAXPATHLEN];
00178 char outfile[DRMS_MAXPATHLEN];
00179 DRMS_Segment_t *segin = NULL;
00180 DRMS_Segment_t *segout = NULL;
00181 DRMS_Link_t *linkin = NULL;
00182 DRMS_Record_t *lrec = NULL;
00183 int istat = DRMS_SUCCESS;
00184 int rv = 0;
00185
00186
00187 if (!drms_copykeys(recout, recin, 1, kDRMS_KeyClass_Explicit))
00188 {
00189
00190 while ((segin = drms_record_nextseg(recin, &iter, 0)) != NULL)
00191 {
00192
00193 segout = drms_segment_lookup(recout, segin->info->name);
00194
00195 if (segout)
00196 {
00197 if (recin->sunum != -1LL)
00198 {
00199 if (segin->info->type == segout->info->type)
00200 {
00201
00202
00203 *infile = '\0';
00204 *outfile = '\0';
00205
00206
00207 drms_segment_filename(segin, infile);
00208
00209 if (segout->info->protocol == DRMS_GENERIC)
00210 {
00211 char *filename = NULL;
00212 filename = rindex(infile, '/');
00213 if (filename)
00214 {
00215 filename++;
00216 }
00217 else
00218 {
00219 filename = infile;
00220 }
00221
00222 CHECKSNPRINTF(snprintf(segout->filename, DRMS_MAXSEGFILENAME, "%s", filename), DRMS_MAXSEGFILENAME);
00223 drms_segment_filename(segout, outfile);
00224 }
00225 else
00226 {
00227 drms_segment_filename(segout, outfile);
00228 }
00229
00230 if (*infile != '\0' && *outfile != '\0')
00231 {
00232 if (copyfile(infile, outfile) != 0)
00233 {
00234 fprintf(stderr, "failure copying file '%s' to '%s'.\n", infile, outfile);
00235 rv = 1;
00236 break;
00237 }
00238 }
00239 }
00240 else
00241 {
00242
00243
00244 if (segin->info->protocol == DRMS_GENERIC || segout->info->protocol == DRMS_GENERIC)
00245 {
00246
00247
00248
00249 fprintf(stderr, "Unable to convert to or from a generic segment.\n");
00250 rv = 1;
00251 break;
00252 }
00253 else
00254 {
00255 DRMS_Array_t *data = NULL;
00256
00257
00258
00259
00260
00261
00262
00263
00264
00265
00266 data = drms_segment_read(segin, segin->info->type, &istat);
00267 if (!data || istat != DRMS_SUCCESS)
00268 {
00269 fprintf(stderr, "Unable to read input segment file.\n");
00270 rv = 1;
00271 if (data)
00272 {
00273 drms_free_array(data);
00274 data = NULL;
00275 }
00276 break;
00277 }
00278
00279
00280
00281 data->israw = 0;
00282
00283
00284
00285 data->bzero = segout->bzero;
00286 data->bscale = segout->bscale;
00287
00288 istat = drms_segment_write(segout, data, 0);
00289
00290 drms_free_array(data);
00291 data = NULL;
00292
00293 if (istat != DRMS_SUCCESS)
00294 {
00295 fprintf(stderr, "Unable to write output segment file.\n");
00296 rv = 1;
00297 break;
00298 }
00299 }
00300 }
00301 }
00302 }
00303 }
00304
00305 if (iter)
00306 {
00307 hiter_destroy(&iter);
00308 }
00309
00310
00311
00312 while ((linkin = drms_record_nextlink(recin, &iter)) != NULL)
00313 {
00314
00315
00316 if (hcon_lookup_lower(&recout->links, linkin->info->name))
00317 {
00318
00319 lrec = drms_link_follow(recin, linkin->info->name, &istat);
00320
00321 if (istat == DRMS_SUCCESS && lrec)
00322 {
00323 if (drms_link_set(linkin->info->name, recout, lrec) != DRMS_SUCCESS)
00324 {
00325 fprintf(stderr, "Failure setting output record's link '%s'.\n", linkin->info->name);
00326 rv = 1;
00327 break;
00328 }
00329 }
00330 }
00331 }
00332
00333 if (iter)
00334 {
00335 hiter_destroy(&iter);
00336 }
00337 }
00338 else
00339 {
00340 fprintf(stderr, "copy keys failure.\n");
00341 rv = 1;
00342 }
00343
00344 return rv;
00345 }
00346
00347 int DoIt(void)
00348 {
00349 DSCPError_t rv = kDSCPErrSuccess;
00350 int status;
00351
00352 const char *rsspecin = NULL;
00353 const char *dsspecout = NULL;
00354 char **seriesin = NULL;
00355 int nseriesin = 0;
00356
00357 rsspecin = cmdparams_get_str(&cmdparams, kRecSetIn, NULL);
00358 dsspecout = cmdparams_get_str(&cmdparams, kDSOut, NULL);
00359
00360
00361 if (strcasecmp(rsspecin, kUndef) == 0 && strcasecmp(dsspecout, kUndef) == 0)
00362 {
00363
00364 if (cmdparams_numargs(&cmdparams) >= 3)
00365 {
00366 rsspecin = cmdparams_getarg(&cmdparams, 1);
00367 dsspecout = cmdparams_getarg(&cmdparams, 2);
00368 }
00369 else
00370 {
00371 rv = kDSCPErrBadArgs;
00372 }
00373 }
00374
00375 if (strcasecmp(rsspecin, kUndef) == 0 || strcasecmp(dsspecout, kUndef) == 0)
00376 {
00377 rv = kDSCPErrBadArgs;
00378 }
00379
00380 if (rv == kDSCPErrSuccess)
00381 {
00382
00383 int exists;
00384
00385 rv = kDSCPErrBadArgs;
00386 exists = SeriesExist(drms_env, rsspecin, &seriesin, &nseriesin, &status);
00387 if (status)
00388 {
00389 fprintf(stderr, "LibDRMS error %d.\n", status);
00390 }
00391 else if (exists)
00392 {
00393 exists = SeriesExist(drms_env, dsspecout, NULL, NULL, &status);
00394 if (status)
00395 {
00396 fprintf(stderr, "LibDRMS error %d.\n", status);
00397 }
00398 else
00399 {
00400 rv = kDSCPErrSuccess;
00401 }
00402 }
00403 }
00404
00405 if (rv == kDSCPErrSuccess)
00406 {
00407
00408 DRMS_RecordSet_t *rsin = NULL;
00409 DRMS_RecordSet_t *rsout = NULL;
00410 DRMS_RecordSet_t *rsfinal = NULL;
00411 int chunksize = 0;
00412 int newchunk = 0;
00413 DRMS_RecChunking_t cstat;
00414 DRMS_Record_t *recin = NULL;
00415 DRMS_Record_t *recout = NULL;
00416 int irec;
00417 int nrecs;
00418
00419
00420 chunksize = CalcChunkSize(drms_env, seriesin[0]);
00421 if (drms_recordset_setchunksize(chunksize))
00422 {
00423 fprintf(stderr, "Unable to set record-set chunk size of %d; using default.\n", chunksize);
00424 chunksize = drms_recordset_getchunksize();
00425 }
00426
00427
00428
00429 nrecs = drms_count_records(drms_env, rsspecin, &status);
00430
00431 if (status == DRMS_SUCCESS)
00432 {
00433 if (nrecs)
00434 {
00435 rsin = drms_open_recordset(drms_env, rsspecin, &status);
00436 if (status)
00437 {
00438 fprintf(stderr, "Invalid record-set specification %s.\n", rsspecin);
00439 rv = kDSCPErrBadArgs;
00440 }
00441 }
00442 }
00443
00444 if (status == DRMS_SUCCESS && rsin && nrecs > 0)
00445 {
00446
00447
00448 if (chunksize > nrecs)
00449 {
00450 chunksize = nrecs;
00451 if (drms_recordset_setchunksize(chunksize))
00452 {
00453 fprintf(stderr, "Unable to set record-set chunk size of %d; using default.\n", chunksize);
00454 chunksize = drms_recordset_getchunksize();
00455 }
00456 }
00457
00458
00459
00460 drms_sortandstage_records(rsin, 1, 0, NULL);
00461
00462
00463
00464 rsfinal = malloc(sizeof(DRMS_RecordSet_t));
00465 memset(rsfinal, 0, sizeof(DRMS_RecordSet_t));
00466 irec = 0;
00467
00468 while ((recin = drms_recordset_fetchnext(drms_env, rsin, &status, &cstat, &newchunk)) != NULL)
00469 {
00470 if (status != DRMS_SUCCESS)
00471 {
00472 fprintf(stderr, "Unable to fetch next input record.\n");
00473 rv = kDSCPErrCantOpenRec;
00474 break;
00475 }
00476
00477 if (newchunk)
00478 {
00479
00480 if (rsout)
00481 {
00482
00483
00484 drms_close_records(rsout, DRMS_FREE_RECORD);
00485 rsout = NULL;
00486 }
00487
00488
00489 rsout = drms_create_records(drms_env, chunksize, dsspecout, DRMS_PERMANENT, &status);
00490 if (status || !rsout || rsout->n != chunksize)
00491 {
00492 fprintf(stderr, "Failure creating output records.\n");
00493 break;
00494 }
00495
00496
00497
00498
00499 rsfinal->env = rsout->env;
00500
00501 irec = 0;
00502 }
00503
00504 recout = drms_recordset_fetchnext(drms_env, rsout, &status, NULL, NULL);
00505 if (status != DRMS_SUCCESS)
00506 {
00507 fprintf(stderr, "Unable to fetch next output record.\n");
00508 rv = kDSCPErrCantOpenRec;
00509 break;
00510 }
00511
00512 status = ProcessRecord(recin, recout);
00513
00514 if (status != 0)
00515 {
00516 fprintf(stderr, "Failure processing record.\n");
00517 break;
00518 }
00519 else
00520 {
00521
00522 drms_merge_record(rsfinal, recout);
00523 XASSERT(rsout->records[irec] == recout);
00524
00525
00526
00527 rsout->records[irec] = NULL;
00528 }
00529
00530 irec++;
00531 }
00532
00533
00534 if (rsout)
00535 {
00536 drms_close_records(rsout, DRMS_FREE_RECORD);
00537 rsout = NULL;
00538 }
00539
00540 if (rsfinal)
00541 {
00542 drms_close_records(rsfinal, DRMS_INSERT_RECORD);
00543 }
00544 }
00545 else
00546 {
00547 fprintf(stderr, "No records to process.\n");
00548 }
00549
00550
00551 drms_close_records(rsin, DRMS_FREE_RECORD);
00552 }
00553
00554 if (seriesin)
00555 {
00556 int iseries;
00557
00558 for (iseries = 0; iseries < nseriesin; iseries++)
00559 {
00560 if (seriesin[iseries])
00561 {
00562 free(seriesin[iseries]);
00563 }
00564 }
00565
00566 free(seriesin);
00567 }
00568
00569 return rv;
00570 }