00001 #include "drms.h"
00002 #include "drms_priv.h"
00003
00004 #include "db.h"
00005
00006
00007
00008 #define DRMS_MINVERSTABLE "drms.minvers"
00009 #define DRMS_OLDCODETABLE "drms.oldcode"
00010
00011
00012 static int TableExists(DRMS_Session_t *session, const char *schema, const char *table)
00013 {
00014 int statint;
00015 int rv = 0;
00016
00017 rv = drms_query_tabexists(session, schema, table, &statint);
00018
00019 if (statint != DRMS_SUCCESS)
00020 {
00021 rv = -1;
00022 }
00023
00024 return rv;
00025 }
00026
00027
00028 static int GetDBMinVersion(DRMS_Session_t *session, char **versout)
00029 {
00030 int rv = 1;
00031 char *schema = NULL;
00032 char *table = NULL;
00033 char query[256];
00034 DB_Text_Result_t *qres = NULL;
00035
00036 if (!versout)
00037 {
00038 fprintf(stderr, "Invalid argument to GetDBMinVersion().\n");
00039 rv = 0;
00040 }
00041 else
00042 {
00043 if (get_namespace(DRMS_MINVERSTABLE, &schema, &table))
00044 {
00045 fprintf(stderr, "Out of memory in GetDBMinVersion().\n");
00046 rv = 0;
00047 }
00048 else
00049 {
00050
00051 if (!TableExists(session, schema, table))
00052 {
00053
00054
00055 *versout = strdup("0.0");
00056 }
00057 else
00058 {
00059
00060 snprintf(query, sizeof(query), "SELECT minversion FROM %s", DRMS_MINVERSTABLE);
00061
00062 if ((qres = drms_query_txt(session, query)) == NULL)
00063 {
00064
00065 fprintf(stderr, "Invalid database query: %s.\n", query);
00066 rv = 0;
00067 }
00068 else
00069 {
00070 if (qres->num_rows == 0)
00071 {
00072 fprintf(stderr, "DRMS requires a record in drms.minvers.\n");
00073 rv = 0;
00074 }
00075 else if (qres->num_rows != 1)
00076 {
00077 fprintf(stderr, "Unexpected db response to query %s\n", query);
00078 rv = 0;
00079 }
00080 else
00081 {
00082 const char *version = qres->field[0][0];
00083
00084 *versout = strdup(version);
00085 }
00086
00087 db_free_text_result(qres);
00088 }
00089 }
00090
00091 free(schema);
00092 free(table);
00093 }
00094 }
00095
00096 return rv;
00097 }
00098
00099
00100 DRMS_Session_t *drms_connect(const char *host)
00101 {
00102 struct sockaddr_in server;
00103 struct hostent *he;
00104 DRMS_Session_t *session;
00105 int denied;
00106 char *port = NULL;
00107 char *hostname = NULL;
00108 char *pport = NULL;
00109 unsigned short portnum;
00110 char *sudirrecv = NULL;
00111 struct sockaddr *serverp = NULL;
00112
00113 if (host)
00114 {
00115 hostname = strdup(host);
00116
00117 if (!hostname)
00118 {
00119 fprintf(stderr, "drms_connect(): Out of memory.\n");
00120 goto bailout1;
00121 }
00122
00123
00124 if ((pport = strchr(host, ':')) != NULL)
00125 {
00126 hostname[pport - host] = '\0';
00127 port = strdup(pport + 1);
00128
00129 if (!port)
00130 {
00131 fprintf(stderr, "drms_connect(): Out of memory.\n");
00132 goto bailout1;
00133 }
00134 }
00135 else
00136 {
00137 fprintf(stderr, "Port number missing from host.\n");
00138 goto bailout1;
00139 }
00140
00141 if (port)
00142 {
00143 sscanf(port, "%hu", &portnum);
00144 }
00145 }
00146
00147
00148 if ((he=gethostbyname(hostname)) == NULL)
00149 {
00150 herror("gethostbyname");
00151
00152 if (port)
00153 {
00154 free(port);
00155 }
00156 if (hostname)
00157 {
00158 free(hostname);
00159 }
00160
00161 fprintf(stderr, "Error calling gethostbyname().\n");
00162 return NULL;
00163 }
00164
00165 session = malloc(sizeof(DRMS_Session_t));
00166 XASSERT(session);
00167 session->db_direct = 0;
00168
00169
00170 if ((session->sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
00171 {
00172 perror("socket call failed");
00173 goto bailout1;
00174 }
00175
00176 {
00177 int rcvbuf, sndbuf;
00178 db_getsocketbufsize(session->sockfd, &sndbuf,&rcvbuf);
00179 #ifdef DEBUG
00180 printf("Original socket buffer sizes: SO_SNDBUF = %d, SO_RCVBUF = %d\n",
00181 sndbuf,rcvbuf);
00182 #endif
00183 if (sndbuf<65536)
00184 {
00185 sndbuf = 65536/2;
00186 db_setsocketbufsize(session->sockfd, sndbuf, rcvbuf/2);
00187 }
00188 db_getsocketbufsize(session->sockfd, &sndbuf,&rcvbuf);
00189 #ifdef DEBUG
00190 printf("New socket buffer sizes: SO_SNDBUF = %d, SO_RCVBUF = %d\n",
00191 sndbuf,rcvbuf);
00192 #endif
00193 }
00194
00195 memset(&server, 0, sizeof(server));
00196 server.sin_family = AF_INET;
00197 server.sin_port = htons(portnum);
00198 server.sin_addr = *((struct in_addr *)he->h_addr);
00199
00200 serverp = (struct sockaddr *)&server;
00201
00202 if ( connect(session->sockfd, serverp, sizeof(struct sockaddr_in)) == -1 )
00203 {
00204 perror("connect");
00205 fprintf(stderr,"Failed to connect to server, please check that host name, "
00206 "port number and password are correct.\n");
00207 goto bailout;
00208 }
00209 strncpy(session->hostname, hostname, DRMS_MAXHOSTNAME);
00210 session->port = portnum;
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222 denied = Readint(session->sockfd);
00223 if (denied)
00224 {
00225 fprintf(stderr, "drms_server didn't accept the attempt to connect with it.\n");
00226 goto bailout;
00227 }
00228 session->clientid = Readint(session->sockfd);
00229 session->sessionid = Readlonglong(session->sockfd);
00230 session->sessionns = receive_string(session->sockfd);
00231 session->sunum = Readlonglong(session->sockfd);
00232
00233
00234
00235
00236
00237
00238
00239
00240
00241 sudirrecv = receive_string(session->sockfd);
00242
00243 if (sudirrecv && strcasecmp(sudirrecv, kNOLOGSUDIR))
00244 {
00245 session->sudir = sudirrecv;
00246 }
00247 else
00248 {
00249 free(sudirrecv);
00250 session->sudir = NULL;
00251 }
00252
00253
00254 char *tmp = NULL;
00255
00256
00257 tmp = receive_string(session->sockfd);
00258 if (tmp)
00259 {
00260 snprintf(session->dbhost, sizeof(session->dbhost), "%s", tmp);
00261 free(tmp);
00262 tmp = NULL;
00263 }
00264
00265
00266 session->dbport = Readint(session->sockfd);
00267
00268
00269 tmp = receive_string(session->sockfd);
00270 if (tmp)
00271 {
00272 snprintf(session->dbname, sizeof(session->dbname), "%s", tmp);
00273 free(tmp);
00274 tmp = NULL;
00275 }
00276
00277
00278 tmp = receive_string(session->sockfd);
00279 if (tmp)
00280 {
00281 snprintf(session->dbuser, sizeof(session->dbuser), "%s", tmp);
00282 free(tmp);
00283 tmp = NULL;
00284 }
00285
00286 #ifdef DEBUG
00287 printf("got sudir=%s\n",session->sudir);
00288 #endif
00289 if (port)
00290 {
00291 free(port);
00292 }
00293 if (hostname)
00294 {
00295 free(hostname);
00296 }
00297
00298 return session;
00299
00300 bailout:
00301 close(session->sockfd);
00302 bailout1:
00303 free(session);
00304 if (port)
00305 {
00306 free(port);
00307 }
00308 if (hostname)
00309 {
00310 free(hostname);
00311 }
00312 return NULL;
00313 }
00314
00315 int drms_send_commandcode (int sockfd, int command) {
00316 int echo;
00317 Writeint (sockfd, command);
00318 echo = Readint (sockfd);
00319 if (echo != command) {
00320 printf ("FATAL ERROR: The DRMS server echoed a different command code (%d)\n"
00321 " from the one sent (%d).\n"
00322 "This usually indicates that the module generated an invalid command\n"
00323 " that caused the DRMS server to terminate the connection.\n"
00324 "It can also mean that the DRMS server crashed.\nAborting.\n",
00325 echo, command);
00326 close (sockfd);
00327 exit (1);
00328 }
00329 return 0;
00330 }
00331
00332 int drms_send_commandcode_noecho (int sockfd, int command) {
00333 Writeint (sockfd, command);
00334 return 0;
00335 }
00336
00337 #ifndef DRMS_CLIENT
00338
00339 DRMS_Session_t *drms_connect_direct(const char *dbhost, const char *dbuser,
00340 const char *dbpasswd, const char *dbname,
00341 const char *sessionns)
00342 {
00343 DRMS_Session_t *session;
00344 char *minversion = NULL;
00345 char jsocversion[128];
00346 char *defSessionNS = NULL;
00347
00348
00349 session = malloc(sizeof(DRMS_Session_t));
00350 XASSERT(session);
00351 memset(session, 0, sizeof(DRMS_Session_t));
00352 session->db_direct = 1;
00353
00354
00355 session->port = -1;
00356 session->sockfd = -1;
00357
00358
00359 if ((session->db_handle = db_connect(dbhost, dbuser, dbpasswd, dbname, 1)) == NULL)
00360 {
00361 fprintf(stderr,"Couldn't connect to database.\n");
00362 free(session);
00363 session = NULL;
00364 }
00365 else if (0)
00366 {
00367
00368 }
00369
00370
00371
00372
00373 if (session)
00374 {
00375
00376 if (!GetDBMinVersion(session, &minversion))
00377 {
00378
00379 db_disconnect(&session->db_handle);
00380 free(session);
00381 session = NULL;
00382 }
00383 else
00384 {
00385 jsoc_getversion(jsocversion, sizeof(jsocversion), NULL);
00386
00387 if (!base_isvers(jsocversion, minversion))
00388 {
00389 char *schema = NULL;
00390 char *table = NULL;
00391
00392
00393 if (get_namespace(DRMS_OLDCODETABLE, &schema, &table))
00394 {
00395 fprintf(stderr, "Out of memory in drms_connect_direct().\n");
00396 free(session);
00397 session = NULL;
00398 }
00399 else
00400 {
00401
00402
00403 if (TableExists(session, schema, table))
00404 {
00405 char binpath[PATH_MAX] = {0};
00406 char query[1024];
00407 struct timeval tv;
00408 char *date = NULL;
00409 int rowexists = 0;
00410 DB_Text_Result_t *qres = NULL;
00411
00412 if (readlink("/proc/self/exe", binpath, sizeof(binpath)) == -1)
00413 {
00414 fprintf(stderr, "Cannot obtain path to running binary.\n");
00415 free(session);
00416 session = NULL;
00417 }
00418 else
00419 {
00420
00421
00422 snprintf(query, sizeof(query), "SELECT path FROM %s WHERE path = '%s'", DRMS_OLDCODETABLE, binpath);
00423
00424 if ((qres = drms_query_txt(session, query)) == NULL)
00425 {
00426
00427 fprintf(stderr, "Invalid database query: %s.\n", query);
00428 free(session);
00429 session = NULL;
00430 }
00431 else
00432 {
00433 rowexists = qres->num_rows > 0;
00434 db_free_text_result(qres);
00435
00436 gettimeofday(&tv, NULL);
00437 date = ctime(&tv.tv_sec);
00438
00439 if (rowexists)
00440 {
00441
00442 snprintf(query, sizeof(query), "UPDATE %s set date = '%s', version = '%s' WHERE path = '%s'", DRMS_OLDCODETABLE, date, jsocversion, binpath);
00443 }
00444 else
00445 {
00446
00447 snprintf(query, sizeof(query), "INSERT INTO %s (path, date, version) VALUES ('%s', '%s', '%s')", DRMS_OLDCODETABLE, binpath, date, jsocversion);
00448 }
00449
00450 if (drms_dms(session, NULL, query))
00451 {
00452 fprintf(stderr, "Invalid database query: %s.\n", query);
00453 free(session);
00454 session = NULL;
00455 }
00456 }
00457 }
00458 }
00459
00460 free(table);
00461 free(schema);
00462 }
00463
00464 fprintf(stderr, "Your DRMS module/client code (version %s) is incompatible with the current version of DRMS (which requires version %s). Please update.\n", jsocversion, minversion);
00465
00466 db_disconnect(&session->db_handle);
00467 free(session);
00468 session = NULL;
00469 }
00470 }
00471
00472 if (minversion)
00473 {
00474 free(minversion);
00475 }
00476 }
00477
00478 if (session)
00479 {
00480 int istat = DRMS_SUCCESS;
00481 int tabExists = 0;
00482 DB_Text_Result_t *tresult = NULL;
00483 char query[1024];
00484
00485 if (sessionns && *sessionns)
00486 {
00487
00488
00489
00490 defSessionNS = strdup(sessionns);
00491 }
00492 else
00493 {
00494
00495 tabExists = drms_query_tabexists(session, "admin", "sessionns", &istat);
00496
00497 if (istat == DRMS_SUCCESS)
00498 {
00499 if (tabExists)
00500 {
00501 sprintf(query, "SELECT sessionns FROM admin.sessionns WHERE username='%s'", session->db_handle->dbuser);
00502 tresult = db_query_txt(session->db_handle, query);
00503
00504 if (tresult)
00505 {
00506 if (tresult->num_rows > 0)
00507 {
00508 defSessionNS = strdup(tresult->field[0][0]);
00509 }
00510
00511 db_free_text_result(tresult);
00512 tresult = NULL;
00513 }
00514 else
00515 {
00516 fprintf(stderr, "Failed db query: %s.\n", query);
00517 free(session);
00518 session = NULL;
00519 }
00520 }
00521 }
00522 else
00523 {
00524 fprintf(stderr, "Failed to check for admin.sessionns table existence.\n");
00525 free(session);
00526 session = NULL;
00527 }
00528 }
00529
00530 if (session)
00531 {
00532 if (defSessionNS)
00533 {
00534
00535 tabExists = drms_query_tabexists(session, defSessionNS, DRMS_SESSION_TABLE, &istat);
00536
00537 if (istat == DRMS_SUCCESS)
00538 {
00539 if (tabExists)
00540 {
00541 session->sessionns = defSessionNS;
00542 }
00543 else
00544 {
00545 fprintf(stderr, "Session table %s.%s does not exist.\n", defSessionNS, DRMS_SESSION_TABLE);
00546 free(session);
00547 session = NULL;
00548 }
00549 }
00550 else
00551 {
00552 fprintf(stderr, "Failed to check for %s.%s table existence.\n", defSessionNS, DRMS_SESSION_TABLE);
00553 free(session);
00554 session = NULL;
00555 }
00556 }
00557 }
00558 }
00559
00560 if (session)
00561 {
00562 if (!session->sessionns)
00563 {
00564 session->sessionns = strdup("");
00565 }
00566 }
00567
00568 return session;
00569 }
00570 #endif
00571
00572
00573
00574
00575
00576
00577
00578
00579
00580
00581 #ifdef DRMS_CLIENT
00582 void drms_disconnect(DRMS_Env_t *env, int abort)
00583 {
00584 drms_send_commandcode(env->session->sockfd, DRMS_DISCONNECT);
00585 Writeint(env->session->sockfd, abort);
00586 }
00587
00588 void drms_disconnect_now(DRMS_Env_t *env, int abort)
00589 {
00590 drms_send_commandcode_noecho(env->session->sockfd, DRMS_DISCONNECT);
00591 Writeint(env->session->sockfd, abort);
00592 }
00593 #endif
00594
00595 int drms_commit(DRMS_Env_t *env)
00596 {
00597 int status;
00598
00599 #ifndef DRMS_CLIENT
00600 if (env->session->db_direct)
00601 {
00602 return db_commit(env->session->db_handle);
00603 }
00604 else
00605 #else
00606 XASSERT(env->session->db_direct==0);
00607 #endif
00608 {
00609 drms_send_commandcode(env->session->sockfd, DRMS_COMMIT);
00610 status = Readint(env->session->sockfd);
00611 return status;
00612 }
00613 }
00614
00615
00616
00617 int drms_rollback(DRMS_Session_t *session)
00618 {
00619 int status;
00620
00621 #ifndef DRMS_CLIENT
00622 if (session->db_direct)
00623 {
00624 return db_rollback(session->db_handle);
00625 }
00626 else
00627 #else
00628 XASSERT(session->db_direct==0);
00629 #endif
00630 {
00631 drms_send_commandcode(session->sockfd, DRMS_ROLLBACK);
00632 status = Readint(session->sockfd);
00633 return status;
00634 }
00635 }
00636
00637 DB_Text_Result_t *drms_query_txt(DRMS_Session_t *session, char *query)
00638 {
00639 char *errmsg = NULL;
00640 DB_Text_Result_t *rv = NULL;
00641 #ifdef DEBUG
00642 printf("drms_query_txt: query = %s\n",query);
00643 #endif
00644 #ifndef DRMS_CLIENT
00645 if (session->db_direct)
00646 {
00647 return db_query_txt(session->db_handle, query);
00648 }
00649 else
00650 #else
00651 XASSERT(session->db_direct==0);
00652 #endif
00653 {
00654 drms_send_commandcode(session->sockfd, DRMS_TXTQUERY);
00655 rv = db_client_query_txt(session->sockfd, query, 0, &errmsg);
00656
00657 if (errmsg)
00658 {
00659 snprintf(session->db_handle->errmsg, sizeof(session->db_handle->errmsg), "%s", errmsg);
00660 free(errmsg);
00661 }
00662
00663 return rv;
00664 }
00665 }
00666
00667 DB_Binary_Result_t *drms_query_bin(DRMS_Session_t *session, char *query)
00668 {
00669 char *errmsg = NULL;
00670 DB_Binary_Result_t *rv = NULL;
00671 #ifdef DEBUG
00672 printf("drms_query_bin: query = %s\n",query);
00673 #endif
00674
00675 #ifndef DRMS_CLIENT
00676 if (session->db_direct)
00677 {
00678 return db_query_bin(session->db_handle, query);
00679 }
00680 else
00681 #else
00682 XASSERT(session->db_direct==0);
00683 #endif
00684 {
00685 drms_send_commandcode(session->sockfd, DRMS_BINQUERY);
00686 rv = db_client_query_bin(session->sockfd, query, 0, &errmsg);
00687
00688 if (errmsg)
00689 {
00690 snprintf(session->db_handle->errmsg, sizeof(session->db_handle->errmsg), "%s", errmsg);
00691 free(errmsg);
00692 }
00693
00694 return rv;
00695 }
00696 }
00697
00698
00699
00700 DB_Binary_Result_t *drms_query_binv(DRMS_Session_t *session, char *query,
00701 ...)
00702 {
00703 DB_Type_t intype[MAXARG];
00704 void *argin[MAXARG];
00705 int n,i;
00706 char *q;
00707 DB_Binary_Result_t *result;
00708 db_char_t tchar;
00709 db_int1_t tint1;
00710 db_int2_t tint2;
00711 db_int4_t tint4;
00712 db_int8_t tint8;
00713 db_float_t tfloat;
00714 db_double_t tdouble;
00715
00716 va_list ap;
00717 va_start(ap, query);
00718
00719
00720 #ifdef DEBUG
00721 printf("drms_query_binv: query = %s\n",query);
00722 #endif
00723
00724
00725 q = (char *)query;
00726 n = 0;
00727 while (*q)
00728 {
00729 if (*q == '?')
00730 {
00731 if (n>=MAXARG)
00732 {
00733 fprintf(stderr,"drms_query_binv: Maximum number of arguments "
00734 "exceeded.\n");
00735 result = NULL;
00736 goto failure;
00737 }
00738 n++;
00739 }
00740 q++;
00741 }
00742
00743 for (i=0; i<n; i++)
00744 {
00745 intype[i] = va_arg(ap, DB_Type_t);
00746 switch(intype[i])
00747 {
00748 case DB_CHAR:
00749 tchar = (db_char_t) va_arg(ap, int);
00750 argin[i] = alloca(sizeof(db_char_t));
00751 memcpy(argin[i], &tchar, sizeof(db_char_t));
00752 break;
00753 case DB_INT1:
00754 tint1 = (db_int1_t) va_arg(ap, int);
00755 argin[i] = alloca(sizeof(db_int1_t));
00756 memcpy(argin[i], &tint1, sizeof(db_int1_t));
00757 break;
00758 case DB_INT2:
00759 tint2 = (db_int2_t) va_arg(ap, int);
00760 argin[i] = alloca(sizeof(db_int2_t));
00761 memcpy(argin[i], &tint2, sizeof(db_int2_t));
00762 break;
00763 case DB_INT4:
00764 tint4 = va_arg(ap, db_int4_t);
00765 argin[i] = alloca(sizeof(db_int4_t));
00766 memcpy(argin[i], &tint4, sizeof(db_int4_t));
00767 break;
00768 case DB_INT8:
00769 tint8 = va_arg(ap, db_int8_t);
00770 argin[i] = alloca(sizeof(db_int8_t));
00771 memcpy(argin[i], &tint8, sizeof(db_int8_t));
00772 break;
00773 case DB_FLOAT:
00774 tfloat = (db_float_t) va_arg(ap,double);
00775 argin[i] = alloca(sizeof(db_float_t));
00776 memcpy(argin[i], &tfloat, sizeof(db_float_t));
00777 break;
00778 case DB_DOUBLE:
00779 tdouble = va_arg(ap, db_double_t);
00780 argin[i] = alloca(sizeof(db_double_t));
00781 memcpy(argin[i], &tdouble, sizeof(db_double_t));
00782 break;
00783 case DB_STRING:
00784 case DB_VARCHAR:
00785 argin[i] = va_arg(ap, db_string_t);
00786 break;
00787 }
00788 }
00789 result = drms_query_bin_array(session, query, n, intype, argin);
00790
00791 failure:
00792 va_end(ap);
00793 return result;
00794 }
00795
00796
00797
00798
00799 DB_Binary_Result_t *drms_query_bin_array(DRMS_Session_t *session, char *query,
00800 int n_args, DB_Type_t *intype,
00801 void **argin)
00802 {
00803 #ifdef DEBUG
00804 printf("drms_query_bin_array: query = %s\n",query);
00805 #endif
00806 #ifndef DRMS_CLIENT
00807 if (session->db_direct)
00808 {
00809 return db_query_bin_array(session->db_handle,query, n_args, intype,
00810 argin);
00811 }
00812 else
00813 #else
00814 XASSERT(session->db_direct==0);
00815 #endif
00816 {
00817 drms_send_commandcode(session->sockfd, DRMS_BINQUERY_ARRAY);
00818 return db_client_query_bin_array(session->sockfd, query, 0, n_args, intype,
00819 argin);
00820 }
00821 }
00822
00823
00824 DB_Binary_Result_t **drms_query_bin_ntuple(DRMS_Session_t *session, const char *stmnt, unsigned int nelems, unsigned int nargs, DB_Type_t *dbtypes, void **values)
00825 {
00826 #ifndef DRMS_CLIENT
00827
00828 XASSERT(session->db_direct == 1);
00829 return db_query_bin_ntuple(session->db_handle, stmnt, nelems, nargs, dbtypes, values);
00830 #else
00831 XASSERT(session->db_direct==0);
00832 drms_send_commandcode(session->sockfd, DRMS_BINQUERY_NTUPLE);
00833 return db_client_query_bin_ntuple(session->sockfd, stmnt, nelems, nargs, dbtypes, values);
00834 #endif
00835 }
00836
00837
00838 int drms_dms(DRMS_Session_t *session, int *row_count, char *query)
00839 {
00840 #ifndef DRMS_CLIENT
00841 if (session->db_direct)
00842 {
00843 return db_dms(session->db_handle, row_count, query);
00844 }
00845 else
00846 #else
00847 XASSERT(session->db_direct==0);
00848 #endif
00849 {
00850 drms_send_commandcode(session->sockfd, DRMS_DMS);
00851 return db_client_dms(session->sockfd, row_count, query);
00852 }
00853 }
00854
00855
00856
00857
00858 int drms_dmsv(DRMS_Session_t *session, int *row_count, char *query,
00859 int n_rows, ...)
00860 {
00861 int status=1;
00862 DB_Type_t intype[MAXARG];
00863 void *argin[MAXARG];
00864 int n,i;
00865 char *q;
00866 db_char_t tchar;
00867 db_int1_t tint1;
00868 db_int2_t tint2;
00869 db_int4_t tint4;
00870 db_int8_t tint8;
00871 db_float_t tfloat;
00872 db_double_t tdouble;
00873
00874 va_list ap;
00875 va_start(ap, n_rows);
00876
00877
00878 q = (char *)query;
00879 n = 0;
00880 while (*q)
00881 {
00882 if (*q == '?')
00883 {
00884 if (n>=MAXARG)
00885 {
00886 fprintf(stderr,"ERRO in drms_dmsv: Maximum number of arguments "
00887 "exceeded.\n");
00888 goto failure;
00889 }
00890 n++;
00891 }
00892 q++;
00893 }
00894
00895 for (i=0; i<n; i++)
00896 {
00897 intype[i] = va_arg(ap, DB_Type_t);
00898 if (n_rows == -1)
00899 {
00900 switch(intype[i])
00901 {
00902 case DB_CHAR:
00903 tchar = (db_char_t) va_arg(ap, int);
00904 argin[i] = alloca(sizeof(db_char_t));
00905 memcpy(argin[i], &tchar, sizeof(db_char_t));
00906 break;
00907 case DB_INT1:
00908 tint1 = (db_int1_t) va_arg(ap, int);
00909 argin[i] = alloca(sizeof(db_int1_t));
00910 memcpy(argin[i], &tint1, sizeof(db_int1_t));
00911 break;
00912 case DB_INT2:
00913 tint2 = (db_int2_t) va_arg(ap, int);
00914 argin[i] = alloca(sizeof(db_int2_t));
00915 memcpy(argin[i], &tint2, sizeof(db_int2_t));
00916 break;
00917 case DB_INT4:
00918 tint4 = va_arg(ap, db_int4_t);
00919 argin[i] = alloca(sizeof(db_int4_t));
00920 memcpy(argin[i], &tint4, sizeof(db_int4_t));
00921 break;
00922 case DB_INT8:
00923 tint8 = va_arg(ap, db_int8_t);
00924 argin[i] = alloca(sizeof(db_int8_t));
00925 memcpy(argin[i], &tint8, sizeof(db_int8_t));
00926 break;
00927 case DB_FLOAT:
00928 tfloat = (db_float_t) va_arg(ap,double);
00929 argin[i] = alloca(sizeof(db_float_t));
00930 memcpy(argin[i], &tfloat, sizeof(db_float_t));
00931 break;
00932 case DB_DOUBLE:
00933 tdouble = va_arg(ap, db_double_t);
00934 argin[i] = alloca(sizeof(db_double_t));
00935 memcpy(argin[i], &tdouble, sizeof(db_double_t));
00936 break;
00937 case DB_STRING:
00938 case DB_VARCHAR:
00939 argin[i] = alloca(sizeof(char **));
00940 *((char **)argin[i]) = va_arg(ap, char *);
00941 break;
00942 }
00943 }
00944 else
00945 {
00946 argin[i] = va_arg(ap, void *);
00947 }
00948 }
00949
00950 if (n_rows == -1)
00951 n_rows = 1;
00952
00953 status = drms_dms_array(session, row_count, query, n_rows, n,
00954 intype, argin );
00955 failure:
00956 va_end(ap);
00957 return status;
00958 }
00959
00960
00961 int drms_dms_array(DRMS_Session_t *session, int *row_count,
00962 char *query, int n_rows, int n_args,
00963 DB_Type_t *intype, void **argin )
00964 {
00965 #ifndef DRMS_CLIENT
00966 if (session->db_direct)
00967 {
00968 return db_dms_array(session->db_handle, row_count, query,
00969 n_rows, n_args, intype, argin );
00970 }
00971 else
00972 #else
00973 XASSERT(session->db_direct==0);
00974 #endif
00975 {
00976 drms_send_commandcode(session->sockfd, DRMS_DMS_ARRAY);
00977 return db_client_dms_array(session->sockfd, row_count, query,
00978 n_rows, n_args, intype, argin );
00979 }
00980 }
00981
00982
00983
00984 int drms_bulk_insertv(DRMS_Session_t *session, char *table,
00985 int n_rows, int n_cols, ...)
00986 {
00987 int status=1;
00988 DB_Type_t intype[MAXARG];
00989 void *argin[MAXARG];
00990 int i;
00991 db_char_t tchar;
00992 db_int1_t tint1;
00993 db_int2_t tint2;
00994 db_int4_t tint4;
00995 db_int8_t tint8;
00996 db_float_t tfloat;
00997 db_double_t tdouble;
00998
00999 va_list ap;
01000 va_start(ap, n_cols);
01001
01002 for (i=0; i<n_cols; i++)
01003 {
01004 intype[i] = va_arg(ap, DB_Type_t);
01005 if (n_rows == -1)
01006 {
01007 switch(intype[i])
01008 {
01009 case DB_CHAR:
01010 tchar = (db_char_t) va_arg(ap, int);
01011 argin[i] = alloca(sizeof(db_char_t));
01012 memcpy(argin[i], &tchar, sizeof(db_char_t));
01013 break;
01014 case DB_INT1:
01015 tint1 = (db_int1_t) va_arg(ap, int);
01016 argin[i] = alloca(sizeof(db_int1_t));
01017 memcpy(argin[i], &tint1, sizeof(db_int1_t));
01018 break;
01019 case DB_INT2:
01020 tint2 = (db_int2_t) va_arg(ap, int);
01021 argin[i] = alloca(sizeof(db_int2_t));
01022 memcpy(argin[i], &tint2, sizeof(db_int2_t));
01023 break;
01024 case DB_INT4:
01025 tint4 = va_arg(ap, db_int4_t);
01026 argin[i] = alloca(sizeof(db_int4_t));
01027 memcpy(argin[i], &tint4, sizeof(db_int4_t));
01028 break;
01029 case DB_INT8:
01030 tint8 = va_arg(ap, db_int8_t);
01031 argin[i] = alloca(sizeof(db_int8_t));
01032 memcpy(argin[i], &tint8, sizeof(db_int8_t));
01033 break;
01034 case DB_FLOAT:
01035 tfloat = (db_float_t) va_arg(ap,double);
01036 argin[i] = alloca(sizeof(db_float_t));
01037 memcpy(argin[i], &tfloat, sizeof(db_float_t));
01038 break;
01039 case DB_DOUBLE:
01040 tdouble = va_arg(ap, db_double_t);
01041 argin[i] = alloca(sizeof(db_double_t));
01042 memcpy(argin[i], &tdouble, sizeof(db_double_t));
01043 break;
01044 case DB_STRING:
01045 case DB_VARCHAR:
01046 argin[i] = alloca(sizeof(char **));
01047 *((char **)argin[i]) = va_arg(ap, char *);
01048 break;
01049 }
01050 }
01051 else
01052 {
01053 argin[i] = va_arg(ap, void *);
01054 }
01055 }
01056
01057 if (n_rows == -1)
01058 n_rows = 1;
01059
01060 status = drms_bulk_insert_array(session, table, n_rows, n_cols,
01061 intype, argin );
01062 va_end(ap);
01063 return status;
01064 }
01065
01066
01067
01068 int drms_bulk_insert_array(DRMS_Session_t *session,
01069 char *table, int n_rows, int n_args,
01070 DB_Type_t *intype, void **argin )
01071 {
01072 #ifndef DRMS_CLIENT
01073 if (session->db_direct)
01074 {
01075 return db_bulk_insert_array(session->db_handle, table,
01076 n_rows, n_args, intype, argin );
01077 }
01078 else
01079 #else
01080 XASSERT(session->db_direct==0);
01081 #endif
01082 {
01083 drms_send_commandcode(session->sockfd, DRMS_BULK_INSERT_ARRAY);
01084 return db_client_bulk_insert_array(session->sockfd,table,
01085 n_rows, n_args, intype, argin );
01086 }
01087 }
01088
01089
01090 int drms_sequence_drop(DRMS_Session_t *session, char *table)
01091 {
01092 #ifndef DRMS_CLIENT
01093 if (session->db_direct)
01094 {
01095 return db_sequence_drop(session->db_handle, table);
01096 }
01097 else
01098 #else
01099 XASSERT(session->db_direct==0);
01100 #endif
01101 {
01102 drms_send_commandcode(session->sockfd, DRMS_SEQUENCE_DROP);
01103 return db_client_sequence_drop(session->sockfd, table);
01104 }
01105 }
01106
01107 int drms_sequence_create(DRMS_Session_t *session, char *table)
01108 {
01109 #ifndef DRMS_CLIENT
01110 if (session->db_direct)
01111 {
01112 return db_sequence_create(session->db_handle, table);
01113 }
01114 else
01115 #else
01116 XASSERT(session->db_direct==0);
01117 #endif
01118 {
01119 drms_send_commandcode(session->sockfd, DRMS_SEQUENCE_CREATE);
01120 return db_client_sequence_create(session->sockfd, table);
01121 }
01122 }
01123
01124
01125 long long *drms_sequence_getnext(DRMS_Session_t *session, char *table, int n)
01126 {
01127 #ifndef DRMS_CLIENT
01128 if (session->db_direct)
01129 {
01130 return db_sequence_getnext_n(session->db_handle, table, n);
01131 }
01132 else
01133 #else
01134 XASSERT(session->db_direct==0);
01135 #endif
01136 {
01137 drms_send_commandcode(session->sockfd, DRMS_SEQUENCE_GETNEXT);
01138 return db_client_sequence_getnext_n(session->sockfd, table, n);
01139 }
01140 }
01141
01142
01143
01144
01145 long long drms_sequence_getcurrent(DRMS_Session_t *session, char *table)
01146 {
01147 #ifndef DRMS_CLIENT
01148 if (session->db_direct)
01149 {
01150 return db_sequence_getcurrent(session->db_handle, table);
01151 }
01152 else
01153 #else
01154 XASSERT(session->db_direct==0);
01155 #endif
01156 {
01157 drms_send_commandcode(session->sockfd, DRMS_SEQUENCE_GETCURRENT);
01158 return db_client_sequence_getcurrent(session->sockfd, table);
01159 }
01160 }
01161
01162
01163
01164 long long drms_sequence_getlast(DRMS_Session_t *session, char *table)
01165 {
01166 #ifndef DRMS_CLIENT
01167 if (session->db_direct)
01168 {
01169 return db_sequence_getlast(session->db_handle, table);
01170 }
01171 else
01172 #else
01173 XASSERT(session->db_direct==0);
01174 #endif
01175 {
01176 drms_send_commandcode(session->sockfd, DRMS_SEQUENCE_GETLAST);
01177 return db_client_sequence_getlast(session->sockfd, table);
01178 }
01179 }
01180
01181
01182
01183
01184 long long *drms_alloc_recnum(DRMS_Env_t *env, char *series,
01185 DRMS_RecLifetime_t lifetime, int n)
01186 {
01187 int i, status;
01188 long long *seqnums;
01189 struct iovec vec[4];
01190 int tmp[2], len;
01191
01192 #ifndef DRMS_CLIENT
01193 if (env->session->db_direct)
01194 {
01195 seqnums = db_sequence_getnext_n(env->session->db_handle, series, n);
01196 if (lifetime == DRMS_TRANSIENT)
01197 {
01198 drms_server_transient_records(env, series, n, seqnums);
01199 }
01200 return seqnums;
01201 }
01202 else
01203 #else
01204 XASSERT(env->session->db_direct==0);
01205 #endif
01206 {
01207 int sockfd = env->session->sockfd;
01208 drms_send_commandcode(sockfd, DRMS_ALLOC_RECNUM);
01209 vec[1].iov_len = strlen(series);
01210 vec[1].iov_base = series;
01211 len = htonl(vec[1].iov_len);
01212 vec[0].iov_len = sizeof(len);
01213 vec[0].iov_base = &len;
01214 tmp[0] = htonl(n);
01215 vec[2].iov_len = sizeof(tmp[0]);
01216 vec[2].iov_base = &tmp[0];
01217 tmp[1] = htonl((int)lifetime);
01218 vec[3].iov_len = sizeof(tmp[1]);
01219 vec[3].iov_base = &tmp[1];
01220 Writevn(sockfd, vec, 4);
01221
01222 status = Readint(sockfd);
01223 if (status==0)
01224 {
01225 seqnums = malloc(n*sizeof(long long));
01226 XASSERT(seqnums);
01227 for (i=0; i<n; i++)
01228 seqnums[i] = (long long)Readlonglong(sockfd);
01229 return seqnums;
01230 }
01231 else
01232 return NULL;
01233 }
01234 }
01235
01236
01237
01238
01239
01240
01241
01242
01243
01244
01245
01246 DRMS_StorageUnit_t *drms_getunit_internal(DRMS_Env_t *env, char *series,
01247 long long sunum, int retrieve,
01248 int gotosums, int *status)
01249 {
01250 HContainer_t *scon=NULL;
01251 int stat;
01252 DRMS_StorageUnit_t *su;
01253 char hashkey[DRMS_MAXHASHKEYLEN];
01254 DRMS_Record_t *template;
01255 #ifdef DRMS_CLIENT
01256 char *sudir;
01257 XASSERT(env->session->db_direct==0);
01258 #endif
01259
01260 if ((su = drms_su_lookup(env, series, sunum, &scon)) == NULL)
01261 {
01262
01263
01264 if (!scon)
01265 {
01266 scon = hcon_allocslot(&env->storageunit_cache,series);
01267 hcon_init(scon, sizeof(DRMS_StorageUnit_t), DRMS_MAXHASHKEYLEN,
01268 (void (*)(const void *)) drms_su_freeunit, NULL);
01269 }
01270
01271 sprintf(hashkey,DRMS_SUNUM_FORMAT, sunum);
01272 su = hcon_allocslot(scon,hashkey);
01273 #ifdef DEBUG
01274 printf("getunit: Got su = %p. Now has %d slots from '%s'\n",su,
01275 hcon_size(scon), series);
01276 #endif
01277
01278
01279 su->sunum = sunum;
01280 su->mode = DRMS_READONLY;
01281
01282 su->nfree = 0;
01283 su->state = NULL;
01284 su->recnum = NULL;
01285 su->refcount = 0;
01286
01287 if ((template = drms_template_record(env, series,&stat)) == NULL)
01288 goto bailout;
01289 su->seriesinfo = template->seriesinfo;
01290
01291 if (gotosums)
01292 {
01293
01294 #ifndef DRMS_CLIENT
01295
01296
01297 stat = drms_su_getsudir(env, su, retrieve);
01298 #ifdef DEBUG
01299 printf("drms_getunit: Got sudir from SUMS = '%s'\n",su->sudir);
01300 #endif
01301 #else
01302 {
01303 int len, retrieve_tmp;
01304 long long sunum_tmp;
01305 struct iovec vec[4];
01306
01307 drms_send_commandcode(env->session->sockfd, DRMS_GETUNIT);
01308
01309
01310 vec[1].iov_len = strlen(series);
01311 vec[1].iov_base = series;
01312 len = htonl(vec[1].iov_len);
01313 vec[0].iov_len = sizeof(len);
01314 vec[0].iov_base = &len;
01315 sunum_tmp = htonll(sunum);
01316 vec[2].iov_len = sizeof(sunum_tmp);
01317 vec[2].iov_base = &sunum_tmp;
01318 retrieve_tmp = htonl(retrieve);
01319 vec[3].iov_len = sizeof(retrieve_tmp);
01320 vec[3].iov_base = &retrieve_tmp;
01321 Writevn(env->session->sockfd, vec, 4);
01322
01323 stat = Readint(env->session->sockfd);
01324 if (stat == DRMS_SUCCESS)
01325 {
01326 sudir = receive_string(env->session->sockfd);
01327 #ifdef DEBUG
01328 printf("drms_getunit: Got sudir from DRMS server = '%s', stat = %d\n",
01329 sudir,stat);
01330 #endif
01331 strncpy(su->sudir, sudir, sizeof(su->sudir));
01332 free(sudir);
01333 }
01334 }
01335 #endif
01336
01337 if (!strlen(su->sudir))
01338 {
01339
01340
01341
01342
01343
01344
01345
01346
01347
01348
01349
01350
01351
01352 if (!retrieve)
01353 {
01354
01355
01356
01357 hcon_remove(scon, hashkey);
01358 }
01359
01360 su = NULL;
01361 }
01362 }
01363
01364 if (stat)
01365 {
01366 hcon_remove(scon, hashkey);
01367 su = NULL;
01368 }
01369 }
01370 else
01371 {
01372
01373
01374
01375 if (*su->sudir == '\0')
01376 {
01377 su = NULL;
01378 }
01379 stat = DRMS_SUCCESS;
01380 }
01381
01382 bailout:
01383 if (status)
01384 *status = stat;
01385 return su;
01386 }
01387
01388 DRMS_StorageUnit_t *drms_getunit(DRMS_Env_t *env, char *series,
01389 long long sunum, int retrieve, int *status)
01390 {
01391
01392 return drms_getunit_internal(env, series, sunum, retrieve, 1, status);
01393 }
01394
01395 DRMS_StorageUnit_t *drms_getunit_nosums(DRMS_Env_t *env, char *series,
01396 long long sunum, int *status)
01397 {
01398
01399 return drms_getunit_internal(env, series, sunum, 0, 0, status);
01400 }
01401
01402
01403
01404
01405 int drms_getunits_internal(DRMS_Env_t *env,
01406 int n,
01407 DRMS_SuAndSeries_t *suandseries,
01408 int retrieve,
01409 int dontwait)
01410 {
01411 HContainer_t *scon=NULL;
01412 int stat = DRMS_SUCCESS;
01413 DRMS_StorageUnit_t *su;
01414 char hashkey[DRMS_MAXHASHKEYLEN];
01415 DRMS_Record_t *template;
01416 #ifdef DRMS_CLIENT
01417 char *sudir;
01418 XASSERT(env->session->db_direct==0);
01419 #endif
01420
01421 DRMS_StorageUnit_t **su_nc;
01422 int cnt;
01423
01424 #ifdef DRMS_CLIENT
01425 char **tosend = (char **)malloc(n * sizeof(char *));
01426 #endif
01427
01428
01429 dontwait = 0;
01430
01431 su_nc = malloc(n*sizeof(DRMS_StorageUnit_t *));
01432 XASSERT(su_nc);
01433 #ifdef DEBUG
01434 printf("getunit: Called, n=%d, series=%s\n", n, series);
01435 #endif
01436
01437
01438 cnt = 0;
01439 for (int i = 0; i < n; i++) {
01440 if ((template = drms_template_record(env, suandseries[i].series, &stat)) == NULL)
01441 goto bailout;
01442
01443 if ((su = drms_su_lookup(env, suandseries[i].series, suandseries[i].sunum, &scon)) == NULL) {
01444 if (!scon)
01445 {
01446 scon = hcon_allocslot(&env->storageunit_cache, suandseries[i].series);
01447 hcon_init(scon, sizeof(DRMS_StorageUnit_t), DRMS_MAXHASHKEYLEN,
01448 (void (*)(const void *)) drms_su_freeunit, NULL);
01449 }
01450
01451 sprintf(hashkey,DRMS_SUNUM_FORMAT, suandseries[i].sunum);
01452 su = hcon_allocslot(scon,hashkey);
01453 #ifdef DEBUG
01454 printf("getunit: Got su = %p. Now has %d slots from '%s'\n",su,
01455 hcon_size(scon), suandseries[i].series);
01456 #endif
01457
01458
01459 su->sunum = suandseries[i].sunum;
01460 su->sudir[0] = '\0';
01461 su->mode = DRMS_READONLY;
01462
01463 su->nfree = 0;
01464 su->state = NULL;
01465 su->recnum = NULL;
01466 su->refcount = 0;
01467 su->seriesinfo = template->seriesinfo;
01468 su_nc[cnt] = su;
01469
01470 #ifdef DRMS_CLIENT
01471
01472
01473 tosend[cnt] = suandseries[i].series;
01474 #endif
01475
01476 cnt++;
01477 }
01478 }
01479
01480 #ifndef DRMS_CLIENT
01481
01482 if (cnt) {
01483 stat = drms_su_getsudirs(env, cnt, su_nc, retrieve, dontwait);
01484 }
01485 #else
01486 int icnt;
01487 if (cnt) {
01488 long long *sunum_tmp;
01489 struct iovec *vec;
01490
01491 drms_send_commandcode(env->session->sockfd, DRMS_GETUNITS);
01492
01493
01494
01495
01496 Writeint(env->session->sockfd, cnt);
01497
01498
01499 for (icnt = 0; icnt < cnt; icnt++)
01500 {
01501 send_string(env->session->sockfd, tosend[icnt]);
01502 }
01503
01504
01505 vec = malloc(cnt*sizeof(struct iovec));
01506 XASSERT(vec);
01507 sunum_tmp = malloc(cnt*sizeof(long long));
01508 XASSERT(sunum_tmp);
01509 for (int i = 0; i < cnt; i++) {
01510 sunum_tmp[i] = htonll(su_nc[i]->sunum);
01511 vec[i].iov_len = sizeof(sunum_tmp[i]);
01512 vec[i].iov_base = &sunum_tmp[i];
01513 }
01514 Writevn(env->session->sockfd, vec, cnt);
01515 free(sunum_tmp);
01516 free(vec);
01517
01518
01519 Writeint(env->session->sockfd, retrieve);
01520
01521
01522 Writeint(env->session->sockfd, dontwait);
01523
01524 stat = Readint(env->session->sockfd);
01525 if (stat == DRMS_SUCCESS) {
01526 if (!dontwait) {
01527 for (int i = 0; i < cnt; i++) {
01528 sudir = receive_string(env->session->sockfd);
01529 strncpy(su_nc[i]->sudir, sudir, sizeof(su->sudir));
01530 free(sudir);
01531 }
01532 }
01533 }
01534 }
01535 #endif
01536 for (int i = 0; i < cnt; i++) {
01537 if (stat || !strlen(su_nc[i]->sudir)) {
01538 drms_su_lookup(env, su_nc[i]->seriesinfo->seriesname, su_nc[i]->sunum, &scon);
01539 sprintf(hashkey,DRMS_SUNUM_FORMAT, su_nc[i]->sunum);
01540 hcon_remove(scon, hashkey);
01541 }
01542 }
01543
01544 bailout:
01545 free(su_nc);
01546
01547 return stat;
01548 }
01549
01550 int drms_getunits(DRMS_Env_t *env,
01551 char *series,
01552 int n,
01553 long long *sunum,
01554 int retrieve,
01555 int dontwait)
01556 {
01557 DRMS_SuAndSeries_t *arr = malloc(n * sizeof(DRMS_SuAndSeries_t));
01558 int isu;
01559 char *dup = strdup(series);
01560 int ret = 0;
01561
01562 if (arr && dup)
01563 {
01564
01565 dontwait = 0;
01566
01567 for (isu = 0; isu < n; isu++)
01568 {
01569 arr[isu].sunum = sunum[isu];
01570 arr[isu].series = dup;
01571 }
01572
01573 ret = drms_getunits_internal(env, n, arr, retrieve, dontwait);
01574
01575 free(dup);
01576 free(arr);
01577 }
01578
01579 return ret;
01580 }
01581
01582
01583 int drms_getunits_ex(DRMS_Env_t *env,
01584 int num,
01585 DRMS_SuAndSeries_t *suandseries,
01586 int retrieve,
01587 int dontwait)
01588 {
01589
01590 dontwait = 0;
01591
01592 return drms_getunits_internal(env, num, suandseries, retrieve, dontwait);
01593 }
01594
01595 int drms_getsuinfo(DRMS_Env_t *env, long long *sunums, int nReqs, SUM_info_t **infostructs)
01596 {
01597 int status = DRMS_SUCCESS;
01598
01599 #ifndef DRMS_CLIENT
01600
01601 status = drms_su_getinfo(env, sunums, nReqs, infostructs);
01602 #else
01603 DB_Type_t type;
01604
01605 SUM_info_t *info = NULL;
01606
01607 int isunum;
01608 long long onesunum;
01609 long long sunum;
01610 char *online_loc = NULL;
01611 char *online_status = NULL;
01612 char *archive_status = NULL;
01613 char *offsite_ack = NULL;
01614 char *history_comment= NULL;
01615 char *owning_series = NULL;
01616 int storage_group;
01617 double *pbytes = NULL;
01618 double bytes;
01619 char *creat_date = NULL;
01620 char *username = NULL;
01621 char *arch_tape = NULL;
01622 int arch_tape_fn;
01623 char *arch_tape_date = NULL;
01624 char *safe_tape = NULL;
01625 int safe_tape_fn;
01626 char *safe_tape_date = NULL;
01627 int pa_status;
01628 int pa_substatus;
01629 char *effective_date = NULL;
01630
01631 drms_send_commandcode(env->session->sockfd, DRMS_GETSUINFO);
01632
01633 Writeint(env->session->sockfd, nReqs);
01634 for (isunum = 0; isunum < nReqs; isunum++)
01635 {
01636 onesunum = sunums[isunum];
01637 Writelonglong(env->session->sockfd, onesunum);
01638 }
01639
01640 status = Readint(env->session->sockfd);
01641
01642 if (status == DRMS_SUCCESS)
01643 {
01644 for (isunum = 0; isunum < nReqs; isunum++)
01645 {
01646
01647 sunum = Readlonglong(env->session->sockfd);
01648 online_loc = receive_string(env->session->sockfd);
01649 online_status = receive_string(env->session->sockfd);
01650 archive_status = receive_string(env->session->sockfd);
01651 offsite_ack = receive_string(env->session->sockfd);
01652 history_comment = receive_string(env->session->sockfd);
01653 owning_series = receive_string(env->session->sockfd);
01654 storage_group = Readint(env->session->sockfd);
01655 pbytes = (double *)Read_dbtype(&type, env->session->sockfd);
01656 bytes = *pbytes;
01657 creat_date = receive_string(env->session->sockfd);
01658 username = receive_string(env->session->sockfd);
01659 arch_tape = receive_string(env->session->sockfd);
01660 arch_tape_fn = Readint(env->session->sockfd);
01661 arch_tape_date = receive_string(env->session->sockfd);
01662 safe_tape = receive_string(env->session->sockfd);
01663 safe_tape_fn = Readint(env->session->sockfd);
01664 safe_tape_date = receive_string(env->session->sockfd);
01665 pa_status = Readint(env->session->sockfd);
01666 pa_substatus = Readint(env->session->sockfd);
01667 effective_date = receive_string(env->session->sockfd);
01668
01669 infostructs[isunum] = (SUM_info_t *)malloc(sizeof(SUM_info_t));
01670 info = infostructs[isunum];
01671
01672 info->sunum = sunum;
01673 snprintf(info->online_loc, sizeof(info->online_loc), "%s", online_loc);
01674 snprintf(info->online_status, sizeof(info->online_status), "%s", online_status);
01675 snprintf(info->archive_status, sizeof(info->archive_status), "%s", archive_status);
01676 snprintf(info->offsite_ack, sizeof(info->offsite_ack), "%s", offsite_ack);
01677 snprintf(info->history_comment, sizeof(info->history_comment), "%s", history_comment);
01678 snprintf(info->owning_series, sizeof(info->owning_series), "%s", owning_series);
01679 info->storage_group = storage_group;
01680 info->bytes = bytes;
01681 snprintf(info->creat_date, sizeof(info->creat_date), "%s", creat_date);
01682 snprintf(info->username, sizeof(info->username), "%s", username);
01683 snprintf(info->arch_tape, sizeof(info->arch_tape), "%s", arch_tape);
01684 info->arch_tape_fn = arch_tape_fn;
01685 snprintf(info->arch_tape_date, sizeof(info->arch_tape_date), "%s", arch_tape_date);
01686 snprintf(info->safe_tape, sizeof(info->safe_tape), "%s", safe_tape);
01687 info->safe_tape_fn = safe_tape_fn;
01688 snprintf(info->safe_tape_date, sizeof(info->safe_tape_date), "%s", safe_tape_date);
01689 info->pa_status = pa_status;
01690 info->pa_substatus = pa_substatus;
01691 snprintf(info->effective_date, sizeof(info->effective_date), "%s", effective_date);
01692
01693 free(online_loc);
01694 free(online_status);
01695 free(archive_status);
01696 free(offsite_ack);
01697 free(history_comment);
01698 free(owning_series);
01699 free(pbytes);
01700 free(creat_date);
01701 free(username);
01702 free(arch_tape);
01703 free(arch_tape_date);
01704 free(safe_tape);
01705 free(safe_tape_date);
01706 free(effective_date);
01707
01708
01709 }
01710 }
01711 #endif
01712
01713 return status;
01714 }
01715
01716 int drms_getsudir(DRMS_Env_t *env, DRMS_StorageUnit_t *su, int retrieve)
01717 {
01718 int status = DRMS_SUCCESS;
01719
01720 #ifdef DRMS_CLIENT
01721 XASSERT(env->session->db_direct==0);
01722 #endif
01723
01724 #ifndef DRMS_CLIENT
01725
01726 status = drms_su_getsudir(env, su, retrieve);
01727 #else
01728 {
01729 char *sudir;
01730
01731 drms_send_commandcode(env->session->sockfd, DRMS_GETSUDIR);
01732
01733
01734
01735
01736
01737
01738
01739
01740
01741
01742
01743
01744 Writelonglong(env->session->sockfd, su->sunum);
01745 Writeint(env->session->sockfd, retrieve);
01746
01747 sudir = receive_string(env->session->sockfd);
01748 status = Readint(env->session->sockfd);
01749
01750 if (status == DRMS_SUCCESS)
01751 {
01752 snprintf(su->sudir, DRMS_MAXPATHLEN, "%s", sudir);
01753 }
01754 else if (status == DRMS_REMOTESUMS_TRYLATER)
01755 {
01756 *(su->sudir) = '\0';
01757 }
01758 }
01759 #endif
01760
01761 return status;
01762 }
01763
01764 int drms_getsudirs(DRMS_Env_t *env, DRMS_StorageUnit_t **su, int num, int retrieve, int dontwait)
01765 {
01766 int status = DRMS_SUCCESS;
01767
01768
01769 dontwait = 0;
01770
01771 #ifdef DRMS_CLIENT
01772 XASSERT(env->session->db_direct==0);
01773 #endif
01774
01775 #ifndef DRMS_CLIENT
01776
01777 status = drms_su_getsudirs(env, num, su, retrieve, dontwait);
01778 #else
01779 {
01780 char *sudir;
01781 DRMS_StorageUnit_t *onesu = NULL;
01782 int isu;
01783
01784 drms_send_commandcode(env->session->sockfd, DRMS_GETSUDIRS);
01785
01786
01787
01788
01789
01790
01791
01792
01793
01794
01795
01796 Writeint(env->session->sockfd, num);
01797 for (isu = 0; isu < num; isu++)
01798 {
01799 onesu = su[isu];
01800 Writelonglong(env->session->sockfd, onesu->sunum);
01801 }
01802
01803 Writeint(env->session->sockfd, retrieve);
01804 Writeint(env->session->sockfd, dontwait);
01805
01806 status = Readint(env->session->sockfd);
01807
01808 for (isu = 0; isu < num; isu++)
01809 {
01810 onesu = su[isu];
01811 sudir = receive_string(env->session->sockfd);
01812
01813 if (status == DRMS_SUCCESS)
01814 {
01815 snprintf(onesu->sudir, DRMS_MAXPATHLEN, "%s", sudir);
01816 }
01817 else if (status == DRMS_REMOTESUMS_TRYLATER)
01818 {
01819 *(onesu->sudir) = '\0';
01820 }
01821 }
01822 }
01823 #endif
01824
01825 return status;
01826 }
01827
01828
01829
01830 int drms_newslots_internal(DRMS_Env_t *env, int n, char *series, long long *recnum,
01831 DRMS_RecLifetime_t lifetime, int *slotnum,
01832 DRMS_StorageUnit_t **su,
01833 int createslotdirs,
01834 int gotosums)
01835 {
01836 int status, i;
01837 long long sunum;
01838 DRMS_Record_t *template;
01839 char *sudir;
01840 char hashkey[DRMS_MAXHASHKEYLEN];
01841 HContainer_t *scon=NULL;
01842
01843 if (!slotnum || !su || !series)
01844 return 1;
01845
01846 #ifndef DRMS_CLIENT
01847 if (env->session->db_direct)
01848 {
01849 if (gotosums)
01850 {
01851
01852 return drms_su_newslots(env, n, series, recnum, lifetime, slotnum, su, createslotdirs);
01853 }
01854 else
01855 {
01856
01857 return drms_su_newslots_nosums(env, n, series, recnum, lifetime, slotnum, su, createslotdirs);
01858 }
01859 }
01860 else
01861 #else
01862 XASSERT(env->session->db_direct==0);
01863 #endif
01864 {
01865 int tmp[6], *t;
01866 long long *ltmp, *lt;
01867 struct iovec *vec, *v;
01868
01869 drms_send_commandcode(env->session->sockfd, DRMS_NEWSLOTS);
01870 if (n==0)
01871 {
01872 send_string(env->session->sockfd, series);
01873 Writeint(env->session->sockfd, n);
01874 return 0;
01875 }
01876
01877 ltmp = malloc(n*sizeof(long long));
01878 XASSERT(ltmp);
01879 vec = malloc((n+6)*sizeof(struct iovec));
01880 XASSERT(vec);
01881
01882
01883
01884 t = tmp; v = vec;
01885 net_packstring(series, t++, v); v+=2;
01886 net_packint(n, t++, v++);
01887 net_packint((int)lifetime, t++, v++);
01888 net_packint(createslotdirs, t++, v++);
01889
01890
01891 net_packint(gotosums, t++, v++);
01892
01893 lt = ltmp;
01894 for (i=0; i<n; i++)
01895 net_packlonglong(recnum[i], lt++, v++);
01896 Writevn(env->session->sockfd, vec, n+6);
01897 free(vec);
01898 free(ltmp);
01899
01900 status = Readint(env->session->sockfd);
01901 if (status==DRMS_SUCCESS)
01902 {
01903 if ((template = drms_template_record(env, series, &status))==NULL)
01904 return status;
01905 for (i=0; i<n; i++)
01906 {
01907 sunum = Readlonglong(env->session->sockfd);
01908 sudir = receive_string(env->session->sockfd);
01909 slotnum[i] = Readint(env->session->sockfd);
01910
01911
01912 scon = NULL;
01913 if ((su[i] = drms_su_lookup(env, series, sunum, &scon)) == NULL)
01914 {
01915
01916
01917 if (!scon)
01918 {
01919 scon = hcon_allocslot(&env->storageunit_cache,series);
01920 hcon_init(scon, sizeof(DRMS_StorageUnit_t), DRMS_MAXHASHKEYLEN,
01921 (void (*)(const void *)) drms_su_freeunit, NULL);
01922 }
01923
01924 sprintf(hashkey,DRMS_SUNUM_FORMAT, sunum);
01925 su[i] = hcon_allocslot(scon,hashkey);
01926
01927
01928 su[i]->sunum = sunum;
01929 strncpy(su[i]->sudir, sudir, sizeof(su[i]->sudir));
01930 su[i]->seriesinfo = template->seriesinfo;
01931 su[i]->mode = DRMS_READWRITE;
01932 su[i]->nfree = 0;
01933 su[i]->state = NULL;
01934 su[i]->recnum = NULL;
01935 su[i]->refcount = 0;
01936 }
01937 else
01938 {
01939 if (strcmp(su[i]->sudir, sudir))
01940 {
01941 fprintf(stderr,"ERROR: Storage unit %s:#%lld seen with different "
01942 "sudirs: '%s' != '%s'",
01943 series, sunum, su[i]->sudir, sudir);
01944 return 1;
01945 }
01946 }
01947 #ifdef DEBUG
01948 printf("Client received sunum = %lld\n",su[i]->sunum);
01949 printf("Client received sudir = '%s'\n",sudir);
01950 printf("Client: su->sudir = '%s'\n",su[i]->sudir);
01951 #endif
01952 free(sudir);
01953 }
01954 }
01955 else
01956 for (i=0; i<n; i++)
01957 su[i] = NULL;
01958 }
01959
01960 return status;
01961 }
01962
01963 int drms_newslots(DRMS_Env_t *env, int n, char *series, long long *recnum,
01964 DRMS_RecLifetime_t lifetime, int *slotnum,
01965 DRMS_StorageUnit_t **su,
01966 int createslotdirs)
01967 {
01968
01969 return drms_newslots_internal(env, n, series, recnum, lifetime, slotnum, su, createslotdirs, 1);
01970 }
01971
01972 int drms_newslots_nosums(DRMS_Env_t *env, int n, char *series, long long *recnum,
01973 DRMS_RecLifetime_t lifetime, int *slotnum,
01974 DRMS_StorageUnit_t **su,
01975 int createslotdirs)
01976 {
01977
01978 return drms_newslots_internal(env, n, series, recnum, lifetime, slotnum, su, createslotdirs, 1);
01979 }
01980
01981
01982
01983
01984 int drms_slot_setstate(DRMS_Env_t *env, char *series, long long sunum,
01985 int slotnum, int state)
01986 {
01987
01988 #ifndef DRMS_CLIENT
01989 if (env->session->db_direct)
01990 {
01991 if (state == DRMS_SLOT_FREE)
01992 return drms_su_freeslot(env, series, sunum, slotnum);
01993 else
01994 return drms_su_markslot(env, series, sunum, slotnum, &state) != NULL;
01995 }
01996 else
01997 #else
01998 XASSERT(env->session->db_direct==0);
01999 #endif
02000 {
02001 int len,tmp[3];
02002 long long sunum_tmp;
02003 struct iovec vec[5];
02004
02005 drms_send_commandcode(env->session->sockfd, DRMS_SLOT_SETSTATE);
02006
02007
02008 vec[1].iov_len = strlen(series);
02009 vec[1].iov_base = series;
02010 len = htonl(vec[1].iov_len);
02011 vec[0].iov_len = sizeof(len);
02012 vec[0].iov_base = &len;
02013 sunum_tmp = htonll(sunum);
02014 vec[2].iov_len = sizeof(sunum_tmp);
02015 vec[2].iov_base = &sunum_tmp;
02016 tmp[1] = htonl(slotnum);
02017 vec[3].iov_len = sizeof(tmp[1]);
02018 vec[3].iov_base = &tmp[1];
02019 tmp[2] = htonl(state);
02020 vec[4].iov_len = sizeof(tmp[2]);
02021 vec[4].iov_base = &tmp[2];
02022 Writevn(env->session->sockfd, vec, 5);
02023
02024 return Readint(env->session->sockfd);
02025 }
02026 }
02027
02028 int drms_dropseries(DRMS_Env_t *env, const char *series, DRMS_Array_t *vec)
02029 {
02030 #ifndef DRMS_CLIENT
02031 return drms_server_dropseries_su(env, series, vec);
02032 #else
02033 XASSERT(env->session->db_direct==0);
02034
02035 if (!env->session->db_direct)
02036 {
02037
02038
02039
02040 drms_send_commandcode(env->session->sockfd, DRMS_DROPSERIES);
02041 send_string(env->session->sockfd, series);
02042
02043
02044
02045 int irow = -1;
02046 long long *data = (long long *)vec->data;
02047
02048
02049 Writeint(env->session->sockfd, vec->axis[1]);
02050
02051
02052 for (irow = 0; irow < vec->axis[1]; irow++)
02053 {
02054 Writelonglong(env->session->sockfd, data[irow]);
02055 }
02056 }
02057
02058 return 0;
02059 #endif
02060 }
02061
02062
02063
02064 int drms_create_series(DRMS_Record_t *rec, int perms)
02065 {
02066 int status;
02067 DRMS_Record_t *template;
02068 DRMS_Env_t *env;
02069 char *series;
02070
02071 series = rec->seriesinfo->seriesname;
02072 env = rec->env;
02073
02074
02075
02076 if (drms_template_record(env, series, &status) != NULL)
02077 {
02078 fprintf(stderr,"drms_create_series(): "
02079 "ERROR: Cannot create series '%s' because it already exists.\n", series);
02080 return 1;
02081 }
02082 template = (DRMS_Record_t *)hcon_allocslot_lower(&env->series_cache, series);
02083 drms_copy_record_struct(template, rec);
02084
02085
02086 for (int i = 0; i < template->seriesinfo->pidx_num; i++) {
02087 template->seriesinfo->pidx_keywords[i] = drms_keyword_lookup(template, rec->seriesinfo->pidx_keywords[i]->info->name, 0);
02088 }
02089
02090 status = drms_insert_series(env->session, 0, template, perms);
02091 if (!status && !env->session->db_direct)
02092 {
02093 drms_send_commandcode(env->session->sockfd, DRMS_NEWSERIES);
02094 send_string(env->session->sockfd, series);
02095 }
02096 return status;
02097 }
02098
02099
02100
02101 int drms_update_series(DRMS_Record_t *rec, int perms)
02102 {
02103 int status;
02104 DRMS_Record_t *template;
02105 DRMS_Env_t *env;
02106 char *series;
02107
02108 series = rec->seriesinfo->seriesname;
02109 env = rec->env;
02110
02111
02112
02113 template = drms_template_record(env, series, &status);
02114
02115 if (!template)
02116 {
02117 fprintf(stderr,"ERROR: Cannot update series '%s' because it does not "
02118 "exists.\n", series);
02119 return 1;
02120 }
02121 drms_copy_record_struct(template, rec);
02122 status = drms_insert_series(env->session, 1, template, perms);
02123
02124
02125
02126 if (!status && !env->session->db_direct)
02127 {
02128 drms_send_commandcode(env->session->sockfd, DRMS_NEWSERIES);
02129 send_string(env->session->sockfd, series);
02130 }
02131 return status;
02132 }
02133
02134
02135
02136
02137
02138 int drms_create_series_fromprototype(DRMS_Record_t **prototype,
02139 const char *outSeriesName,
02140 int perms)
02141 {
02142 int status = DRMS_SUCCESS;
02143
02144 if (prototype && *prototype && strlen(outSeriesName) < DRMS_MAXSERIESNAMELEN)
02145 {
02146 char *user = getenv("USER");
02147 DRMS_Record_t *proto = *prototype;
02148
02149 strcpy(proto->seriesinfo->seriesname, outSeriesName);
02150
02151
02152 if (proto->env->session->db_direct &&
02153 proto->env->session->db_handle->dbuser &&
02154 strlen(proto->env->session->db_handle->dbuser))
02155 {
02156 strcpy(proto->seriesinfo->owner, proto->env->session->db_handle->dbuser);
02157 }
02158 else if (!proto->seriesinfo->owner || strlen(proto->seriesinfo->owner) == 0)
02159 {
02160 if (user && strlen(user) < DRMS_MAXOWNERLEN)
02161 {
02162 strcpy(proto->seriesinfo->owner, user);
02163 }
02164 else
02165 {
02166 strcpy(proto->seriesinfo->owner, "unknown");
02167 }
02168 }
02169
02170 status = drms_create_series(proto, perms);
02171 }
02172 else
02173 {
02174 status = DRMS_ERROR_INVALIDDATA;
02175 }
02176
02177 if (prototype)
02178 {
02179 drms_destroy_recproto(prototype);
02180 }
02181
02182 return status;
02183 }
02184
02185 int drms_series_hastableprivs(DRMS_Env_t *env, const char *schema, const char *table, const char *priv)
02186 {
02187 char query[DRMS_MAXQUERYLEN];
02188 int result = 0;
02189 DB_Text_Result_t *qres = NULL;
02190
02191 sprintf(query, "SELECT has_table_privilege('%s.%s', '%s')", schema, table, priv);
02192
02193 if ((qres = drms_query_txt(env->session, query)) != NULL && qres->num_rows == 1 && qres->num_cols == 1)
02194 {
02195 if (*(qres->field[0][0]) == 't')
02196 {
02197 result = 1;
02198 }
02199 }
02200
02201 if (qres)
02202 {
02203 db_free_text_result(qres);
02204 }
02205
02206 return result;
02207 }
02208
02209 static int drms_series_candosomething(DRMS_Env_t *env, const char *series, const char *perm)
02210 {
02211 int result = 0;
02212 char *namespace = NULL;
02213 char *sname = NULL;
02214
02215 if (!get_namespace(series, &namespace, &sname))
02216 {
02217 result = drms_series_hastableprivs(env, namespace, sname, perm);
02218 }
02219
02220 if (namespace)
02221 {
02222 free(namespace);
02223 }
02224
02225 if (sname)
02226 {
02227 free(sname);
02228 }
02229
02230 return result;
02231 }
02232
02233 int drms_series_cancreaterecord(DRMS_Env_t *env, const char *series)
02234 {
02235 return drms_series_candosomething(env, series, "INSERT");
02236 }
02237
02238 int drms_series_candeleterecord(DRMS_Env_t *env, const char *series)
02239 {
02240 return drms_series_candosomething(env, series, "DELETE");
02241 }
02242
02243 int drms_series_canupdaterecord(DRMS_Env_t *env, const char *series)
02244 {
02245 return drms_series_candosomething(env, series, "UPDATE");
02246 }
02247
02248
02249 int drms_query_tabexists(DRMS_Session_t *session, const char *ns, const char *tab, int *status)
02250 {
02251 int result = 0;
02252 char query[DRMS_MAXQUERYLEN];
02253 char schema[64];
02254 char table[64];
02255
02256 DB_Text_Result_t *qres = NULL;
02257
02258 if (status)
02259 {
02260 *status = DRMS_SUCCESS;
02261 }
02262
02263
02264 if (!ns || !tab)
02265 {
02266 return 0;
02267 }
02268
02269 snprintf(schema, sizeof(schema), "%s", ns);
02270 snprintf(table, sizeof(table), "%s", tab);
02271
02272 strtolower(schema);
02273 strtolower(table);
02274
02275 snprintf(query, sizeof(query), "SELECT n.nspname, c.relname FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = '%s' AND c.relname = '%s';", schema, table);
02276
02277 if ((qres = drms_query_txt(session, query)) == NULL)
02278 {
02279 if (status)
02280 {
02281 *status = DRMS_ERROR_BADDBQUERY;
02282 }
02283
02284 fprintf(stderr, "Bad database query '%s'\n", query);
02285 }
02286 else if (qres->num_rows > 0)
02287 {
02288 result = 1;
02289 }
02290
02291 if (qres)
02292 {
02293 db_free_text_result(qres);
02294 qres = NULL;
02295 }
02296
02297 return result;
02298 }
02299
02300
02301
02302
02303
02304
02305 int drms_series_isdbowner(DRMS_Env_t *env, const char *series, int *status)
02306 {
02307 int isowner = 0;
02308 char *nspace = NULL;
02309 char *relname = NULL;
02310 int istat = DRMS_SUCCESS;
02311 DB_Text_Result_t *qres = NULL;
02312
02313 if (!get_namespace(series, &nspace, &relname))
02314 {
02315 char query[1024];
02316 strtolower(nspace);
02317 strtolower(relname);
02318
02319 snprintf(query, sizeof(query), "SELECT pg_catalog.pg_get_userbyid(T1.relowner) AS owner FROM pg_catalog.pg_class AS T1, (SELECT oid FROM pg_catalog.pg_namespace WHERE nspname = '%s') AS T2 WHERE T1.relnamespace = T2.oid AND T1.relname = '%s'", nspace, relname);
02320
02321 if ((qres = drms_query_txt(env->session, query)) != NULL)
02322 {
02323 if (qres->num_cols == 1 && qres->num_rows == 1)
02324 {
02325 char *dbuser = NULL;
02326
02327 #ifndef DRMS_CLIENT
02328 dbuser = env->session->db_handle->dbuser;
02329 #else
02330 drms_send_commandcode(env->session->sockfd, DRMS_GETDBUSER);
02331 dbuser = receive_string(env->session->sockfd);
02332 #endif
02333 if (strcmp(qres->field[0][0], dbuser) == 0)
02334 {
02335 isowner = 1;
02336 }
02337
02338 #ifdef DRMS_CLIENT
02339 free(dbuser);
02340 dbuser = NULL;
02341 #endif
02342
02343 }
02344 else
02345 {
02346 fprintf(stderr, "Unexpected database response to query '%s'.\n", query);
02347 istat = DRMS_ERROR_BADDBQUERY;
02348 }
02349
02350 db_free_text_result(qres);
02351 qres = NULL;
02352 }
02353 else
02354 {
02355 fprintf(stderr, "Invalid query '%s'.\n", query);
02356 istat = DRMS_ERROR_BADDBQUERY;
02357 }
02358
02359 free(nspace);
02360 free(relname);
02361 }
02362 else
02363 {
02364 istat = DRMS_ERROR_OUTOFMEMORY;
02365 }
02366
02367 if (status)
02368 {
02369 *status = istat;
02370 }
02371
02372 return isowner;
02373 }
02374
02375
02376 int drms_client_isproduser(DRMS_Env_t *env, int *status)
02377 {
02378 static int isproduser = -1;
02379 char query[1024];
02380 char *dbuser = NULL;
02381 char *dbhost = NULL;
02382 char *pc = NULL;
02383 DB_Text_Result_t *qres = NULL;
02384 int istat = DRMS_SUCCESS;
02385 int forceconn = 0;
02386 DB_Handle_t *dbh = NULL;
02387
02388 if (isproduser == -1)
02389 {
02390 isproduser = 0;
02391 }
02392 else
02393 {
02394 return isproduser;
02395 }
02396
02397 #if defined(PRODUSER_DBHOST) && defined(PRODUSER_DBNAME) && defined(PRODUSER_PRODTAB) && defined(PRODUSER_COLUSER)
02398
02399 dbhost = strdup(PRODUSER_DBHOST);
02400
02401 if (!dbhost)
02402 {
02403 istat = DRMS_ERROR_OUTOFMEMORY;
02404 }
02405 else
02406 {
02407 pc = strchr(dbhost, ':');
02408 if (pc)
02409 {
02410 *pc = '\0';
02411 }
02412 }
02413
02414 if (istat == DRMS_SUCCESS)
02415 {
02416 #ifndef DRMS_CLIENT
02417 dbuser = env->session->db_handle->dbuser;
02418
02419
02420 forceconn = (strcasecmp(dbhost, env->session->db_handle->dbhost) != 0);
02421 if (env->verbose)
02422 {
02423 printf("Environment db host - %s, Query db host - %s.\n", env->session->db_handle->dbhost, dbhost);
02424 }
02425 #else
02426 drms_send_commandcode(env->session->sockfd, DRMS_GETDBUSER);
02427 dbuser = receive_string(env->session->sockfd);
02428
02429 if (!dbuser)
02430 {
02431 istat = DRMS_ERROR_OUTOFMEMORY;
02432 }
02433
02434
02435
02436
02437
02438
02439 forceconn = 1;
02440 #endif
02441 }
02442
02443 if (istat == DRMS_SUCCESS)
02444 {
02445 if (forceconn)
02446 {
02447
02448
02449
02450
02451
02452 if ((dbh = db_connect(PRODUSER_DBHOST, dbuser, NULL, PRODUSER_DBNAME, 1)) == NULL)
02453 {
02454 fprintf(stderr,"Couldn't connect to %s database on %s.\n", PRODUSER_DBNAME, PRODUSER_DBHOST);
02455 istat = DRMS_ERROR_CANTCONNECTTODB;
02456 }
02457 }
02458 else
02459 {
02460 dbh = env->session->db_handle;
02461 }
02462 }
02463
02464 if (istat == DRMS_SUCCESS)
02465 {
02466
02467
02468 int tabexists = 0;
02469 char *schema = NULL;
02470 char *table = NULL;
02471
02472 if (get_namespace(PRODUSER_PRODTAB, &schema, &table))
02473 {
02474 fprintf(stderr, "Out of memory in drms_client_isproduser().\n");
02475 istat = DRMS_ERROR_OUTOFMEMORY;
02476 }
02477 else
02478 {
02479 snprintf(query, sizeof(query), "SELECT n.nspname, c.relname FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = '%s' AND c.relname = '%s';", schema, table);
02480 if ((qres = db_query_txt(dbh, query)) != NULL)
02481 {
02482 if (qres->num_rows > 0)
02483 {
02484 tabexists = 1;
02485 }
02486
02487 db_free_text_result(qres);
02488 qres = NULL;
02489 }
02490
02491 free(schema);
02492 free(table);
02493 }
02494
02495 if (istat == DRMS_SUCCESS && tabexists)
02496 {
02497 snprintf(query, sizeof(query), "SELECT %s FROM %s WHERE %s = '%s'", PRODUSER_COLUSER, PRODUSER_PRODTAB, PRODUSER_COLUSER, dbuser);
02498
02499 if ((qres = db_query_txt(dbh, query)) != NULL)
02500 {
02501 if (qres->num_cols == 1 && qres->num_rows == 1)
02502 {
02503 isproduser = 1;
02504 }
02505
02506 db_free_text_result(qres);
02507 }
02508 else
02509 {
02510 fprintf(stderr, "Unexpected database response to query '%s'.\n", query);
02511 }
02512 }
02513 }
02514
02515 if (forceconn && dbh)
02516 {
02517
02518 db_disconnect(&dbh);
02519 }
02520
02521 #ifdef DRMS_CLIENT
02522 if (dbuser)
02523 {
02524 free(dbuser);
02525 dbuser = NULL;
02526 }
02527 #endif
02528
02529 if (dbhost)
02530 {
02531 free(dbhost);
02532 }
02533
02534 if (status)
02535 {
02536 *status = istat;
02537 }
02538 #endif
02539
02540 return isproduser;
02541 }
02542
02543 int drms_setretention(DRMS_Env_t *env, int16_t newRetention, int nsus, long long *sunums)
02544 {
02545 #ifndef DRMS_CLIENT
02546 return drms_su_setretention(env, newRetention, nsus, sunums);
02547 #else
02548 long long *sunumlist = NULL;
02549 struct iovec *vec = NULL;
02550 int count;
02551 int isunum;
02552 long long oneSunum;
02553 int istat;
02554
02555 istat = DRMS_SUCCESS;
02556
02557 drms_send_commandcode(env->session->sockfd, DRMS_SETRETENTION);
02558
02559
02560 vec = calloc(sizeof(struct iovec), nsus);
02561 XASSERT(vec);
02562 sunumlist = calloc(sizeof(long long), nsus);
02563 XASSERT(sunumlist);
02564
02565 for (isunum = 0, count = 0; isunum < nsus; isunum++)
02566 {
02567 oneSunum = sunums[isunum];
02568 if (oneSunum >= 0)
02569 {
02570 sunumlist[isunum] = htonll(oneSunum);
02571 vec[isunum].iov_len = sizeof(sunumlist[isunum]);
02572 vec[isunum].iov_base = &sunumlist[isunum];
02573 count++;
02574 }
02575 }
02576
02577
02578
02579
02580 Writeshort(env->session->sockfd, newRetention);
02581
02582
02583 Writeint(env->session->sockfd, count);
02584
02585
02586 Writevn(env->session->sockfd, vec, count);
02587
02588 free(sunumlist);
02589 sunumlist = NULL;
02590 free(vec);
02591 vec = NULL;
02592
02593
02594 istat = Readint(env->session->sockfd);
02595
02596 return istat;
02597 #endif
02598 }
02599
02600 int drms_makewritable(DRMS_Env_t *env)
02601 {
02602 #ifndef DRMS_CLIENT
02603 return drms_session_setwrite(env);
02604 #else
02605 int drmsStat = DRMS_SUCCESS;
02606
02607 drms_send_commandcode(env->session->sockfd, DRMS_MAKESESSIONWRITABLE);
02608
02609
02610 drmsStat = Readint(env->session->sockfd);
02611
02612 if (drmsStat != DRMS_SUCCESS)
02613 {
02614 fprintf(stderr, "Unable to make transaction writable.\n");
02615 }
02616 #endif
02617 }
02618
02619 #ifdef DRMS_CLIENT
02620
02621 sem_t *gShutdownsem = NULL;
02622
02623 DRMS_Shutdown_State_t gShutdown;
02624
02625
02626 void drms_lock_client(DRMS_Env_t *env)
02627 {
02628 if (env->drms_lock == NULL)
02629 {
02630 return;
02631 }
02632 else
02633 {
02634 pthread_mutex_lock(env->drms_lock);
02635 }
02636 }
02637
02638 void drms_unlock_client(DRMS_Env_t *env)
02639 {
02640 if (env->drms_lock == NULL)
02641 {
02642 return;
02643 }
02644 else
02645 {
02646 pthread_mutex_unlock( env->drms_lock );
02647 }
02648 }
02649
02650
02651 int drms_trylock_client(DRMS_Env_t *env)
02652 {
02653 if (env->drms_lock == NULL)
02654 {
02655 return 0;
02656 }
02657 else
02658 {
02659 return pthread_mutex_trylock(env->drms_lock);
02660 }
02661 }
02662
02663 sem_t *drms_client_getsdsem(void)
02664 {
02665 return gShutdownsem;
02666 }
02667
02668 void drms_client_initsdsem(void)
02669 {
02670 if (!gShutdownsem)
02671 {
02672
02673 gShutdownsem = malloc(sizeof(sem_t));
02674
02675
02676 sem_init(gShutdownsem, 0, 1);
02677
02678
02679 gShutdown = kSHUTDOWN_UNINITIATED;
02680 }
02681 }
02682
02683 void drms_client_destroysdsem(void)
02684 {
02685 if (gShutdownsem)
02686 {
02687 sem_destroy(gShutdownsem);
02688 free(gShutdownsem);
02689 gShutdownsem = NULL;
02690 }
02691 }
02692
02693 DRMS_Shutdown_State_t drms_client_getsd(void)
02694 {
02695 return gShutdown;
02696 }
02697
02698 void drms_client_setsd(DRMS_Shutdown_State_t st)
02699 {
02700 gShutdown = st;
02701 }
02702
02703 int drms_client_registercleaner(DRMS_Env_t *env, CleanerData_t *data)
02704 {
02705 int gotlock = 0;
02706 int ok = 1;
02707
02708 gotlock = (drms_trylock_client(env) == 0);
02709
02710 if (gotlock)
02711 {
02712 if (env->cleaners == NULL)
02713 {
02714
02715 env->cleaners = list_llcreate(sizeof(CleanerData_t), NULL);
02716 if (!env->cleaners)
02717 {
02718 fprintf(stderr, "Can't register cleaner.\n");
02719 ok = 0;
02720 }
02721 }
02722
02723 if (ok)
02724 {
02725 if (!list_llinserttail(env->cleaners, data))
02726 {
02727 fprintf(stderr, "Can't register cleaner.\n");
02728 ok = 0;
02729 }
02730 }
02731
02732 drms_unlock_client(env);
02733 }
02734 else
02735 {
02736 fprintf(stderr, "Can't register doit cleaner function. Unable to obtain mutex.\n");
02737 }
02738
02739 return ok;
02740 }
02741
02742 static void ArrivederciBaby(DRMS_Env_t *env)
02743 {
02744 if (gShutdownsem)
02745 {
02746 sem_wait(gShutdownsem);
02747 }
02748
02749 gShutdown = kSHUTDOWN_ABORT;
02750
02751 if (gShutdownsem)
02752 {
02753 sem_post(gShutdownsem);
02754 }
02755
02756
02757
02758 drms_abort_now(env);
02759
02760
02761 exit(1);
02762 }
02763
02764
02765 void *drms_signal_thread(void *arg)
02766 {
02767 int status, signo;
02768 DRMS_Env_t *env = (DRMS_Env_t *) arg;
02769 int doexit = 0;
02770
02771 #ifdef DEBUG
02772 printf("drms_signal_thread started.\n");
02773 fflush(stdout);
02774 #endif
02775
02776
02777
02778
02779
02780 if( (status = pthread_sigmask(SIG_BLOCK, &env->signal_mask, NULL)))
02781 {
02782 fprintf(stderr,"pthread_sigmask call failed with status = %d\n", status);
02783 exit(1);
02784 }
02785
02786 for (;;)
02787 {
02788 if ((status = sigwait(&env->signal_mask, &signo)))
02789 {
02790 if (status == EINTR)
02791 {
02792 fprintf(stderr, "sigwait error, errcode=%d.\n", status);
02793 continue;
02794 }
02795 else
02796 {
02797 fprintf(stderr,"sigwait error, errcode=%d.\n", status);
02798 signo = SIGTERM;
02799 }
02800 }
02801
02802 switch(signo)
02803 {
02804 case SIGINT:
02805 fprintf(stderr,"WARNING: jsoc_main received SIGINT...exiting.\n");
02806 break;
02807 case SIGTERM:
02808 fprintf(stderr,"WARNING: jsoc_main received SIGTERM...exiting.\n");
02809 break;
02810 case SIGQUIT:
02811 fprintf(stderr,"WARNING: jsoc_main received SIGQUIT...exiting.\n");
02812 break;
02813 case SIGUSR2:
02814 if (env->verbose)
02815 {
02816 fprintf(stdout,"signal thread received SIGUSR2 (main shutting down)...exiting.\n");
02817 }
02818 pthread_exit(NULL);
02819 break;
02820 default:
02821 fprintf(stderr,"WARNING: DRMS server received signal no. %d...exiting.\n",
02822 signo);
02823
02824 signo = SIGTERM;
02825 break;
02826 }
02827
02828 switch(signo)
02829 {
02830 case SIGINT:
02831 case SIGTERM:
02832 case SIGQUIT:
02833
02834
02835
02836 if (gShutdownsem)
02837 {
02838
02839 sem_wait(gShutdownsem);
02840
02841 if (gShutdown == kSHUTDOWN_UNINITIATED)
02842 {
02843
02844 gShutdown = kSHUTDOWN_ABORTINITIATED;
02845
02846 fprintf(stderr, "Shutdown initiated.\n");
02847
02848
02849
02850
02851 if (env->cleaners)
02852 {
02853 ListNode_t *lnode = NULL;
02854 CleanerData_t *acleaner = NULL;
02855
02856 list_llreset(env->cleaners);
02857 while ((lnode = list_llnext(env->cleaners)) != NULL)
02858 {
02859 acleaner = lnode->data;
02860 (*(acleaner->cb))(acleaner->data);
02861 list_llremove(env->cleaners, lnode);
02862 list_llfreenode(&lnode);
02863 }
02864 }
02865
02866
02867 sem_post(gShutdownsem);
02868
02869 doexit = 1;
02870 }
02871 else
02872 {
02873
02874 sem_post(gShutdownsem);
02875 }
02876
02877
02878
02879 if (doexit)
02880 {
02881 ArrivederciBaby(env);
02882 return NULL;
02883 }
02884 }
02885 else
02886 {
02887 ArrivederciBaby(env);
02888 return NULL;
02889 }
02890
02891 break;
02892 }
02893 }
02894 }
02895
02896 #endif