00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include <dirent.h>
00015 #include "jsoc_main.h"
00016 #include "drms.h"
00017
00018 #define kSUNUMS "sunums"
00019 #define kPATHS "paths"
00020 #define kSERIES "series"
00021 #define kSEP1 "://"
00022 #define kSEP2 "@"
#define kSEP3 ":"
00023 #define kHASHKEYLEN 64
00024 #define kMAXREQ 512
00025
00026 enum RSINGEST_stat_enum
00027 {
00028 kRSING_success = 0,
00029 kRSING_badparma,
00030 kRSING_tracker,
00031 kRSING_sumallocfailure,
00032 kRSING_sumcommitfailure,
00033 kRSING_sumexportfailure,
00034 kRSING_failure
00035 };
00036
00037 typedef enum RSINGEST_stat_enum RSINGEST_stat_t;
00038
00039 struct RSING_FCopyTracker_struct
00040 {
00041 char series[DRMS_MAXSERIESNAMELEN];
00042 char sudir[DRMS_MAXPATHLEN];
00043 };
00044
00045 typedef struct RSING_FCopyTracker_struct RSING_FCopyTracker_t;
00046
00047 struct RSING_ExpTracker_struct
00048 {
00049 SUMEXP_t sumexpt;
00050 int maxreq;
00051 };
00052
00053 typedef struct RSING_ExpTracker_struct RSING_ExpTracker_t;
00054
00055 char *module_name = "rs_ingest";
00056
00057 ModuleArgs_t module_args[] =
00058 {
00059 {ARG_STRING, kSUNUMS, NULL, "comma-separated list of SUNUMs to ingest"},
00060 {ARG_STRING, kPATHS, NULL, "comma-separated list of paths to remote SUs to ingest"},
00061 {ARG_STRING, kSERIES, NULL, "comma-separated list of series owning the SUs"},
00062 {ARG_END}
00063 };
00064
00065 static int rsing_createhashkey(long long sunum, char *buf, int size)
00066 {
00067 return (snprintf(buf, size, "%lld", sunum) > 0);
00068 }
00069
00070
00071
00072 static RSINGEST_stat_t SumsURLMap(const char *instr, char **cmdout, char **hostout, unsigned int *portout)
00073 {
00074 char *tmp = strdup(instr);
00075 char *user = NULL;
00076 char *meth = NULL;
00077 char *host = NULL;
00078 char *port = NULL;
00079 char *domain = NULL;
00080 char methmapped[1024];
00081 char usermapped[1024];
00082 char hostmapped[1024];
00083 char portmapped[1024];
00084 int size;
00085
00086 RSINGEST_stat_t status;
00087
00088 status = kRSING_success;
00089
00090 if (tmp)
00091 {
00092 meth = tmp;
00093 user = strstr(tmp, kSEP1);
00094
00095 if (user)
00096 {
00097 *user = '\0';
00098 user += 3;
00099 host = strstr(user, kSEP2);
00100
00101 if (host)
00102 {
00103 *host = '\0';
00104 host++;
00105 domain = strchr(host, '.');
00106
00107 if (domain)
00108 {
00109 *domain = '\0';
00110 domain++;
00111 port = strstr(domain, kSEP3);
00112
00113
00114 if (port)
00115 {
00116 *port = '\0';
00117 port++;
00118 }
00119 }
00120 else
00121 {
00122
00123 port = strstr(host, kSEP3);
00124
00125
00126 if (port)
00127 {
00128 *port = '\0';
00129 port++;
00130 }
00131 }
00132 }
00133 else
00134 {
00135 status = kRSING_badparma;
00136 }
00137 }
00138 else
00139 {
00140 status = kRSING_badparma;
00141 }
00142
00143
00144 #ifdef LOC_SUMEXP_METHFMT
00145 snprintf(methmapped, sizeof(methmapped), LOC_SUMEXP_METHFMT);
00146 #else
00147 snprintf(methmapped, sizeof(methmapped), "%s", meth);
00148 #endif
00149
00150 #ifdef LOC_SUMEXP_USERFMT
00151 snprintf(usermapped, sizeof(usermapped), LOC_SUMEXP_USERFMT);
00152 #else
00153 snprintf(usermapped, sizeof(usermapped), "%s", user);
00154 #endif
00155
00156 #ifdef LOC_SUMEXP_HOSTFMT
00157 snprintf(hostmapped, sizeof(hostmapped), LOC_SUMEXP_HOSTFMT);
00158 #else
00159 snprintf(hostmapped, sizeof(hostmapped), "%s.%s", host, domain);
00160 #endif
00161
00162 *portmapped = '\0';
00163 if (!port)
00164 {
00165 port = "0";
00166 }
00167 #ifdef LOC_SUMEXP_PORTFMT
00168 snprintf(portmapped, sizeof(portmapped), LOC_SUMEXP_PORTFMT);
00169 #else
00170 snprintf(portmapped, sizeof(portmapped), "%s", port);
00171 #endif
00172
00173 if (cmdout)
00174 {
00175 *cmdout = strdup(methmapped);
00176 }
00177
00178 if (hostout)
00179 {
00180 size = strlen(usermapped) + strlen(hostmapped) + 128;
00181 *hostout = malloc(size);
00182 snprintf(*hostout, size, "%s@%s", usermapped, hostmapped);
00183 }
00184
00185 if (portout)
00186 {
00187 if (sscanf(portmapped, "%u", portout) != 1)
00188 {
00189 status = kRSING_badparma;
00190 }
00191 }
00192
00193 free(tmp);
00194 }
00195 else
00196 {
00197 status = kRSING_badparma;
00198 }
00199
00200 return status;
00201 }
00202
00203 int DoIt(void)
00204 {
00205 RSINGEST_stat_t status = kRSING_success;
00206 char *listsunums = NULL;
00207 char *listpaths = NULL;
00208 char *listseries = NULL;
00209 char query[DRMS_MAXQUERYLEN];
00210 int drmsst;
00211 DRMS_RecordSet_t *rs = NULL;
00212 char serverstr[512];
00213 char *server = NULL;
00214 char *servermeth = NULL;
00215 unsigned int port;
00216 char *sudir = NULL;
00217 char *ansunum;
00218 char *apath;
00219 char *aseries;
00220 char *lsunum = NULL;
00221 char *lpath = NULL;
00222 char *lseries = NULL;
00223 long long sunum;
00224 HContainer_t *sutracker = NULL;
00225 HContainer_t *sumexptracker = NULL;
00226 char hashkey[kHASHKEYLEN];
00227 RSING_FCopyTracker_t fcopy;
00228 RSING_FCopyTracker_t *pfcopy = NULL;
00229 struct stat stBuf;
00230 RSING_ExpTracker_t *pexp = NULL;
00231 HIterator_t *hiter = NULL;
00232 RSING_ExpTracker_t *pexptracker = NULL;
00233 SUMEXP_t *psumexpt = NULL;
00234 const char *phashkey = NULL;
00235 struct dirent **fileList = NULL;
00236 int atleastonefilecopied = 0;
00237 DRMS_Record_t *template = NULL;
00238 int drmsstat = DRMS_SUCCESS;
00239
00240 listsunums = cmdparams_get_str(&cmdparams, kSUNUMS, NULL);
00241 listpaths = cmdparams_get_str(&cmdparams, kPATHS, NULL);
00242 listseries = cmdparams_get_str(&cmdparams, kSERIES, NULL);
00243
00244
00245 for (ansunum = strtok_r(listsunums, ",", &lsunum),
00246 apath = strtok_r(listpaths, ",", &lpath),
00247 aseries = strtok_r(listseries, ",", &lseries);
00248 ansunum && apath && aseries;
00249 ansunum = strtok_r(NULL, ",", &lsunum),
00250 apath = strtok_r(NULL, ",", &lpath),
00251 aseries = strtok_r(NULL, ",", &lseries))
00252 {
00253
00254
00255 snprintf(query, sizeof(query), "%s[?sunum=%s?]", aseries, ansunum);
00256 rs = drms_open_records(drms_env, query, &drmsst);
00257 if (!rs || rs->n == 0)
00258 {
00259 fprintf(stderr,
00260 "Uknown series '%s' and/or SUNUM '%s'; cannot ingest - skipping.\n",
00261 aseries,
00262 ansunum);
00263 continue;
00264 }
00265
00266 drms_close_records(rs, DRMS_FREE_RECORD);
00267
00268 if (sscanf(ansunum, "%lld", &sunum) != 1)
00269 {
00270 fprintf(stderr, "Invalid sunum '%s'; skipping\n", ansunum);
00271 continue;
00272 }
00273
00274 if (!sutracker)
00275 {
00276 sutracker = hcon_create(sizeof(RSING_FCopyTracker_t), kHASHKEYLEN, NULL, NULL, NULL, NULL, 0);
00277 }
00278
00279 if (!sutracker)
00280 {
00281 fprintf(stderr, "Failed to create file-copy tracker.\n");
00282 status = kRSING_tracker;
00283 break;
00284 }
00285
00286 if (!sumexptracker)
00287 {
00288 sumexptracker = hcon_create(sizeof(RSING_ExpTracker_t), kHASHKEYLEN, NULL, NULL, NULL, NULL, 0);
00289 }
00290
00291 if (!sumexptracker)
00292 {
00293 fprintf(stderr, "Failed to create export-request tracker.\n");
00294 status = kRSING_tracker;
00295 break;
00296 }
00297
00298
00299 template = drms_template_record(drms_env, aseries, &drmsstat);
00300 XASSERT(template);
00301 if (!drms_su_allocsu(drms_env,
00302 drms_su_size(drms_env, aseries) + 1000000,
00303 sunum,
00304 &sudir,
00305 &template->seriesinfo->tapegroup,
00306 NULL))
00307 {
00308 if (sudir && !drms_su_getexportserver(drms_env, sunum, serverstr, sizeof(serverstr)))
00309 {
00310
00311
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340 status = SumsURLMap(serverstr, &servermeth, &server, &port);
00341
00342 if (status == kRSING_success)
00343 {
00344 if (rsing_createhashkey(sunum, hashkey, sizeof(hashkey)))
00345 {
00346 if (hcon_lookup(sutracker, hashkey))
00347 {
00348 fprintf(stderr, "Attempt to export a storage unit more than once; skipping\n");
00349
00350 if (servermeth)
00351 {
00352 free(servermeth);
00353 servermeth = NULL;
00354 }
00355
00356 if (server)
00357 {
00358 free(server);
00359 server = NULL;
00360 }
00361
00362
00363
00364 continue;
00365 }
00366
00367 snprintf(fcopy.series, DRMS_MAXSERIESNAMELEN, "%s", aseries);
00368 snprintf(fcopy.sudir, DRMS_MAXPATHLEN, "%s", sudir);
00369 hcon_insert(sutracker, hashkey, &fcopy);
00370 }
00371
00372
00373 if ((pexp = (RSING_ExpTracker_t *)hcon_lookup(sumexptracker, server)) != NULL)
00374 {
00375
00376 if (pexp->sumexpt.reqcnt == pexp->maxreq)
00377 {
00378
00379 pexp->sumexpt.src =
00380 (char **)realloc(pexp->sumexpt.src, pexp->maxreq * 2 * sizeof(char *));
00381 pexp->sumexpt.dest =
00382 (char **)realloc(pexp->sumexpt.dest, pexp->maxreq * 2 * sizeof(char *));
00383 }
00384 }
00385 else
00386 {
00387
00388 pexp = (RSING_ExpTracker_t *)hcon_allocslot(sumexptracker, server);
00389 pexp->sumexpt.cmd = servermeth;
00390 pexp->sumexpt.host = server;
00391 pexp->sumexpt.port = port;
00392 pexp->sumexpt.src = (char **)calloc(kMAXREQ, sizeof(char *));
00393 pexp->sumexpt.dest = (char **)calloc(kMAXREQ, sizeof(char *));
00394 pexp->sumexpt.reqcnt = 0;
00395 pexp->maxreq = kMAXREQ;
00396 }
00397
00398 (pexp->sumexpt.src)[pexp->sumexpt.reqcnt] = strdup(apath);
00399 (pexp->sumexpt.dest)[pexp->sumexpt.reqcnt] = strdup(sudir);
00400 pexp->sumexpt.reqcnt++;
00401 }
00402 else
00403 {
00404
00405 fprintf(stderr, "Invalid server string '%s'.\n", serverstr);
00406 }
00407
00408 if (servermeth)
00409 {
00410 servermeth = NULL;
00411 }
00412
00413 if (server)
00414 {
00415 server = NULL;
00416 }
00417 }
00418 }
00419 else
00420 {
00421 status = kRSING_sumallocfailure;
00422 }
00423
00424 if (sudir)
00425 {
00426 free(sudir);
00427 sudir = NULL;
00428 }
00429
00430 if (status != kRSING_success)
00431 {
00432
00433 fprintf(stderr, "Error '%d' transfering storage unit %lld; skipping.\n", (int)status, sunum);
00434 status = kRSING_success;
00435 }
00436 }
00437
00438
00439 hiter = hiter_create(sumexptracker);
00440 while ((pexptracker = hiter_getnext(hiter)) != NULL)
00441 {
00442 psumexpt = &(pexptracker->sumexpt);
00443
00444
00445 if (drms_su_sumexport(drms_env, psumexpt))
00446 {
00447 status = kRSING_sumexportfailure;
00448 fprintf(stderr, "Error '%d' SUM_export().\n", (int)status);
00449
00450 status = kRSING_success;
00451 }
00452
00453
00454 if (psumexpt->host)
00455 {
00456 free(psumexpt->host);
00457 }
00458
00459 while (psumexpt->reqcnt >= 0)
00460 {
00461 if (psumexpt->src && psumexpt->src[psumexpt->reqcnt])
00462 {
00463 free(psumexpt->src[psumexpt->reqcnt]);
00464 }
00465 if (psumexpt->dest && psumexpt->dest[psumexpt->reqcnt])
00466 {
00467 free(psumexpt->dest[psumexpt->reqcnt]);
00468 }
00469
00470 psumexpt->reqcnt--;
00471 }
00472
00473 if (psumexpt->src)
00474 {
00475 free(psumexpt->src);
00476 }
00477 if (psumexpt->dest)
00478 {
00479 free(psumexpt->dest);
00480 }
00481 }
00482
00483 hiter_destroy(&hiter);
00484 hcon_destroy(&sumexptracker);
00485
00486
00487 hiter = hiter_create(sutracker);
00488 while ((pfcopy = hiter_extgetnext(hiter, &phashkey)) != NULL)
00489 {
00490
00491 if (!stat(pfcopy->sudir, &stBuf) &&
00492 S_ISDIR(stBuf.st_mode) &&
00493 scandir(pfcopy->sudir, &fileList, NULL, NULL) > 2)
00494 {
00495
00496 sscanf(phashkey, "%lld", &sunum);
00497
00498 if (drms_su_commitsu(drms_env, pfcopy->series, sunum, pfcopy->sudir))
00499 {
00500
00501 fprintf(stderr, "Error committing SU '%s'.\n", phashkey);
00502 }
00503 else
00504 {
00505 atleastonefilecopied = 1;
00506 }
00507 }
00508 else
00509 {
00510 fprintf(stderr, "Error copying to storage unit '%s'.\n", pfcopy->sudir);
00511 }
00512
00513 }
00514
00515 hiter_destroy(&hiter);
00516 hcon_destroy(&sutracker);
00517
00518 if (status == kRSING_success && atleastonefilecopied)
00519 {
00520
00521 printf("1\n");
00522 }
00523 else
00524 {
00525
00526 printf("-1\n");
00527 }
00528
00529 return status;
00530 }
00531