00001
00002
00003 #include <dirent.h>
00004 #include "drms.h"
00005 #include "drms_priv.h"
00006 #include "db.h"
00007 #include "xmem.h"
00008 #include "util.h"
00009 #include "list.h"
00010 #include "printk.h"
00011
00012 #ifdef DRMS_CLIENT
00013 #define DEFS_CLIENT
00014 #endif
00015 #include "drmssite_info.h"
00016
00017 #ifdef DEFS_CLIENT
00018 #undef DEFS_CLIENT
00019 #endif
00020
00021 #define SUMIN(a,b) ((a) > (b) ? (b) : (a))
00022
00023
00024 #if defined(JMD_IS_INSTALLED) && JMD_IS_INSTALLED
00025 #include <curl/curl.h>
00026
00027 struct POSTState
00028 {
00029 char session_id[256];
00030 int no_submitted;
00031 };
00032
00033 int session_status (char *session);
00034 size_t parse_session_state(void *buffer, size_t size, size_t nmemb, void *userp);
00035
00036 void populate_with_sunums(DRMS_Env_t *env, HContainer_t *postmap, char * seriesname, int n, long long sulist[]);
00037 struct PassSunumList
00038 {
00039 char series[500];
00040 char **sunumlist;
00041 long long *sunumarr;
00042 int n;
00043 int ncount;
00044 char id[256];
00045 int submitted;
00046 int sizeadded;
00047 };
00048
00049 void add_sunum_to_POST(DRMS_Env_t *env, HContainer_t *postmap, char *seriesname, long long sunum);
00050 size_t create_post_msg(HContainer_t *postmap, char **ptr);
00051 size_t write_data(void *buffer, size_t size, size_t nmemb, void *userp);
00052 int send_POST_request(char * postrequeststr, curl_off_t postsize, struct POSTState *ps);
00053 void free_post_request(HContainer_t *postmap);
00054
00055 size_t read_callback(void *ptr, size_t size, size_t nmemb, void *userp);
00056 #endif // JMD Support
00057
00058
00059 #define kSUNUMLISTSIZE 248576
00060
00061 struct SUList_struct
00062 {
00063 char *str;
00064 size_t size;
00065 };
00066
00067 typedef struct SUList_struct SUList_t;
00068
00069 static char *EndsWith(const char *str, const char *suffix, int caseInsensitive)
00070 {
00071 if (!str || !suffix)
00072 {
00073 return NULL;
00074 }
00075
00076 size_t lenstr = strlen(str);
00077 size_t lensuffix = strlen(suffix);
00078 int found = 0;
00079
00080 if (lensuffix > lenstr)
00081 {
00082 return NULL;
00083 }
00084
00085 if (caseInsensitive)
00086 {
00087 found = (strncasecmp(str + lenstr - lensuffix, suffix, lensuffix) == 0);
00088 }
00089 else
00090 {
00091 found = (strncmp(str + lenstr - lensuffix, suffix, lensuffix) == 0);
00092 }
00093
00094 if (found)
00095 {
00096 return (char *)str + lenstr - lensuffix;
00097 }
00098 else
00099 {
00100 return NULL;
00101 }
00102 }
00103
00104 static int EmptyDir(const char *dir, int depth)
00105 {
00106 struct stat stBuf;
00107 struct dirent **fileList = NULL;
00108 int nfiles = 0;
00109 int notempty = 0;
00110
00111 notempty = (!stat(dir, &stBuf) && S_ISDIR(stBuf.st_mode) && (nfiles = scandir(dir, &fileList, NULL, NULL)) > 0 && fileList);
00112
00113 if (notempty)
00114 {
00115
00116 int ifile;
00117 struct dirent *entry = NULL;
00118 char fbuf[PATH_MAX];
00119 char subdir[PATH_MAX];
00120
00121 notempty = 0;
00122 ifile = 0;
00123
00124 while (ifile < nfiles)
00125 {
00126 entry = fileList[ifile];
00127
00128 if (entry != NULL)
00129 {
00130 if (!notempty)
00131 {
00132 char *oneFile = entry->d_name;
00133
00134 if (strcmp(oneFile, ".") != 0 && strcmp(oneFile, "..") != 0)
00135 {
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146 snprintf(fbuf, sizeof(fbuf), "%s/%s", dir, oneFile);
00147 if (stat(fbuf, &stBuf))
00148 {
00149
00150 free(entry);
00151 ifile++;
00152 continue;
00153 }
00154
00155 if (depth == 0 && EndsWith(oneFile, ".tas.virgin", 1) || EndsWith(oneFile, ".tas", 1))
00156 {
00157 char *dup = strdup(oneFile);
00158 char *loc = NULL;
00159
00160 if (dup)
00161 {
00162 if (loc = EndsWith(dup, ".virgin", 1))
00163 {
00164
00165 *loc = '\0';
00166 snprintf(fbuf, sizeof(fbuf), "%s/%s", dir, dup);
00167 if (!stat(fbuf, &stBuf))
00168 {
00169
00170 unlink(fbuf);
00171
00172
00173 snprintf(fbuf, sizeof(fbuf), "%s/%s", dir, oneFile);
00174 unlink(fbuf);
00175
00176 free(entry);
00177 ifile++;
00178 continue;
00179 }
00180 }
00181 else if (loc = EndsWith(dup, ".tas", 1))
00182 {
00183
00184 snprintf(fbuf, sizeof(fbuf), "%s/%s.virgin", dir, dup);
00185 if (!stat(fbuf, &stBuf))
00186 {
00187
00188 unlink(fbuf);
00189
00190
00191 snprintf(fbuf, sizeof(fbuf), "%s/%s", dir, oneFile);
00192 unlink(fbuf);
00193
00194 free(entry);
00195 ifile++;
00196 continue;
00197 }
00198 }
00199
00200 free(dup);
00201 }
00202 else
00203 {
00204
00205
00206
00207 }
00208 }
00209
00210 snprintf(subdir, sizeof(subdir), "%s/%s", dir, oneFile);
00211
00212 if (!stat(subdir, &stBuf))
00213 {
00214 if (S_ISDIR(stBuf.st_mode))
00215 {
00216 notempty = !EmptyDir(subdir, depth + 1);
00217 }
00218 else
00219 {
00220 notempty = 1;
00221 }
00222 }
00223 }
00224 }
00225
00226 free(entry);
00227 }
00228
00229 ifile++;
00230 }
00231
00232 free(fileList);
00233 }
00234
00235 return !notempty;
00236 }
00237
00238
00239
00240 #ifndef DRMS_CLIENT
00241 long long drms_su_alloc(DRMS_Env_t *env, uint64_t size, char **sudir, int *tapegroup, int *status)
00242 {
00243 int stat;
00244 long long sunum;
00245
00246 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_ALLOC) && SUMS_USEMTSUMS_ALLOC
00247 DRMS_MtSumsRequest_t *request = NULL;
00248 DRMS_MtSumsRequest_t *reply = NULL;
00249
00250 request = calloc(1, sizeof(DRMS_MtSumsRequest_t));
00251 #else
00252 DRMS_SumRequest_t *request = NULL;
00253 DRMS_SumRequest_t *reply = NULL;
00254
00255 request = malloc(sizeof(DRMS_SumRequest_t));
00256 #endif
00257
00258 XASSERT(request);
00259
00260
00261 request->opcode = DRMS_SUMALLOC;
00262 request->dontwait = 0;
00263 request->reqcnt = 1;
00264 request->bytes = (double)size;
00265
00266 if (tapegroup)
00267 {
00268 request->group = *tapegroup;
00269 }
00270 else
00271 {
00272 request->group = -1;
00273 }
00274
00275 if (request->bytes <=0 )
00276 {
00277 fprintf(stderr,"Invalid storage unit size %lf\n",request->bytes);
00278 return 0;
00279 }
00280
00281 drms_lock_server(env);
00282
00283 if (!env->sum_thread) {
00284 if((stat = pthread_create(&env->sum_thread, NULL, &drms_sums_thread,
00285 (void *) env))) {
00286 fprintf(stderr,"Thread creation failed: %d\n", stat);
00287 drms_unlock_server(env);
00288 return 1;
00289 }
00290 }
00291
00292
00293 tqueueAdd(env->sum_inbox, (long) pthread_self(), (char *)request);
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303 drms_unlock_server(env);
00304 tqueueDel(env->sum_outbox, (long) pthread_self(), (char **)&reply);
00305
00306 if (reply->opcode)
00307 {
00308 if (reply->opcode == -2)
00309 {
00310 fprintf(stderr, "Cannot access SUMS in this DRMS session - a tape read is pending.\n");
00311 stat = DRMS_ERROR_PENDINGTAPEREAD;
00312 }
00313 else
00314 {
00315 fprintf(stderr,"SUM ALLOC failed with error code %d.\n",reply->opcode);
00316 stat = reply->opcode;
00317 }
00318
00319 sunum = 0;
00320
00321 if (sudir)
00322 {
00323 *sudir = NULL;
00324 }
00325 }
00326 else
00327 {
00328 stat = DRMS_SUCCESS;
00329 sunum = reply->sunum[0];
00330
00331 if (sudir)
00332 {
00333
00334 *sudir = reply->sudir[0];
00335 }
00336
00337 #ifdef DEBUG
00338 printf("Allocated Storage unit #%lld, dir='%s'\n",sunum, reply->sudir[0]);
00339 #endif
00340 }
00341
00342 if (status)
00343 *status = stat;
00344
00345 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_ALLOC) && SUMS_USEMTSUMS_ALLOC
00346
00347 if (reply->sunum)
00348 {
00349 free(reply->sunum);
00350 reply->sunum = NULL;
00351 }
00352
00353 if (reply->sudir)
00354 {
00355 free(reply->sudir);
00356 reply->sudir = NULL;
00357 }
00358 #endif
00359
00360 free(reply);
00361 return sunum;
00362 }
00363 #endif
00364
00365 #ifndef DRMS_CLIENT
00366 int drms_su_alloc2(DRMS_Env_t *env,
00367 uint64_t size,
00368 long long sunum,
00369 char **sudir,
00370 int *tapegroup,
00371 int *status)
00372 {
00373 int stat;
00374
00375 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_ALLOC) && SUMS_USEMTSUMS_ALLOC
00376 DRMS_MtSumsRequest_t *request = NULL;
00377 DRMS_MtSumsRequest_t *reply = NULL;
00378
00379 request = calloc(1, sizeof(DRMS_MtSumsRequest_t));
00380 XASSERT(request);
00381 request->sunum = calloc(1, sizeof(uint64_t));
00382 XASSERT(request->sunum);
00383 #else
00384 DRMS_SumRequest_t *request = NULL;
00385 DRMS_SumRequest_t *reply = NULL;
00386
00387 request = malloc(sizeof(DRMS_SumRequest_t));
00388 XASSERT(request);
00389 #endif
00390 request->opcode = DRMS_SUMALLOC2;
00391 request->dontwait = 0;
00392 request->reqcnt = 1;
00393 request->bytes = (double)size;
00394 request->sunum[0] = sunum;
00395
00396 if (tapegroup)
00397 {
00398 request->group = *tapegroup;
00399 }
00400 else
00401 {
00402 request->group = -1;
00403 }
00404
00405 if (request->bytes <=0 )
00406 {
00407 fprintf(stderr,"Invalid storage unit size %lf\n",request->bytes);
00408 return 0;
00409 }
00410
00411 drms_lock_server(env);
00412 if (!env->sum_thread)
00413 {
00414 if((stat = pthread_create(&env->sum_thread, NULL, &drms_sums_thread,
00415 (void *) env)))
00416 {
00417 fprintf(stderr,"Thread creation failed: %d\n", stat);
00418 drms_unlock_server(env);
00419 return 1;
00420 }
00421 }
00422
00423
00424 tqueueAdd(env->sum_inbox, (long) pthread_self(), (char *)request);
00425
00426 drms_unlock_server(env);
00427
00428 tqueueDel(env->sum_outbox, (long) pthread_self(), (char **)&reply);
00429
00430 if (reply->opcode)
00431 {
00432 if (reply->opcode == -2)
00433 {
00434 fprintf(stderr, "Cannot access SUMS in this DRMS session - a tape read is pending.\n");
00435 stat = DRMS_ERROR_PENDINGTAPEREAD;
00436 }
00437 else
00438 {
00439 fprintf(stderr,"SUM ALLOC2 failed with error code %d.\n",reply->opcode);
00440 stat = reply->opcode;
00441 }
00442
00443 if (sudir)
00444 {
00445 *sudir = NULL;
00446 }
00447 }
00448 else
00449 {
00450 stat = DRMS_SUCCESS;
00451 if (sudir)
00452 {
00453
00454 *sudir = reply->sudir[0];
00455 }
00456 }
00457
00458 if (status)
00459 {
00460 *status = stat;
00461 }
00462
00463 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_ALLOC) && SUMS_USEMTSUMS_ALLOC
00464
00465 if (reply->sudir)
00466 {
00467 free(reply->sudir);
00468 reply->sudir = NULL;
00469 }
00470 #endif
00471
00472 if (reply)
00473 {
00474 free(reply);
00475 }
00476
00477 return stat;
00478 }
00479 #endif
00480
00481
00482
00483
00484
00485
00486
00487
00488
00489
00490 #ifndef DRMS_CLIENT
00491 static int drms_su_newslots_internal(DRMS_Env_t *env, int n, char *series,
00492 long long *recnum, DRMS_RecLifetime_t lifetime,
00493 int *slotnum, DRMS_StorageUnit_t **su,
00494 int createslotdirs,
00495 int gotosums)
00496 {
00497
00498
00499
00500
00501 int i, status, slot;
00502 HContainer_t *scon;
00503 HIterator_t hit;
00504 long long sunum;
00505 char slotdir[DRMS_MAXPATHLEN+40], hashkey[DRMS_MAXHASHKEYLEN], *sudir = NULL;
00506 DRMS_Record_t *template=NULL;
00507
00508
00509
00510 XASSERT(env->session->db_direct==1);
00511
00512
00513 scon = hcon_lookup(&env->storageunit_cache, series);
00514 if (scon == NULL)
00515 {
00516 scon = hcon_allocslot(&env->storageunit_cache, series);
00517 hcon_init(scon, sizeof(DRMS_StorageUnit_t), DRMS_MAXHASHKEYLEN,
00518 (void (*)(const void *)) drms_su_freeunit, NULL);
00519 }
00520
00521
00522
00523 XASSERT(scon != NULL);
00524 slot = -1;
00525 status = -1;
00526 hiter_new(&hit, scon);
00527 while( (su[0] = (DRMS_StorageUnit_t *)hiter_getnext(&hit)) )
00528 {
00529
00530 if ( su[0]->mode == DRMS_READWRITE && su[0]->nfree > 0 )
00531 {
00532
00533
00534
00535
00536
00537
00538
00539
00540
00541 slot = 0;
00542 break;
00543 }
00544 }
00545 hiter_free(&hit);
00546
00547
00548
00549 #ifdef DEBUG
00550 printf("n = %d\n",n);
00551 #endif
00552 for (i=0; i<n; i++)
00553 {
00554 if (slot >= 0)
00555 {
00556
00557 if (i > 0)
00558 {
00559
00560
00561
00562
00563
00564
00565 su[i] = su[i-1];
00566 }
00567 else
00568 {
00569
00570
00571
00572
00573 }
00574
00575 XASSERT(su[i]->nfree>0);
00576
00577
00578 while (su[i]->state[slot] != DRMS_SLOT_FREE)
00579 {
00580
00581
00582
00583
00584 ++slot;
00585 }
00586 }
00587 if (slot == -1)
00588 {
00589
00590
00591
00592
00593
00594
00595 if (gotosums == 0)
00596 {
00597 fprintf(stderr, "drms_su_newslots_internal() failure - no existing slots available, need to fetch a new one from SUMS, but SUMS access is disabled (gotosums == 0).\n");
00598 status = DRMS_ERROR_NEEDSUMS;
00599 goto bail;
00600 }
00601
00602 if (template==NULL)
00603 {
00604 if ((template = drms_template_record(env, series, &status)) == NULL)
00605 {
00606 fprintf(stderr,"ERROR: failed to look up series %s template in "
00607 "drms_su_newslots()\n", series);
00608 fprintf(stderr, "num items in series_cache %d\n", hcon_size(&(env->series_cache)));
00609 hcon_printf(stderr, &(env->series_cache));
00610 goto bail;
00611 }
00612 }
00613
00614
00615
00616
00617
00618
00619
00620 sunum = drms_su_alloc(env,
00621 104857600,
00622 &sudir,
00623 &(template->seriesinfo->tapegroup),
00624 &status);
00625 if (status)
00626 {
00627 if (sudir)
00628 free(sudir);
00629 goto bail;
00630 }
00631 sprintf(hashkey,DRMS_SUNUM_FORMAT, sunum);
00632
00633
00634 su[i] = hcon_allocslot(scon, hashkey);
00635 #ifdef DEBUG
00636 printf("Got su[i] = %p. Now has %d slots from '%s'\n",su[i],
00637 hcon_size(scon), series);
00638 #endif
00639
00640
00641 su[i]->sunum = sunum;
00642 strncpy(su[i]->sudir, sudir, sizeof(su[i]->sudir));
00643 free(sudir);
00644 su[i]->mode = DRMS_READWRITE;
00645 su[i]->seriesinfo = template->seriesinfo;
00646 su[i]->nfree = su[i]->seriesinfo->unitsize;
00647 su[i]->state = malloc(su[i]->nfree);
00648 XASSERT(su[i]->state);
00649 memset(su[i]->state, DRMS_SLOT_FREE, su[i]->nfree);
00650 su[i]->recnum = malloc(su[i]->nfree*sizeof(long long));
00651 XASSERT(su[i]->recnum);
00652 memset(su[i]->recnum, 0, su[i]->nfree*sizeof(long long));
00653 su[i]->refcount = 0;
00654
00655 slot = 0;
00656 }
00657 slotnum[i] = slot;
00658 if (lifetime == DRMS_TRANSIENT)
00659 su[i]->state[slot] = DRMS_SLOT_TEMP;
00660 else
00661 su[i]->state[slot] = DRMS_SLOT_FULL;
00662 su[i]->recnum[slot] = recnum[i];
00663 ++slot;
00664 su[i]->nfree--;
00665 if (su[i]->nfree == 0)
00666 slot = -1;
00667
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677 }
00678
00679 if (createslotdirs)
00680 {
00681 for (i=0; i<n; i++)
00682 {
00683 if (su[i])
00684 {
00685 CHECKSNPRINTF(snprintf(slotdir, DRMS_MAXPATHLEN, "%s/" DRMS_SLOTDIR_FORMAT,
00686 su[i]->sudir,slotnum[i]), DRMS_MAXPATHLEN);
00687 #ifdef DEBUG
00688 printf("su->sudir = '%s', slotdir = '%s'\n",su[i]->sudir, slotdir);
00689 #endif
00690
00691 if (mkdir(slotdir,0777))
00692 {
00693 fprintf(stderr,"ERROR: drms_newslot could not create record "
00694 "directory '%s'.\n",slotdir);
00695 perror("mkdir call failed with error");
00696 status = DRMS_ERROR_MKDIRFAILED;
00697 }
00698 }
00699 }
00700 }
00701
00702 status = DRMS_SUCCESS;
00703 bail:
00704 return status;
00705 }
00706
00707 int drms_su_newslots(DRMS_Env_t *env, int n, char *series,
00708 long long *recnum, DRMS_RecLifetime_t lifetime,
00709 int *slotnum, DRMS_StorageUnit_t **su,
00710 int createslotdirs)
00711 {
00712
00713 return drms_su_newslots_internal(env, n, series, recnum, lifetime, slotnum, su, createslotdirs, 1);
00714 }
00715
00716 int drms_su_newslots_nosums(DRMS_Env_t *env, int n, char *series,
00717 long long *recnum, DRMS_RecLifetime_t lifetime,
00718 int *slotnum, DRMS_StorageUnit_t **su,
00719 int createslotdirs)
00720 {
00721
00722
00723 return drms_su_newslots_internal(env, n, series, recnum, lifetime, slotnum, su, createslotdirs, 0);
00724 }
00725 #endif
00726
00727
00728
00729 #ifndef DRMS_CLIENT
00730 struct DRMS_RsHandle_struct
00731 {
00732 DB_Handle_t *dbh;
00733 char *requestsTable;
00734 char *seqTable;
00735 };
00736
00737 typedef struct DRMS_RsHandle_struct DRMS_RsHandle_t;
00738
00739 static DRMS_RsHandle_t *crankUpRemoteSums(DRMS_Env_t *env, int *status)
00740 {
00741 int rsStatus = DRMS_SUCCESS;
00742 char *nspace = NULL;
00743 char *table = NULL;
00744 char hostPort[128] = {0};
00745 char seqTable[128] = {0};
00746 DRMS_RsHandle_t *handle = NULL;
00747 DRMS_RsHandle_t *rv = NULL;
00748
00749
00750
00751
00752
00753 handle = calloc(1, sizeof(DRMS_RsHandle_t));
00754 if (!handle)
00755 {
00756 rsStatus = DRMS_ERROR_OUTOFMEMORY;
00757 }
00758
00759
00760 if (!rsStatus)
00761 {
00762 if (get_namespace(RS_REQUEST_TABLE, &nspace, &table))
00763 {
00764 rsStatus = DRMS_ERROR_OUTOFMEMORY;
00765 }
00766 else
00767 {
00768 if (!drms_query_tabexists(env->session, nspace, table, &rsStatus))
00769 {
00770 printkerr("Cannot locate remote-sums requests table %s.\n", RS_REQUEST_TABLE);
00771 rsStatus = DRMS_ERROR_REMOTESUMS_MISSING;
00772 }
00773 else
00774 {
00775 handle->requestsTable = strdup(RS_REQUEST_TABLE);
00776 if (!handle->requestsTable)
00777 {
00778 rsStatus = DRMS_ERROR_OUTOFMEMORY;
00779 }
00780 }
00781 }
00782 }
00783
00784
00785 if (!rsStatus)
00786 {
00787 snprintf(seqTable, sizeof(seqTable), "%s_seq", table);
00788 if (!drms_query_tabexists(env->session, nspace, seqTable, &rsStatus))
00789 {
00790 printkerr("Cannot locate remote-sums requests table sequence table %s_seq.\n", RS_REQUEST_TABLE);
00791 rsStatus = DRMS_ERROR_REMOTESUMS_MISSING;
00792 }
00793 else
00794 {
00795 char tBuf[128];
00796
00797 snprintf(tBuf, sizeof(tBuf), "%s.%s", nspace, seqTable);
00798 handle->seqTable = strdup(tBuf);
00799 if (!handle->seqTable)
00800 {
00801 rsStatus = DRMS_ERROR_OUTOFMEMORY;
00802 }
00803 }
00804 }
00805
00806
00807 if (!rsStatus)
00808 {
00809
00810
00811 struct stat stBuf;
00812 FILE *fptr = NULL;
00813
00814 if (stat(RS_LOCKFILE, &stBuf) || !S_ISREG(stBuf.st_mode))
00815 {
00816 printkerr("rsumsd.py is not running - cannot find PID file %s.\n", RS_LOCKFILE);
00817 rsStatus = DRMS_ERROR_REMOTESUMS_MISSING;
00818 }
00819 else
00820 {
00821
00822 fptr = fopen(RS_LOCKFILE, "r");
00823 if (!fptr)
00824 {
00825 printkerr("Cannot open remote-sums PID file %s.\n", RS_LOCKFILE);
00826 rsStatus = DRMS_ERROR_REMOTESUMS_MISSING;
00827 }
00828 else
00829 {
00830 char lineBuf[LINE_MAX];
00831
00832
00833 if (!fgets(lineBuf, sizeof(lineBuf), fptr))
00834 {
00835 printkerr("remote-sums PID file %s is missing PID.\n", RS_LOCKFILE);
00836 rsStatus = DRMS_ERROR_REMOTESUMS_MISSING;
00837 }
00838 else
00839 {
00840 char procBuf[PATH_MAX];
00841
00842
00843 if (lineBuf[strlen(lineBuf) - 1] == '\n')
00844 {
00845 lineBuf[strlen(lineBuf) - 1] = '\0';
00846 }
00847
00848 snprintf(procBuf, sizeof(procBuf), "/proc/%s", lineBuf);
00849
00850 if (stat(procBuf, &stBuf) || !S_ISDIR(stBuf.st_mode))
00851 {
00852 printkerr("rsumsd.py is not running - cannot find running process %s.\n", procBuf);
00853 rsStatus = DRMS_ERROR_REMOTESUMS_MISSING;
00854 }
00855 }
00856
00857
00858 fclose(fptr);
00859 }
00860 }
00861 }
00862
00863 if (!rsStatus)
00864 {
00865 snprintf(hostPort, sizeof(hostPort), "%s:%d", RS_DBHOST, RS_DBPORT);
00866
00867 if ((handle->dbh = db_connect(hostPort, env->session->db_handle->dbuser, NULL, RS_DBNAME, 1)) == NULL)
00868 {
00869 printkerr("Couldn't connect to remote-sums database (host=%s, user=%s, db=%s).\n", hostPort, env->session->db_handle->dbuser, RS_DBNAME);
00870 rsStatus = DRMS_ERROR_REMOTESUMS_MISSING;
00871 }
00872 }
00873
00874 if (!rsStatus)
00875 {
00876 rv = handle;
00877 }
00878
00879 if (nspace)
00880 {
00881 free(nspace);
00882 }
00883
00884 if (table)
00885 {
00886 free(table);
00887 }
00888
00889 if (status)
00890 {
00891 *status = rsStatus;
00892 }
00893
00894 return rv;
00895 }
00896
00897 static void shutDownRemoteSums(DRMS_RsHandle_t **pHandle)
00898 {
00899 if (pHandle)
00900 {
00901 DRMS_RsHandle_t *handle = *pHandle;
00902
00903 if (handle)
00904 {
00905 if (handle->seqTable)
00906 {
00907 free(handle->seqTable);
00908 handle->seqTable = NULL;
00909 }
00910
00911 if (handle->requestsTable)
00912 {
00913 free(handle->requestsTable);
00914 handle->requestsTable = NULL;
00915 }
00916
00917 if (handle->dbh)
00918 {
00919 db_disconnect(&handle->dbh);
00920 }
00921
00922 free(handle);
00923 }
00924
00925 *pHandle = NULL;
00926 }
00927 }
00928
00929 static int processRemoteSums(DRMS_RsHandle_t *handle, int64_t *sunums, unsigned int nsunums)
00930 {
00931 int rsStatus = DRMS_SUCCESS;
00932 char idBuf[128];
00933
00934 if (!rsStatus)
00935 {
00936
00937
00938
00939
00940
00941
00942 char *sunumList = NULL;
00943 size_t szList = 128;
00944
00945 sunumList = calloc(szList, sizeof(char));
00946
00947 if (!sunumList)
00948 {
00949 rsStatus = DRMS_ERROR_OUTOFMEMORY;
00950 }
00951 else
00952 {
00953 int iSunum;
00954 char numBuf[64];
00955
00956 for (iSunum = 0; iSunum < nsunums; iSunum++)
00957 {
00958 if (sunums[iSunum] >= 0)
00959 {
00960 if (iSunum > 0)
00961 {
00962 sunumList = base_strcatalloc(sunumList, ",", &szList);
00963 }
00964
00965 snprintf(numBuf, sizeof(numBuf), "%lld", (long long)(sunums[iSunum]));
00966 sunumList = base_strcatalloc(sunumList, numBuf, &szList);
00967 }
00968 }
00969
00970 if (strlen(sunumList) > 0)
00971 {
00972
00973 char *cmd = NULL;
00974 size_t szCmd;
00975
00976 szCmd = strlen(sunumList) + 256;
00977 cmd = calloc(szCmd, sizeof(char));
00978
00979 if (!cmd)
00980 {
00981 rsStatus = DRMS_ERROR_OUTOFMEMORY;
00982 }
00983 else
00984 {
00985
00986 long long nextID;
00987
00988
00989
00990
00991 if (!handle || !handle->seqTable || !handle->requestsTable || !handle->dbh)
00992 {
00993 printkerr("Invalid remote-sums handle.\n");
00994 rsStatus = DRMS_ERROR_REMOTESUMS_MISSING;
00995 }
00996 else
00997 {
00998
00999 nextID = db_sequence_getnext(handle->dbh, handle->requestsTable);
01000 snprintf(idBuf, sizeof(idBuf), "%lld", nextID);
01001
01002 cmd = base_strcatalloc(cmd, "INSERT INTO ", &szCmd);
01003 cmd = base_strcatalloc(cmd, handle->requestsTable, &szCmd);
01004 cmd = base_strcatalloc(cmd, "(requestid, dbhost, dbport, dbname, starttime, sunums, status, errmsg) VALUES (", &szCmd);
01005 cmd = base_strcatalloc(cmd, idBuf, &szCmd);
01006 cmd = base_strcatalloc(cmd, ", '", &szCmd);
01007 cmd = base_strcatalloc(cmd, handle->dbh->dbhost, &szCmd);
01008 cmd = base_strcatalloc(cmd, "', ", &szCmd);
01009
01010 cmd = base_strcatalloc(cmd, handle->dbh->dbport, &szCmd);
01011 cmd = base_strcatalloc(cmd, ", '", &szCmd);
01012 cmd = base_strcatalloc(cmd, handle->dbh->dbname, &szCmd);
01013
01014 cmd = base_strcatalloc(cmd, "', localtimestamp(0), '", &szCmd);
01015 cmd = base_strcatalloc(cmd, sunumList, &szCmd);
01016 cmd = base_strcatalloc(cmd, "', 'N', '')", &szCmd);
01017
01018
01019 if (db_dms(handle->dbh, NULL, cmd))
01020 {
01021 printkerr("Failure inserting record for new remote-sums request: %s.\n", cmd);
01022 rsStatus = DRMS_ERROR_REMOTESUMS_REQUEST;
01023 }
01024 }
01025
01026 free(cmd);
01027 }
01028 }
01029
01030 free(sunumList);
01031 }
01032 }
01033
01034 if (!rsStatus)
01035 {
01036 char dbCmd[256];
01037 DB_Text_Result_t *dbRes = NULL;
01038 const char *reqStatus = NULL;
01039 const char *reqErrmsg = NULL;
01040 time_t timeStart;
01041
01042
01043
01044 timeStart = time(NULL);
01045 while (1)
01046 {
01047
01048 if (time(NULL) > timeStart + 12 * 60 * 60)
01049 {
01050 rsStatus = DRMS_REMOTESUMS_TRYLATER;
01051 break;
01052 }
01053
01054
01055 snprintf(dbCmd, sizeof(dbCmd), "SELECT status,errmsg FROM %s WHERE requestid = %s", handle->requestsTable, idBuf);
01056 dbRes = db_query_txt(handle->dbh, dbCmd);
01057
01058 if (dbRes && dbRes->num_rows == 1 && dbRes->num_cols == 2)
01059 {
01060
01061 reqStatus = dbRes->field[0][0];
01062 reqErrmsg = dbRes->field[0][1];
01063
01064 if (*reqStatus == 'E')
01065 {
01066 printkerr(reqErrmsg);
01067 rsStatus = DRMS_ERROR_REMOTESUMS_REQUEST;
01068
01069 break;
01070 }
01071 else if (*reqStatus == 'C')
01072 {
01073 break;
01074 }
01075 }
01076 else
01077 {
01078 printkerr("Error checking on remote-sums request %s.\n", idBuf);
01079 rsStatus = DRMS_ERROR_REMOTESUMS_REQUEST;
01080 break;
01081 }
01082
01083 if (dbRes)
01084 {
01085 db_free_text_result(dbRes);
01086 dbRes = NULL;
01087 }
01088
01089 sleep(1);
01090 }
01091
01092 if (dbRes)
01093 {
01094 db_free_text_result(dbRes);
01095 dbRes = NULL;
01096 }
01097
01098
01099 snprintf(dbCmd, sizeof(dbCmd), "DELETE FROM %s WHERE requestid = %s", handle->requestsTable, idBuf);
01100
01101
01102 if (db_dms(handle->dbh, NULL, dbCmd))
01103 {
01104 printkerr("Failure deleting record for completed remote-sums request: %s.\n", dbCmd);
01105 rsStatus = DRMS_ERROR_REMOTESUMS_REQUEST;
01106 }
01107 }
01108
01109 return rsStatus;
01110 }
01111
01112 int drms_su_getsudir(DRMS_Env_t *env, DRMS_StorageUnit_t *su, int retrieve)
01113 {
01114 int sustatus = DRMS_SUCCESS;
01115 int tryagain;
01116 int natts;
01117 int16_t stagingRet = INT16_MIN;
01118 int status = 0;
01119
01120 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
01121 DRMS_MtSumsRequest_t *request = NULL;
01122 DRMS_MtSumsRequest_t *reply = NULL;
01123 #else
01124 DRMS_SumRequest_t *request = NULL;
01125 DRMS_SumRequest_t *reply = NULL;
01126 #endif
01127
01128
01129
01130
01131
01132
01133
01134
01135
01136 #if defined(JMD_IS_INSTALLED) && JMD_IS_INSTALLED
01137 struct POSTState ps;
01138 int postsize;
01139 char *postrequeststr = NULL;
01140 HContainer_t *postmap = NULL;
01141 char *sname = NULL;
01142 long long sunum;
01143 int inprogress;
01144 int ntries;
01145 #endif
01146
01147 int64_t *sunums = NULL;
01148 DRMS_RsHandle_t *rsHandle = NULL;
01149
01150
01151
01152
01153 drms_lock_server(env);
01154
01155 if (!env->sum_thread) {
01156 if((status = pthread_create(&env->sum_thread, NULL, &drms_sums_thread,
01157 (void *) env))) {
01158 fprintf(stderr,"Thread creation failed: %d\n", status);
01159 drms_unlock_server(env);
01160 return 1;
01161 }
01162 }
01163
01164 tryagain = 1;
01165 natts = 1;
01166 while (tryagain && natts < 3)
01167 {
01168 tryagain = 0;
01169
01170 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
01171 request = calloc(1, sizeof(DRMS_MtSumsRequest_t));
01172 XASSERT(request);
01173 request->sunum = calloc(1, sizeof(uint64_t));
01174 XASSERT(request->sunum);
01175 #else
01176 request = malloc(sizeof(DRMS_SumRequest_t));
01177 XASSERT(request);
01178 #endif
01179
01180 request->opcode = DRMS_SUMGET;
01181 request->reqcnt = 1;
01182 request->sunum[0] = su->sunum;
01183 request->mode = NORETRIEVE + TOUCH;
01184 if (retrieve)
01185 request->mode = RETRIEVE + TOUCH;
01186
01187 request->dontwait = 0;
01188
01189
01190
01191
01192
01193
01194
01195 if (env->retention != INT16_MIN)
01196 {
01197
01198 request->tdays = -1 * abs(env->retention);
01199 }
01200 else if ((stagingRet = drms_series_getstagingretention(su->seriesinfo)) != INT16_MIN)
01201 {
01202
01203
01204 if (stagingRet == 0)
01205 {
01206
01207 request->tdays = -1 * abs(STDRETENTION);
01208 }
01209 else
01210 {
01211 request->tdays = -1 * stagingRet;
01212 }
01213 }
01214 else
01215 {
01216
01217 request->tdays = -1 * abs(STDRETENTION);
01218 }
01219
01220
01221 tqueueAdd(env->sum_inbox, (long) pthread_self(), (char *)request);
01222
01223
01224
01225
01226
01227 drms_unlock_server(env);
01228 tqueueDel(env->sum_outbox, (long) pthread_self(), (char **)&reply);
01229 drms_lock_server(env);
01230
01231 if (reply->opcode != 0)
01232 {
01233 if (reply->opcode == 3)
01234 {
01235
01236 if (reply->sudir)
01237 {
01238
01239 for (int i = 0; i < reply->reqcnt; i++)
01240 {
01241 if ((reply->sudir)[i])
01242 {
01243 free((reply->sudir)[i]);
01244 (reply->sudir)[i] = NULL;
01245 }
01246 }
01247
01248 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
01249 free(reply->sudir);
01250 reply->sudir = NULL;
01251 #endif
01252 }
01253
01254 free(reply);
01255 drms_unlock_server(env);
01256 return DRMS_ERROR_SUMSTRYLATER;
01257 }
01258 else if (reply->opcode == -2)
01259 {
01260 fprintf(stderr, "Cannot access SUMS in this DRMS session - a tape read is pending.\n");
01261
01262 if (reply->sudir)
01263 {
01264 for (int i = 0; i < reply->reqcnt; i++)
01265 {
01266 if ((reply->sudir)[i])
01267 {
01268 free((reply->sudir)[i]);
01269 (reply->sudir)[i] = NULL;
01270 }
01271 }
01272
01273 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
01274 free(reply->sudir);
01275 reply->sudir = NULL;
01276 #endif
01277 }
01278 free(reply);
01279 drms_unlock_server(env);
01280 return DRMS_ERROR_PENDINGTAPEREAD;
01281 }
01282 else
01283 {
01284 fprintf(stderr, "SUM GET failed with error code %d.\n", reply->opcode);
01285
01286 if (reply->sudir)
01287 {
01288 for (int i = 0; i < reply->reqcnt; i++)
01289 {
01290 if ((reply->sudir)[i])
01291 {
01292 free((reply->sudir)[i]);
01293 (reply->sudir)[i] = NULL;
01294 }
01295 }
01296
01297 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
01298 free(reply->sudir);
01299 reply->sudir = NULL;
01300 #endif
01301 }
01302
01303 free(reply);
01304 drms_unlock_server(env);
01305 return DRMS_ERROR_SUMGET;
01306 }
01307 }
01308 else
01309 {
01310 su->sudir[0] = '\0';
01311
01312 if (reply->sudir && strlen(reply->sudir[0]) > 0)
01313 {
01314 snprintf(su->sudir, sizeof(su->sudir), "%s", reply->sudir[0]);
01315 }
01316 else if (retrieve && natts < 2 && su->sunum >= 0 && drms_su_isremotesu(su->sunum))
01317 {
01318
01319
01320
01321
01322
01323
01324
01325
01326
01327
01328
01329 #if defined(JMD_IS_INSTALLED) && JMD_IS_INSTALLED
01330
01331 postrequeststr=NULL;
01332
01333 postmap = hcon_create(sizeof(struct PassSunumList *), 128, NULL, NULL, NULL, NULL, 0);
01334 sname = su->seriesinfo ? su->seriesinfo->seriesname : "unknown";
01335 sunum = su->sunum;
01336 add_sunum_to_POST(env, postmap,sname,sunum);
01337
01338
01339
01340
01341 postsize = create_post_msg (postmap, &postrequeststr);
01342 send_POST_request (postrequeststr, postsize, &ps);
01343
01344 inprogress=-1;
01345 ntries = 0;
01346
01347 int timeOutSecs = 1 * 60 * 60;
01348 int sleepSecs = 10;
01349 int maxTries = timeOutSecs / sleepSecs;
01350
01351 while ((inprogress = session_status(ps.session_id)) > 0 && ntries < maxTries)
01352 {
01353 sleep(sleepSecs);
01354 ntries++;
01355 }
01356
01357
01358 free_post_request(postmap);
01359 free(postrequeststr);
01360
01361 if (0 != inprogress)
01362 {
01363
01364 sustatus = DRMS_REMOTESUMS_TRYLATER;
01365 tryagain = 0;
01366 }
01367 else
01368 {
01369 tryagain = 1;
01370 }
01371
01372 #else
01373
01374 {
01375 sunums = calloc(1, sizeof(int64_t));
01376 rsHandle = NULL;
01377
01378 if (!sunums)
01379 {
01380 sustatus = DRMS_ERROR_OUTOFMEMORY;
01381 tryagain = 0;
01382 }
01383 else
01384 {
01385 sunums[0] = su->sunum;
01386 rsHandle = crankUpRemoteSums(env, &sustatus);
01387
01388 if (!sustatus && rsHandle)
01389 {
01390 sustatus = processRemoteSums(rsHandle, sunums, 1);
01391
01392 if (!sustatus)
01393 {
01394 tryagain = 1;
01395 }
01396 else
01397 {
01398 tryagain = 0;
01399 }
01400 }
01401 else
01402 {
01403 if (!sustatus)
01404 {
01405 sustatus = DRMS_ERROR_REMOTESUMS_INITIALIZATION;
01406 }
01407
01408 tryagain = 0;
01409 }
01410
01411 if (rsHandle)
01412 {
01413 shutDownRemoteSums(&rsHandle);
01414 }
01415
01416 free(sunums);
01417 sunums = NULL;
01418 }
01419 }
01420 #endif
01421 }
01422 else
01423 {
01424
01425
01426 (su->sudir)[0] = '\0';
01427 }
01428 }
01429
01430
01431 for (int i = 0; i < reply->reqcnt; i++)
01432 {
01433 if ((reply->sudir)[i])
01434 {
01435 free((reply->sudir)[i]);
01436 (reply->sudir)[i] = NULL;
01437 }
01438 }
01439
01440 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
01441 free(reply->sudir);
01442 reply->sudir = NULL;
01443 #endif
01444
01445
01446 free(reply);
01447 natts++;
01448 }
01449
01450 drms_unlock_server(env);
01451
01452 return sustatus;
01453 }
01454 #endif
01455
01456
01457
01458 #ifndef DRMS_CLIENT
01459 int drms_su_getsudirs(DRMS_Env_t *env, int n, DRMS_StorageUnit_t **su, int retrieve, int dontwait)
01460 {
01461 int sustatus = DRMS_SUCCESS;
01462 DRMS_StorageUnit_t **workingsus = NULL;
01463 DRMS_StorageUnit_t **rsumssus = NULL;
01464
01465 LinkedList_t *retrysus = NULL;
01466 int nretrySUNUMS = 0;
01467 int workingn;
01468 int tryagain;
01469 int natts;
01470 int16_t maxRet;
01471 int16_t stagingRet = INT16_MIN;
01472 int maxNoSus = 0;
01473
01474 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
01475 DRMS_MtSumsRequest_t *request = NULL;
01476 DRMS_MtSumsRequest_t *reply = NULL;
01477 maxNoSus = MAX_MTSUMS_NSUS;
01478 #else
01479 DRMS_SumRequest_t *request = NULL;
01480 DRMS_SumRequest_t *reply = NULL;
01481 maxNoSus = MAXSUMREQCNT;
01482 #endif
01483
01484
01485
01486
01487
01488
01489
01490
01491 int status = 0;
01492 int isu;
01493 int iSUMSsunum;
01494 DRMS_StorageUnit_t *onesu = NULL;
01495 int start = 0;
01496 int end = 0;
01497 ListNode_t *node = NULL;
01498 int64_t *sunums = NULL;
01499 DRMS_RsHandle_t *rsHandle = NULL;
01500 DRMS_StorageUnit_t *rsu = NULL;
01501
01502 #if defined(JMD_IS_INSTALLED) && JMD_IS_INSTALLED
01503 DRMS_StorageUnit_t *jmd_rsu;
01504 struct POSTState ps;
01505 char *postrequeststr;
01506 HContainer_t *postmap;
01507 char *sname;
01508 long long sunum;
01509 int postsize;
01510 int inprogress;
01511 int ntries;
01512 #endif
01513
01514
01515
01516 drms_lock_server(env);
01517
01518 if (!env->sum_thread) {
01519 if((status = pthread_create(&env->sum_thread, NULL, &drms_sums_thread,
01520 (void *) env))) {
01521 fprintf(stderr,"Thread creation failed: %d\n", status);
01522 drms_unlock_server(env);
01523 return 1;
01524 }
01525 }
01526
01527 onesu = NULL;
01528
01529
01530 dontwait = 0;
01531
01532 for (isu = 0; isu < n; isu++)
01533 {
01534 onesu = su[isu];
01535
01536
01537 *(onesu->sudir) = '\0';
01538 }
01539
01540
01541
01542 start = 0;
01543 end = SUMIN(maxNoSus, n);
01544
01545 workingsus = su;
01546 workingn = n;
01547
01548 tryagain = 1;
01549 natts = 1;
01550 maxRet = -1;
01551
01552 if (env->retention != INT16_MIN)
01553 {
01554
01555 maxRet = (int16_t)abs(env->retention);
01556 }
01557
01558 while (tryagain && natts < 3)
01559 {
01560 tryagain = 0;
01561
01562
01563 while (start < workingn)
01564 {
01565
01566 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
01567 request = calloc(1, sizeof(DRMS_MtSumsRequest_t));
01568 XASSERT(request);
01569 request->sunum = calloc(maxNoSus, sizeof(uint64_t));
01570 XASSERT(request->sunum);
01571 #else
01572 request = malloc(sizeof(DRMS_SumRequest_t));
01573 XASSERT(request);
01574 #endif
01575
01576 request->opcode = DRMS_SUMGET;
01577 request->reqcnt = end - start;
01578
01579 for (isu = start, iSUMSsunum = 0; isu < end; isu++, iSUMSsunum++)
01580 {
01581 request->sunum[iSUMSsunum] = workingsus[isu]->sunum;
01582 if (maxRet == -1)
01583 {
01584
01585
01586 stagingRet = drms_series_getstagingretention(workingsus[isu]->seriesinfo);
01587 if (stagingRet != INT16_MIN)
01588 {
01589 if (stagingRet > maxRet)
01590 {
01591 maxRet = stagingRet;
01592 }
01593 }
01594 }
01595 }
01596
01597 request->mode = NORETRIEVE + TOUCH;
01598 if (retrieve)
01599 request->mode = RETRIEVE + TOUCH;
01600
01601 request->dontwait = dontwait;
01602
01603
01604
01605
01606
01607
01608
01609 if (maxRet != -1 && maxRet != 0)
01610 {
01611
01612
01613 request->tdays = -1 * maxRet;
01614 }
01615 else
01616 {
01617
01618 request->tdays = -1 * abs(STDRETENTION);
01619 }
01620
01621
01622 tqueueAdd(env->sum_inbox, (long) pthread_self(), (char *) request);
01623
01624
01625 if (!dontwait)
01626 {
01627
01628
01629
01630
01631 drms_unlock_server(env);
01632 tqueueDel(env->sum_outbox, (long) pthread_self(), (char **)&reply);
01633 drms_lock_server(env);
01634
01635 if (reply->opcode != 0)
01636 {
01637 if (reply->opcode == 3)
01638 {
01639 if (reply->sudir)
01640 {
01641 for (int i = 0; i < reply->reqcnt; i++)
01642 {
01643 if ((reply->sudir)[i])
01644 {
01645 free((reply->sudir)[i]);
01646 (reply->sudir)[i] = NULL;
01647 }
01648 }
01649
01650 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
01651 free(reply->sudir);
01652 reply->sudir = NULL;
01653 #endif
01654 }
01655
01656 free(reply);
01657 drms_unlock_server(env);
01658
01659
01660 return DRMS_ERROR_SUMSTRYLATER;
01661 }
01662 else if (reply->opcode == -2)
01663 {
01664 fprintf(stderr, "Cannot access SUMS in this DRMS session - a tape read is pending.\n");
01665
01666 if (reply->sudir)
01667 {
01668 for (int i = 0; i < reply->reqcnt; i++)
01669 {
01670 if ((reply->sudir)[i])
01671 {
01672 free((reply->sudir)[i]);
01673 (reply->sudir)[i] = NULL;
01674 }
01675 }
01676
01677 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
01678 free(reply->sudir);
01679 reply->sudir = NULL;
01680 #endif
01681 }
01682
01683 free(reply);
01684 drms_unlock_server(env);
01685 return DRMS_ERROR_PENDINGTAPEREAD;
01686 }
01687 else if (reply->opcode == -3)
01688 {
01689 fprintf(stderr, "Failure setting sum-get-pending flag.\n");
01690
01691 if (reply->sudir)
01692 {
01693 for (int i = 0; i < reply->reqcnt; i++)
01694 {
01695 if ((reply->sudir)[i])
01696 {
01697 free((reply->sudir)[i]);
01698 (reply->sudir)[i] = NULL;
01699 }
01700 }
01701
01702 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
01703 free(reply->sudir);
01704 reply->sudir = NULL;
01705 #endif
01706 }
01707
01708 free(reply);
01709 drms_unlock_server(env);
01710 return DRMS_ERROR_SUMGET;
01711 }
01712 else if (reply->opcode == -4)
01713 {
01714 fprintf(stderr, "Failure UNsetting sum-get-pending flag.\n");
01715
01716 if (reply->sudir)
01717 {
01718 for (int i = 0; i < reply->reqcnt; i++)
01719 {
01720 if ((reply->sudir)[i])
01721 {
01722 free((reply->sudir)[i]);
01723 (reply->sudir)[i] = NULL;
01724 }
01725 }
01726
01727 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
01728 free(reply->sudir);
01729 reply->sudir = NULL;
01730 #endif
01731 }
01732 free(reply);
01733 drms_unlock_server(env);
01734 return DRMS_ERROR_SUMGET;
01735 }
01736 else
01737 {
01738 fprintf(stderr, "SUM GET failed with error code %d.\n", reply->opcode);
01739
01740 if (reply->sudir)
01741 {
01742 for (int i = 0; i < reply->reqcnt; i++)
01743 {
01744 if ((reply->sudir)[i])
01745 {
01746 free((reply->sudir)[i]);
01747 (reply->sudir)[i] = NULL;
01748 }
01749 }
01750
01751 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
01752 free(reply->sudir);
01753 reply->sudir = NULL;
01754 #endif
01755 }
01756
01757 free(reply);
01758 drms_unlock_server(env);
01759 return DRMS_ERROR_SUMGET;
01760 }
01761 }
01762 else
01763 {
01764 if (!retrysus)
01765 {
01766 retrysus = list_llcreate(sizeof(DRMS_StorageUnit_t *), NULL);
01767 }
01768
01769 for (isu = start, iSUMSsunum = 0; isu < end; isu++, iSUMSsunum++)
01770 {
01771 if (strlen(reply->sudir[iSUMSsunum]) > 0)
01772 {
01773
01774
01775 strncpy(workingsus[isu]->sudir,
01776 reply->sudir[iSUMSsunum],
01777 DRMS_MAXPATHLEN);
01778 }
01779 else if (retrieve && natts < 2 && workingsus[isu]->sunum >= 0 && drms_su_isremotesu(workingsus[isu]->sunum))
01780 {
01781
01782
01783
01784
01785
01786
01787
01788
01789
01790
01791 snprintf(workingsus[isu]->sudir, DRMS_MAXPATHLEN, "%s", "rs");
01792
01793
01794
01795 list_llinserttail(retrysus, &(workingsus[isu]));
01796 }
01797 else
01798 {
01799
01800
01801 (workingsus[isu]->sudir)[0] = '\0';
01802 }
01803
01804 if (reply->sudir[iSUMSsunum])
01805 {
01806 free(reply->sudir[iSUMSsunum]);
01807 reply->sudir[iSUMSsunum] = NULL;
01808 }
01809 }
01810 }
01811
01812 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
01813 free(reply->sudir);
01814 reply->sudir = NULL;
01815 #endif
01816
01817 free(reply);
01818 }
01819
01820 start = end;
01821 end = SUMIN(maxNoSus + start, workingn);
01822
01823
01824 }
01825
01826
01827
01828 if (natts < 2 && retrysus && list_llgetnitems(retrysus) > 0)
01829 {
01830 node = NULL;
01831 #if defined(JMD_IS_INSTALLED) && JMD_IS_INSTALLED
01832
01833
01834 jmd_rsu = NULL;
01835
01836 list_llreset(retrysus);
01837
01838 postrequeststr=NULL;
01839
01840 postmap = hcon_create(sizeof(struct PassSunumList *), 128, NULL, NULL, NULL, NULL, 0);
01841 while ((node = list_llnext(retrysus)) != NULL)
01842 {
01843 jmd_rsu = *((DRMS_StorageUnit_t **)(node->data));
01844 sname = jmd_rsu->seriesinfo ? jmd_rsu->seriesinfo->seriesname : "unknown";
01845 sunum = jmd_rsu->sunum;
01846 add_sunum_to_POST(env, postmap,sname,sunum);
01847 }
01848
01849 postsize = create_post_msg(postmap,&postrequeststr);
01850 send_POST_request(postrequeststr,postsize,&ps);
01851
01852
01853
01854
01855
01856
01857 ntries = 0;
01858 int timeOutSecs = 12 * 60 * 60;
01859 int sleepSecs = 10;
01860 int maxTries = timeOutSecs / sleepSecs;
01861
01862 while ((inprogress = session_status(ps.session_id)) > 0 && ntries < maxTries)
01863 {
01864
01865
01866 sleep(sleepSecs);
01867 ntries++;
01868 }
01869
01870
01871 free_post_request(postmap);
01872 free(postrequeststr);
01873
01874 if (ntries >= maxTries)
01875 {
01876
01877 sustatus = DRMS_REMOTESUMS_TRYLATER;
01878 tryagain = 0;
01879 break;
01880 }
01881
01882 if (0 == inprogress)
01883 {
01884
01885
01886 tryagain = 1;
01887 nretrySUNUMS = list_llgetnitems(retrysus);
01888
01889 start = 0;
01890
01891 end = SUMIN(maxNoSus, nretrySUNUMS);
01892 rsumssus = (DRMS_StorageUnit_t **)malloc(sizeof(DRMS_StorageUnit_t *) * nretrySUNUMS);
01893
01894 isu = 0;
01895
01896 while (node = list_llgethead(retrysus))
01897 {
01898 onesu = (DRMS_StorageUnit_t *)malloc(sizeof(DRMS_StorageUnit_t));
01899 onesu->sunum = (*(DRMS_StorageUnit_t **)(node->data))->sunum;
01900 *(onesu->sudir) = '\0';
01901 rsumssus[isu] = onesu;
01902 isu++;
01903 list_llremove(retrysus, node);
01904 list_llfreenode(&node);
01905 }
01906
01907 if (retrysus)
01908 {
01909 list_llfree(&retrysus);
01910 }
01911
01912 workingsus = rsumssus;
01913 workingn = nretrySUNUMS;
01914 natts++;
01915 }
01916
01917 #else
01918
01919 {
01920 sunums = NULL;
01921 rsHandle = NULL;
01922 rsu = NULL;
01923
01924 nretrySUNUMS = list_llgetnitems(retrysus);
01925 sunums = calloc(SUMIN(maxNoSus, nretrySUNUMS), sizeof(int64_t));
01926
01927 if (!sunums)
01928 {
01929 sustatus = DRMS_ERROR_OUTOFMEMORY;
01930 tryagain = 0;
01931 }
01932 else
01933 {
01934 rsHandle = crankUpRemoteSums(env, &sustatus);
01935
01936 if (!sustatus && rsHandle)
01937 {
01938
01939 for (list_llreset(retrysus), isu = 0, iSUMSsunum = 0; ((node = list_llnext(retrysus)) != NULL); isu++)
01940 {
01941 rsu = *((DRMS_StorageUnit_t **)(node->data));
01942 sunums[iSUMSsunum++] = rsu->sunum;
01943
01944 if (iSUMSsunum > 0 && (iSUMSsunum % maxNoSus == 0 || isu == nretrySUNUMS - 1))
01945 {
01946
01947 sustatus = processRemoteSums(rsHandle, sunums, iSUMSsunum);
01948 free(sunums);
01949 sunums = NULL;
01950
01951 iSUMSsunum = 0;
01952
01953 if (sustatus)
01954 {
01955 break;
01956 }
01957
01958 if (isu < nretrySUNUMS - 1)
01959 {
01960 sunums = calloc(SUMIN(maxNoSus, nretrySUNUMS - isu - 1), sizeof(int64_t));
01961
01962 if (!sunums)
01963 {
01964 sustatus = DRMS_ERROR_OUTOFMEMORY;
01965 break;
01966 }
01967 }
01968 }
01969 }
01970 }
01971 else
01972 {
01973 if (!sustatus)
01974 {
01975 sustatus = DRMS_ERROR_REMOTESUMS_INITIALIZATION;
01976 }
01977 }
01978
01979 if (rsHandle)
01980 {
01981 shutDownRemoteSums(&rsHandle);
01982 }
01983
01984 if (!sustatus)
01985 {
01986
01987 tryagain = 1;
01988
01989 start = 0;
01990
01991 end = SUMIN(maxNoSus, nretrySUNUMS);
01992 rsumssus = (DRMS_StorageUnit_t **)malloc(sizeof(DRMS_StorageUnit_t *) * nretrySUNUMS);
01993
01994 isu = 0;
01995 while (node = list_llgethead(retrysus))
01996 {
01997 onesu = (DRMS_StorageUnit_t *)malloc(sizeof(DRMS_StorageUnit_t));
01998 onesu->sunum = (*(DRMS_StorageUnit_t **)(node->data))->sunum;
01999 *(onesu->sudir) = '\0';
02000 rsumssus[isu] = onesu;
02001 isu++;
02002 list_llremove(retrysus, node);
02003 list_llfreenode(&node);
02004 }
02005
02006 if (retrysus)
02007 {
02008 list_llfree(&retrysus);
02009 }
02010
02011 workingsus = rsumssus;
02012 workingn = nretrySUNUMS;
02013 natts++;
02014 }
02015 else
02016 {
02017 tryagain = 0;
02018 }
02019 }
02020 }
02021 #endif
02022 }
02023 }
02024
02025 if (retrieve && rsumssus && nretrySUNUMS > 0)
02026 {
02027
02028 for (isu = 0, iSUMSsunum = 0; isu < n; isu++)
02029 {
02030 if (strcmp(su[isu]->sudir, "rs") == 0)
02031 {
02032
02033 snprintf(su[isu]->sudir, DRMS_MAXPATHLEN, "%s", rsumssus[iSUMSsunum]->sudir);
02034 iSUMSsunum++;
02035 }
02036 }
02037 }
02038
02039 if (rsumssus)
02040 {
02041 for (isu = 0; isu < nretrySUNUMS; isu++)
02042 {
02043 if (rsumssus[isu])
02044 {
02045 free(rsumssus[isu]);
02046 }
02047 }
02048
02049 free(rsumssus);
02050 }
02051
02052 if (retrysus)
02053 {
02054 list_llfree(&retrysus);
02055 }
02056
02057 drms_unlock_server(env);
02058
02059 return sustatus;
02060 }
02061 #endif
02062
02063 #ifndef DRMS_CLIENT
02064 int drms_su_setretention(DRMS_Env_t *env, int16_t newRetention, int nsus, long long *sunums)
02065 {
02066 int drmsStatus;
02067 int isu;
02068 int start;
02069 int szChunk;
02070 int maxNoSus = 0;
02071
02072 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
02073 maxNoSus = MAX_MTSUMS_NSUS;
02074 DRMS_MtSumsRequest_t *request = NULL;
02075 DRMS_MtSumsRequest_t *reply = NULL;
02076 #else
02077 maxNoSus = MAXSUMREQCNT;
02078 DRMS_SumRequest_t *request = NULL;
02079 DRMS_SumRequest_t *reply = NULL;
02080 #endif
02081
02082 drmsStatus = DRMS_SUCCESS;
02083
02084 drms_lock_server(env);
02085
02086 if (!env->sum_thread)
02087 {
02088 int libStat;
02089 if (libStat = pthread_create(&env->sum_thread, NULL, &drms_sums_thread, (void *)env))
02090 {
02091 fprintf(stderr,"Thread creation failed: %d\n", libStat);
02092 drmsStatus = DRMS_ERROR_CANTCREATETHREAD;
02093 }
02094 }
02095
02096 if (drmsStatus == DRMS_SUCCESS)
02097 {
02098 HContainer_t *map = NULL;
02099 int yep;
02100 char key[128];
02101
02102 map = hcon_create(sizeof(SUM_info_t *), 128, NULL, NULL, NULL, NULL, 0);
02103
02104 if (!map)
02105 {
02106 drmsStatus = DRMS_ERROR_OUTOFMEMORY;
02107 }
02108 else
02109 {
02110 yep = 1;
02111 start = 0;
02112
02113 while (start < nsus)
02114 {
02115
02116 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
02117 request = calloc(1, sizeof(DRMS_MtSumsRequest_t));
02118 XASSERT(request);
02119 request->sunum = calloc(maxNoSus, sizeof(uint64_t));
02120 XASSERT(request->sunum);
02121 #else
02122 request = malloc(sizeof(DRMS_SumRequest_t));
02123 XASSERT(request);
02124 #endif
02125 request->opcode = DRMS_SUMGET;
02126
02127 for (isu = start, szChunk = 0; szChunk < maxNoSus && isu < nsus; isu++)
02128 {
02129
02130
02131
02132
02133 snprintf(key, sizeof(key), "%llu", (unsigned long long)sunums[isu]);
02134 if (sunums[isu] >= 0 && !hcon_member(map, key))
02135 {
02136 hcon_insert(map, key, &yep);
02137 request->sunum[szChunk] = sunums[isu];
02138 szChunk++;
02139 }
02140 }
02141
02142 start += szChunk;
02143
02144 request->reqcnt = szChunk;
02145
02146
02147 if (newRetention != 0)
02148 {
02149 request->mode = RETRIEVE + TOUCH;
02150 }
02151 else
02152 {
02153 request->mode = NORETRIEVE + TOUCH;
02154 }
02155
02156 request->dontwait = 0;
02157
02158
02159 request->tdays = newRetention;
02160
02161
02162 tqueueAdd(env->sum_inbox, (long) pthread_self(), (char *) request);
02163
02164
02165
02166
02167
02168
02169
02170 drms_unlock_server(env);
02171 tqueueDel(env->sum_outbox, (long) pthread_self(), (char **)&reply);
02172 drms_lock_server(env);
02173
02174 if (reply->opcode != 0)
02175 {
02176 if (reply->opcode == 3)
02177 {
02178
02179 drmsStatus = DRMS_ERROR_SUMSTRYLATER;
02180 }
02181 else if (reply->opcode == -2)
02182 {
02183 fprintf(stderr, "Cannot access SUMS in this DRMS session - a tape read is pending.\n");
02184 drmsStatus = DRMS_ERROR_PENDINGTAPEREAD;
02185 }
02186 else if (reply->opcode == -3)
02187 {
02188 fprintf(stderr, "Failure setting sum-get-pending flag.\n");
02189 drmsStatus = DRMS_ERROR_SUMGET;
02190 }
02191 else if (reply->opcode == -4)
02192 {
02193 fprintf(stderr, "Failure UNsetting sum-get-pending flag.\n");
02194 drmsStatus = DRMS_ERROR_SUMGET;
02195 }
02196 else
02197 {
02198 fprintf(stderr, "SUM GET failed with error code %d.\n", reply->opcode);
02199 drmsStatus = DRMS_ERROR_SUMGET;
02200 }
02201 }
02202
02203
02204
02205
02206 if (reply->sudir)
02207 {
02208 for (isu = 0; isu < szChunk; isu++)
02209 {
02210 if (reply->sudir[isu])
02211 {
02212 free(reply->sudir[isu]);
02213 }
02214 }
02215 }
02216
02217 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
02218 if (reply->sudir)
02219 {
02220 free(reply->sudir);
02221 reply->sudir = NULL;
02222 }
02223 #endif
02224
02225 free(reply);
02226 }
02227
02228 hcon_destroy(&map);
02229 }
02230 }
02231
02232 drms_unlock_server(env);
02233
02234 return drmsStatus;
02235 }
02236 #endif
02237
02238 #ifndef DRMS_CLIENT
02239 static void SUFreeInfo(const void *value)
02240 {
02241 SUM_info_t *tofree = *((SUM_info_t **)value);
02242 if (tofree)
02243 {
02244 free(tofree);
02245 }
02246 }
02247
02248
02249 int drms_su_getinfo(DRMS_Env_t *env, long long *sunums, int nsunums, SUM_info_t **info)
02250 {
02251 int status = DRMS_SUCCESS;
02252 HContainer_t *map = NULL;
02253 int isunum;
02254 int iinfo;
02255 int nReqs;
02256 char key[128];
02257 SUM_info_t *nulladdr = NULL;
02258 SUM_info_t **pinfo = NULL;
02259 int maxNoSus = 0;
02260
02261 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_INFO) && SUMS_USEMTSUMS_INFO
02262 maxNoSus = MAX_MTSUMS_NSUS;
02263 DRMS_MtSumsRequest_t *request = NULL;
02264 DRMS_MtSumsRequest_t *reply = NULL;
02265 #else
02266 maxNoSus = MAXSUMREQCNT;
02267 DRMS_SumRequest_t *request = NULL;
02268 DRMS_SumRequest_t *reply = NULL;
02269 #endif
02270
02271 drms_lock_server(env);
02272
02273 if (!env->sum_thread)
02274 {
02275
02276 if((status = pthread_create(&env->sum_thread, NULL, &drms_sums_thread,
02277 (void *) env)) != DRMS_SUCCESS)
02278 {
02279 fprintf(stderr,"Thread creation failed: %d\n", status);
02280 drms_unlock_server(env);
02281 return 1;
02282 }
02283 }
02284
02285
02286
02287 map = hcon_create(sizeof(SUM_info_t *), 128, SUFreeInfo, NULL, NULL, NULL, 0);
02288
02289 for (nReqs = 0, isunum = 0; isunum < nsunums; isunum++)
02290 {
02291 if (nReqs == 0)
02292 {
02293 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_INFO) && SUMS_USEMTSUMS_INFO
02294 request = (DRMS_MtSumsRequest_t *)calloc(1, sizeof(DRMS_MtSumsRequest_t));
02295 XASSERT(request);
02296
02297 request->sunum = calloc(maxNoSus, sizeof(uint64_t));
02298 XASSERT(request->sunum);
02299 #else
02300 request = (DRMS_SumRequest_t *)malloc(sizeof(DRMS_SumRequest_t));
02301 XASSERT(request);
02302 #endif
02303
02304 request->opcode = DRMS_SUMINFO;
02305 request->dontwait = 0;
02306 }
02307
02308 snprintf(key, sizeof(key), "%llu", (unsigned long long)sunums[isunum]);
02309 if (!hcon_member(map, key))
02310 {
02311
02312
02313 request->sunum[nReqs] = (uint64_t)sunums[isunum];
02314 hcon_insert(map, key, &nulladdr);
02315 nReqs++;
02316 }
02317
02318
02319
02320 if (nReqs == maxNoSus || (isunum + 1 == nsunums && nReqs > 0))
02321 {
02322 request->reqcnt = nReqs;
02323
02324
02325 tqueueAdd(env->sum_inbox, (long)pthread_self(), (char *)request);
02326 drms_unlock_server(env);
02327 tqueueDel(env->sum_outbox, (long)pthread_self(), (char **)&reply);
02328 drms_lock_server(env);
02329
02330 if (reply->opcode != 0)
02331 {
02332 hcon_destroy(&map);
02333
02334 if (reply->opcode == -2)
02335 {
02336 fprintf(stderr, "Cannot access SUMS in this DRMS session - a tape read is pending.\n");
02337 if (reply)
02338 {
02339 if (reply->sudir)
02340 {
02341
02342 for (int i = 0; i < reply->reqcnt; i++)
02343 {
02344 if ((reply->sudir)[i])
02345 {
02346 free((reply->sudir)[i]);
02347 (reply->sudir)[i] = NULL;
02348 }
02349 }
02350 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_INFO) && SUMS_USEMTSUMS_INFO
02351 free(reply->sudir);
02352 reply->sudir = NULL;
02353 #endif
02354 }
02355
02356 free(reply);
02357 }
02358 drms_unlock_server(env);
02359 return DRMS_ERROR_PENDINGTAPEREAD;
02360 }
02361
02362 fprintf(stderr, "SUMINFO failed with error code %d.\n", reply->opcode);
02363
02364 if (reply)
02365 {
02366 if (reply->sudir)
02367 {
02368 for (int i = 0; i < reply->reqcnt; i++)
02369 {
02370 if ((reply->sudir)[i])
02371 {
02372 free((reply->sudir)[i]);
02373 (reply->sudir)[i] = NULL;
02374 }
02375 }
02376 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_INFO) && SUMS_USEMTSUMS_INFO
02377 free(reply->sudir);
02378 reply->sudir = NULL;
02379 #endif
02380 }
02381
02382 free(reply);
02383 }
02384
02385 drms_unlock_server(env);
02386 return 1;
02387 }
02388 else
02389 {
02390 SUM_info_t *retinfo = NULL;
02391
02392
02393 for (iinfo = 0; iinfo < nReqs; iinfo++)
02394 {
02395
02396
02397
02398
02399
02400 retinfo = (SUM_info_t *)reply->sudir[iinfo];
02401 snprintf(key, sizeof(key), "%llu", (unsigned long long)(retinfo->sunum));
02402 if ((pinfo = hcon_lookup(map, key)) != NULL)
02403 {
02404
02405 *pinfo = retinfo;
02406 }
02407 else
02408 {
02409 fprintf(stderr, "Information returned for an sunum ('%s') that is unknown to DRMS.\n", key);
02410 status = 99;
02411 break;
02412 }
02413 }
02414 }
02415
02416
02417 if (reply)
02418 {
02419 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_INFO) && SUMS_USEMTSUMS_INFO
02420 if (reply->sudir)
02421 {
02422 free(reply->sudir);
02423 reply->sudir = NULL;
02424 }
02425 #endif
02426
02427 free(reply);
02428 reply = NULL;
02429 }
02430
02431 nReqs = 0;
02432 }
02433 }
02434
02435 if (status == DRMS_SUCCESS)
02436 {
02437
02438 for (isunum = 0; isunum < nsunums; isunum++)
02439 {
02440 snprintf(key, sizeof(key), "%llu", (unsigned long long)(sunums[isunum]));
02441 if ((pinfo = hcon_lookup(map, key)) != NULL)
02442 {
02443 info[isunum] = (SUM_info_t *)malloc(sizeof(SUM_info_t));
02444 *(info[isunum]) = **pinfo;
02445 (info[isunum])->next = NULL;
02446 }
02447 else
02448 {
02449 fprintf(stderr, "sunum '%s' unknown to SUMS.\n", key);
02450 status = 99;
02451 break;
02452 }
02453 }
02454 }
02455
02456
02457 hcon_destroy(&map);
02458
02459
02460
02461 drms_unlock_server(env);
02462
02463 return status;
02464 }
02465 #endif
02466
02467
02468
02469
02470
02471 #ifndef DRMS_CLIENT
02472 int drms_commitunit(DRMS_Env_t *env, DRMS_StorageUnit_t *su)
02473 {
02474 int i;
02475 FILE *fp;
02476 char filename[DRMS_MAXPATHLEN];
02477 int actualarchive = 0;
02478 int docommit = 0;
02479 int16_t newSuRet = INT16_MIN;
02480
02481 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_PUT) && SUMS_USEMTSUMS_PUT
02482 DRMS_MtSumsRequest_t *request = NULL;
02483 DRMS_MtSumsRequest_t *reply = NULL;
02484 #else
02485 DRMS_SumRequest_t *request = NULL;
02486 DRMS_SumRequest_t *reply = NULL;
02487 #endif
02488
02489 docommit = !EmptyDir(su->sudir, 0);
02490 if (docommit)
02491 {
02492 if (env->archive != INT_MIN)
02493 {
02494 actualarchive = env->archive;
02495 }
02496 else
02497 {
02498 actualarchive = su->seriesinfo->archive;
02499 }
02500
02501
02502 if ( su->recnum )
02503 {
02504 sprintf(filename,"%s/Records.txt",su->sudir);
02505 if ((fp = fopen(filename,"w")) == NULL)
02506 {
02507 fprintf(stderr,"ERROR in drms_commitunit: Failed to open file '%s'\n",
02508 filename);
02509 return 1;
02510 }
02511
02512
02513
02514 if (actualarchive == -1)
02515 {
02516 fprintf(fp, "DELETE_SLOTS_RECORDS\n");
02517 }
02518
02519 fprintf(fp,"series=%s\n", su->seriesinfo->seriesname);
02520 if (su->nfree<su->seriesinfo->unitsize)
02521 {
02522 fprintf(fp,"slot\trecord number\n");
02523 for (i=0; i<su->seriesinfo->unitsize; i++)
02524 if (su->state[i] != DRMS_SLOT_FREE)
02525 fprintf(fp,"%d\t%lld\n", i, su->recnum[i]);
02526 }
02527 fclose(fp);
02528 }
02529
02530 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_PUT) && SUMS_USEMTSUMS_PUT
02531 request = calloc(1, sizeof(DRMS_MtSumsRequest_t));
02532 XASSERT(request);
02533 request->sunum = calloc(1, sizeof(uint64_t));
02534 XASSERT(request->sunum);
02535 request->sudir = calloc(1, sizeof(char *));
02536 XASSERT(request->sudir);
02537 #else
02538 request = malloc(sizeof(DRMS_SumRequest_t));
02539 XASSERT(request);
02540 #endif
02541 request->opcode = DRMS_SUMPUT;
02542 request->dontwait = 0;
02543 request->reqcnt = 1;
02544 request->dsname = su->seriesinfo->seriesname;
02545 request->group = su->seriesinfo->tapegroup;
02546
02547
02548
02549
02550 if (actualarchive == 1)
02551 request->mode = ARCH + TOUCH;
02552 else
02553 request->mode = TEMP + TOUCH;
02554
02555
02556
02557
02558
02559 if (env->newsuretention != INT16_MIN)
02560 {
02561
02562 request->tdays = env->newsuretention;
02563 }
02564 else if ((newSuRet = drms_series_getnewsuretention(su->seriesinfo)) != INT16_MIN)
02565 {
02566
02567 if (newSuRet == 0)
02568 {
02569
02570 request->tdays = abs(STDRETENTION);
02571 }
02572 else
02573 {
02574 request->tdays = newSuRet;
02575 }
02576 }
02577 else
02578 {
02579
02580 request->tdays = abs(STDRETENTION);
02581 }
02582
02583 request->sunum[0] = su->sunum;
02584 request->sudir[0] = su->sudir;
02585 request->comment = NULL;
02586
02587
02588 XASSERT(env->sum_thread);
02589
02590 tqueueAdd(env->sum_inbox, (long) pthread_self(), (char *)request);
02591
02592 tqueueDel(env->sum_outbox, (long) pthread_self(), (char **)&reply);
02593 if (reply->opcode != 0)
02594 {
02595 if (reply->opcode == -2)
02596 {
02597 fprintf(stderr, "Cannot access SUMS in this DRMS session - a tape read is pending.\n");
02598 free(reply);
02599 return DRMS_ERROR_PENDINGTAPEREAD;
02600 }
02601
02602 fprintf(stderr, "ERROR in drms_commitunit: SUM PUT failed with "
02603 "error code %d.\n",reply->opcode);
02604 free(reply);
02605 return 1;
02606 }
02607 free(reply);
02608
02609 su->mode = DRMS_READONLY;
02610
02611 }
02612
02613 return 0;
02614 }
02615
02616 static int CommitUnits(DRMS_Env_t *env,
02617 LinkedList_t *ll,
02618 const char *seriesName,
02619 int seriesArch,
02620 int seriesUS,
02621 int seriesTG,
02622 int16_t seriesRet)
02623 {
02624 ListNode_t *node = NULL;
02625 DRMS_StorageUnit_t *sunit = NULL;
02626 int actualarchive;
02627 char filename[DRMS_MAXPATHLEN];
02628 FILE *fp = NULL;
02629 int nsus;
02630 int statint;
02631 int isu;
02632 int islot;
02633
02634
02635 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_PUT) && SUMS_USEMTSUMS_PUT
02636 DRMS_MtSumsRequest_t *request = NULL;
02637 DRMS_MtSumsRequest_t *reply = NULL;
02638 DRMS_StorageUnit_t **punits = NULL;
02639 #else
02640 DRMS_SumRequest_t *request = NULL;
02641 DRMS_SumRequest_t *reply = NULL;
02642 DRMS_StorageUnit_t *punits[DRMS_MAX_REQCNT];
02643 #endif
02644
02645 actualarchive = 0;
02646
02647 statint = DRMS_SUCCESS;
02648
02649 if (ll->nitems > 0)
02650 {
02651
02652 if (env->archive != INT_MIN)
02653 {
02654 actualarchive = env->archive;
02655 }
02656 else
02657 {
02658 actualarchive = seriesArch;
02659 }
02660
02661 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_PUT) && SUMS_USEMTSUMS_PUT
02662 request = calloc(1, sizeof(DRMS_MtSumsRequest_t));
02663 XASSERT(request);
02664 request->sunum = calloc(ll->nitems, sizeof(uint64_t));
02665 XASSERT(request->sunum);
02666 request->sudir = calloc(ll->nitems, sizeof(char *));
02667 XASSERT(request->sudir);
02668 #else
02669 request = malloc(sizeof(DRMS_SumRequest_t));
02670 XASSERT(request);
02671 #endif
02672
02673 nsus = 0;
02674 list_llreset(ll);
02675 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_PUT) && SUMS_USEMTSUMS_PUT
02676 punits = calloc(list_llgetnitems(ll), sizeof(DRMS_StorageUnit_t *));
02677 #endif
02678 while ((node = list_llnext(ll)) != NULL)
02679 {
02680 sunit = *((DRMS_StorageUnit_t **)(node->data));
02681
02682 if (!EmptyDir(sunit->sudir, 0))
02683 {
02684 if (nsus == DRMS_MAX_REQCNT)
02685 {
02686
02687
02688 statint = DRMS_ERROR_INVALIDDATA;
02689 break;
02690 }
02691
02692
02693 if (sunit->recnum)
02694 {
02695 snprintf(filename, sizeof(filename), "%s/Records.txt", sunit->sudir);
02696 if ((fp = fopen(filename,"w")) == NULL)
02697 {
02698 fprintf(stderr,
02699 "ERROR in drms_commitunits: Failed to open file '%s'\n",
02700 filename);
02701 statint = DRMS_ERROR_FILECREATE;
02702 break;
02703 }
02704
02705
02706
02707 if (actualarchive == -1)
02708 {
02709 fprintf(fp, "DELETE_SLOTS_RECORDS\n");
02710 }
02711
02712 fprintf(fp, "series=%s\n", seriesName);
02713 if (sunit->nfree < seriesUS)
02714 {
02715 fprintf(fp,"slot\trecord number\n");
02716 for (islot = 0; islot < seriesUS; islot++)
02717 {
02718 if (sunit->state[islot] != DRMS_SLOT_FREE)
02719 {
02720 fprintf(fp,"%d\t%lld\n", islot, sunit->recnum[islot]);
02721 }
02722 }
02723 }
02724 fclose(fp);
02725 fp = NULL;
02726 }
02727
02728 request->sudir[nsus] = sunit->sudir;
02729 request->sunum[nsus] = sunit->sunum;
02730
02731
02732
02733 punits[nsus] = sunit;
02734 nsus++;
02735 }
02736 }
02737
02738 if (nsus == 0 || statint != DRMS_SUCCESS)
02739 {
02740 free(request);
02741 request = NULL;
02742 }
02743 else
02744 {
02745 request->opcode = DRMS_SUMPUT;
02746 request->dontwait = 0;
02747 request->reqcnt = nsus;
02748 request->dsname = seriesName;
02749 request->group = seriesTG;
02750 if (actualarchive == 1)
02751 {
02752 request->mode = ARCH + TOUCH;
02753 }
02754 else
02755 {
02756 request->mode = TEMP + TOUCH;
02757 }
02758
02759
02760 request->tdays = (int)seriesRet;
02761
02762 request->comment = NULL;
02763
02764
02765 XASSERT(env->sum_thread);
02766
02767
02768 tqueueAdd(env->sum_inbox, (long) pthread_self(), (char *)request);
02769
02770
02771 tqueueDel(env->sum_outbox, (long) pthread_self(), (char **)&reply);
02772
02773 if (reply->opcode != 0)
02774 {
02775 if (reply->opcode == -2)
02776 {
02777 fprintf(stderr, "Cannot access SUMS in this DRMS session - a tape read is pending.\n");
02778 statint = DRMS_ERROR_PENDINGTAPEREAD;
02779 }
02780 else
02781 {
02782 fprintf(stderr, "ERROR in drms_commitunit: SUM PUT failed with "
02783 "error code %d.\n",reply->opcode);
02784 statint = DRMS_ERROR_SUMPUT;
02785 }
02786 }
02787 else
02788 {
02789
02790 for (isu = 0; isu < nsus; isu++)
02791 {
02792 sunit = punits[isu];
02793 sunit->mode = DRMS_READONLY;
02794 }
02795 }
02796
02797 free(reply);
02798
02799 }
02800 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_PUT) && SUMS_USEMTSUMS_PUT
02801 if (punits)
02802 {
02803 free(punits);
02804 punits = NULL;
02805 }
02806 #endif
02807 }
02808
02809 return statint;
02810 }
02811
02812
02813 #endif
02814
02815
02816
02817
02818
02819 #ifndef DRMS_CLIENT
02820 int drms_commit_all_units(DRMS_Env_t *env, int *archive, int *status)
02821 {
02822 int i;
02823 HContainer_t *scon;
02824 HIterator_t hit_outer, hit_inner;
02825 DRMS_StorageUnit_t *su;
02826 int statint = 0;
02827 const char *seriesName = NULL;
02828 DRMS_Record_t *recTemp = NULL;
02829 int nsus;
02830 LinkedList_t *sulist = NULL;
02831 DRMS_SeriesInfo_t *si = NULL;
02832 int16_t newSuRetentionRaw = INT16_MIN;
02833 int16_t newSuRetention = INT16_MIN;
02834 int16_t maxNewSuRetention = INT16_MIN;
02835 int maxNoSus = 0;
02836
02837 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_PUT) && SUMS_USEMTSUMS_PUT
02838 maxNoSus = MAX_MTSUMS_NSUS;
02839 #else
02840 maxNoSus = MAXSUMREQCNT;
02841 #endif
02842
02843 XASSERT(env->session->db_direct==1);
02844 hiter_new(&hit_outer, &env->storageunit_cache);
02845 if (archive)
02846 *archive = 0;
02847
02848 nsus = 0;
02849
02850 while((scon = (HContainer_t *)hiter_extgetnext(&hit_outer, &seriesName)))
02851 {
02852
02853
02854
02855
02856 if (archive && *archive == 0 && env->archive == INT_MIN)
02857 {
02858
02859 recTemp = drms_template_record(env, seriesName, &statint);
02860
02861 if (statint != DRMS_SUCCESS)
02862 {
02863 break;
02864 }
02865
02866 if (recTemp->seriesinfo->archive)
02867 {
02868 *archive = 1;
02869 }
02870 }
02871
02872 si = NULL;
02873 maxNewSuRetention = INT16_MIN;
02874
02875
02876 hiter_new(&hit_inner, scon);
02877 while((su = (DRMS_StorageUnit_t *)hiter_getnext(&hit_inner)))
02878 {
02879 if (!si)
02880 {
02881 si = su->seriesinfo;
02882
02883 if (env->newsuretention != INT16_MIN)
02884 {
02885
02886 newSuRetention = env->newsuretention;
02887 }
02888 else if ((newSuRetentionRaw = drms_series_getnewsuretention(si)) != INT16_MIN)
02889 {
02890
02891 if (newSuRetentionRaw == 0)
02892 {
02893
02894 newSuRetention = (int16_t)abs(STDRETENTION);
02895 }
02896 else
02897 {
02898 newSuRetention = newSuRetentionRaw;
02899 }
02900 }
02901 else
02902 {
02903
02904 newSuRetention = (int16_t)abs(STDRETENTION);
02905 }
02906
02907 if (newSuRetention > maxNewSuRetention)
02908 {
02909 maxNewSuRetention = newSuRetention;
02910 }
02911 }
02912
02913 if ( su->mode == DRMS_READWRITE )
02914 {
02915
02916 for (i=0; i<(su->seriesinfo->unitsize - su->nfree); i++)
02917 {
02918 if (su->state[i] == DRMS_SLOT_FULL)
02919 {
02920 if (!sulist)
02921 {
02922
02923 sulist = list_llcreate(sizeof(DRMS_StorageUnit_t *), NULL);
02924 }
02925
02926 list_llinserttail(sulist, &su);
02927 nsus++;
02928 break;
02929 }
02930 }
02931
02932
02933
02934 if (nsus == maxNoSus)
02935 {
02936 statint = CommitUnits(env, sulist, seriesName, si->archive, si->unitsize, si->tapegroup, maxNewSuRetention);
02937 list_llfree(&sulist);
02938 nsus = 0;
02939
02940 if (statint != DRMS_SUCCESS)
02941 {
02942 break;
02943 }
02944 }
02945 }
02946 }
02947
02948 hiter_free(&hit_inner);
02949
02950
02951 if (nsus > 0)
02952 {
02953 statint = CommitUnits(env, sulist, seriesName, si->archive, si->unitsize, si->tapegroup, maxNewSuRetention);
02954 list_llfree(&sulist);
02955 nsus = 0;
02956 }
02957 }
02958
02959 hiter_free(&hit_outer);
02960
02961
02962 if (archive && *archive == 0 && env->archive == 1)
02963 *archive = 1;
02964
02965 if (status)
02966 {
02967 *status = statint;
02968 }
02969
02970
02971
02972
02973 return maxNewSuRetention < DRMS_LOG_RETENTION ? DRMS_LOG_RETENTION : maxNewSuRetention;
02974 }
02975 #endif
02976
02977
02978 DRMS_StorageUnit_t *drms_su_lookup(DRMS_Env_t *env, char *series,
02979 long long sunum, HContainer_t **scon_out)
02980 {
02981 HIterator_t hit;
02982 HContainer_t *scon;
02983 DRMS_StorageUnit_t *su;
02984 char hashkey[DRMS_MAXHASHKEYLEN];
02985
02986
02987
02988 su = NULL;
02989 scon = NULL;
02990 if (series==NULL)
02991 {
02992
02993
02994 sprintf(hashkey, DRMS_SUNUM_FORMAT, sunum);
02995 hiter_new(&hit, &env->storageunit_cache);
02996 while( (scon = (HContainer_t *)hiter_getnext(&hit)) )
02997 {
02998 if ( (su = hcon_lookup(scon, hashkey)) )
02999 break;
03000 }
03001 }
03002 else
03003 {
03004
03005 if ( (scon = hcon_lookup(&env->storageunit_cache, series)) )
03006 {
03007 sprintf(hashkey, DRMS_SUNUM_FORMAT, sunum);
03008 su = hcon_lookup(scon, hashkey);
03009 }
03010 }
03011 if (scon_out)
03012 *scon_out = scon;
03013 return su;
03014 }
03015
03016 void drms_su_freeunit(DRMS_StorageUnit_t *su)
03017 {
03018 if (su->state)
03019 {
03020 free(su->state);
03021 su->state=NULL;
03022 }
03023 if (su->recnum)
03024 {
03025 free(su->recnum);
03026 su->recnum=NULL;
03027 }
03028 }
03029
03030 void drms_freeunit(DRMS_Env_t *env, DRMS_StorageUnit_t *su)
03031 {
03032 char hashkey[DRMS_MAXHASHKEYLEN];
03033 HContainer_t *scon;
03034
03035 if ( (scon = hcon_lookup(&env->storageunit_cache, su->seriesinfo->seriesname)) )
03036 {
03037 sprintf(hashkey,DRMS_SUNUM_FORMAT, su->sunum);
03038 if (su->state)
03039 {
03040 free(su->state);
03041 su->state = NULL;
03042 }
03043 if (su->recnum)
03044 {
03045 free(su->recnum);
03046 su->recnum=NULL;
03047 }
03048 hcon_remove(scon, hashkey);
03049 }
03050 }
03051
03052
03053
03054
03055
03056
03057 int drms_su_freeslot(DRMS_Env_t *env, char *series, long long sunum,
03058 int slotnum)
03059 {
03060 DRMS_StorageUnit_t *su;
03061 char slotdir[DRMS_MAXPATHLEN];
03062 int state;
03063 state = DRMS_SLOT_FREE;
03064 if ((su = drms_su_markslot(env, series, sunum, slotnum, &state)) == NULL)
03065 return 1;
03066 else
03067 {
03068 if (state != DRMS_SLOT_FREE)
03069 {
03070
03071 CHECKSNPRINTF(snprintf(slotdir, DRMS_MAXPATHLEN, "%s/" DRMS_SLOTDIR_FORMAT,
03072 su->sudir,slotnum), DRMS_MAXPATHLEN);
03073
03074 return (RemoveDir(slotdir, 64) == 0 ? 0 : 1);
03075 }
03076 return 0;
03077 }
03078 }
03079
03080
03081
03082
03083
03084
03085
03086
03087 DRMS_StorageUnit_t *drms_su_markslot(DRMS_Env_t *env, char *series,
03088 long long sunum, int slotnum, int *state)
03089 {
03090 int oldstate;
03091 HContainer_t *scon;
03092 DRMS_StorageUnit_t *su;
03093
03094
03095 if (state==NULL)
03096 return NULL;
03097 if ((su = drms_su_lookup(env, series, sunum, &scon)) == NULL)
03098 return NULL;
03099 if ( su->mode!=DRMS_READWRITE )
03100 return NULL;
03101
03102 oldstate = su->state[slotnum];
03103 if (oldstate!=DRMS_SLOT_FREE && *state==DRMS_SLOT_FREE)
03104 su->nfree++;
03105 else if (oldstate==DRMS_SLOT_FREE && *state!=DRMS_SLOT_FREE)
03106 su->nfree--;
03107
03108 su->state[slotnum] = (char)(*state);
03109 *state = oldstate;
03110 return su;
03111 }
03112
03113 int drms_su_isremotesu(long long sunum)
03114 {
03115 return !drmssite_sunum_is_local((unsigned long long)sunum);
03116 }
03117
03118 int drms_su_getexportURL(DRMS_Env_t *env, long long sunum, char *url, int size)
03119 {
03120 int ret = 0;
03121 DRMSSiteInfo_t *info = NULL;
03122
03123 #ifdef DRMS_CLIENT
03124
03125
03126
03127
03128 drms_send_commandcode(env->session->sockfd, DRMS_SITEINFO);
03129 ret = drmssite_client_info_from_sunum((unsigned long long)sunum,
03130 env->session->sockfd,
03131 &info);
03132 #else
03133 ret = drmssite_server_info_from_sunum((unsigned long long)sunum,
03134 env->session->db_handle,
03135 &info);
03136 #endif
03137
03138 if (!ret && info)
03139 {
03140 snprintf(url, size, "%s", info->request_URL);
03141 drmssite_freeinfo(&info);
03142 }
03143 else
03144 {
03145 ret = 1;
03146 }
03147
03148 return ret;
03149 }
03150
03151
03152
03153
03154 int drms_su_getexportserver(DRMS_Env_t *env,
03155 long long sunum,
03156 char *expserver,
03157 int size)
03158 {
03159 int ret = 0;
03160 DRMSSiteInfo_t *info = NULL;
03161
03162 #ifdef DRMS_CLIENT
03163 drms_send_commandcode(env->session->sockfd, DRMS_SITEINFO);
03164 ret = drmssite_client_info_from_sunum((unsigned long long)sunum,
03165 env->session->sockfd,
03166 &info);
03167 #else
03168 ret = drmssite_server_info_from_sunum((unsigned long long)sunum,
03169 env->session->db_handle,
03170 &info);
03171 #endif
03172
03173
03174
03175 if (!ret && info)
03176 {
03177 snprintf(expserver, size, "%s", info->SUMS_URL);
03178 drmssite_freeinfo(&info);
03179 }
03180 else
03181 {
03182 ret = 1;
03183 }
03184
03185 return ret;
03186 }
03187
03188 #ifndef DRMS_CLIENT
03189 int drms_su_allocsu(DRMS_Env_t *env,
03190 uint64_t size,
03191 long long sunum,
03192 char **sudir,
03193 int *tapegroup,
03194 int *status)
03195 {
03196 return drms_su_alloc2(env, size, sunum, sudir, tapegroup, status);
03197 }
03198
03199
03200
03201 int drms_su_commitsu(DRMS_Env_t *env,
03202 const char *seriesname,
03203 long long sunum,
03204 const char *sudir)
03205 {
03206 int actualarchive = 0;
03207 DRMS_SeriesInfo_t *seriesinfo = NULL;
03208 DRMS_Record_t *templaterec = NULL;
03209 int drmsst = DRMS_SUCCESS;
03210 int docommit = 0;
03211 int16_t newSuRet = INT16_MIN;
03212
03213 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_PUT) && SUMS_USEMTSUMS_PUT
03214 DRMS_MtSumsRequest_t *request = NULL;
03215 DRMS_MtSumsRequest_t *reply = NULL;
03216 #else
03217 DRMS_SumRequest_t *request = NULL;
03218 DRMS_SumRequest_t *reply = NULL;
03219 #endif
03220
03221
03222 docommit = !EmptyDir(sudir, 0);
03223 if (docommit)
03224 {
03225 templaterec = drms_template_record(env, seriesname, &drmsst);
03226
03227 if (templaterec)
03228 {
03229 seriesinfo = templaterec->seriesinfo;
03230 }
03231
03232 if (seriesinfo)
03233 {
03234 char *tmp = NULL;
03235
03236 if (env->archive != INT_MIN)
03237 {
03238 actualarchive = env->archive;
03239 }
03240 else
03241 {
03242 actualarchive = seriesinfo->archive;
03243 }
03244
03245 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_PUT) && SUMS_USEMTSUMS_PUT
03246 request = calloc(1, sizeof(DRMS_MtSumsRequest_t));
03247 XASSERT(request);
03248 request->sunum = calloc(1, sizeof(uint64_t));
03249 XASSERT(request->sunum);
03250 request->sudir = calloc(1, sizeof(char *));
03251 XASSERT(request->sudir);
03252 #else
03253 request = malloc(sizeof(DRMS_SumRequest_t));
03254 XASSERT(request);
03255 #endif
03256
03257 request->opcode = DRMS_SUMPUT;
03258 request->dontwait = 0;
03259 request->reqcnt = 1;
03260 request->dsname = seriesinfo->seriesname;
03261 request->group = seriesinfo->tapegroup;
03262
03263 if (actualarchive == 1)
03264 {
03265 request->mode = ARCH + TOUCH;
03266 }
03267 else
03268 {
03269 request->mode = TEMP + TOUCH;
03270 }
03271
03272
03273
03274
03275
03276 if (env->newsuretention != INT16_MIN)
03277 {
03278
03279 request->tdays = env->newsuretention;
03280 }
03281 else if ((newSuRet = drms_series_getnewsuretention(seriesinfo)) != INT16_MIN)
03282 {
03283
03284 if (newSuRet == 0)
03285 {
03286
03287 request->tdays = abs(STDRETENTION);
03288 }
03289 else
03290 {
03291 request->tdays = newSuRet;
03292 }
03293 }
03294 else
03295 {
03296
03297 request->tdays = abs(STDRETENTION);
03298 }
03299
03300 tmp = strdup(sudir);
03301
03302 request->sunum[0] = sunum;
03303 request->sudir[0] = tmp;
03304 request->comment = NULL;
03305
03306
03307 XASSERT(env->sum_thread);
03308
03309
03310 tqueueAdd(env->sum_inbox, (long) pthread_self(), (char *)request);
03311
03312
03313 tqueueDel(env->sum_outbox, (long) pthread_self(), (char **)&reply);
03314
03315 if (reply->opcode != 0)
03316 {
03317 if (reply->opcode == -2)
03318 {
03319 fprintf(stderr, "Cannot access SUMS in this DRMS session - a tape read is pending.\n");
03320 drmsst = DRMS_ERROR_PENDINGTAPEREAD;
03321 }
03322 else
03323 {
03324 fprintf(stderr, "ERROR in drms_commitunit: SUM PUT failed with "
03325 "error code %d.\n", reply->opcode);
03326 drmsst = DRMS_ERROR_SUMPUT;
03327 }
03328 }
03329
03330 if (tmp)
03331 {
03332 free(tmp);
03333 }
03334
03335 free(reply);
03336 }
03337 else
03338 {
03339 fprintf(stderr, "Unknown series '%s'.\n", seriesname);
03340 drmsst = DRMS_ERROR_UNKNOWNSERIES;
03341 }
03342 }
03343
03344 return drmsst;
03345 }
03346
03347 int drms_su_sumexport(DRMS_Env_t *env, SUMEXP_t *sumexpt)
03348 {
03349 int drmsst = DRMS_SUCCESS;
03350
03351 DRMS_SumRequest_t *request = NULL;
03352 DRMS_SumRequest_t *reply = NULL;
03353
03354 request = malloc(sizeof(DRMS_SumRequest_t));
03355 XASSERT(request);
03356 memset(request, 0, sizeof(DRMS_SumRequest_t));
03357
03358
03359
03360 request->opcode = DRMS_SUMEXPORT;
03361 request->comment = (char *)sumexpt;
03362
03363 drms_lock_server(env);
03364 if (!env->sum_thread)
03365 {
03366 if((drmsst = pthread_create(&env->sum_thread, NULL, &drms_sums_thread,
03367 (void *) env)))
03368 {
03369 fprintf(stderr, "Thread creation failed: %d\n", drmsst);
03370 drms_unlock_server(env);
03371 return 1;
03372 }
03373 }
03374
03375
03376 tqueueAdd(env->sum_inbox, (long) pthread_self(), (char *)request);
03377
03378 drms_unlock_server(env);
03379
03380 tqueueDel(env->sum_outbox, (long) pthread_self(), (char **)&reply);
03381
03382 if (reply->opcode)
03383 {
03384 if (reply->opcode == -2)
03385 {
03386 fprintf(stderr, "Cannot access SUMS in this DRMS session - a tape read is pending.\n");
03387 drmsst = DRMS_ERROR_PENDINGTAPEREAD;
03388 }
03389 else
03390 {
03391 fprintf(stderr,"SUM_EXPORT failed with error code %d.\n", reply->opcode);
03392 drmsst = reply->opcode;
03393 }
03394 }
03395 else
03396 {
03397 drmsst = DRMS_SUCCESS;
03398 }
03399
03400 if (reply)
03401 {
03402 free(reply);
03403 }
03404
03405 return drmsst;
03406 }
03407
03408 #if defined(JMD_IS_INSTALLED) && JMD_IS_INSTALLED
03409
03410
03411 int send_POST_request(char * postrequeststr, curl_off_t postsize, struct POSTState *ps)
03412 {
03413 CURL *curl;
03414
03415 curl_global_init(CURL_GLOBAL_ALL);
03416
03417 curl = curl_easy_init();
03418 curl_easy_setopt(curl, CURLOPT_URL, JMD_URL);
03419
03420
03421 curl_easy_setopt(curl, CURLOPT_POST, 1L);
03422
03423 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, write_data);
03424 curl_easy_setopt(curl, CURLOPT_WRITEDATA, ps);
03425 curl_easy_setopt(curl, CURLOPT_POSTFIELDS, postrequeststr);
03426 curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (curl_off_t) postsize);
03427
03428 curl_easy_perform(curl);
03429
03430
03431 curl_easy_cleanup(curl);
03432
03433 return 0;
03434 }
03435
03436 void populate_with_sunums(DRMS_Env_t *env, HContainer_t *postmap, char * seriesname, int n, long long sulist[])
03437 {
03438 int k = 0;
03439 for (k = 0; k < n; k++)
03440 {
03441 add_sunum_to_POST(env, postmap, seriesname, sulist[k]);
03442 }
03443 }
03444
03445
03446
03447
03448
03449
03450
03451
03452
03453
03454
03455
03456
03457
03458
03459
03460 void add_sunum_to_POST(DRMS_Env_t *env, HContainer_t *postmap, char *seriesname, long long sunum)
03461 {
03462 int allocsize = 1000;
03463
03464 struct PassSunumList *poststruct = NULL;
03465
03466
03467
03468 if (!hcon_member(postmap,seriesname))
03469 {
03470 poststruct = calloc(1,sizeof(struct PassSunumList));
03471 strcpy(poststruct->series,seriesname);
03472 poststruct->sunumarr = malloc(sizeof(long long)*allocsize);
03473 hcon_insert(postmap,seriesname,&poststruct);
03474 }
03475 else
03476 {
03477 struct PassSunumList **ptr;
03478 if ((ptr = hcon_lookup(postmap,seriesname))!= NULL)
03479 {
03480 poststruct = *ptr;
03481 }
03482 else
03483 {
03484
03485 fprintf(stderr, "Seriesname [%s], is unknown to hash structure in remote POST handling.\n", seriesname);
03486
03487
03488
03489 pthread_kill(env->signal_thread, SIGTERM);
03490 }
03491
03492
03493 if (((poststruct->n+1) % allocsize) == 0)
03494 {
03495
03496 if ((poststruct->sunumarr = realloc((void *) poststruct->sunumarr, sizeof(long long ) * (poststruct->n+1) + sizeof(long long ) * allocsize)) == NULL)
03497 {
03498 fprintf(stderr, "Can not allocate new memory for POST sunum array.\n");
03499
03500
03501
03502 pthread_kill(env->signal_thread, SIGTERM);
03503
03504 }
03505 else
03506 {
03507
03508 }
03509 }
03510 }
03511
03512 poststruct->sunumarr[poststruct->n] = sunum;
03513 poststruct->n++;
03514 }
03515
03516
03517 size_t create_post_msg(HContainer_t *postmap, char **ptrr)
03518 {
03519 int size_count = 0;
03520 int i = 0;
03521 struct PassSunumList **lpptr = NULL;
03522 struct PassSunumList *list = NULL;
03523
03524 char *seriesname = NULL;
03525 long estsize = 0;
03526 int totalSuCount = 0;
03527 HIterator_t hit;
03528
03529 hiter_new(&hit,postmap);
03530
03531 while ((lpptr = (struct PassSunumList **)hiter_getnext(&hit)))
03532 {
03533 list = *lpptr;
03534 estsize = estsize + 1000 + list->n * (strlen(list->series) + 20);
03535
03536 if (*ptrr == NULL)
03537 {
03538
03539 *ptrr = calloc(1,sizeof(char) * estsize);
03540 }
03541 else
03542 {
03543 *ptrr = realloc(*ptrr, sizeof(char) * estsize);
03544 }
03545
03546 char *ptr2 = *ptrr;
03547 ptr2 += size_count;
03548
03549 if (seriesname == NULL || strcmp(seriesname, list->series))
03550 {
03551 if (seriesname == NULL)
03552 {
03553 sprintf(ptr2, "type=request&series=%s", list->series);
03554 }
03555 else
03556 {
03557 sprintf(ptr2, "&series=%s", list->series);
03558 }
03559
03560 seriesname = list->series;
03561 size_count += strlen(ptr2);
03562 ptr2 = *ptrr;
03563 ptr2 += size_count;
03564 }
03565
03566
03567
03568
03569 for(i = 0; i < list->n; i++)
03570 {
03571
03572
03573 char chunk[1000];
03574
03575 sprintf(chunk, "&%s=%lld", list->series, list->sunumarr[i]);
03576 int chunklen = strlen(chunk);
03577
03578 size_count += chunklen;
03579
03580 if (size_count > estsize)
03581 {
03582 estsize = estsize + (list->n - list->ncount) * (strlen(list->series) + 40);
03583 *ptrr = realloc(*ptrr, sizeof(char) * estsize);
03584 ptr2 = *ptrr;
03585 ptr2 += (size_count - chunklen);
03586 }
03587
03588 strcpy(ptr2,chunk);
03589
03590 ptr2 = *ptrr;
03591 ptr2 += size_count;
03592 list->ncount++;
03593 totalSuCount++;
03594 }
03595 }
03596
03597 return size_count;
03598 }
03599
03600
03601
03602 size_t write_data(void *buffer, size_t size, size_t nmemb, void *userp)
03603 {
03604
03605
03606
03607
03608
03609
03610 struct POSTState *ps = (struct POSTState *)userp;
03611 int submissions = -2;
03612 char id[256] = "NO SessionID";
03613
03614
03615 sscanf(buffer, "sessionID=%s\nSummary:\nSUCCESFUL submissions:%d", id, &submissions);
03616 strcpy(ps->session_id, id);
03617 ps->no_submitted = submissions;
03618 return strlen(buffer);
03619 }
03620
03621
03622 size_t parse_session_state(void *buffer, size_t size, size_t nmemb, void *userp)
03623 {
03624 int *inprogress = (int *)userp;
03625 int total = -2;
03626 char rest[1000];
03627
03628
03629 sscanf(buffer, "QUERY REQUEST:\nTotal Count=%d\nIn Progress=%d\n%s", &total, inprogress, rest);
03630 return strlen(buffer);
03631 }
03632
03633
03634
03635
03636 int session_status (char *session)
03637 {
03638 CURL *curl;
03639
03640 curl_global_init(CURL_GLOBAL_ALL);
03641 curl = curl_easy_init();
03642
03643 char url[2000];
03644
03645
03646 sprintf(url, "%s?type=query&sessionid=%s", JMD_URL, session);
03647 curl_easy_setopt(curl, CURLOPT_URL, url);
03648
03649
03650 int inprogress = -1;
03651 curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, parse_session_state);
03652 curl_easy_setopt(curl, CURLOPT_WRITEDATA, &inprogress);
03653 curl_easy_perform(curl);
03654
03655
03656 curl_easy_cleanup(curl);
03657
03658 return inprogress;
03659 }
03660
03661 void free_post_request (HContainer_t *postmap)
03662 {
03663 HIterator_t *hitmap = hiter_create(postmap);
03664 if (hitmap)
03665 {
03666 struct PassSunumList **ppoststruct = NULL;
03667 while ((ppoststruct = hiter_getnext(hitmap)) != NULL)
03668 {
03669 struct PassSunumList *poststruct = *ppoststruct;
03670 if (poststruct->sunumarr != NULL)
03671 {
03672 free(poststruct->sunumarr);
03673 poststruct->sunumarr=NULL;
03674 }
03675
03676 free(poststruct);
03677 poststruct = NULL;
03678 }
03679 }
03680
03681 hiter_destroy(&hitmap);
03682 hcon_destroy(&postmap);
03683 }
03684
03685 #endif
03686
03687 #endif // DRMS_CLIENT