00001
00002
00003 #define __DRMS_SERVER_C
00004 #include "drms.h"
00005 #include "drms_priv.h"
00006 #include "tee.h"
00007 #undef __DRMS_SERVER_C
00008 #include "xmem.h"
00009 #include <pwd.h>
00010 #ifdef __linux__
00011 #include <sched.h>
00012 #endif
00013 #include <dirent.h>
00014 #include "printk.h"
00015
00016
00017 #ifdef DRMS_CLIENT
00018 #define DEFS_CLIENT
00019 #endif
00020 #include "drmssite_info.h"
00021
00022 #ifdef DEFS_CLIENT
00023 #undef DEFS_CLIENT
00024 #endif
00025
00026 #define kDELSERFILE "thefile.txt"
00027 #define kDelSerChunk 10000
00028 #define kMaxSleep (90)
00029 #define kSUMSDead -2
00030 #define kBrokenPipe -99
00031 #define kTooManySumsOpen -98
00032
00033 sem_t *gShutdownsem = NULL;
00034
00035 DRMS_Shutdown_State_t gShutdown;
00036
00037
00038 pthread_mutex_t *gSUMSbusyMtx = NULL;
00039 int gSUMSbusy = 0;
00040
00041
00042 static volatile sig_atomic_t gGotPipe = 0;
00043
00044
00045
00046
00047
00048
00049
00050 HContainer_t *gSgPending = NULL;
00051
00052
00053 static void drms_delete_temporaries(DRMS_Env_t *env);
00054
00055
00056 static int IsMTSums(int opcode)
00057 {
00058 int isMTSums = 0;
00059
00060
00061
00062
00063 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS
00064 if (opcode == DRMS_SUMOPEN || opcode == DRMS_SUMCLOSE)
00065 {
00066 #if defined(SUMS_USEMTSUMS_CONNECTION) && SUMS_USEMTSUMS_CONNECTION
00067 isMTSums = 1;
00068 #endif
00069 }
00070 else if (opcode == DRMS_SUMINFO)
00071 {
00072 #if defined(SUMS_USEMTSUMS_INFO) && SUMS_USEMTSUMS_INFO
00073 isMTSums = 1;
00074 #endif
00075 }
00076 else if (opcode == DRMS_SUMGET)
00077 {
00078 #if defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
00079 isMTSums = 1;
00080 #endif
00081 }
00082 else if (opcode == DRMS_SUMALLOC || opcode == DRMS_SUMALLOC2)
00083 {
00084 #if defined(SUMS_USEMTSUMS_ALLOC) && SUMS_USEMTSUMS_ALLOC
00085 isMTSums = 1;
00086 #endif
00087 }
00088 else if (opcode == DRMS_SUMPUT)
00089 {
00090 #if defined(SUMS_USEMTSUMS_PUT) && SUMS_USEMTSUMS_PUT
00091 isMTSums = 1;
00092 #endif
00093 }
00094 else if (opcode == DRMS_SUMDELETESERIES)
00095 {
00096 #if defined(SUMS_USEMTSUMS_DELETESUS) && SUMS_USEMTSUMS_DELETESUS
00097 isMTSums = 1;
00098 #endif
00099 }
00100 else
00101 {
00102
00103
00104
00105 printkerr("Invalid opcode %d.\n", opcode);
00106 }
00107
00108 #endif
00109
00110 return isMTSums;
00111 }
00112
00113
00114
00115 static int MakeSumsCall(DRMS_Env_t *env, int calltype, SUM_t **sumt, int (*history)(const char *fmt, ...), ...)
00116 {
00117 int opcode = 0;
00118 va_list ap;
00119 static int nsumsconn = 0;
00120
00121 if (calltype != DRMS_SUMCLOSE)
00122 {
00123
00124 drms_lock_server(env);
00125 if (!env->sumssafe)
00126 {
00127 fprintf(stderr, "Unable to call SUMS - SUMS is still processing a tape-read.\n");
00128 opcode = kSUMSDead;
00129 }
00130 drms_unlock_server(env);
00131 }
00132
00133 if (opcode == kSUMSDead)
00134 {
00135 return opcode;
00136 }
00137
00138 gGotPipe = 0;
00139
00140 switch (calltype)
00141 {
00142 case DRMS_SUMOPEN:
00143 {
00144
00145 va_start(ap, history);
00146 char *server = va_arg(ap, char *);
00147 char *db = va_arg(ap, char *);
00148
00149
00150
00151 if (nsumsconn >= MAXSUMOPEN)
00152 {
00153 *sumt = NULL;
00154 fprintf(stderr, "Attempting to exceed maximum number of available SUM_open() calls per process.\n");
00155 opcode = kTooManySumsOpen;
00156 }
00157 else
00158 {
00159 *sumt = SUM_open(server, db, history);
00160 opcode = -1;
00161 }
00162
00163 if (*sumt)
00164 {
00165 ++nsumsconn;
00166 }
00167
00168 va_end(ap);
00169 }
00170 break;
00171 case DRMS_SUMALLOC:
00172 {
00173 opcode = SUM_alloc(*sumt, history);
00174 }
00175 break;
00176 case DRMS_SUMGET:
00177 {
00178 opcode = SUM_get(*sumt, history);
00179 }
00180 break;
00181 case DRMS_SUMPUT:
00182 {
00183 opcode = SUM_put(*sumt, history);
00184 }
00185 break;
00186 case DRMS_SUMCLOSE:
00187 {
00188 opcode = SUM_close(*sumt, history);
00189 nsumsconn = 0;
00190 }
00191 break;
00192 case DRMS_SUMDELETESERIES:
00193 {
00194 va_start(ap, history);
00195 char *fpath = va_arg(ap, char *);
00196 char *series = va_arg(ap, char *);
00197
00198 opcode = SUM_delete_series(*sumt, fpath, series, history);
00199
00200 va_end(ap);
00201 }
00202 break;
00203 case DRMS_SUMALLOC2:
00204 {
00205 va_start(ap, history);
00206 uint64_t sunum = va_arg(ap, uint64_t);
00207
00208 opcode = SUM_alloc2(*sumt, sunum, history);
00209
00210 va_end(ap);
00211 }
00212 break;
00213 case DRMS_SUMEXPORT:
00214 {
00215 va_start(ap, history);
00216 SUMEXP_t *sumexpt = va_arg(ap, SUMEXP_t *);
00217
00218 opcode = SUM_export(sumexpt, history);
00219
00220 va_end(ap);
00221 }
00222 break;
00223 case DRMS_SUMINFO:
00224 {
00225 va_start(ap, history);
00226 uint64_t *dxarray = va_arg(ap, uint64_t *);
00227 int reqcnt = va_arg(ap, int);
00228
00229 opcode = SUM_infoArray(*sumt, dxarray, reqcnt, history);
00230
00231 va_end(ap);
00232 }
00233 break;
00234 default:
00235 fprintf(stderr, "Invalid SUMS call type '%d'.\n", calltype);
00236
00237 }
00238
00239 if (gGotPipe)
00240 {
00241
00242 fprintf(stderr, "Received a SIGPIPE signal; error calling SUMS call %d.\n", calltype);
00243 opcode = kBrokenPipe;
00244 }
00245
00246 return opcode;
00247 }
00248
00249 static DRMS_SumRequest_t *drms_process_sums_request(DRMS_Env_t *env,
00250 SUM_t **sum,
00251 DRMS_SumRequest_t *request,
00252 int opcode,
00253 int mtRequest);
00254
00255 static void GettingSleepier(int *sleepiness)
00256 {
00257 *sleepiness *= 2;
00258 if (*sleepiness > kMaxSleep)
00259 {
00260 *sleepiness = kMaxSleep;
00261 }
00262 }
00263
00264 sem_t *drms_server_getsdsem(void)
00265 {
00266 return gShutdownsem;
00267 }
00268
00269 void drms_server_initsdsem(void)
00270 {
00271
00272 gShutdownsem = malloc(sizeof(sem_t));
00273
00274
00275 sem_init(gShutdownsem, 0, 1);
00276
00277
00278 gShutdown = kSHUTDOWN_UNINITIATED;
00279 }
00280
00281 void drms_server_destroysdsem(void)
00282 {
00283 if (gShutdownsem)
00284 {
00285 sem_destroy(gShutdownsem);
00286 free(gShutdownsem);
00287 gShutdownsem = NULL;
00288 }
00289 }
00290
00291 DRMS_Shutdown_State_t drms_server_getsd(void)
00292 {
00293 return gShutdown;
00294 }
00295
00296 void drms_server_setsd(DRMS_Shutdown_State_t st)
00297 {
00298 gShutdown = st;
00299 }
00300
00301 int drms_server_authenticate(int sockfd, DRMS_Env_t *env, int clientid)
00302 {
00303 int status = 0;
00304 int tmp[8],*t;
00305 long long ltmp[2];
00306 struct iovec vec[16],*v;
00307 int port = -1;
00308 char *endptr = NULL;
00309 long long ival = 0;
00310
00311
00312 t=tmp; v=vec;
00313 net_packint(status, t++, v++);
00314 net_packint(clientid, t++, v++);
00315 net_packlonglong(env->session->sessionid, <mp[0], v++);
00316 net_packstring(env->session->sessionns, t++, v);
00317 v += 2;
00318 net_packlonglong(env->session->sunum, <mp[1], v++);
00319 net_packstring(env->session->sudir ? env->session->sudir : kNOLOGSUDIR, t++, v);
00320 v += 2;
00321
00322
00323 net_packstring(env->session->db_handle->dbhost, t++, v);
00324 v += 2;
00325
00326
00327
00328 ival = strtoll(env->session->db_handle->dbport, &endptr, 10);
00329 if (ival != 0 || endptr != env->session->db_handle->dbport)
00330 {
00331 port = (int)ival;
00332 }
00333
00334 net_packint(port, t++, v++);
00335
00336 net_packstring(env->session->db_handle->dbname, t++, v);
00337 v += 2;
00338
00339 net_packstring(env->session->db_handle->dbuser, t++, v);
00340 v += 2;
00341
00342 Writevn(sockfd, vec, 15);
00343
00344 return status;
00345 }
00346
00347
00348
00349
00350
00351
00352
00353 int drms_server_begin_transaction(DRMS_Env_t *env) {
00354
00355 int notfirst = 0;
00356
00357
00358
00359
00360 if (db_start_transaction(env->session->db_handle)) {
00361 fprintf(stderr,"Couldn't start database transaction.\n");
00362
00363
00364 pthread_kill(env->signal_thread, SIGTERM);
00365 }
00366 else
00367 {
00368 if (drms_session_setread(env) != DRMS_SUCCESS)
00369 {
00370 goto bailout;
00371 }
00372
00373
00374 drms_lock_server(env);
00375 env->transrunning = 1;
00376 drms_unlock_server(env);
00377
00378
00379
00380 if (env->dbtimeout != INT_MIN)
00381 {
00382 if (db_settimeout(env->session->db_handle, env->dbtimeout))
00383 {
00384 fprintf(stderr, "Failed to modify db-statement time-out to %d.\n", env->dbtimeout);
00385 }
00386 }
00387
00388
00389 if (env->dbutf8clientencoding != 0)
00390 {
00391 if (db_setutf8clientencoding(env->session->db_handle))
00392 {
00393 fprintf(stderr, "failed to set UTF8 client encoding\n");
00394 }
00395
00396 if (env->verbose)
00397 {
00398 printf("set UTF8 database client encoding\n");
00399 }
00400 }
00401 }
00402
00403
00404
00405
00406 if (env->drms_lock)
00407 {
00408 notfirst = 1;
00409 drms_lock_server(env);
00410 }
00411
00412 if (!env->transinit)
00413 {
00414 env->sum_inbox = tqueueInit (100);
00415 env->sum_outbox = tqueueInit (100);
00416
00417
00418
00419 env->drms_lock = malloc(sizeof(pthread_mutex_t));
00420 XASSERT(env->drms_lock);
00421 pthread_mutex_init (env->drms_lock, NULL);
00422
00423
00424 env->clientlock = malloc(sizeof(pthread_mutex_t));
00425 XASSERT(env->clientlock);
00426 pthread_mutex_init(env->clientlock, NULL);
00427
00428 drms_lock_server(env);
00429 if (drms_cache_init(env))
00430 {
00431 drms_unlock_server(env);
00432 goto bailout;
00433 }
00434
00435 env->transinit = 1;
00436 drms_unlock_server(env);
00437 }
00438
00439
00440
00441 if (!notfirst)
00442 {
00443 drms_lock_server(env);
00444 }
00445
00446 if (drms_server_open_session(env))
00447 {
00448 drms_unlock_server(env);
00449 return 1;
00450 }
00451
00452 env->sessionrunning = 1;
00453 drms_unlock_server(env);
00454
00455 return 0;
00456 bailout:
00457 drms_free_env(env, 1);
00458 return 1;
00459 }
00460
00461 void drms_server_end_transaction(DRMS_Env_t *env, int abort, int final) {
00462 if (abort) {
00463 drms_server_abort(env, final);
00464 } else {
00465 drms_server_commit(env, final);
00466 }
00467 }
00468
00469 int drms_session_setread(DRMS_Env_t *env)
00470 {
00471 int rv = DRMS_SUCCESS;
00472 char stmt[DRMS_MAXQUERYLEN];
00473
00474
00475 drms_lock_server(env);
00476
00477 if (env && env->session)
00478 {
00479
00480
00481
00482 env->session->readonly = 1;
00483
00484
00485 #if 0
00486
00487 snprintf(stmt, sizeof(stmt), "SET TRANSACTION READ ONLY");
00488
00489 if (db_dms(env->session->db_handle, NULL, stmt))
00490 {
00491 fprintf(stderr, "Failed to set transaction mode to READ WRITE.\n");
00492 rv = DRMS_ERROR_MODDBTRANS;
00493 }
00494 #endif
00495 }
00496 else
00497 {
00498 fprintf(stderr, "No session - cannot set it READ ONLY.\n");
00499 rv = DRMS_ERROR_MODDBTRANS;
00500 }
00501
00502 drms_unlock_server(env);
00503
00504 return rv;
00505 }
00506
00507 int drms_session_setwrite(DRMS_Env_t *env)
00508 {
00509 int rv = DRMS_SUCCESS;
00510 char stmt[DRMS_MAXQUERYLEN];
00511 pid_t pid;
00512 struct passwd *pwd = NULL;
00513
00514
00515 drms_lock_server(env);
00516
00517 pid = getpid();
00518 pwd = getpwuid(geteuid());
00519
00520
00521
00522
00523
00524
00525
00526
00527
00528
00529
00530 if (env->session->readonly != 1 || env->session->sessionid != 0 || *env->session->startTime == '\0' || env->session->sunum != 0 || env->session->sudir != NULL)
00531 {
00532 drms_unlock_server(env);
00533 return DRMS_SUCCESS;
00534 }
00535
00536 if (!env->session->sessionns || *env->session->sessionns == '\0')
00537 {
00538
00539 fprintf(stderr, "User %s has read-only access to DRMS, but is attempting to modify the DRMS database.\n", env->session->db_handle->dbuser);
00540 rv = DRMS_ERROR_MODDBTRANS;
00541 }
00542 else
00543 {
00544
00545 char hostbuf[1024];
00546
00547 snprintf(hostbuf, sizeof(hostbuf), "%s:%s", env->session->db_handle->dbhost, env->session->db_handle->dbport);
00548
00549 if ((env->session->stat_conn = db_connect(hostbuf, env->session->db_handle->dbuser, env->dbpasswd, env->session->db_handle->dbname, 1)) == NULL)
00550 {
00551 fprintf(stderr,"Error: Couldn't establish database connection for write-session logging.\n");
00552 pthread_kill(env->signal_thread, SIGTERM);
00553 rv = DRMS_ERROR_CANTCONNECTTODB;
00554 }
00555 }
00556
00557 if (rv == DRMS_SUCCESS)
00558 {
00559
00560 char *sn = malloc(sizeof(char) * strlen(env->session->sessionns) + 16);
00561
00562 if (sn)
00563 {
00564 sprintf(sn, "%s.drms_sessionid", env->session->sessionns);
00565 env->session->sessionid = db_sequence_getnext(env->session->stat_conn, sn);
00566 free(sn);
00567 }
00568 else
00569 {
00570 fprintf(stderr, "drms_session_setwrite(): out of memory.\n");
00571 rv = DRMS_ERROR_OUTOFMEMORY;
00572 }
00573 }
00574
00575 if (rv == DRMS_SUCCESS)
00576 {
00577 if (env->dolog)
00578 {
00579
00580
00581
00582
00583 int tg = 1;
00584 long long sunum = -1;
00585 int sumsStat = 0;
00586
00587
00588 drms_unlock_server(env);
00589 sunum = drms_su_alloc(env, 1<<20, &env->session->sudir, &tg, &sumsStat);
00590 drms_lock_server(env);
00591 env->session->sunum = sunum;
00592
00593 if (sumsStat)
00594 {
00595 fprintf(stderr,"SUMS failed to allocate a storage unit for the write-log files: %d.\n", sumsStat);
00596 rv = DRMS_ERROR_SUMALLOC;
00597 }
00598 }
00599 }
00600
00601 if (rv == DRMS_SUCCESS)
00602 {
00603
00604 if (env->dolog)
00605 {
00606 char filename_e[DRMS_MAXPATHLEN];
00607 char filename_o[DRMS_MAXPATHLEN];
00608
00609
00610
00611
00612
00613 snprintf(stmt, DRMS_MAXQUERYLEN, "INSERT INTO %s."DRMS_SESSION_TABLE
00614 "(sessionid, hostname, port, pid, username, starttime, sunum, "
00615 "sudir, status, clients,lastcontact,sums_thread_status,jsoc_version) VALUES "
00616 "(?,?,?,?,?,?,?,?,'idle',0,LOCALTIMESTAMP(0),"
00617 "'starting', '%s(%d)')",
00618 env->session->sessionns,
00619 jsoc_version,
00620 jsoc_vers_num);
00621
00622 if (db_dmsv(env->session->stat_conn, NULL, stmt, -1,
00623 DB_INT8, env->session->sessionid,
00624 DB_STRING, env->session->hostname,
00625 DB_INT4, (int)env->session->port,
00626 DB_INT4, (int)pid,
00627 DB_STRING, pwd->pw_name,
00628 DB_STRING, env->session->startTime,
00629 DB_INT8, env->session->sunum,
00630 DB_STRING, env->session->sudir))
00631 {
00632 fprintf(stderr,"Error: Couldn't register session.\n");
00633 rv = DRMS_ERROR_BADDBQUERY;
00634 }
00635
00636 if (rv == DRMS_SUCCESS)
00637 {
00638
00639
00640 if (save_stdeo())
00641 {
00642 fprintf(stderr, "Can't save stdout and stderr.\n");
00643 rv = DRMS_ERROR_MODDBTRANS;
00644 }
00645 else
00646 {
00647 if (!env->quiet)
00648 {
00649
00650 CHECKSNPRINTF(snprintf(filename_e, DRMS_MAXPATHLEN, "%s/%s.stderr.gz", env->session->sudir, env->logfile_prefix), DRMS_MAXPATHLEN);
00651 CHECKSNPRINTF(snprintf(filename_o, DRMS_MAXPATHLEN, "%s/%s.stdout.gz", env->session->sudir, env->logfile_prefix), DRMS_MAXPATHLEN);
00652
00653 if ((env->tee_pid = tee_stdio(filename_o, 0644, filename_e, 0644)) < 0)
00654 {
00655 pthread_kill(env->signal_thread, SIGTERM);
00656 rv = DRMS_ERROR_MODDBTRANS;
00657 }
00658 }
00659 else
00660 {
00661
00662 CHECKSNPRINTF(snprintf(filename_e, DRMS_MAXPATHLEN, "%s/%s.stderr", env->session->sudir, env->logfile_prefix), DRMS_MAXPATHLEN);
00663 CHECKSNPRINTF(snprintf(filename_o, DRMS_MAXPATHLEN, "%s/%s.stdout", env->session->sudir, env->logfile_prefix), DRMS_MAXPATHLEN);
00664
00665 if (redirect_stdio(filename_o, 0644, filename_e, 0644))
00666 {
00667 pthread_kill(env->signal_thread, SIGTERM);
00668 rv = DRMS_ERROR_MODDBTRANS;
00669 }
00670 }
00671 }
00672 }
00673
00674 if (rv == DRMS_SUCCESS)
00675 {
00676 printf("DRMS server connected to database '%s' on host '%s' as user '%s'.\n",
00677 env->session->db_handle->dbname,
00678 env->session->db_handle->dbhost,
00679 env->session->db_handle->dbuser);
00680
00681 printf("DRMS_HOST = %s\n"
00682 "DRMS_PORT = %hu\n"
00683 "DRMS_PID = %lu\n"
00684 "DRMS_SESSIONID = %lld\n"
00685 "DRMS_SESSIONNS = %s\n",
00686 env->session->hostname,
00687 env->session->port,
00688 (unsigned long)pid,
00689 env->session->sessionid,
00690 env->session->sessionns);
00691 fflush(stdout);
00692 }
00693 }
00694 else
00695 {
00696
00697
00698
00699
00700 snprintf(stmt, DRMS_MAXQUERYLEN, "INSERT INTO %s."DRMS_SESSION_TABLE
00701 "(sessionid, hostname, port, pid, username, starttime, "
00702 "status, clients,lastcontact,sums_thread_status,jsoc_version) VALUES "
00703 "(?,?,?,?,?,?,'idle',0,LOCALTIMESTAMP(0),"
00704 "'starting', '%s(%d)')",
00705 env->session->sessionns,
00706 jsoc_version,
00707 jsoc_vers_num);
00708
00709 if (db_dmsv(env->session->stat_conn, NULL, stmt, -1,
00710 DB_INT8, env->session->sessionid,
00711 DB_STRING, env->session->hostname,
00712 DB_INT4, (int)env->session->port,
00713 DB_INT4, (int)pid,
00714 DB_STRING, pwd->pw_name,
00715 DB_STRING, env->session->startTime))
00716 {
00717 fprintf(stderr, "Error: Couldn't register session.\n");
00718 rv = DRMS_ERROR_MODDBTRANS;
00719 }
00720 }
00721 }
00722
00723 if (rv == DRMS_SUCCESS)
00724 {
00725
00726 env->session->readonly = 0;
00727 }
00728
00729 drms_unlock_server(env);
00730
00731 return rv;
00732 }
00733
00734
00735
00736
00737 int drms_server_open_session(DRMS_Env_t *env)
00738 {
00739 #ifdef DEBUG
00740 printf("In drms_server_open_session()\n");
00741 #endif
00742
00743
00744 time_t secs = time(NULL);
00745 struct tm *ltime = localtime(&secs);
00746 char tbuf[128];
00747
00748 strftime(tbuf, sizeof(tbuf), "%Y-%m-%d %T", ltime);
00749 snprintf(env->session->startTime, sizeof(env->session->startTime), "%s", tbuf);
00750 pid_t pid = getpid();
00751
00752 if (!strcmp(env->logfile_prefix, "drms_server"))
00753 {
00754
00755
00756 printf("DRMS server connected to database '%s' on host '%s' as user '%s'.\n",
00757 env->session->db_handle->dbname,
00758 env->session->db_handle->dbhost,
00759 env->session->db_handle->dbuser);
00760
00761 printf("DRMS_HOST = %s\n"
00762 "DRMS_PORT = %hu\n"
00763 "DRMS_PID = %lu\n"
00764 "__ENDSELFSTART__",
00765 env->session->hostname,
00766 env->session->port,
00767 (unsigned long)pid);
00768 fflush(stdout);
00769 }
00770
00771 return 0;
00772 }
00773
00774
00775
00776
00777 int drms_server_close_session(DRMS_Env_t *env, char *stat_str, int clients,
00778 int log_retention, int archive_log)
00779 {
00780 DRMS_StorageUnit_t su;
00781 DIR *dp;
00782 struct dirent *dirp;
00783 int emptydir = 1;
00784
00785
00786 fflush(stdout);
00787 fflush(stderr);
00788
00789 if (env->tee_pid > 0) {
00790 int status = 0;
00791
00792
00793
00794
00795 close(STDERR_FILENO);
00796 close(STDOUT_FILENO);
00797
00798 waitpid(env->tee_pid, &status, 0);
00799
00800
00801
00802 if (status) printf("Problem returning from tee\n");
00803 }
00804
00805
00806
00807 if (!env->session->readonly && env->dolog)
00808 {
00809 if (restore_stdeo())
00810 {
00811 printf("Can't restore stderr and stdout\n");
00812 }
00813 }
00814
00815 if (env->session->sudir) {
00816 if ((dp = opendir(env->session->sudir)) == NULL) {
00817 fprintf(stderr, "Can't open %s\n", env->session->sudir);
00818 return 1;
00819 }
00820
00821 while ((dirp = readdir(dp)) != NULL) {
00822 if (!strcmp(dirp->d_name, ".") ||
00823 !strcmp(dirp->d_name, ".."))
00824 continue;
00825
00826
00827
00828
00829 if (strncmp(dirp->d_name+strlen(dirp->d_name)-3, ".gz", 3)) {
00830 char command[DRMS_MAXPATHLEN];
00831 sprintf(command,"/bin/gzip %s/%s", env->session->sudir, dirp->d_name);
00832 system(command);
00833 emptydir = 0;
00834 } else {
00835 emptydir = 0;
00836 }
00837 }
00838 closedir(dp);
00839
00840
00841 memset(&su,0,sizeof(su));
00842 su.seriesinfo = malloc(sizeof(DRMS_SeriesInfo_t));
00843 XASSERT(su.seriesinfo);
00844 strcpy(su.seriesinfo->seriesname,DRMS_LOG_DSNAME);
00845 su.seriesinfo->tapegroup = DRMS_LOG_TAPEGROUP;
00846 su.seriesinfo->archive = archive_log;
00847 if (emptydir)
00848 su.seriesinfo->archive = 0;
00849 if (log_retention <= 0 )
00850 su.seriesinfo->retention = 1;
00851 else
00852 su.seriesinfo->retention = log_retention;
00853 strcpy(su.sudir, env->session->sudir);
00854 su.sunum = env->session->sunum;
00855 su.mode = DRMS_READWRITE;
00856 su.state = NULL;
00857 su.recnum = NULL;
00858 su.seriesinfo->hasshadow = 0;
00859 su.seriesinfo->createshadow = 0;
00860
00861
00862 drms_unlock_server(env);
00863 if (drms_commitunit(env, &su))
00864 fprintf(stderr,"Error: Couldn't commit log storage unit to SUMS.\n");
00865 drms_lock_server(env);
00866 free(su.seriesinfo);
00867 }
00868
00869 if (!env->session->readonly)
00870 {
00871
00872 char stmt[1024];
00873
00874 sprintf(stmt, "UPDATE %s."DRMS_SESSION_TABLE
00875 " SET sudir=NULL,status=?,clients=?,lastcontact=LOCALTIMESTAMP(0),"
00876 "endtime=LOCALTIMESTAMP(0) WHERE sessionid=?", env->session->sessionns);
00877
00878 if (db_dmsv(env->session->stat_conn, NULL, stmt, -1,
00879 DB_STRING, stat_str,
00880 DB_INT4, clients,
00881 DB_INT8, env->session->sessionid))
00882 {
00883
00884 return 1;
00885 }
00886 }
00887
00888 return 0;
00889 }
00890
00891
00892
00893
00894
00895
00896
00897
00898
00899 void drms_server_abort(DRMS_Env_t *env, int final)
00900 {
00901 drms_lock_server(env);
00902 if (env->verbose)
00903 fprintf(stderr,"WARNING: DRMS is aborting...\n");
00904
00905
00906 if (env->transrunning)
00907 {
00908 db_rollback(env->session->db_handle);
00909 env->transrunning = 0;
00910 }
00911
00912
00913 if (env->sessionrunning)
00914 {
00915 drms_server_close_session(env, abortstring, env->clientcounter, 7, 0);
00916 env->sessionrunning = 0;
00917 }
00918
00919
00920
00921
00922
00923
00924
00925 if (env->sum_thread) {
00926
00927
00928 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_CONNECTION) && SUMS_USEMTSUMS_CONNECTION
00929 DRMS_MtSumsRequest_t *request = NULL;
00930 request = calloc(1, sizeof(DRMS_MtSumsRequest_t));
00931
00932 #else
00933 DRMS_SumRequest_t *request = NULL;
00934 request = malloc(sizeof(DRMS_SumRequest_t));
00935 #endif
00936 XASSERT(request);
00937
00938 if (env->sum_tag)
00939 {
00940
00941
00942
00943
00944
00945
00946
00947 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_CONNECTION) && SUMS_USEMTSUMS_CONNECTION
00948 DRMS_MtSumsRequest_t *reply = NULL;
00949 reply = calloc(1, sizeof(DRMS_MtSumsRequest_t));
00950 #else
00951 DRMS_SumRequest_t *reply;
00952 reply = malloc(sizeof(DRMS_SumRequest_t));
00953 #endif
00954 XASSERT(reply);
00955 reply->opcode = DRMS_ERROR_ABORT;
00956 tqueueAdd(env->sum_outbox, env->sum_tag, (char *)reply);
00957 request->opcode = DRMS_SUMABORT;
00958 }
00959 else
00960 {
00961 request->opcode = DRMS_SUMCLOSE;
00962 }
00963
00964
00965
00966
00967 tqueueAdd(env->sum_inbox, (long)pthread_self(), (char *)request);
00968
00969 pthread_join(env->sum_thread, NULL);
00970 env->sum_thread = 0;
00971 }
00972
00973 db_disconnect(&env->session->stat_conn);
00974
00975
00976 db_disconnect(&env->session->db_handle);
00977
00978
00979
00980
00981
00982 if (env->server_wait)
00983 {
00984 if (env->verbose)
00985 fprintf(stderr, "WARNING: DRMS server aborting in %d seconds...\n",
00986 DRMS_ABORT_SLEEP);
00987 sleep(DRMS_ABORT_SLEEP);
00988 }
00989
00990
00991 if (final) {
00992
00993
00994 #ifndef DRMS_CLIENT
00995 drms_unlock_server(env);
00996 #endif
00997
00998 drms_free_env(env, 1);
00999 } else {
01000
01001 drms_unlock_server(env);
01002 }
01003
01004
01005 #ifdef DEBUG
01006 printf("Exiting drms_server_abort()\n");
01007 #endif
01008 }
01009
01010
01011
01012
01013
01014
01015
01016
01017
01018
01019 void drms_server_commit(DRMS_Env_t *env, int final)
01020 {
01021 int log_retention, archive_log;
01022 int status = 0;
01023
01024 if (env->verbose)
01025 printf("DRMS is committing...stand by...\n");
01026
01027 drms_lock_server(env);
01028
01029 if (env->verbose)
01030 printf("Deleting temporary records from DRMS.\n");
01031 drms_delete_temporaries(env);
01032
01033
01034 if (env->verbose)
01035 printf("Commiting changes to SUMS.\n");
01036
01037
01038
01039
01040
01041
01042
01043
01044
01045 if (env->session->db_direct == 1)
01046 {
01047 drms_fitsrw_term(env->verbose);
01048 }
01049
01050
01051 drms_unlock_server(env);
01052 log_retention = drms_commit_all_units(env, &archive_log, &status);
01053 drms_lock_server(env);
01054
01055
01056 if (env->verbose)
01057 printf("Unregistering DRMS session.\n");
01058
01059 if (env->sessionrunning)
01060 {
01061 drms_server_close_session(env, "finished", 0, log_retention, archive_log);
01062 env->sessionrunning = 0;
01063 }
01064
01065 if (env->sum_thread) {
01066
01067
01068 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_CONNECTION) && SUMS_USEMTSUMS_CONNECTION
01069 DRMS_MtSumsRequest_t *request = NULL;
01070 request = calloc(1, sizeof(DRMS_MtSumsRequest_t));
01071 #else
01072 DRMS_SumRequest_t *request = NULL;
01073 request = malloc(sizeof(DRMS_SumRequest_t));
01074 #endif
01075 XASSERT(request);
01076
01077 request->opcode = DRMS_SUMCLOSE;
01078 tqueueAdd(env->sum_inbox, (long) pthread_self(), (char *)request);
01079
01080
01081
01082
01083
01084 drms_unlock_server(env);
01085 pthread_join(env->sum_thread, NULL);
01086 drms_lock_server(env);
01087 env->sum_thread = 0;
01088 }
01089
01090 db_disconnect(&env->session->stat_conn);
01091
01092
01093 if (env->transrunning)
01094 {
01095 if (status)
01096 {
01097 db_rollback(env->session->db_handle);
01098 }
01099 else
01100 {
01101 db_commit(env->session->db_handle);
01102 }
01103
01104 env->transrunning = 0;
01105 }
01106
01107 if (final) {
01108 db_disconnect(&env->session->db_handle);
01109 }
01110
01111
01112 if (env->server_wait) {
01113 if (env->verbose)
01114 fprintf(stderr, "WARNING: DRMS server stopping in approximately %d "
01115 "seconds...\n", DRMS_ABORT_SLEEP);
01116 sleep(DRMS_ABORT_SLEEP);
01117 }
01118
01119
01120 if (final)
01121 {
01122 drms_free_env(env, 1);
01123 }
01124 else
01125 {
01126 drms_unlock_server(env);
01127 }
01128
01129 #ifdef DEBUG
01130 printf("Exiting drms_server_commit()\n");
01131 #endif
01132 }
01133
01134
01135
01136
01137
01138
01139 void *drms_server_thread(void *arg)
01140 {
01141 DRMS_ThreadInfo_t *tinfo;
01142 struct sockaddr_in client;
01143 int noshare, tnum, sockfd;
01144 unsigned int client_size = sizeof(client);
01145 int command,status,disconnect;
01146 DRMS_Env_t *env;
01147 DB_Handle_t *db_handle;
01148
01149
01150 if (pthread_detach(pthread_self()))
01151 {
01152 printf("Thread detach failed\n");
01153 return NULL;
01154 }
01155
01156
01157 tinfo = (DRMS_ThreadInfo_t *) arg;
01158 sockfd = tinfo->sockfd;
01159 tnum = tinfo->threadnum;
01160 env = tinfo->env;
01161 noshare = tinfo->noshare;
01162 free(tinfo);
01163 XASSERT(env->session->db_direct == 1);
01164 db_handle = env->session->db_handle;
01165
01166
01167 if( (status = pthread_sigmask(SIG_BLOCK, &env->signal_mask, NULL)))
01168 {
01169 printf("pthread_sigmask call failed with status = %d\n", status);
01170
01171
01172
01173 pthread_kill(env->signal_thread, SIGTERM);
01174 goto bail;
01175 }
01176
01177 if ( getpeername(sockfd, (struct sockaddr *)&client, &client_size) == -1 )
01178 {
01179 perror("accept call failed.");
01180 goto bail;
01181 }
01182
01183 printf("thread %d: handling new connection from %s:%d.\n",
01184 tnum, inet_ntoa(client.sin_addr),
01185 ntohs(client.sin_port));
01186
01187
01188 if (noshare)
01189 {
01190 if (db_start_transaction(db_handle))
01191 {
01192 fprintf(stderr,"thread %d: Couldn't start database transaction.\n",tnum);
01193 goto bail;
01194 }
01195
01196 if (drms_session_setread(env) != DRMS_SUCCESS)
01197 {
01198 goto bail;
01199 }
01200 }
01201
01202
01203 if (env->verbose)
01204 printf("thread %d: Waiting for command.\n", tnum);
01205
01206 disconnect = 0; status = 0;
01207 while ( !status && !disconnect && readint(sockfd, &command) == sizeof(int))
01208 {
01209
01210 Writeint(sockfd, command);
01211
01212 switch(command)
01213 {
01214 case DRMS_DISCONNECT:
01215 if (env->verbose)
01216 printf("thread %d: Executing DRMS_DISCONNECT.\n", tnum);
01217 disconnect = 1;
01218 status = Readint(sockfd);
01219 break;
01220 case DRMS_ROLLBACK:
01221 if (env->verbose)
01222 printf("thread %d: Executing DRMS_ROLLBACK.\n", tnum);
01223 pthread_mutex_lock(env->clientlock);
01224 status = db_rollback(db_handle);
01225
01226 if (!status)
01227 {
01228 status = db_start_transaction(db_handle);
01229 }
01230
01231 if (!status)
01232 {
01233 status = drms_session_setread(env);
01234 }
01235
01236 pthread_mutex_unlock(env->clientlock);
01237 Writeint(sockfd, status);
01238 break;
01239 case DRMS_COMMIT:
01240 if (env->verbose)
01241 printf("thread %d: Executing DRMS_COMMIT.\n", tnum);
01242 pthread_mutex_lock(env->clientlock);
01243 status = db_commit(db_handle);
01244
01245 if (!status)
01246 {
01247 status = db_start_transaction(db_handle);
01248 }
01249
01250 if (!status)
01251 {
01252 status = drms_session_setread(env);
01253 }
01254
01255 pthread_mutex_unlock(env->clientlock);
01256 Writeint(sockfd, status);
01257 break;
01258 case DRMS_TXTQUERY:
01259 if (env->verbose)
01260 printf("thread %d: Executing DRMS_TXTQUERY.\n",tnum);
01261 status = db_server_query_txt(sockfd, db_handle);
01262 break;
01263 case DRMS_BINQUERY:
01264 if (env->verbose)
01265 printf("thread %d: Executing DRMS_BINQUERY.\n",tnum);
01266 status = db_server_query_bin(sockfd, db_handle);
01267 break;
01268 case DRMS_DMS:
01269 if (env->verbose)
01270 printf("thread %d: Executing DRMS_DMS.\n",tnum);
01271 status = db_server_dms(sockfd, db_handle);
01272 break;
01273 case DRMS_DMS_ARRAY:
01274 if (env->verbose)
01275 printf("thread %d: Executing DRMS_DMS_ARRAY.\n",tnum);
01276 status = db_server_dms_array(sockfd, db_handle);
01277 break;
01278 case DRMS_BULK_INSERT_ARRAY:
01279 if (env->verbose)
01280 printf("thread %d: Executing DRMS_BULK_INSERT_ARRAY.\n",tnum);
01281 status = db_server_bulk_insert_array(sockfd, db_handle);
01282 break;
01283 case DRMS_SEQUENCE_DROP:
01284 if (env->verbose)
01285 printf("thread %d: Executing DRMS_SEQUENCE_DROP.\n",tnum);
01286 status = db_server_sequence_drop(sockfd, db_handle);
01287 break;
01288 case DRMS_SEQUENCE_CREATE:
01289 if (env->verbose)
01290 printf("thread %d: Executing DRMS_SEQUENCE_CREATE.\n",tnum);
01291 status = db_server_sequence_create(sockfd, db_handle);
01292 break;
01293 case DRMS_SEQUENCE_GETNEXT:
01294 if (env->verbose)
01295 printf("thread %d: Executing DRMS_SEQUENCE_GETNEXT.\n",tnum);
01296 status = db_server_sequence_getnext_n(sockfd, db_handle);
01297 break;
01298 case DRMS_SEQUENCE_GETCURRENT:
01299 if (env->verbose)
01300 printf("thread %d: Executing DRMS_SEQUENCE_GETCURRENT.\n",tnum);
01301 status = db_server_sequence_getcurrent(sockfd, db_handle);
01302 break;
01303 case DRMS_SEQUENCE_GETLAST:
01304 if (env->verbose)
01305 printf("thread %d: Executing DRMS_SEQUENCE_GETLAST.\n",tnum);
01306 status = db_server_sequence_getlast(sockfd, db_handle);
01307 break;
01308 case DRMS_BINQUERY_ARRAY:
01309 if (env->verbose)
01310 printf("thread %d: Executing DRMS_BINQUERY_ARRAY.\n",tnum);
01311 status = db_server_query_bin_array(sockfd, db_handle);
01312 break;
01313 case DRMS_BINQUERY_NTUPLE:
01314 {
01315 if (env->verbose)
01316 {
01317 printf("thread %d: Executing DRMS_BINQUERY_NTUPLE.\n", tnum);
01318 }
01319
01320 status = db_server_query_bin_ntuple(sockfd, db_handle);
01321 break;
01322 }
01323 case DRMS_ALLOC_RECNUM:
01324 if (env->verbose)
01325 printf("thread %d: Executing DRMS_ALLOC_RECNUM.\n",tnum);
01326 status = drms_server_alloc_recnum(env, sockfd);
01327 break;
01328 case DRMS_NEWSLOTS:
01329 if (env->verbose)
01330 printf("thread %d: Executing DRMS_NEWSLOTS.\n",tnum);
01331 pthread_mutex_lock(env->clientlock);
01332 status = drms_server_newslots(env, sockfd);
01333 pthread_mutex_unlock(env->clientlock);
01334 break;
01335 case DRMS_SLOT_SETSTATE:
01336 if (env->verbose)
01337 printf("thread %d: Executing DRMS_SLOT_SETSTATE.\n",tnum);
01338 pthread_mutex_lock(env->clientlock);
01339 status = drms_server_slot_setstate(env, sockfd);
01340 pthread_mutex_unlock(env->clientlock);
01341 break;
01342 case DRMS_GETUNIT:
01343 if (env->verbose)
01344 printf("thread %d: Executing DRMS_GETUNIT.\n",tnum);
01345 pthread_mutex_lock(env->clientlock);
01346 status = drms_server_getunit(env, sockfd);
01347 pthread_mutex_unlock(env->clientlock);
01348 break;
01349 case DRMS_GETUNITS:
01350 if (env->verbose)
01351 printf("thread %d: Executing DRMS_GETUNITS.\n",tnum);
01352
01353 pthread_mutex_lock(env->clientlock);
01354 status = drms_server_getunits(env, sockfd);
01355 pthread_mutex_unlock(env->clientlock);
01356 break;
01357 case DRMS_GETSUDIR:
01358 if (env->verbose)
01359 printf("thread %d: Executing DRMS_GETSUDIR.\n",tnum);
01360
01361
01362 pthread_mutex_lock(env->clientlock);
01363 status = drms_server_getsudir(env, sockfd);
01364 pthread_mutex_unlock(env->clientlock);
01365 break;
01366 case DRMS_GETSUDIRS:
01367 if (env->verbose)
01368 printf("thread %d: Executing DRMS_GETSUDIRS.\n",tnum);
01369
01370 pthread_mutex_lock(env->clientlock);
01371 status = drms_server_getsudirs(env, sockfd);
01372 pthread_mutex_unlock(env->clientlock);
01373 break;
01374 case DRMS_NEWSERIES:
01375 if (env->verbose)
01376 printf("thread %d: Executing DRMS_NEWSERIES.\n",tnum);
01377 pthread_mutex_lock(env->clientlock);
01378 status = drms_server_newseries(env, sockfd);
01379 pthread_mutex_unlock(env->clientlock);
01380 break;
01381 case DRMS_DROPSERIES:
01382 if (env->verbose)
01383 printf("thread %d: Executing DRMS_DROPSERIES.\n",tnum);
01384 pthread_mutex_lock(env->clientlock);
01385 status = drms_server_dropseries(env, sockfd);
01386 pthread_mutex_unlock(env->clientlock);
01387 break;
01388 case DRMS_GETTMPGUID:
01389 pthread_mutex_lock(env->clientlock);
01390 drms_server_gettmpguid(&sockfd);
01391 pthread_mutex_unlock(env->clientlock);
01392 break;
01393 case DRMS_SITEINFO:
01394 pthread_mutex_lock(env->clientlock);
01395 drmssite_server_siteinfo(sockfd, db_handle);
01396 pthread_mutex_unlock(env->clientlock);
01397 break;
01398 case DRMS_LOCALSITEINFO:
01399 pthread_mutex_lock(env->clientlock);
01400 drmssite_server_localsiteinfo(sockfd, db_handle);
01401 pthread_mutex_unlock(env->clientlock);
01402 break;
01403 case DRMS_GETSUINFO:
01404 pthread_mutex_lock(env->clientlock);
01405 drms_server_getsuinfo(env, sockfd);
01406 pthread_mutex_unlock(env->clientlock);
01407 break;
01408 case DRMS_GETDBUSER:
01409 pthread_mutex_lock(env->clientlock);
01410 drms_server_getdbuser(env, sockfd);
01411 pthread_mutex_unlock(env->clientlock);
01412 break;
01413 case DRMS_SETRETENTION:
01414 pthread_mutex_lock(env->clientlock);
01415 status = drms_server_setretention(env, sockfd);
01416 pthread_mutex_unlock(env->clientlock);
01417 break;
01418 case DRMS_MAKESESSIONWRITABLE:
01419 pthread_mutex_lock(env->clientlock);
01420 status = drms_server_session_setwrite(env, sockfd);
01421 pthread_mutex_unlock(env->clientlock);
01422 break;
01423 default:
01424 fprintf(stderr,"Error: Unknown command code '%d'\n",command);
01425 }
01426 if (status)
01427 if (env->verbose)
01428 printf("thread %d: WARNING: Last command failed with status = %d\n",
01429 tnum,status);
01430
01431
01432 #ifdef DEBUG
01433 xmem_check_all_guardwords(stdout, 100);
01434 #endif
01435
01436 #ifdef __linux__
01437 sched_yield();
01438
01439 #endif
01440 }
01441
01442
01443
01444
01445
01446
01447
01448
01449
01450
01451
01452
01453
01454
01455
01456
01457
01458
01459
01460
01461
01462
01463
01464
01465
01466
01467
01468
01469
01470 drms_lock_server(env);
01471 --(env->clientcounter);
01472 drms_unlock_server(env);
01473
01474 if (noshare)
01475 {
01476
01477 drms_lock_server(env);
01478
01479 if (disconnect && !status)
01480 {
01481 if (env->verbose)
01482 printf("thread %d: Performing COMMIT.\n",tnum);
01483 if(db_commit(db_handle))
01484 fprintf(stderr,"thread %d: COMMIT failed.\n",tnum);
01485 }
01486 else
01487 {
01488 if (env->verbose)
01489 printf("thread %d: Performing ROLLBACK.\n",tnum);
01490 if(db_rollback(db_handle))
01491 fprintf(stderr,"thread %d: ROLLBACK failed.\n",tnum);
01492 }
01493
01494 if(db_start_transaction(db_handle))
01495 {
01496 fprintf(stderr,"thread %d: START TRANSACTION failed.\n",tnum);
01497 }
01498 else
01499 {
01500
01501 drms_unlock_server(env);
01502 if (drms_session_setread(env) != DRMS_SUCCESS)
01503 {
01504
01505 fprintf(stderr, "thread %d: SET READ ONLY failed.\n", tnum);
01506 }
01507 drms_lock_server(env);
01508 }
01509
01510 drms_unlock_server(env);
01511 }
01512 else
01513 {
01514
01515
01516
01517
01518
01519
01520
01521
01522 if (!status && disconnect)
01523 {
01524
01525 drms_lock_server(env);
01526
01527 drms_unlock_server(env);
01528 }
01529 else
01530 {
01531 if (!status && !disconnect)
01532 fprintf(stderr,"thread %d: The client module seems to have crashed "
01533 "or aborted.\nSocket connection was lost. Aborting.\n",tnum);
01534 else if (status && !disconnect)
01535 fprintf(stderr,"thread %d: An error occured in server thread, "
01536 "status=%d.\nAborting.\n",tnum,status);
01537 else if (status && disconnect)
01538 fprintf(stderr,"thread %d: The client module disconnected and set "
01539 "the abort flag.\nAborting.\n",tnum);
01540
01541
01542
01543
01544
01545
01546
01547
01548
01549
01550
01551
01552
01553
01554
01555
01556
01557
01558
01559
01560 pthread_kill(env->signal_thread, SIGTERM);
01561 }
01562 }
01563
01564 printf("thread %d: Exiting with disconnect = %d and status = %d.\n",
01565 tnum,disconnect,status);
01566
01567 close(sockfd);
01568 return NULL;
01569
01570 bail:
01571 drms_lock_server(env);
01572 --(env->clientcounter);
01573 drms_unlock_server(env);
01574 close(sockfd);
01575 return NULL;
01576 }
01577
01578
01579
01580
01581
01582 int drms_server_newslots(DRMS_Env_t *env, int sockfd)
01583 {
01584 int status,n,i;
01585 char *series;
01586 int *slotnum;
01587 long long *recnum;
01588 DRMS_StorageUnit_t **su;
01589 DRMS_RecLifetime_t lifetime;
01590 int createslotdirs;
01591 int gotosums;
01592
01593 status = DRMS_SUCCESS;
01594 series = receive_string(sockfd);
01595
01596 n = Readint(sockfd);
01597 if (n>0)
01598 {
01599 lifetime = (DRMS_RecLifetime_t) Readint(sockfd);
01600 createslotdirs = Readint(sockfd);
01601
01602
01603 gotosums = Readint(sockfd);
01604
01605 su = malloc(n*sizeof(DRMS_StorageUnit_t *));
01606 XASSERT(su);
01607 slotnum = malloc(n*sizeof(int));
01608 XASSERT(slotnum);
01609 recnum = malloc(n*sizeof(long long));
01610 XASSERT(recnum);
01611 for (i=0; i<n; i++)
01612 recnum[i] = Readlonglong(sockfd);
01613
01614 #if defined(DEBUG)
01615 fprintf(stdout, "series = '%s'\nn = '%d'\nlifetime = '%d'\ncreateslotdirs = '%d'\n",
01616 series, n, lifetime, createslotdirs);
01617 #endif
01618
01619 if (gotosums)
01620 {
01621 status = drms_su_newslots(env, n, series, recnum, lifetime, slotnum, su, createslotdirs);
01622 }
01623 else
01624 {
01625 status = drms_su_newslots_nosums(env, n, series, recnum, lifetime, slotnum, su, createslotdirs);
01626 }
01627
01628 if (status==DRMS_SUCCESS)
01629 {
01630 int *tmp, *t;
01631 long long *ltmp, *lt;
01632 struct iovec *vec, *v;
01633
01634 tmp = malloc((2*n+1)*sizeof(int));
01635 XASSERT(tmp);
01636 ltmp = malloc((n+1)*sizeof(long long));
01637 XASSERT(ltmp);
01638 vec = malloc((4*n+1)*sizeof(struct iovec));
01639 XASSERT(vec);
01640 t=tmp; v=vec; lt=ltmp;
01641 net_packint(status, t++, v++);
01642 for (i=0; i<n; i++)
01643 {
01644 net_packlonglong(su[i]->sunum, lt++, v++);
01645 net_packstring(su[i]->sudir, t++, v); v+=2;
01646 net_packint(slotnum[i], t++, v++);
01647 }
01648 Writevn(sockfd, vec, (4*n+1));
01649 free(tmp);
01650 free(ltmp);
01651 free(vec);
01652 }
01653 else
01654 Writeint(sockfd, status);
01655 free(slotnum);
01656 free(su);
01657 free(recnum);
01658 }
01659 free(series);
01660 return status;
01661 }
01662
01663
01664
01665
01666
01667 int drms_server_getunit(DRMS_Env_t *env, int sockfd)
01668 {
01669 int status;
01670 long long sunum;
01671 char *series;
01672 int retrieve;
01673 DRMS_StorageUnit_t *su;
01674
01675 series = receive_string(sockfd);
01676 sunum = Readlonglong(sockfd);
01677 retrieve = Readint(sockfd);
01678 su = drms_getunit(env, series, sunum, retrieve, &status);
01679 if (status==DRMS_SUCCESS)
01680 {
01681 int tmp,len;
01682 struct iovec vec[3];
01683 tmp = htonl(status);
01684 vec[0].iov_len = sizeof(tmp);
01685 vec[0].iov_base = &tmp;
01686 if (su) {
01687 vec[2].iov_len = strlen(su->sudir);
01688 vec[2].iov_base = su->sudir;
01689 len = htonl(vec[2].iov_len);
01690 vec[1].iov_len = sizeof(len);
01691 vec[1].iov_base = &len;
01692 } else {
01693 vec[2].iov_len = 0;
01694 vec[2].iov_base = "\0";
01695 len = htonl(vec[2].iov_len);
01696 vec[1].iov_len = sizeof(len);
01697 vec[1].iov_base = &len;
01698 }
01699 Writevn(sockfd, vec, 3);
01700 }
01701 else
01702 Writeint(sockfd,status);
01703 free(series);
01704 return status;
01705 }
01706
01707
01708
01709
01710 int drms_server_getunits(DRMS_Env_t *env, int sockfd)
01711 {
01712 int status;
01713 int n;
01714
01715 int retrieve, dontwait;
01716 DRMS_StorageUnit_t *su;
01717 DRMS_SuAndSeries_t *suandseries = NULL;
01718 int icnt;
01719
01720 n = Readint(sockfd);
01721
01722 suandseries = (DRMS_SuAndSeries_t *)malloc(sizeof(DRMS_SuAndSeries_t) * n);
01723
01724 for (icnt = 0; icnt < n; icnt++)
01725 {
01726 suandseries[icnt].series = receive_string(sockfd);
01727 }
01728
01729 for (icnt = 0; icnt < n; icnt++)
01730 {
01731 suandseries[icnt].sunum = Readlonglong(sockfd);
01732 }
01733
01734 retrieve = Readint(sockfd);
01735 dontwait = Readint(sockfd);
01736
01737
01738 dontwait = 0;
01739
01740 status = drms_getunits_ex(env, n, suandseries, retrieve, dontwait);
01741 Writeint(sockfd,status);
01742 if (status==DRMS_SUCCESS) {
01743 if (!dontwait) {
01744 int *len;
01745 struct iovec *vec;
01746
01747 len = malloc(n*sizeof(int));
01748 XASSERT(len);
01749 vec = malloc(2*n*sizeof(struct iovec));
01750 XASSERT(vec);
01751 for (int i = 0; i < n; i++) {
01752 HContainer_t *scon = NULL;
01753 su = drms_su_lookup(env, suandseries[i].series, suandseries[i].sunum, &scon);
01754 if (su) {
01755 vec[2*i+1].iov_len = strlen(su->sudir);
01756 vec[2*i+1].iov_base = su->sudir;
01757 len[i] = htonl(vec[2*i+1].iov_len);
01758 vec[2*i].iov_len = sizeof(len[i]);
01759 vec[2*i].iov_base = &len[i];
01760 } else {
01761 vec[2*i+1].iov_len = 0;
01762 vec[2*i+1].iov_base = "\0";
01763 len[i] = htonl(vec[2*i+1].iov_len);
01764 vec[2*i].iov_len = sizeof(len[i]);
01765 vec[2*i].iov_base = &len[i];
01766 }
01767 }
01768 Writevn(sockfd, vec, 2*n);
01769 free(len);
01770 free(vec);
01771 }
01772 }
01773
01774 for (icnt = 0; icnt < n; icnt++)
01775 {
01776 if (suandseries[icnt].series)
01777 {
01778 free(suandseries[icnt].series);
01779 }
01780 }
01781
01782 free(suandseries);
01783
01784 return status;
01785 }
01786
01787 int drms_server_getsudir(DRMS_Env_t *env, int sockfd)
01788 {
01789 int retrieve;
01790 DRMS_StorageUnit_t su;
01791 int status;
01792
01793 su.sudir[0] = '\0';
01794 su.mode = DRMS_READONLY;
01795 su.nfree = 0;
01796 su.state = NULL;
01797 su.recnum = NULL;
01798 su.refcount = 0;
01799 su.seriesinfo = NULL;
01800
01801
01802 su.sunum = Readlonglong(sockfd);
01803 retrieve = Readint(sockfd);
01804
01805 status = drms_su_getsudir(env, &su, retrieve);
01806
01807 send_string(sockfd, su.sudir);
01808 Writeint(sockfd, status);
01809
01810 if (status == DRMS_REMOTESUMS_TRYLATER)
01811 {
01812 return DRMS_SUCCESS;
01813 }
01814 else
01815 {
01816 return status;
01817 }
01818 }
01819
01820 int drms_server_getsudirs(DRMS_Env_t *env, int sockfd)
01821 {
01822 DRMS_StorageUnit_t **su = NULL;
01823 DRMS_StorageUnit_t *onesu = NULL;
01824 int num;
01825 int retrieve;
01826 int dontwait;
01827 int isu;
01828 int status;
01829
01830 num = Readint(sockfd);
01831
01832 su = malloc(sizeof(DRMS_StorageUnit_t *) * num);
01833
01834 for (isu = 0; isu < num; isu++)
01835 {
01836 su[isu] = (DRMS_StorageUnit_t *)malloc(sizeof(DRMS_StorageUnit_t));
01837 onesu = su[isu];
01838 onesu->sunum = Readlonglong(sockfd);
01839 onesu->sudir[0] = '\0';
01840 onesu->mode = DRMS_READONLY;
01841 onesu->nfree = 0;
01842 onesu->state = NULL;
01843 onesu->recnum = NULL;
01844 onesu->refcount = 0;
01845 onesu->seriesinfo = NULL;
01846 }
01847
01848 retrieve = Readint(sockfd);
01849 dontwait = Readint(sockfd);
01850
01851
01852 dontwait = 0;
01853
01854 status = drms_su_getsudirs(env, num, su, retrieve, dontwait);
01855
01856 for (isu = 0; isu < num; isu++)
01857 {
01858 onesu = su[isu];
01859 send_string(sockfd, onesu->sudir);
01860 }
01861
01862 if (su)
01863 {
01864 for (isu = 0; isu < num; isu++)
01865 {
01866 if (su[isu])
01867 {
01868 free(su[isu]);
01869 }
01870 }
01871
01872 free(su);
01873 }
01874
01875 if (status == DRMS_REMOTESUMS_TRYLATER)
01876 {
01877 return DRMS_SUCCESS;
01878 }
01879 else
01880 {
01881 return status;
01882 }
01883 }
01884
01885
01886 int drms_server_getsuinfo(DRMS_Env_t *env, int sockfd)
01887 {
01888 int status = DRMS_SUCCESS;
01889 int nReqs;
01890 int isunum;
01891 long long *sunums = NULL;
01892 SUM_info_t **infostructs = NULL;
01893 SUM_info_t *info = NULL;
01894
01895 nReqs = Readint(sockfd);
01896
01897 sunums = (long long *)malloc(sizeof(long long) * nReqs);
01898 infostructs = (SUM_info_t **)malloc(sizeof(SUM_info_t *) * nReqs);
01899
01900 for (isunum = 0; isunum < nReqs; isunum++)
01901 {
01902 sunums[isunum] = Readlonglong(sockfd);
01903 }
01904
01905 status = drms_su_getinfo(env, sunums, nReqs, infostructs);
01906
01907 Writeint(sockfd, status);
01908
01909 if (status == DRMS_SUCCESS)
01910 {
01911
01912 for (isunum = 0; isunum < nReqs; isunum++)
01913 {
01914 info = infostructs[isunum];
01915
01916 Writelonglong(sockfd, info->sunum);
01917 send_string(sockfd, info->online_loc);
01918 send_string(sockfd, info->online_status);
01919 send_string(sockfd, info->archive_status);
01920 send_string(sockfd, info->offsite_ack);
01921 send_string(sockfd, info->history_comment);
01922 send_string(sockfd, info->owning_series);
01923 Writeint(sockfd, info->storage_group);
01924 Write_dbtype(DB_DOUBLE, (void *)&(info->bytes), sockfd);
01925 send_string(sockfd, info->creat_date);
01926 send_string(sockfd, info->username);
01927 send_string(sockfd, info->arch_tape);
01928 Writeint(sockfd, info->arch_tape_fn);
01929 send_string(sockfd, info->arch_tape_date);
01930 send_string(sockfd, info->safe_tape);
01931 Writeint(sockfd, info->safe_tape_fn);
01932 send_string(sockfd, info->safe_tape_date);
01933 Writeint(sockfd, info->pa_status);
01934 Writeint(sockfd, info->pa_substatus);
01935 send_string(sockfd, info->effective_date);
01936 }
01937 }
01938
01939 if (infostructs)
01940 {
01941 free(infostructs);
01942 infostructs = NULL;
01943 }
01944
01945 return status;
01946 }
01947
01948 int drms_server_getdbuser(DRMS_Env_t *env, int sockfd)
01949 {
01950 send_string(sockfd, env->session->db_handle->dbuser);
01951 return DRMS_SUCCESS;
01952 }
01953
01954
01955
01956
01957
01958 int drms_server_setretention(DRMS_Env_t *env, int sockfd)
01959 {
01960 int16_t newRetention = -1;
01961 int num = -1;
01962 int status;
01963 long long *sunums = NULL;
01964 int isu;
01965
01966 newRetention = Readshort(sockfd);
01967 num = Readint(sockfd);
01968 sunums = (long long *)malloc(num * sizeof(long long));
01969
01970 if (!sunums)
01971 {
01972 fprintf(stderr, "drms_server_setretention(): out of memory.\n");
01973 return DRMS_ERROR_OUTOFMEMORY;
01974 }
01975
01976 for (isu = 0; isu < num; isu++)
01977 {
01978 sunums[isu] = Readlonglong(sockfd);
01979 }
01980
01981 status = drms_su_setretention(env, newRetention, num, sunums);
01982 Writeint(sockfd, status);
01983
01984 return status;
01985 }
01986
01987 int drms_server_session_setwrite(DRMS_Env_t *env, int sockfd)
01988 {
01989 int status = DRMS_SUCCESS;
01990
01991 status = drms_session_setwrite(env);
01992 Writeint(sockfd, status);
01993
01994 return status;
01995 }
01996
01997
01998 int drms_server_alloc_recnum(DRMS_Env_t *env, int sockfd)
01999 {
02000 int status, n, i;
02001 char *series;
02002 DRMS_RecLifetime_t lifetime;
02003 long long *recnums;
02004 int tmp;
02005 struct iovec vec[3];
02006
02007
02008 series = receive_string(sockfd);
02009 n = Readint(sockfd);
02010 lifetime = (DRMS_RecLifetime_t) Readint(sockfd);
02011
02012 if ((recnums = db_sequence_getnext_n(env->session->db_handle, series, n)) == NULL)
02013 {
02014 status = 1;
02015 Writeint(sockfd, status);
02016 }
02017 else
02018 {
02019 status = 0;
02020
02021
02022 if (lifetime == DRMS_TRANSIENT)
02023 {
02024 drms_lock_server(env);
02025 drms_server_transient_records(env, series, n, recnums);
02026 drms_unlock_server(env);
02027 }
02028
02029
02030
02031 tmp = htonl(status);
02032 vec[0].iov_len = sizeof(int);
02033 vec[0].iov_base = &tmp;
02034 for (i=0; i<n; i++)
02035 recnums[i] = htonll( recnums[i] );
02036 vec[1].iov_len = n*sizeof(long long);
02037 vec[1].iov_base = recnums;
02038 Writevn(sockfd, vec, 2);
02039 free(recnums);
02040 }
02041 free(series);
02042 return status;
02043 }
02044
02045 void drms_server_transient_records(DRMS_Env_t *env, char *series, int n, long long *recnums) {
02046 DS_node_t *ds=(DS_node_t *) NULL;
02047
02048 if (!env->templist)
02049 {
02050 env->templist = ds = malloc(sizeof(DS_node_t));
02051 XASSERT(env->templist);
02052 ds->series = strdup(series);
02053 ds->nmax = 512;
02054 ds->n = 0;
02055 ds->recnums = malloc(ds->nmax*sizeof(long long));
02056 XASSERT(ds->recnums);
02057 ds->next = NULL;
02058 }
02059 else
02060 {
02061 if (ds == (DS_node_t *) NULL) ds = env->templist;
02062 while(1)
02063 {
02064 if (!strcmp(ds->series,series))
02065 break;
02066 else
02067 {
02068 if (!ds->next)
02069 {
02070 ds->next = malloc(sizeof(DS_node_t));
02071 XASSERT(ds->next);
02072 ds = ds->next;
02073 ds->series = strdup(series);
02074 ds->nmax = 512;
02075 ds->n = 0;
02076 ds->recnums = malloc(ds->nmax*sizeof(long long));
02077 XASSERT(ds->recnums);
02078 ds->next = NULL;
02079 break;
02080 }
02081 ds = ds->next;
02082 }
02083 }
02084 }
02085 if ( ds->n+n >= ds->nmax )
02086 {
02087 ds->nmax *= 2;
02088 ds->recnums = realloc(ds->recnums, ds->nmax*sizeof(long long));
02089 XASSERT(ds->recnums);
02090 }
02091 for (int i=0; i<n; i++)
02092 ds->recnums[(ds->n)++] = recnums[i];
02093 }
02094
02095
02096
02097 int drms_server_slot_setstate(DRMS_Env_t *env, int sockfd)
02098 {
02099 int status, state;
02100 char *series;
02101 int slotnum;
02102 long long sunum;
02103
02104 series = receive_string(sockfd);
02105 sunum = Readlonglong(sockfd);
02106 slotnum = Readint(sockfd);
02107 state = Readint(sockfd);
02108 if (state == DRMS_SLOT_FREE)
02109 status = drms_su_freeslot(env, series, sunum, slotnum);
02110 else
02111 status = (drms_su_markslot(env, series, sunum, slotnum, &state) != NULL);
02112 Writeint(sockfd,status);
02113 free(series);
02114 return status;
02115 }
02116
02117
02118
02119
02120
02121 int drms_server_newseries(DRMS_Env_t *env, int sockfd)
02122 {
02123 char *series;
02124 DRMS_Record_t *template;
02125
02126 series = receive_string(sockfd);
02127
02128
02129
02130 template = (DRMS_Record_t *)hcon_allocslot_lower(&env->series_cache, series);
02131 memset(template,0,sizeof(DRMS_Record_t));
02132 template->init = 0;
02133
02134 free(series);
02135 return 0;
02136 }
02137
02138
02139
02140
02141 int drms_server_dropseries(DRMS_Env_t *env, int sockfd)
02142 {
02143 char *series_lower;
02144 char *tn;
02145 DRMS_Array_t *vec = NULL;
02146 int nrows = -1;
02147 int irow = -1;
02148 int dims[2];
02149 int status = 0;
02150 int8_t *val = NULL;
02151
02152 series_lower = receive_string(sockfd);
02153 tn = strdup(series_lower);
02154 strtolower(series_lower);
02155 nrows = Readint(sockfd);
02156 dims[0] = 1;
02157 dims[1] = nrows;
02158
02159 vec = drms_array_create(DRMS_TYPE_LONGLONG, 2, dims, NULL, &status);
02160
02161 if (!status)
02162 {
02163 val = (int8_t *)(vec->data);
02164
02165 for (irow = 0; irow < nrows; irow++)
02166 {
02167 *(long long *)val = Readlonglong(sockfd);
02168 val += drms_sizeof(DRMS_TYPE_LONGLONG);
02169 }
02170
02171 drms_server_dropseries_su(env, tn, vec);
02172
02173
02174
02175 hcon_remove(&env->series_cache, series_lower);
02176 }
02177
02178 if (vec)
02179 {
02180 drms_free_array(vec);
02181 }
02182
02183 free(series_lower);
02184 free(tn);
02185 return status;
02186 }
02187
02188
02189
02190 int drms_server_dropseries_su(DRMS_Env_t *env, const char *tn, DRMS_Array_t *array) {
02191 int status = 0;
02192 int drmsstatus = DRMS_SUCCESS;
02193
02194 if (!env->sum_thread) {
02195 if((status = pthread_create(&env->sum_thread, NULL, &drms_sums_thread,
02196 (void *) env))) {
02197 fprintf(stderr,"Thread creation failed: %d\n", status);
02198 return 1;
02199 }
02200 }
02201
02202 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_DELETESUS) && SUMS_USEMTSUMS_DELETESUS
02203 char commentbuf[DRMS_MAXSERIESNAMELEN * 2];
02204 DRMS_MtSumsRequest_t *request = NULL;
02205 DRMS_MtSumsRequest_t *reply = NULL;
02206
02207 request = calloc(1, sizeof(DRMS_MtSumsRequest_t));
02208 XASSERT(request);
02209
02210 request->opcode = DRMS_SUMDELETESERIES;
02211
02212 snprintf(commentbuf, sizeof(commentbuf), "NONE,%s", tn);
02213 request->comment = commentbuf;
02214
02215 tqueueAdd(env->sum_inbox, (long)pthread_self(), (char *)request);
02216 tqueueDel(env->sum_outbox, (long)pthread_self(), (char **)&reply);
02217
02218
02219
02220 if (reply->opcode)
02221 {
02222 fprintf(stderr, "SUM_delete_series() returned with error code '%d'.\n", reply->opcode);
02223 status = DRMS_ERROR_SUMDELETESERIES;
02224 }
02225
02226
02227 if (reply)
02228 {
02229 free(reply);
02230 }
02231 #else
02232
02233
02234 if (array &&
02235 array->naxis == 2 &&
02236 array->axis[0] == 1 &&
02237 array->axis[1] > 0 &&
02238 array->type == DRMS_TYPE_LONGLONG)
02239 {
02240
02241
02242 char *sudir = NULL;
02243 long long sunum = -1;
02244 int lastrow = 0;
02245 int irow = 0;
02246
02247 while (irow < array->axis[1])
02248 {
02249
02250 sunum = drms_su_alloc(env, 1048576, &sudir, NULL, &drmsstatus);
02251
02252 if (!drmsstatus && sunum >= 0 && sudir)
02253 {
02254
02255 char fbuf[PATH_MAX];
02256 snprintf(fbuf, sizeof(fbuf), "%s/%s", sudir, kDELSERFILE);
02257 FILE *fptr = fopen(fbuf, "w");
02258
02259 if (fptr)
02260 {
02261
02262 long long *data = (long long *)array->data;
02263 char obuf[64];
02264
02265 lastrow += kDelSerChunk < array->axis[1] - lastrow ? kDelSerChunk : array->axis[1] - lastrow;
02266
02267
02268 for (; irow < lastrow; irow++)
02269 {
02270 snprintf(obuf, sizeof(obuf), "%lld\n", data[irow]);
02271 fwrite(obuf, strlen(obuf), 1, fptr);
02272 }
02273
02274 if (!fclose(fptr))
02275 {
02276
02277
02278
02279
02280
02281 DRMS_SumRequest_t *putreq = NULL;
02282 DRMS_SumRequest_t *putrep = NULL;
02283
02284 putreq = malloc(sizeof(DRMS_SumRequest_t));
02285 XASSERT(putreq);
02286 memset(putreq, 0, sizeof(DRMS_SumRequest_t));
02287 putreq->opcode = DRMS_SUMPUT;
02288 putreq->dontwait = 0;
02289 putreq->reqcnt = 1;
02290 putreq->dsname = "deleteseriestemp";
02291 putreq->group = 0;
02292 putreq->mode = TEMP + TOUCH;
02293 putreq->tdays = 2;
02294 putreq->sunum[0] = sunum;
02295 putreq->sudir[0] = sudir;
02296 putreq->comment = NULL;
02297
02298
02299 XASSERT(env->sum_thread);
02300
02301
02302 tqueueAdd(env->sum_inbox, (long) pthread_self(), (char *)putreq);
02303
02304
02305 tqueueDel(env->sum_outbox, (long) pthread_self(), (char **)&putrep);
02306
02307 if (putrep->opcode != 0)
02308 {
02309 fprintf(stderr, "ERROR in drms_server_dropseries_su(): SUM PUT failed with "
02310 "error code %d.\n", putrep->opcode);
02311 status = DRMS_ERROR_SUMPUT;
02312 }
02313
02314 if (putrep)
02315 {
02316 free(putrep);
02317 }
02318
02319 if (!status)
02320 {
02321 char sunumstr[64];
02322 char commentbuf[DRMS_MAXPATHLEN * 2];
02323 DRMS_SumRequest_t *request = NULL;
02324 DRMS_SumRequest_t *reply = NULL;
02325
02326 request = calloc(1, sizeof(DRMS_SumRequest_t));
02327 XASSERT(request);
02328
02329 snprintf(sunumstr, sizeof(sunumstr), "%lld", sunum);
02330 request->opcode = DRMS_SUMDELETESERIES;
02331
02332 snprintf(commentbuf, sizeof(commentbuf), "%s,%s", fbuf, tn);
02333 request->comment = commentbuf;
02334
02335 tqueueAdd(env->sum_inbox, (long)pthread_self(), (char *)request);
02336 tqueueDel(env->sum_outbox, (long)pthread_self(), (char **)&reply);
02337
02338
02339
02340 if (reply->opcode)
02341 {
02342 fprintf(stderr,
02343 "SUM_delete_series() returned with error code '%d'.\n",
02344 reply->opcode);
02345 status = DRMS_ERROR_SUMDELETESERIES;
02346 }
02347
02348
02349 if (reply)
02350 {
02351 free(reply);
02352 }
02353 }
02354 }
02355 else
02356 {
02357
02358 fprintf(stderr, "Couldn't close file '%s'.\n", fbuf);
02359 status = DRMS_ERROR_FILECREATE;
02360 }
02361 }
02362 else
02363 {
02364
02365 fprintf(stderr, "Couldn't open file '%s'.\n", fbuf);
02366 status = DRMS_ERROR_FILECREATE;
02367 }
02368
02369 free(sudir);
02370 }
02371 else
02372 {
02373 fprintf(stderr, "SUMALLOC failed in drms_server_dropseries_su().\n");
02374 status = DRMS_ERROR_SUMALLOC;
02375 }
02376
02377 }
02378 }
02379 else
02380 {
02381 status = DRMS_ERROR_INVALIDDATA;
02382 fprintf(stderr, "Unexpected array passed to drms_server_dropseries_su().\n");
02383 }
02384 #endif
02385 return status;
02386 }
02387
02388 void drms_lock_server(DRMS_Env_t *env)
02389 {
02390 if ( env->drms_lock == NULL )
02391 return;
02392 else
02393 {
02394 pthread_mutex_lock( env->drms_lock );
02395 }
02396 }
02397
02398 void drms_unlock_server(DRMS_Env_t *env)
02399 {
02400 if ( env->drms_lock == NULL )
02401 return;
02402 else
02403 {
02404 pthread_mutex_unlock( env->drms_lock );
02405 }
02406 }
02407
02408
02409 int drms_trylock_server(DRMS_Env_t *env)
02410 {
02411 if (env->drms_lock == NULL)
02412 {
02413 return 0;
02414 }
02415 else
02416 {
02417 return pthread_mutex_trylock(env->drms_lock);
02418 }
02419 }
02420
02421 long long drms_server_gettmpguid(int *sockfd)
02422 {
02423 static long long GUID = 1;
02424
02425 if (sockfd)
02426 {
02427 Writelonglong(*sockfd, GUID);
02428 }
02429
02430 GUID++;
02431 return GUID - 1;
02432 }
02433
02434
02435
02436 void drms_delete_temporaries(DRMS_Env_t *env)
02437 {
02438 int i, status;
02439 char *command, *p;
02440 DS_node_t *ds;
02441
02442 XASSERT(env->session->db_direct==1);
02443
02444 ds = env->templist;
02445 while (ds)
02446 {
02447 if (ds->n>0)
02448 {
02449 command = malloc(strlen(ds->series) + 40 + 21*ds->n);
02450 XASSERT(command);
02451 p = command;
02452 p += sprintf(p, "delete from %s where recnum in (",ds->series);
02453 for (i=0; i<ds->n-1; i++)
02454 p += sprintf(p, "%lld,",ds->recnums[i]);
02455 p += sprintf(p, "%lld)",ds->recnums[ds->n-1]);
02456 #ifdef DEBUG
02457 printf("drms_delete_temporaries: command = \n%s\n",command);
02458 #endif
02459 status = drms_dms(env->session, NULL, command);
02460 if (status)
02461 {
02462 fprintf(stderr,"ERROR in drms_delete_temporaries: drms_dms failed "
02463 "with status=%d\n",status);
02464 free(command);
02465 return;
02466 }
02467 free(command);
02468 }
02469 ds = ds->next;
02470 }
02471 }
02472
02473
02474
02475 static void SigPipeHndlr(int signum)
02476 {
02477 gGotPipe = 1;
02478 }
02479
02480
02481
02482
02483
02484
02485
02486
02487
02488 void *drms_sums_thread(void *arg)
02489 {
02490 int status;
02491 DRMS_Env_t *env;
02492 SUM_t *sum=NULL;
02493
02494
02495
02496
02497 void *request = NULL;
02498 void *reply = NULL;
02499
02500 long stop, empty;
02501 int connected = 0;
02502 char *ptmp;
02503 TIMER_t *timer = NULL;
02504 int tryagain;
02505 int sleepiness;
02506 int tryconnect;
02507 int shuttingdown;
02508 sem_t *sdsem = drms_server_getsdsem();
02509 int sumscallret;
02510 int rv;
02511 int mtRequest = 0;
02512 int opcode;
02513 int dontwait;
02514 int reqcnt;
02515
02516
02517 env = (DRMS_Env_t *) arg;
02518
02519
02520
02521
02522
02523 if( (status = pthread_sigmask(SIG_BLOCK, &env->signal_mask, NULL)))
02524 {
02525 fprintf(stderr,"pthread_sigmask call failed with status = %d\n", status);
02526 Exit(1);
02527 }
02528
02529
02530
02531 struct sigaction nact;
02532
02533 memset(&nact, 0, sizeof(struct sigaction));
02534 nact.sa_handler = SigPipeHndlr;
02535 nact.sa_flags = SA_RESTART;
02536
02537
02538
02539 sigaction(SIGPIPE, &nact, NULL);
02540
02541 #ifdef DEBUG
02542 printf("drms_sums_thread started.\n");
02543 fflush(stdout);
02544 #endif
02545
02546 if (!gSUMSbusyMtx)
02547 {
02548 gSUMSbusyMtx = malloc(sizeof(pthread_mutex_t));
02549 XASSERT(gSUMSbusyMtx);
02550 pthread_mutex_init(gSUMSbusyMtx, NULL);
02551 }
02552
02553
02554 stop = 0;
02555 empty = 0;
02556 tryconnect = 1;
02557
02558 while ( !stop || !empty)
02559 {
02560
02561
02562
02563
02564
02565
02566
02567
02568
02569
02570
02571
02572
02573
02574
02575
02576
02577
02578 env->sum_tag = 0;
02579 rv = tqueueDelAny(env->sum_inbox, &env->sum_tag, &ptmp );
02580
02581 if (stop == 1)
02582 {
02583 empty = rv;
02584 }
02585
02586
02587
02588 opcode = ((DRMS_SumRequest_t *)ptmp)->opcode;
02589
02590 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS
02591
02592 mtRequest = IsMTSums(opcode);
02593
02594 if (mtRequest)
02595 {
02596 request = (DRMS_MtSumsRequest_t *)ptmp;
02597 dontwait = ((DRMS_MtSumsRequest_t *)request)->dontwait;
02598 reqcnt = ((DRMS_MtSumsRequest_t *)request)->reqcnt;
02599 }
02600 else
02601 {
02602 #endif
02603
02604 mtRequest = 0;
02605
02606 request = (DRMS_SumRequest_t *)ptmp;
02607 dontwait = ((DRMS_SumRequest_t *)request)->dontwait;
02608 reqcnt = ((DRMS_SumRequest_t *)request)->reqcnt;
02609
02610 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS
02611 }
02612 #endif
02613
02614 if (env->verbose)
02615 {
02616 printf("sums thread: Got request %d from thread %ld.\n", opcode, env->sum_tag);
02617 }
02618
02619 if (tryconnect && !connected && opcode!=DRMS_SUMCLOSE)
02620 {
02621 int firstSumOpen = 1;
02622
02623
02624 tryagain = 1;
02625 sleepiness = 1;
02626
02627 while (tryagain)
02628 {
02629 tryagain = 0;
02630
02631 if (!sum)
02632 {
02633
02634 if (env->verbose)
02635 {
02636 timer = CreateTimer();
02637 }
02638
02639 if (env->verbose)
02640 {
02641 printf("sums thread: No SUMS connection, calling SUM_open(). First SUM_open()?: %d.\n", firstSumOpen);
02642 }
02643
02644 sumscallret = MakeSumsCall(env, DRMS_SUMOPEN, &sum, printkerr, NULL, NULL);
02645
02646 firstSumOpen = 0;
02647
02648 if (sumscallret == kBrokenPipe || sumscallret == kSUMSDead || sumscallret == kTooManySumsOpen)
02649 {
02650
02651 if (env->verbose)
02652 {
02653 printf("sums thread: SUM_open() failed. MakeSumsCall() returned %d.\n", sumscallret);
02654 }
02655
02656 sum = NULL;
02657 }
02658
02659 if (env->verbose && timer)
02660 {
02661 fprintf(stdout, "to call SUM_open: %f seconds.\n", GetElapsedTime(timer));
02662 DestroyTimer(&timer);
02663 }
02664
02665 if (!sum)
02666 {
02667 if (env->loopconn && sumscallret != kSUMSDead && sumscallret != kTooManySumsOpen)
02668 {
02669 fprintf(stderr, "Failed to connect to SUMS; trying again in %d seconds.\n", sleepiness);
02670 tryagain = 1;
02671 sleep(sleepiness);
02672 GettingSleepier(&sleepiness);
02673 }
02674 else
02675 {
02676
02677
02678 stop = 1;
02679 tryconnect = 0;
02680 empty = tqueueCork(env->sum_inbox);
02681
02682 fprintf(stderr,"Failed to connect to SUMS; terminating.\n");
02683 fflush(stdout);
02684
02685 sleep(1);
02686 pthread_kill(env->signal_thread, SIGTERM);
02687
02688
02689 }
02690 }
02691 else
02692 {
02693 connected = 1;
02694 }
02695 }
02696
02697
02698 if (sdsem)
02699 {
02700 sem_wait(sdsem);
02701 shuttingdown = (drms_server_getsd() != kSHUTDOWN_UNINITIATED);
02702 sem_post(sdsem);
02703 }
02704
02705 if (shuttingdown)
02706 {
02707 if (env->verbose)
02708 {
02709 printf("sums thread: Shutting down.\n");
02710 }
02711
02712 tryagain = 0;
02713 tryconnect = 0;
02714 }
02715
02716 }
02717
02718 #ifdef DEBUG
02719 printf("drms_sums_thread connected to SUMS. SUMID = %llu\n",sum->uid);
02720 fflush(stdout);
02721 #endif
02722 }
02723
02724
02725 if (opcode == DRMS_SUMCLOSE)
02726 {
02727 if (env->verbose)
02728 {
02729 printf("sums thread: DRMS_SUMCLOSE requested.\n");
02730 }
02731
02732 if (!stop)
02733 {
02734 if (env->verbose)
02735 {
02736 printf("sums thread: stopping request loop.\n");
02737 }
02738
02739
02740
02741 stop = 1;
02742
02743
02744
02745
02746
02747
02748
02749
02750
02751
02752
02753
02754
02755 empty = tqueueCork(env->sum_inbox);
02756
02757
02758 }
02759 }
02760 else if (opcode == DRMS_SUMABORT)
02761 {
02762 if (env->verbose)
02763 {
02764 printf("sums thread: DRMS_SUMABORT requested.\n");
02765 }
02766
02767 break;
02768 }
02769 else
02770 {
02771 int requestID = opcode;
02772
02773 if (env->verbose)
02774 {
02775 printf("sums thread: Processing a non-close/abort DRMS request. request (ID): %d\n", requestID);
02776 }
02777
02778 if (connected)
02779 {
02780 if (env->verbose)
02781 {
02782 printf("sums thread: Already connected to SUMS.\n");
02783 }
02784
02785
02786
02787
02788
02789
02790 reply = drms_process_sums_request(env, &sum, (DRMS_SumRequest_t *)request, opcode, mtRequest);
02791 if (!reply)
02792 {
02793 fprintf(stderr, "sums thread: drms_process_sums_request() returned NULL. Thread making the request (ID): %ld. Request (ID): %d\n", env->sum_tag, requestID);
02794 }
02795
02796 XASSERT(reply);
02797
02798 }
02799 else
02800 {
02801 if (env->verbose)
02802 {
02803 printf("sums thread: Not connected to SUMS. Replying to main thread with a DRMS_ERROR_SUMOPEN code.\n");
02804 }
02805
02806
02807 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS
02808 if (mtRequest)
02809 {
02810 reply = calloc(1, sizeof(DRMS_MtSumsRequest_t));
02811 XASSERT(reply);
02812 ((DRMS_MtSumsRequest_t *)reply)->opcode = DRMS_ERROR_SUMOPEN;
02813 }
02814 else
02815 {
02816 #endif
02817 reply = malloc(sizeof(DRMS_SumRequest_t));
02818 XASSERT(reply);
02819 ((DRMS_SumRequest_t *)reply)->opcode = DRMS_ERROR_SUMOPEN;
02820 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS
02821 }
02822 #endif
02823 }
02824
02825 if (reply)
02826 {
02827 if (env->verbose)
02828 {
02829
02830 printf("sums thread: A reply was returned from SUMS, return code %d.\n", ((DRMS_SumRequest_t *)reply)->opcode);
02831 }
02832
02833 if (!dontwait)
02834 {
02835
02836 if (env->verbose)
02837 {
02838 printf("sums thread: Requestor wants the reply, queueing reply.\n");
02839 }
02840
02841
02842 tqueueAdd(env->sum_outbox, env->sum_tag, (char *) reply);
02843 }
02844 else
02845 {
02846 if (env->verbose)
02847 {
02848 printf("sums thread: Requestor does not want the reply.\n");
02849 }
02850
02851
02852
02853 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS
02854 if (mtRequest)
02855 {
02856 if (((DRMS_MtSumsRequest_t *)reply)->sunum)
02857 {
02858 free(((DRMS_MtSumsRequest_t *)reply)->sunum);
02859 ((DRMS_MtSumsRequest_t *)reply)->sunum = NULL;
02860 }
02861
02862 for (int i = 0; i < reqcnt; i++)
02863 {
02864 if ((((DRMS_MtSumsRequest_t *)reply)->sudir)[i])
02865 {
02866 free((((DRMS_MtSumsRequest_t *)reply)->sudir)[i]);
02867 }
02868 }
02869
02870 if (((DRMS_MtSumsRequest_t *)reply)->sudir)
02871 {
02872 free(((DRMS_MtSumsRequest_t *)reply)->sudir);
02873 ((DRMS_MtSumsRequest_t *)reply)->sudir = NULL;
02874 }
02875 }
02876 else
02877 {
02878 #endif
02879 for (int i = 0; i < reqcnt; i++)
02880 {
02881 if ((((DRMS_SumRequest_t *)reply)->sudir)[i])
02882 {
02883 free((((DRMS_SumRequest_t *)reply)->sudir)[i]);
02884 }
02885 }
02886 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS
02887 }
02888 #endif
02889
02890 free(reply);
02891 }
02892 }
02893 else
02894 {
02895 if (env->verbose)
02896 {
02897 printf("sums thread: A reply was NOT returned from SUMS.\n");
02898 }
02899
02900 if (!dontwait)
02901 {
02902
02903 fprintf(stderr, "ERROR: The client has requested a reply from SUMS, but no such reply is available.\n");
02904 }
02905 }
02906
02907 env->sum_tag = 0;
02908 }
02909
02910
02911
02912 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS
02913 if (mtRequest)
02914 {
02915 if (((DRMS_MtSumsRequest_t *)request)->sunum)
02916 {
02917
02918
02919 free(((DRMS_MtSumsRequest_t *)request)->sunum);
02920 ((DRMS_MtSumsRequest_t *)request)->sunum = NULL;
02921 }
02922
02923 if (((DRMS_MtSumsRequest_t *)request)->sudir)
02924 {
02925
02926 free(((DRMS_MtSumsRequest_t *)request)->sudir);
02927 ((DRMS_MtSumsRequest_t *)request)->sudir = NULL;
02928 }
02929 }
02930 #endif
02931
02932 free(request);
02933 }
02934
02935 if (connected && sum)
02936 {
02937
02938 sumscallret = MakeSumsCall(env, DRMS_SUMCLOSE, &sum, printkerr);
02939 if (sumscallret == kBrokenPipe)
02940 {
02941 fprintf(stderr, "Unable to call SUM_close(); broken pipe; not retrying.\n");
02942 }
02943 }
02944
02945 if (gSUMSbusyMtx)
02946 {
02947 pthread_mutex_destroy(gSUMSbusyMtx);
02948 }
02949
02950 return NULL;
02951 }
02952
02953 int SUMExptErr(const char *fmt, ...)
02954 {
02955 char string[4096];
02956
02957 va_list ap;
02958 va_start(ap, fmt);
02959 vsnprintf(string, sizeof(string), fmt, ap);
02960 va_end (ap);
02961 return fprintf(stderr, "%s", string);
02962 }
02963
02964 static int IsSgPending(SUMID_t id, uint64_t *sunums, int nsunums)
02965 {
02966 int ans;
02967 int isunum;
02968 char idstr[32];
02969 char sunumstr[32];
02970 HContainer_t *idH = NULL;
02971
02972 ans = 0;
02973
02974 if (gSgPending)
02975 {
02976 snprintf(idstr, sizeof(idstr), "%u", id);
02977 if ((idH = hcon_lookup(gSgPending, idstr)) != NULL)
02978 {
02979 for (isunum = 0; idH && isunum < nsunums; isunum++)
02980 {
02981 snprintf(sunumstr, sizeof(sunumstr), "%llu", (unsigned long long)sunums[isunum]);
02982 if (hcon_member(idH, sunumstr))
02983 {
02984 fprintf(stderr, "A SUM_get() request for the SU identified by SUNUM %s is pending.\n", sunumstr);
02985 ans = 1;
02986 break;
02987 }
02988 }
02989 }
02990 }
02991
02992 return ans;
02993 }
02994
02995 static int SetSgPending(SUMID_t id, uint64_t *sunums, int nsunums)
02996 {
02997 int err;
02998 int isunum;
02999 char idstr[32];
03000 char sunumstr[32];
03001 HContainer_t *idH = NULL;
03002
03003 err = 0;
03004
03005
03006 snprintf(idstr, sizeof(idstr), "%u", id);
03007 if (gSgPending == NULL)
03008 {
03009 err = ((gSgPending = hcon_create(sizeof(HContainer_t), sizeof(idstr), NULL, NULL, NULL, NULL, 0)) == NULL);
03010 }
03011 else
03012 {
03013 idH = hcon_lookup(gSgPending, idstr);
03014 }
03015
03016 if (!err)
03017 {
03018 if (!idH)
03019 {
03020
03021 err = ((idH = hcon_create(sizeof(char), sizeof(sunumstr), NULL, NULL, NULL, NULL, 0)) == NULL);
03022
03023 if (!err)
03024 {
03025
03026 err = hcon_insert(gSgPending, idstr, gSgPending);
03027 }
03028 }
03029 }
03030
03031 if (!err)
03032 {
03033 char val = 't';
03034
03035 for (isunum = 0; isunum < nsunums; isunum++)
03036 {
03037 snprintf(sunumstr, sizeof(sunumstr), "%llu", (unsigned long long)sunums[isunum]);
03038
03039
03040 if (hcon_member(idH, sunumstr))
03041 {
03042 fprintf(stderr, "Cannot flag this SU (sunum %s) as pending - it is already pending.\n", sunumstr);
03043 err = 1;
03044 break;
03045 }
03046
03047 err = hcon_insert(idH, sunumstr, &val);
03048 }
03049 }
03050
03051 return err;
03052 }
03053
03054 static int UnsetSgPending(SUMID_t id)
03055 {
03056 int err;
03057 char idstr[32];
03058
03059 err = 0;
03060
03061 if (gSgPending)
03062 {
03063
03064 snprintf(idstr, sizeof(idstr), "%u", id);
03065 hcon_remove(gSgPending, idstr);
03066
03067 if (hcon_member(gSgPending, idstr))
03068 {
03069 err = 1;
03070 }
03071
03072 if (!err)
03073 {
03074 if (hcon_size(gSgPending) == 0)
03075 {
03076 hcon_destroy(&gSgPending);
03077
03078 if (gSgPending)
03079 {
03080 err = 1;
03081 }
03082 }
03083 }
03084 }
03085
03086 return err;
03087 }
03088
03089 static DRMS_SumRequest_t *drms_process_sums_request(DRMS_Env_t *env,
03090 SUM_t **suminout,
03091 DRMS_SumRequest_t *request,
03092 int opcode,
03093 int mtRequest)
03094 {
03095 int i;
03096 DRMS_SumRequest_t *reply = NULL;
03097 int shuttingdown = 0;
03098 sem_t *sdsem = drms_server_getsdsem();
03099 SUM_t *sum = NULL;
03100 int tryagain;
03101 int nosums;
03102
03103 int sumscrashed;
03104 int sleepiness;
03105 int sumnoop;
03106 int sumscallret;
03107 int maxNoSus;
03108
03109 if (suminout && *suminout)
03110 {
03111 sum = *suminout;
03112 }
03113
03114 if (!sum)
03115 {
03116 fprintf(stderr , "Error in drms_process_sums_request(): No SUMS connection.\n");
03117 return NULL;
03118 }
03119
03120 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS
03121 if (mtRequest)
03122 {
03123 reply = calloc(1, sizeof(DRMS_MtSumsRequest_t));
03124 maxNoSus = MAX_MTSUMS_NSUS;
03125 }
03126 else
03127 {
03128 #endif
03129 reply = malloc(sizeof(DRMS_SumRequest_t));
03130 maxNoSus = MAXSUMREQCNT;
03131 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS
03132 }
03133 #endif
03134
03135 XASSERT(reply);
03136
03137 switch(opcode)
03138 {
03139 case DRMS_SUMALLOC:
03140 {
03141 int replyOp = 0;
03142 #ifdef DEBUG
03143 printf("Processing SUMALLOC request.\n");
03144 #endif
03145
03146 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_ALLOC) && SUMS_USEMTSUMS_ALLOC
03147 if (((DRMS_MtSumsRequest_t *)request)->reqcnt != 1)
03148 {
03149 fprintf(stderr, "SUM thread: Invalid reqcnt (%d) in SUMALLOC request.\n", ((DRMS_MtSumsRequest_t *)request)->reqcnt);
03150 ((DRMS_MtSumsRequest_t *)reply)->opcode = DRMS_ERROR_SUMALLOC;
03151 break;
03152 }
03153 #else
03154 if (request->reqcnt != 1)
03155 {
03156 fprintf(stderr, "SUM thread: Invalid reqcnt (%d) in SUMALLOC request.\n", request->reqcnt);
03157 reply->opcode = DRMS_ERROR_SUMALLOC;
03158 break;
03159 }
03160 #endif
03161
03162 nosums = 0;
03163 tryagain = 1;
03164 sleepiness = 1;
03165
03166 while (tryagain)
03167 {
03168 tryagain = 0;
03169 sumscrashed = 0;
03170
03171 if (!sum)
03172 {
03173
03174 sumscallret = MakeSumsCall(env, DRMS_SUMOPEN, &sum, printkerr, NULL, NULL);
03175 if (sumscallret == kBrokenPipe)
03176 {
03177
03178 sum = NULL;
03179 }
03180 else if (sumscallret == kSUMSDead)
03181 {
03182 sum = NULL;
03183 nosums = 1;
03184 replyOp = sumscallret;
03185 break;
03186 }
03187
03188 if (!sum)
03189 {
03190 fprintf(stderr, "Failed to connect to SUMS; trying again in %d seconds.\n", sleepiness);
03191 tryagain = 1;
03192 sleep(sleepiness);
03193 GettingSleepier(&sleepiness);
03194 continue;
03195 }
03196 else
03197 {
03198 *suminout = sum;
03199 }
03200 }
03201
03202 sum->reqcnt = 1;
03203
03204 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_ALLOC) && SUMS_USEMTSUMS_ALLOC
03205 sum->bytes = ((DRMS_MtSumsRequest_t *)request)->bytes;
03206 sum->group = ((DRMS_MtSumsRequest_t *)request)->group;
03207 #else
03208 sum->bytes = request->bytes;
03209 sum->group = request->group;
03210 #endif
03211
03212
03213
03214 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_ALLOC) && SUMS_USEMTSUMS_ALLOC
03215 if (((DRMS_MtSumsRequest_t *)request)->group < 0)
03216 {
03217
03218 sum->storeset = 0;
03219 }
03220 else
03221 {
03222 sum->storeset = ((DRMS_MtSumsRequest_t *)request)->group / kExtTapegroupSlot;
03223 }
03224 #else
03225 if (request->group < 0)
03226 {
03227
03228 sum->storeset = 0;
03229 }
03230 else
03231 {
03232 sum->storeset = request->group / kExtTapegroupSlot;
03233 }
03234 #endif
03235
03236 if (sum->storeset > kExtTapegroupMaxStoreset)
03237 {
03238 fprintf(stderr, "SUM thread: storeset '%d' out of range.\n", sum->storeset);
03239 replyOp = DRMS_ERROR_SUMALLOC;
03240 nosums = 1;
03241 break;
03242 }
03243
03244
03245
03246
03247
03248 replyOp = MakeSumsCall(env, DRMS_SUMALLOC, &sum, printkerr);
03249
03250 if (replyOp != 0)
03251 {
03252 sumscrashed = (replyOp == 4 || replyOp == kBrokenPipe);
03253 if (sumscrashed)
03254 {
03255
03256 sum = NULL;
03257 fprintf(stderr, "SUMS no longer there; trying again in %d seconds.\n", sleepiness);
03258 tryagain = 1;
03259 sleep(sleepiness);
03260 GettingSleepier(&sleepiness);
03261 }
03262 else
03263 {
03264 fprintf(stderr,"SUM thread: SUM_alloc RPC call failed with error code %d\n", replyOp);
03265 nosums = 1;
03266 break;
03267 }
03268 }
03269 }
03270
03271 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_ALLOC) && SUMS_USEMTSUMS_ALLOC
03272 ((DRMS_MtSumsRequest_t *)reply)->opcode = replyOp;
03273
03274 if (!nosums)
03275 {
03276 ((DRMS_MtSumsRequest_t *)reply)->sunum = calloc(1, sizeof(uint64_t));
03277 ((DRMS_MtSumsRequest_t *)reply)->sudir = calloc(1, sizeof(char *));
03278
03279 ((DRMS_MtSumsRequest_t *)reply)->sunum[0] = sum->dsix_ptr[0];
03280 ((DRMS_MtSumsRequest_t *)reply)->sudir[0] = strdup(sum->wd[0]);
03281 free(sum->wd[0]);
03282 #ifdef DEBUG
03283 printf("SUM_alloc returned sunum=%llu, sudir=%s.\n", ((DRMS_MtSumsRequest_t *)reply)->sunum[0], ((DRMS_MtSumsRequest_t *)reply)->sudir[0]);
03284 #endif
03285 }
03286 #else
03287 reply->opcode = replyOp;
03288
03289 if (!nosums)
03290 {
03291 reply->sunum[0] = sum->dsix_ptr[0];
03292 reply->sudir[0] = strdup(sum->wd[0]);
03293 free(sum->wd[0]);
03294 #ifdef DEBUG
03295 printf("SUM_alloc returned sunum=%llu, sudir=%s.\n", reply->sunum[0], reply->sudir[0]);
03296 #endif
03297 }
03298 #endif
03299
03300 }
03301 break;
03302
03303 case DRMS_SUMGET:
03304 {
03305 int replyOp = 0;
03306 #ifdef DEBUG
03307 printf("Processing SUMGET request.\n");
03308 #endif
03309
03310 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
03311 if (((DRMS_MtSumsRequest_t *)request)->reqcnt < 1 || ((DRMS_MtSumsRequest_t *)request)->reqcnt > maxNoSus)
03312 {
03313 fprintf(stderr,"SUM thread: Invalid reqcnt (%d) in SUMGET request.\n", ((DRMS_MtSumsRequest_t *)request)->reqcnt);
03314 ((DRMS_MtSumsRequest_t *)reply)->opcode = -5;
03315 break;
03316 }
03317
03318
03319 if (IsSgPending(sum->uid, ((DRMS_MtSumsRequest_t *)request)->sunum, ((DRMS_MtSumsRequest_t *)request)->reqcnt))
03320 {
03321 fprintf(stderr, "Unable to process a SUM_get() request. One or more requested storage units being requested are pending.n");
03322 ((DRMS_MtSumsRequest_t *)reply)->opcode = -2;
03323 break;
03324 }
03325 #else
03326 if (request->reqcnt < 1 || request->reqcnt > maxNoSus)
03327 {
03328 fprintf(stderr,"SUM thread: Invalid reqcnt (%d) in SUMGET request.\n", request->reqcnt);
03329 reply->opcode = -5;
03330 break;
03331 }
03332
03333
03334 if (IsSgPending(sum->uid, request->sunum, request->reqcnt))
03335 {
03336 fprintf(stderr, "Unable to process a SUM_get() request. One or more requested storage units being requested are pending.n");
03337 reply->opcode = -2;
03338 break;
03339 }
03340 #endif
03341
03342
03343 #ifdef DEBUG
03344 printf("SUM thread: calling SUM_get\n");
03345 #endif
03346
03347 nosums = 0;
03348 tryagain = 1;
03349 sleepiness = 1;
03350
03351 while (tryagain)
03352 {
03353 tryagain = 0;
03354 sumscrashed = 0;
03355
03356 if (!sum)
03357 {
03358
03359 sumscallret = MakeSumsCall(env, DRMS_SUMOPEN, &sum, printkerr, NULL, NULL);
03360 if (sumscallret == kBrokenPipe)
03361 {
03362
03363 sum = NULL;
03364 }
03365 else if (sumscallret == kSUMSDead)
03366 {
03367 sum = NULL;
03368 nosums = 1;
03369 replyOp = sumscallret;
03370 break;
03371 }
03372
03373 if (!sum)
03374 {
03375 fprintf(stderr, "Failed to connect to SUMS; trying again in %d seconds.\n", sleepiness);
03376 tryagain = 1;
03377 sleep(sleepiness);
03378 GettingSleepier(&sleepiness);
03379 continue;
03380 }
03381 else
03382 {
03383 *suminout = sum;
03384 }
03385 }
03386
03387 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
03388 sum->reqcnt = ((DRMS_MtSumsRequest_t *)request)->reqcnt;
03389 sum->mode = ((DRMS_MtSumsRequest_t *)request)->mode;
03390 sum->tdays = ((DRMS_MtSumsRequest_t *)request)->tdays;
03391
03392
03393 if (sum->dsix_ptr)
03394 {
03395 free(sum->dsix_ptr);
03396 sum->dsix_ptr = NULL;
03397 }
03398
03399 sum->dsix_ptr = (uint64_t *)calloc(((DRMS_MtSumsRequest_t *)request)->reqcnt, sizeof(uint64_t));
03400
03401 for (i = 0; i < ((DRMS_MtSumsRequest_t *)request)->reqcnt; i++)
03402 {
03403 sum->dsix_ptr[i] = ((DRMS_MtSumsRequest_t *)request)->sunum[i];
03404 }
03405 #else
03406 sum->reqcnt = request->reqcnt;
03407 sum->mode = request->mode;
03408 sum->tdays = request->tdays;
03409
03410
03411 if (sum->dsix_ptr)
03412 {
03413 free(sum->dsix_ptr);
03414 sum->dsix_ptr = NULL;
03415 }
03416
03417 sum->dsix_ptr = (uint64_t *)calloc(SUMARRAYSZ, sizeof(uint64_t));
03418
03419 for (i=0; i<request->reqcnt; i++)
03420 {
03421 sum->dsix_ptr[i] = request->sunum[i];
03422 }
03423 #endif
03424
03425
03426 replyOp = MakeSumsCall(env, DRMS_SUMGET, &sum, printkerr);
03427
03428 #ifdef DEBUG
03429 printf("SUM thread: SUM_get returned %d\n", replyOp);
03430 #endif
03431
03432 if (replyOp == RESULT_PEND)
03433 {
03434
03435
03436
03437 int pollrv = 0;
03438 int naptime = 1;
03439 int nloop = 10;
03440 int maxloop = 21600;
03441
03442
03443 if (SetSgPending(sum->uid, sum->dsix_ptr, sum->reqcnt))
03444 {
03445 replyOp = -3;
03446 nosums = 1;
03447 break;
03448 }
03449
03450
03451 while (1)
03452 {
03453 if (maxloop <= 0)
03454 {
03455
03456 fprintf(stderr, "Tape read has not completed; try again later.\n");
03457 nosums = 1;
03458
03459 drms_lock_server(env);
03460
03461
03462 env->sumssafe = 0;
03463 drms_unlock_server(env);
03464 break;
03465 }
03466
03467 if (sdsem)
03468 {
03469 sem_wait(sdsem);
03470 shuttingdown = (drms_server_getsd() != kSHUTDOWN_UNINITIATED);
03471 sem_post(sdsem);
03472 }
03473
03474 if (shuttingdown)
03475 {
03476 replyOp = 0;
03477 break;
03478 }
03479
03480 if (gSUMSbusyMtx)
03481 {
03482 pthread_mutex_lock(gSUMSbusyMtx);
03483 gSUMSbusy = 1;
03484 pthread_mutex_unlock(gSUMSbusyMtx);
03485 }
03486
03487
03488
03489 if (nloop <= 0)
03490 {
03491 pollrv = SUM_poll(sum);
03492 nloop = 10;
03493 }
03494 else
03495 {
03496 if (gSUMSbusyMtx)
03497 {
03498 pthread_mutex_lock(gSUMSbusyMtx);
03499 gSUMSbusy = 0;
03500 pthread_mutex_unlock(gSUMSbusyMtx);
03501 }
03502
03503 nloop--;
03504 maxloop--;
03505 sleep(1);
03506 continue;
03507 }
03508
03509 if (env->verbose)
03510 {
03511 fprintf(stdout, "SUM_poll() returned %d.\n", pollrv);
03512 }
03513
03514 if (gSUMSbusyMtx)
03515 {
03516 pthread_mutex_lock(gSUMSbusyMtx);
03517 gSUMSbusy = 0;
03518 pthread_mutex_unlock(gSUMSbusyMtx);
03519 }
03520
03521 if (pollrv == 0)
03522 {
03523
03524
03525 if (UnsetSgPending(sum->uid))
03526 {
03527
03528
03529 replyOp = -4;
03530 }
03531 break;
03532 }
03533 else
03534 {
03535
03536
03537
03538
03539
03540
03541
03542
03543
03544
03545
03546
03547 sumnoop = SUM_nop(sum, printkerr);
03548
03549 if (env->verbose)
03550 {
03551 fprintf(stdout, "SUM_nop() returned %d.\n", sumnoop);
03552 }
03553
03554 sumscrashed = (sumnoop == 4);
03555 if (sumnoop >= 4)
03556 {
03557
03558 if (sumscrashed)
03559 {
03560
03561 sum = NULL;
03562 if (UnsetSgPending((sum->uid)))
03563 {
03564
03565
03566 replyOp = -4;
03567 break;
03568 }
03569
03570 fprintf(stderr, "sum_svc no longer there; trying SUM_open() then SUM_get() again in %d seconds.\n", sleepiness);
03571 }
03572 else
03573 {
03574 fprintf(stderr, "Either tape_svc or driveX_svc died; trying SUM_get() again in %d seconds.\n", sleepiness);
03575 }
03576
03577
03578 tryagain = 1;
03579 sleep(sleepiness);
03580 GettingSleepier(&sleepiness);
03581 break;
03582 }
03583 else
03584 {
03585
03586 if (env->verbose)
03587 {
03588 fprintf(stdout, "Tape fetch has not completed, waiting for %d seconds.\n", naptime);
03589 }
03590 sleep(naptime);
03591
03592
03593 }
03594 }
03595 }
03596
03597 if (!shuttingdown && !tryagain)
03598 {
03599 replyOp = pollrv;
03600
03601 if (replyOp || sum->status)
03602 {
03603 fprintf(stderr,"SUM thread: Last SUM_poll call returned error code = %d, sum->status = %d.\n", replyOp, sum->status);
03604 nosums = 1;
03605 break;
03606 }
03607 }
03608 }
03609 else if (replyOp == 1)
03610 {
03611
03612
03613
03614
03615 fprintf(stderr, "SUMS thread: SUM_get() failure. Terminating DRMS module.\n");
03616 nosums = 1;
03617 break;
03618 }
03619 else if (replyOp != 0)
03620 {
03621 sumnoop = SUM_nop(sum, printkerr);
03622 sumscrashed = (sumnoop == 4 || replyOp == kBrokenPipe);
03623 if (sumnoop >= 4 || replyOp == kBrokenPipe)
03624 {
03625 if (sumscrashed)
03626 {
03627 sum = NULL;
03628 fprintf(stderr, "sum_svc no longer there; trying SUM_open() then SUM_get() again in %d seconds.\n", sleepiness);
03629 }
03630 else
03631 {
03632 fprintf(stderr, "Either tape_svc or driveX_svc died; trying SUM_get() again in %d seconds.\n", sleepiness);
03633 }
03634
03635 tryagain = 1;
03636 sleep(sleepiness);
03637 GettingSleepier(&sleepiness);
03638 }
03639 else
03640 {
03641 fprintf(stderr, "SUM thread: SUM_get RPC call failed with error code %d\n", replyOp);
03642 nosums = 1;
03643 break;
03644 }
03645 }
03646 }
03647
03648 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_GET) && SUMS_USEMTSUMS_GET
03649 ((DRMS_MtSumsRequest_t *)reply)->opcode = replyOp;
03650
03651 if (!nosums)
03652 {
03653 ((DRMS_MtSumsRequest_t *)reply)->sudir = calloc(((DRMS_MtSumsRequest_t *)request)->reqcnt, sizeof(char *));
03654
03655 for (i = 0; i < ((DRMS_MtSumsRequest_t *)request)->reqcnt; i++)
03656 {
03657 if (!shuttingdown)
03658 {
03659 ((DRMS_MtSumsRequest_t *)reply)->sudir[i] = strdup(sum->wd[i]);
03660 free(sum->wd[i]);
03661 #ifdef DEBUG
03662 printf("SUM thread: got sudir[%d] = %s = %s\n", i, ((DRMS_MtSumsRequest_t *)reply)->sudir[i], sum->wd[i]);
03663 #endif
03664 }
03665 else
03666 {
03667 ((DRMS_MtSumsRequest_t *)reply)->sudir[i] = strdup("NA (shuttingdown)");
03668 }
03669 }
03670
03671 reply->reqcnt = request->reqcnt;
03672 }
03673 else
03674 {
03675 reply->reqcnt = 0;
03676 }
03677 #else
03678 reply->opcode = replyOp;
03679
03680 if (!nosums)
03681 {
03682 for (i = 0; i < request->reqcnt; i++)
03683 {
03684 if (!shuttingdown)
03685 {
03686 reply->sudir[i] = strdup(sum->wd[i]);
03687 free(sum->wd[i]);
03688 #ifdef DEBUG
03689 printf("SUM thread: got sudir[%d] = %s = %s\n", i, reply->sudir[i], sum->wd[i]);
03690 #endif
03691 }
03692 else
03693 {
03694 reply->sudir[i] = strdup("NA (shuttingdown)");
03695 }
03696 }
03697
03698 reply->reqcnt = request->reqcnt;
03699 }
03700 else
03701 {
03702 reply->reqcnt = 0;
03703 }
03704 #endif
03705
03706
03707 }
03708 break;
03709
03710 case DRMS_SUMPUT:
03711 {
03712 int replyOp = 0;
03713 #ifdef DEBUG
03714 printf("Processing SUMPUT request.\n");
03715 #endif
03716
03717 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_PUT) && SUMS_USEMTSUMS_PUT
03718 if (((DRMS_MtSumsRequest_t *)request)->reqcnt < 1 || ((DRMS_MtSumsRequest_t *)request)->reqcnt > maxNoSus)
03719 {
03720 fprintf(stderr,"SUM thread: Invalid reqcnt (%d) in SUMPUT request.\n", ((DRMS_MtSumsRequest_t *)request)->reqcnt);
03721 ((DRMS_MtSumsRequest_t *)reply)->opcode = DRMS_ERROR_SUMPUT;
03722 break;
03723 }
03724 #else
03725 if (request->reqcnt < 1 || request->reqcnt > maxNoSus)
03726 {
03727 fprintf(stderr,"SUM thread: Invalid reqcnt (%d) in SUMPUT request.\n", request->reqcnt);
03728 reply->opcode = DRMS_ERROR_SUMPUT;
03729 break;
03730 }
03731 #endif
03732
03733 #ifdef DEBUG2
03734 #define LOGDIR "/tmp22/production/"
03735 #define LOGFILE "sumputlog.txt"
03736 struct stat stBuf;
03737 FILE *fptr = NULL;
03738
03739 if (stat(LOGDIR, &stBuf) == 0 && S_ISDIR(stBuf.st_mode))
03740 {
03741 fptr = fopen(LOGDIR"/"LOGFILE, "a");
03742 if (fptr)
03743 {
03744 char tbuf[128];
03745 struct tm *ltime = NULL;
03746 time_t secs = time(NULL);
03747
03748 ltime = localtime(&secs);
03749 strftime(tbuf, sizeof(tbuf), "%Y.%m.%d_%T", ltime);
03750
03751 fprintf(fptr, "SUM_put() call for SUNUMs for series %s at %s:\n", request->dsname, tbuf);
03752 }
03753 }
03754 #endif
03755
03756 tryagain = 1;
03757 sleepiness = 1;
03758
03759 while (tryagain)
03760 {
03761 tryagain = 0;
03762 sumscrashed = 0;
03763
03764 if (!sum)
03765 {
03766
03767 sumscallret = MakeSumsCall(env, DRMS_SUMOPEN, &sum, printkerr, NULL, NULL);
03768 if (sumscallret == kBrokenPipe)
03769 {
03770
03771 sum = NULL;
03772 }
03773 else if (sumscallret == kSUMSDead)
03774 {
03775 sum = NULL;
03776 nosums = 1;
03777 replyOp = sumscallret;
03778 break;
03779 }
03780
03781 if (!sum)
03782 {
03783 fprintf(stderr, "Failed to connect to SUMS; trying again in %d seconds.\n", sleepiness);
03784 tryagain = 1;
03785 sleep(sleepiness);
03786 GettingSleepier(&sleepiness);
03787 continue;
03788 }
03789 else
03790 {
03791 *suminout = sum;
03792 }
03793 }
03794
03795
03796 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_PUT) && SUMS_USEMTSUMS_PUT
03797 sum->dsname = ((DRMS_MtSumsRequest_t *)request)->dsname;
03798 sum->group = ((DRMS_MtSumsRequest_t *)request)->group;
03799 sum->mode = ((DRMS_MtSumsRequest_t *)request)->mode;
03800 sum->tdays = ((DRMS_MtSumsRequest_t *)request)->tdays;
03801 sum->reqcnt = ((DRMS_MtSumsRequest_t *)request)->reqcnt;
03802 sum->history_comment = ((DRMS_MtSumsRequest_t *)request)->comment;
03803
03804
03805 if (sum->dsix_ptr)
03806 {
03807 free(sum->dsix_ptr);
03808 sum->dsix_ptr = NULL;
03809 }
03810
03811 sum->dsix_ptr = (uint64_t *)calloc(((DRMS_MtSumsRequest_t *)request)->reqcnt, sizeof(uint64_t));
03812
03813 if (sum->wd)
03814 {
03815 free(sum->wd);
03816 sum->wd = NULL;
03817 }
03818
03819 sum->wd = (char **)calloc(((DRMS_MtSumsRequest_t *)request)->reqcnt, sizeof(char *));
03820
03821 for (i = 0; i < ((DRMS_MtSumsRequest_t *)request)->reqcnt; i++)
03822 {
03823 sum->dsix_ptr[i] = ((DRMS_MtSumsRequest_t *)request)->sunum[i];
03824 sum->wd[i] = ((DRMS_MtSumsRequest_t *)request)->sudir[i];
03825 #ifdef DEBUG
03826 printf("putting SU with sunum=%lld and sudir='%s' to SUMS.\n", ((DRMS_MtSumsRequest_t *)request)->sunum[i], ((DRMS_MtSumsRequest_t *)request)->sudir[i]);
03827 #endif
03828
03829 #ifdef DEBUG2
03830 if (fptr)
03831 {
03832 fprintf(fptr, " %lld\n", (long long)((DRMS_MtSumsRequest_t *)request)->sunum[i]);
03833 }
03834 #endif
03835 }
03836 #else
03837 sum->dsname = request->dsname;
03838 sum->group = request->group;
03839 sum->mode = request->mode;
03840 sum->tdays = request->tdays;
03841 sum->reqcnt = request->reqcnt;
03842 sum->history_comment = request->comment;
03843
03844
03845 if (sum->dsix_ptr)
03846 {
03847 free(sum->dsix_ptr);
03848 sum->dsix_ptr = NULL;
03849 }
03850
03851 sum->dsix_ptr = (uint64_t *)calloc(SUMARRAYSZ, sizeof(uint64_t));
03852
03853 if (sum->wd)
03854 {
03855 free(sum->wd);
03856 sum->wd = NULL;
03857 }
03858
03859 sum->wd = (char **)calloc(SUMARRAYSZ, sizeof(char *));
03860
03861 for (i=0; i<request->reqcnt; i++)
03862 {
03863 sum->dsix_ptr[i] = request->sunum[i];
03864 sum->wd[i] = request->sudir[i];
03865 #ifdef DEBUG
03866 printf("putting SU with sunum=%lld and sudir='%s' to SUMS.\n",
03867 request->sunum[i],request->sudir[i]);
03868 #endif
03869
03870 #ifdef DEBUG2
03871 if (fptr)
03872 {
03873 fprintf(fptr, " %lld\n", (long long)request->sunum[i]);
03874 }
03875 #endif
03876 }
03877 #endif
03878
03879 replyOp = MakeSumsCall(env, DRMS_SUMPUT, &sum, printkerr);
03880
03881 if (replyOp != 0)
03882 {
03883 sumscrashed = (replyOp == 4 || replyOp == kBrokenPipe);
03884 if (sumscrashed)
03885 {
03886 sum = NULL;
03887 fprintf(stderr, "SUMS no longer there; trying again in %d seconds.\n", sleepiness);
03888 tryagain = 1;
03889 sleep(sleepiness);
03890 GettingSleepier(&sleepiness);
03891 }
03892 else
03893 {
03894 fprintf(stderr, "SUM thread: SUM_put call failed with stat=%d.\n", replyOp);
03895 break;
03896 }
03897 }
03898 }
03899
03900 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_PUT) && SUMS_USEMTSUMS_PUT
03901 ((DRMS_MtSumsRequest_t *)reply)->opcode = replyOp;
03902 #else
03903 reply->opcode = replyOp;
03904 #endif
03905
03906 #ifdef DEBUG2
03907 if (fptr)
03908 {
03909 fclose(fptr);
03910 fptr = NULL;
03911 }
03912 #endif
03913 }
03914 break;
03915 case DRMS_SUMDELETESERIES:
03916 {
03917 const char *intComment = NULL;
03918 int replyOp = 0;
03919
03920 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_DELETESUS) && SUMS_USEMTSUMS_DELETESUS
03921 intComment = ((DRMS_MtSumsRequest_t *)request)->comment;
03922 #else
03923 intComment = request->comment;
03924 #endif
03925
03926 if (intComment)
03927 {
03928
03929 char *comment = strdup(intComment);
03930 char *fpath = NULL;
03931 char *series = NULL;
03932 char *sep = NULL;
03933
03934 if (comment)
03935 {
03936 if ((sep = strchr(comment, ',')) != NULL)
03937 {
03938 *sep = '\0';
03939 fpath = comment;
03940 series = sep + 1;
03941
03942 nosums = 0;
03943 tryagain = 1;
03944 sleepiness = 1;
03945
03946 while (tryagain)
03947 {
03948 tryagain = 0;
03949 sumscrashed = 0;
03950
03951 if (!sum)
03952 {
03953
03954 sumscallret = MakeSumsCall(env, DRMS_SUMOPEN, &sum, printkerr, NULL, NULL);
03955 if (sumscallret == kBrokenPipe)
03956 {
03957
03958 sum = NULL;
03959 }
03960 else if (sumscallret == kSUMSDead)
03961 {
03962 sum = NULL;
03963 nosums = 1;
03964 replyOp = sumscallret;
03965 break;
03966 }
03967
03968 if (!sum)
03969 {
03970 fprintf(stderr, "Failed to connect to SUMS; trying again in %d seconds.\n", sleepiness);
03971 tryagain = 1;
03972 sleep(sleepiness);
03973 GettingSleepier(&sleepiness);
03974 continue;
03975 }
03976 else
03977 {
03978 *suminout = sum;
03979 }
03980 }
03981
03982 replyOp = MakeSumsCall(env, DRMS_SUMDELETESERIES, &sum, printkerr, fpath, series);
03983
03984 if (replyOp != 0)
03985 {
03986 sumscrashed = (replyOp == 4 || replyOp == kBrokenPipe);
03987 if (sumscrashed)
03988 {
03989 sum = NULL;
03990 fprintf(stderr, "SUMS no longer there; trying again in %d seconds.\n", sleepiness);
03991 tryagain = 1;
03992 sleep(sleepiness);
03993 GettingSleepier(&sleepiness);
03994 }
03995 else
03996 {
03997 fprintf(stderr, "SUM thread: SUM_delete_series call failed with stat=%d.\n", replyOp);
03998 nosums = 1;
03999 break;
04000 }
04001 }
04002 }
04003
04004 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_DELETESUS) && SUMS_USEMTSUMS_DELETESUS
04005 ((DRMS_MtSumsRequest_t *)reply)->opcode = replyOp;
04006 #else
04007 reply->opcode = replyOp;
04008 #endif
04009 }
04010
04011 free(comment);
04012 }
04013 }
04014 }
04015 break;
04016 case DRMS_SUMALLOC2:
04017 {
04018
04019
04020
04021 int replyOp = 0;
04022
04023 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_ALLOC) && SUMS_USEMTSUMS_ALLOC
04024 if (((DRMS_MtSumsRequest_t *)request)->reqcnt!=1)
04025 {
04026 fprintf(stderr,"SUM thread: Invalid reqcnt (%d) in SUMALLOC request.\n", ((DRMS_MtSumsRequest_t *)request)->reqcnt);
04027 ((DRMS_MtSumsRequest_t *)reply)->opcode = DRMS_ERROR_SUMALLOC;
04028 break;
04029 }
04030 #else
04031 if (request->reqcnt!=1)
04032 {
04033 fprintf(stderr,"SUM thread: Invalid reqcnt (%d) in SUMALLOC request.\n", request->reqcnt);
04034 reply->opcode = DRMS_ERROR_SUMALLOC;
04035 break;
04036 }
04037 #endif
04038
04039 nosums = 0;
04040 tryagain = 1;
04041 sleepiness = 1;
04042
04043 while (tryagain)
04044 {
04045 tryagain = 0;
04046 sumscrashed = 0;
04047
04048 if (!sum)
04049 {
04050
04051 sumscallret = MakeSumsCall(env, DRMS_SUMOPEN, &sum, printkerr, NULL, NULL);
04052 if (sumscallret == kBrokenPipe)
04053 {
04054
04055 sum = NULL;
04056 }
04057 else if (sumscallret == kSUMSDead)
04058 {
04059 sum = NULL;
04060 nosums = 1;
04061 replyOp = sumscallret;
04062 break;
04063 }
04064
04065 if (!sum)
04066 {
04067 fprintf(stderr, "Failed to connect to SUMS; trying again in %d seconds.\n", sleepiness);
04068 tryagain = 1;
04069 sleep(sleepiness);
04070 GettingSleepier(&sleepiness);
04071 continue;
04072 }
04073 else
04074 {
04075 *suminout = sum;
04076 }
04077 }
04078
04079 sum->reqcnt = 1;
04080
04081 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_ALLOC) && SUMS_USEMTSUMS_ALLOC
04082 sum->bytes = ((DRMS_MtSumsRequest_t *)request)->bytes;
04083 sum->group = ((DRMS_MtSumsRequest_t *)request)->group;
04084
04085
04086
04087
04088
04089
04090
04091 if (((DRMS_MtSumsRequest_t *)request)->group < 0)
04092 {
04093
04094 sum->storeset = 0;
04095 }
04096 else
04097 {
04098 sum->storeset = ((DRMS_MtSumsRequest_t *)request)->group / kExtTapegroupSlot;
04099 }
04100 #else
04101 sum->bytes = request->bytes;
04102 sum->group = request->group;
04103
04104
04105
04106
04107
04108
04109
04110 if (request->group < 0)
04111 {
04112
04113 sum->storeset = 0;
04114 }
04115 else
04116 {
04117 sum->storeset = request->group / kExtTapegroupSlot;
04118 }
04119 #endif
04120
04121 if (sum->storeset > kExtTapegroupMaxStoreset)
04122 {
04123 fprintf(stderr, "SUM thread: storeset '%d' out of range.\n", sum->storeset);
04124 replyOp = DRMS_ERROR_SUMALLOC;
04125 nosums = 1;
04126 break;
04127 }
04128
04129
04130 replyOp = MakeSumsCall(env, DRMS_SUMALLOC2, &sum, printkerr, request->sunum[0]);
04131
04132 if (replyOp != 0)
04133 {
04134 sumscrashed = (replyOp == 4 || replyOp == kBrokenPipe);
04135 if (sumscrashed)
04136 {
04137 sum = NULL;
04138 fprintf(stderr, "SUMS no longer there; trying again in %d seconds.\n", sleepiness);
04139 tryagain = 1;
04140 sleep(sleepiness);
04141 GettingSleepier(&sleepiness);
04142 }
04143 else
04144 {
04145 fprintf(stderr, "SUM thread: SUM_alloc2 RPC call failed with error code %d\n", replyOp);
04146 nosums = 1;
04147 break;
04148 }
04149 }
04150 }
04151
04152 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_ALLOC) && SUMS_USEMTSUMS_ALLOC
04153 ((DRMS_MtSumsRequest_t *)reply)->opcode = replyOp;
04154
04155 if (!nosums)
04156 {
04157 ((DRMS_MtSumsRequest_t *)reply)->sudir = calloc(1, sizeof(char *));
04158 ((DRMS_MtSumsRequest_t *)reply)->sudir[0] = strdup(sum->wd[0]);
04159 free(sum->wd[0]);
04160 }
04161 #else
04162 reply->opcode = replyOp;
04163
04164 if (!nosums)
04165 {
04166 reply->sudir[0] = strdup(sum->wd[0]);
04167 free(sum->wd[0]);
04168 }
04169 #endif
04170 }
04171 break;
04172 case DRMS_SUMEXPORT:
04173 {
04174
04175
04176 SUMEXP_t *sumexpt = (SUMEXP_t *)request->comment;
04177
04178
04179 sumexpt->uid = sum->uid;
04180
04181
04182 if ((reply->opcode = MakeSumsCall(env, DRMS_SUMEXPORT, &sum, SUMExptErr, sumexpt)))
04183 {
04184 fprintf(stderr,"SUM thread: SUM_export RPC call failed with "
04185 "error code %d\n", reply->opcode);
04186 break;
04187 }
04188
04189
04190
04191 }
04192 break;
04193 case DRMS_SUMINFO:
04194 {
04195 HContainer_t *map = NULL;
04196 int isunum;
04197 char key[128];
04198 SUM_info_t *nulladdr = NULL;
04199 uint64_t dxarray[MAXSUNUMARRAY];
04200 int replyOp = 0;
04201
04202 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_INFO) && SUMS_USEMTSUMS_INFO
04203 if (((DRMS_MtSumsRequest_t *)request)->reqcnt < 1 || ((DRMS_MtSumsRequest_t *)request)->reqcnt > maxNoSus)
04204 {
04205 fprintf(stderr,"SUM thread: Invalid reqcnt (%d) in SUMINFO request.\n", ((DRMS_MtSumsRequest_t *)request)->reqcnt);
04206 ((DRMS_MtSumsRequest_t *)reply)->opcode = DRMS_ERROR_SUMINFO;
04207 break;
04208 }
04209 #else
04210 if (request->reqcnt < 1 || request->reqcnt > maxNoSus)
04211 {
04212 fprintf(stderr,"SUM thread: Invalid reqcnt (%d) in SUMINFO request.\n", request->reqcnt);
04213 reply->opcode = DRMS_ERROR_SUMINFO;
04214 break;
04215 }
04216 #endif
04217
04218 nosums = 0;
04219 tryagain = 1;
04220 sleepiness = 1;
04221
04222 while (tryagain)
04223 {
04224 tryagain = 0;
04225 sumscrashed = 0;
04226
04227 if (!sum)
04228 {
04229
04230 sumscallret = MakeSumsCall(env, DRMS_SUMOPEN, &sum, printkerr, NULL, NULL);
04231 if (sumscallret == kBrokenPipe)
04232 {
04233
04234 sum = NULL;
04235 }
04236 else if (sumscallret == kSUMSDead)
04237 {
04238 sum = NULL;
04239 nosums = 1;
04240
04241 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_INFO) && SUMS_USEMTSUMS_INFO
04242 ((DRMS_MtSumsRequest_t *)reply)->opcode = sumscallret;
04243 #else
04244 reply->opcode = sumscallret;
04245 #endif
04246 break;
04247 }
04248
04249 if (!sum)
04250 {
04251 fprintf(stderr, "Failed to connect to SUMS; trying again in %d seconds.\n", sleepiness);
04252 tryagain = 1;
04253 sleep(sleepiness);
04254 GettingSleepier(&sleepiness);
04255 continue;
04256 }
04257 else
04258 {
04259 *suminout = sum;
04260 }
04261 }
04262
04263
04264
04265
04266 map = hcon_create(sizeof(SUM_info_t *), 128, NULL, NULL, NULL, NULL, 0);
04267
04268 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_INFO) && SUMS_USEMTSUMS_INFO
04269 for (i = 0, isunum = 0; i < ((DRMS_MtSumsRequest_t *)request)->reqcnt; i++)
04270 {
04271
04272
04273
04274
04275 if (((DRMS_MtSumsRequest_t *)request)->sunum[i] == ULLONG_MAX)
04276 {
04277
04278 continue;
04279 }
04280
04281 snprintf(key, sizeof(key), "%llu", (unsigned long long)((DRMS_MtSumsRequest_t *)request)->sunum[i]);
04282 if (!hcon_member(map, key))
04283 {
04284 dxarray[isunum] = ((DRMS_MtSumsRequest_t *)request)->sunum[i];
04285 hcon_insert(map, key, &nulladdr);
04286 isunum++;
04287 }
04288 }
04289 #else
04290 for (i = 0, isunum = 0; i < request->reqcnt; i++)
04291 {
04292
04293
04294
04295
04296 if (request->sunum[i] == ULLONG_MAX)
04297 {
04298
04299 continue;
04300 }
04301
04302 snprintf(key, sizeof(key), "%llu", (unsigned long long)request->sunum[i]);
04303 if (!hcon_member(map, key))
04304 {
04305 dxarray[isunum] = request->sunum[i];
04306 hcon_insert(map, key, &nulladdr);
04307 isunum++;
04308 }
04309 }
04310 #endif
04311
04312 sum->reqcnt = isunum;
04313 sum->sinfo = NULL;
04314
04315 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_INFO) && SUMS_USEMTSUMS_INFO
04316 ((DRMS_MtSumsRequest_t *)reply)->opcode = MakeSumsCall(env, DRMS_SUMINFO, &sum, printkerr, dxarray, isunum);
04317
04318 if (((DRMS_MtSumsRequest_t *)reply)->opcode != 0)
04319 {
04320 sumscrashed = (((DRMS_MtSumsRequest_t *)reply)->opcode == 4 || ((DRMS_MtSumsRequest_t *)reply)->opcode == kBrokenPipe);
04321 if (sumscrashed)
04322 {
04323 sum = NULL;
04324 fprintf(stderr, "SUMS no longer there; trying again in %d seconds.\n", sleepiness);
04325 tryagain = 1;
04326 hcon_destroy(&map);
04327 sleep(sleepiness);
04328 GettingSleepier(&sleepiness);
04329 }
04330 else
04331 {
04332 fprintf(stderr,"SUM thread: SUM_infoArray RPC call failed with error code %d\n", ((DRMS_MtSumsRequest_t *)reply)->opcode);
04333 nosums = 1;
04334 break;
04335 }
04336 }
04337 #else
04338
04339 reply->opcode = MakeSumsCall(env, DRMS_SUMINFO, &sum, printkerr, dxarray, isunum);
04340
04341 if (reply->opcode != 0)
04342 {
04343 sumscrashed = (reply->opcode == 4 || reply->opcode == kBrokenPipe);
04344 if (sumscrashed)
04345 {
04346 sum = NULL;
04347 fprintf(stderr, "SUMS no longer there; trying again in %d seconds.\n", sleepiness);
04348 tryagain = 1;
04349 hcon_destroy(&map);
04350 sleep(sleepiness);
04351 GettingSleepier(&sleepiness);
04352 }
04353 else
04354 {
04355 fprintf(stderr,"SUM thread: SUM_infoArray RPC call failed with error code %d\n", reply->opcode);
04356 nosums = 1;
04357 break;
04358 }
04359 }
04360 #endif
04361 }
04362
04363 if (!nosums)
04364 {
04365
04366
04367
04368 SUM_info_t *psinfo = sum->sinfo;
04369 SUM_info_t **pinfo = NULL;
04370
04371
04372 replyOp = 0;
04373 for (i = 0; i < isunum; i++)
04374 {
04375 snprintf(key, sizeof(key), "%llu", (unsigned long long)(dxarray[i]));
04376
04377 if (!psinfo)
04378 {
04379 fprintf(stderr, "SUMS did not return a SUM_info_t struct for sunum %s.\n", key);
04380 replyOp = 99;
04381 break;
04382 }
04383
04384
04385
04386 if ((pinfo = hcon_lookup(map, key)) != NULL)
04387 {
04388 *pinfo = psinfo;
04389
04390
04391 (*pinfo)->sunum = dxarray[i];
04392 }
04393 else
04394 {
04395 fprintf(stderr, "Information returned for sunum '%s' unknown to DRMS.\n", key);
04396 replyOp = 99;
04397 break;
04398 }
04399
04400 psinfo = psinfo->next;
04401 }
04402
04403 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS && defined(SUMS_USEMTSUMS_INFO) && SUMS_USEMTSUMS_INFO
04404 ((DRMS_MtSumsRequest_t *)reply)->opcode = replyOp;
04405
04406 if (replyOp == 0)
04407 {
04408
04409 ((DRMS_MtSumsRequest_t *)reply)->sudir = calloc(((DRMS_MtSumsRequest_t *)request)->reqcnt, sizeof(char *));
04410
04411 for (i = 0; i < ((DRMS_MtSumsRequest_t *)request)->reqcnt; i++)
04412 {
04413
04414
04415
04416 if (((DRMS_MtSumsRequest_t *)request)->sunum[i] == ULLONG_MAX)
04417 {
04418
04419 ((DRMS_MtSumsRequest_t *)reply)->sudir[i] = (char *)malloc(sizeof(SUM_info_t));
04420 memset(((DRMS_MtSumsRequest_t *)reply)->sudir[i], 0, sizeof(SUM_info_t));
04421 ((SUM_info_t *)(((DRMS_MtSumsRequest_t *)reply)->sudir[i]))->sunum = ULLONG_MAX;
04422
04423 continue;
04424 }
04425
04426 snprintf(key, sizeof(key), "%llu", (unsigned long long)(((DRMS_MtSumsRequest_t *)request)->sunum[i]));
04427 if ((pinfo = hcon_lookup(map, key)) != NULL)
04428 {
04429 ((DRMS_MtSumsRequest_t *)reply)->sudir[i] = (char *)malloc(sizeof(SUM_info_t));
04430 *((SUM_info_t *)(((DRMS_MtSumsRequest_t *)reply)->sudir[i])) = **pinfo;
04431 ((SUM_info_t *)(((DRMS_MtSumsRequest_t *)reply)->sudir[i]))->next = NULL;
04432 }
04433 else
04434 {
04435 fprintf(stderr, "Information returned for sunum '%s' unknown to DRMS.\n", key);
04436 ((DRMS_MtSumsRequest_t *)reply)->opcode = 99;
04437 break;
04438 }
04439 }
04440
04441 reply->reqcnt = request->reqcnt;
04442 }
04443 else
04444 {
04445 reply->reqcnt = 0;
04446 }
04447 #else
04448 reply->opcode = replyOp;
04449
04450 if (replyOp == 0)
04451 {
04452 for (i = 0; i < request->reqcnt; i++)
04453 {
04454
04455
04456
04457 if (request->sunum[i] == ULLONG_MAX)
04458 {
04459
04460 reply->sudir[i] = (char *)malloc(sizeof(SUM_info_t));
04461 memset(reply->sudir[i], 0, sizeof(SUM_info_t));
04462 ((SUM_info_t *)(reply->sudir[i]))->sunum = ULLONG_MAX;
04463
04464 continue;
04465 }
04466
04467 snprintf(key, sizeof(key), "%llu", (unsigned long long)(request->sunum[i]));
04468 if ((pinfo = hcon_lookup(map, key)) != NULL)
04469 {
04470 reply->sudir[i] = (char *)malloc(sizeof(SUM_info_t));
04471 *((SUM_info_t *)(reply->sudir[i])) = **pinfo;
04472 ((SUM_info_t *)(reply->sudir[i]))->next = NULL;
04473 }
04474 else
04475 {
04476 fprintf(stderr, "Information returned for sunum '%s' unknown to DRMS.\n", key);
04477 reply->opcode = 99;
04478 break;
04479 }
04480 }
04481
04482 reply->reqcnt = request->reqcnt;
04483 }
04484 else
04485 {
04486 reply->reqcnt = 0;
04487 }
04488 #endif
04489 SUM_infoArray_free(sum);
04490 }
04491
04492
04493 hcon_destroy(&map);
04494 }
04495 break;
04496 default:
04497
04498 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS
04499 if (mtRequest)
04500 {
04501 fprintf(stderr, "SUM thread: Invalid command code (%d) in request.\n", ((DRMS_MtSumsRequest_t *)request)->opcode);
04502 ((DRMS_MtSumsRequest_t *)reply)->opcode = DRMS_ERROR_SUMBADOPCODE;
04503 }
04504 else
04505 {
04506 #endif
04507 fprintf(stderr,"SUM thread: Invalid command code (%d) in request.\n", request->opcode);
04508 reply->opcode = DRMS_ERROR_SUMBADOPCODE;
04509 #if defined(SUMS_USEMTSUMS) && SUMS_USEMTSUMS
04510 }
04511 #endif
04512
04513 break;
04514 }
04515
04516 return reply;
04517 }
04518
04519
04520 int drms_server_registercleaner(DRMS_Env_t *env, CleanerData_t *data)
04521 {
04522 int gotlock = 0;
04523 int ok = 1;
04524
04525 gotlock = (drms_trylock_server(env) == 0);
04526
04527 if (gotlock)
04528 {
04529 if (env->cleaners == NULL)
04530 {
04531
04532 env->cleaners = list_llcreate(sizeof(CleanerData_t), NULL);
04533 if (!env->cleaners)
04534 {
04535 fprintf(stderr, "Can't register cleaner.\n");
04536 ok = 0;
04537 }
04538 }
04539
04540 if (ok)
04541 {
04542 if (!list_llinserttail(env->cleaners, data))
04543 {
04544 fprintf(stderr, "Can't register cleaner.\n");
04545 ok = 0;
04546 }
04547 }
04548
04549 drms_unlock_server(env);
04550 }
04551 else
04552 {
04553 fprintf(stderr, "Can't register doit cleaner function. Unable to obtain mutex.\n");
04554 }
04555
04556 return ok;
04557 }
04558
04559 static void HastaLaVistaBaby(DRMS_Env_t *env, int signo)
04560 {
04561 char errbuf[256];
04562
04563 if (gShutdownsem)
04564 {
04565 sem_wait(gShutdownsem);
04566 }
04567
04568 if (signo == SIGUSR1)
04569 {
04570 gShutdown = kSHUTDOWN_COMMIT;
04571 }
04572 else
04573 {
04574 gShutdown = kSHUTDOWN_ABORT;
04575 }
04576
04577 if (gShutdownsem)
04578 {
04579 sem_post(gShutdownsem);
04580 }
04581
04582 drms_lock_server(env);
04583
04584
04585
04586
04587 if (!db_cancel(env->session->db_handle, errbuf, sizeof(errbuf)))
04588 {
04589 if (!env->selfstart)
04590 {
04591 fprintf(stderr, "Unsuccessful attempt to cancel current db command: %s\n", errbuf);
04592 }
04593 }
04594
04595 drms_unlock_server(env);
04596
04597
04598
04599
04600
04601
04602
04603
04604
04605 drms_server_end_transaction(env, signo != SIGUSR1, 0);
04606
04607
04608
04609
04610
04611
04612 drms_lock_server(env);
04613 db_disconnect(&env->session->db_handle);
04614 drms_unlock_server(env);
04615
04616
04617 drms_free_env(env, 1);
04618
04619
04620 _exit(signo != SIGUSR1);
04621 }
04622
04623 void *drms_signal_thread(void *arg)
04624 {
04625 int status, signo;
04626 DRMS_Env_t *env = (DRMS_Env_t *) arg;
04627 int doexit = 0;
04628
04629 #ifdef DEBUG
04630 printf("drms_signal_thread started.\n");
04631 fflush(stdout);
04632 #endif
04633
04634
04635
04636
04637
04638 if( (status = pthread_sigmask(SIG_BLOCK, &env->signal_mask, NULL)))
04639 {
04640 fprintf(stderr,"pthread_sigmask call failed with status = %d\n", status);
04641 Exit(1);
04642 }
04643
04644 for (;;)
04645 {
04646 if ((status = sigwait(&env->signal_mask, &signo)))
04647 {
04648 if (status == EINTR)
04649 {
04650 fprintf(stderr,"sigwait error, errcode=%d.\n",status);
04651 continue;
04652 }
04653 else
04654 {
04655 fprintf(stderr,"sigwait error, errcode=%d.\n",status);
04656 Exit(1);
04657 }
04658 }
04659
04660 switch(signo)
04661 {
04662 case SIGINT:
04663 fprintf(stderr,"WARNING: DRMS server received SIGINT...exiting.\n");
04664 break;
04665 case SIGTERM:
04666 fprintf(stderr,"WARNING: DRMS server received SIGTERM...exiting.\n");
04667 break;
04668 case SIGQUIT:
04669 fprintf(stderr,"WARNING: DRMS server received SIGQUIT...exiting.\n");
04670 break;
04671 case SIGUSR1:
04672 if (env->verbose) {
04673 fprintf(stderr,"DRMS server received SIGUSR1...commiting data & stopping.\n");
04674 printf("DRMS server received SIGUSR1...commiting data & stopping.\n");
04675 }
04676 break;
04677 case SIGUSR2:
04678 if (env->verbose)
04679 {
04680 fprintf(stdout,"signal thread received SIGUSR2 (main shutting down)...exiting.\n");
04681 }
04682
04683 pthread_exit(NULL);
04684 break;
04685 default:
04686 fprintf(stderr,"WARNING: DRMS server received signal no. %d...exiting.\n",
04687 signo);
04688
04689 Exit(1);
04690 break;
04691 }
04692
04693 switch(signo)
04694 {
04695 case SIGINT:
04696 case SIGTERM:
04697 case SIGQUIT:
04698 case SIGUSR1:
04699
04700
04701
04702 if (gShutdownsem)
04703 {
04704
04705 sem_wait(gShutdownsem);
04706
04707 if (gShutdown == kSHUTDOWN_UNINITIATED)
04708 {
04709
04710 gShutdown = (signo == SIGUSR1) ?
04711 kSHUTDOWN_COMMITINITIATED :
04712 kSHUTDOWN_ABORTINITIATED;
04713
04714 if (!env->selfstart || env->verbose || signo == SIGTERM || signo == SIGINT)
04715 {
04716 fprintf(stderr, "Shutdown initiated.\n");
04717 if (gSUMSbusyMtx)
04718 {
04719 pthread_mutex_lock(gSUMSbusyMtx);
04720
04721 if (gSUMSbusy)
04722 {
04723 fprintf(stderr, "SUMS is busy fetching from tape - please wait until operation completes.\n");
04724 }
04725
04726 pthread_mutex_unlock(gSUMSbusyMtx);
04727 }
04728 }
04729
04730
04731
04732
04733 if (env->cleaners)
04734 {
04735 ListNode_t *lnode = NULL;
04736 CleanerData_t *acleaner = NULL;
04737
04738 list_llreset(env->cleaners);
04739 while ((lnode = list_llnext(env->cleaners)) != NULL)
04740 {
04741 acleaner = lnode->data;
04742 (*(acleaner->cb))(acleaner->data);
04743 list_llremove(env->cleaners, lnode);
04744 list_llfreenode(&lnode);
04745 }
04746 }
04747
04748
04749 sem_post(gShutdownsem);
04750
04751 doexit = 1;
04752 }
04753 else
04754 {
04755
04756 sem_post(gShutdownsem);
04757 }
04758
04759
04760
04761 if (doexit)
04762 {
04763 HastaLaVistaBaby(env, signo);
04764 return NULL;
04765 }
04766 }
04767 else
04768 {
04769 HastaLaVistaBaby(env, signo);
04770 return NULL;
04771 }
04772
04773 break;
04774 }
04775 }
04776 }