00001 #include <stdio.h>
00002 #include <stdlib.h>
00003 #include <string.h>
00004 #include <unistd.h>
00005 #include <stdarg.h>
00006 #include <assert.h>
00007 #include <ctype.h>
00008 #include <netinet/in.h>
00009 #include <libpq-fe.h>
00010 #include "db.h"
00011 #include "xassert.h"
00012 #include "xmem.h"
00013 #include "util.h"
00014
00015
00016 #define PG_CHAR_OID (18)
00017 #define PG_INT2_OID (21)
00018 #define PG_INT4_OID (23)
00019 #define PG_INT8_OID (20)
00020 #define PG_FLOAT_OID (700)
00021 #define PG_DOUBLE_OID (701)
00022 #define PG_STRING_OID (25)
00023 #define PG_VARCHAR_OID (1043)
00024
00025
00026
00027
00028
00029 static int db2pgsql_type(DB_Type_t dbtype)
00030 {
00031 switch(dbtype)
00032 {
00033 case DB_CHAR:
00034 return PG_CHAR_OID;
00035 case DB_INT1:
00036 return PG_INT2_OID;
00037 case DB_INT2:
00038 return PG_INT2_OID;
00039 case DB_INT4:
00040 return PG_INT4_OID;
00041 case DB_INT8:
00042 return PG_INT8_OID;
00043 case DB_FLOAT:
00044 return PG_FLOAT_OID;
00045 case DB_DOUBLE:
00046 return PG_DOUBLE_OID;
00047 case DB_STRING:
00048 return PG_STRING_OID;
00049 case DB_VARCHAR:
00050 return PG_VARCHAR_OID;
00051 default:
00052 return PG_STRING_OID;
00053 }
00054 }
00055
00056 static DB_Type_t pgsql2db_type(int pgtype)
00057 {
00058 switch(pgtype)
00059 {
00060 case PG_CHAR_OID:
00061 return DB_CHAR;
00062 case PG_INT2_OID:
00063 return DB_INT2;
00064
00065 case PG_INT4_OID:
00066 return DB_INT4;
00067 case PG_INT8_OID:
00068 return DB_INT8;
00069 case PG_FLOAT_OID:
00070 return DB_FLOAT;
00071 case PG_DOUBLE_OID:
00072 return DB_DOUBLE;
00073 case PG_STRING_OID:
00074 return DB_STRING;
00075 case PG_VARCHAR_OID:
00076 return DB_VARCHAR;
00077 default:
00078 return DB_STRING;
00079 }
00080 }
00081
00082
00083
00084
00085 DB_Handle_t *db_connect(const char *host, const char *user,
00086 const char *passwd, const char *db_name,
00087 const int lock)
00088 {
00089 DB_Handle_t *handle;
00090 PGconn *db;
00091 char *p,conninfo[8192]={0};
00092 char *port = NULL;
00093 char *hostname = NULL;
00094 char *pport = NULL;
00095
00096
00097
00098 p = conninfo;
00099 if (host)
00100 {
00101 hostname = strdup(host);
00102
00103
00104 if ((pport = strchr(host, ':')) != NULL)
00105 {
00106 hostname[pport - host] = '\0';
00107 port = strdup(pport + 1);
00108 }
00109
00110 if (isdigit(hostname[0]))
00111 p += sprintf(p,"hostaddr = %s ",hostname);
00112 else
00113 p += sprintf(p,"host = %s ",hostname);
00114
00115 if (port)
00116 {
00117 p += sprintf(p, "port = %s ", port);
00118 }
00119 }
00120
00121 if (db_name)
00122 p += sprintf(p,"dbname = %s ",db_name);
00123 if (user)
00124 p += sprintf(p,"user = %s ",user);
00125 if (passwd)
00126 p += sprintf(p,"password = %s ",passwd);
00127
00128
00129
00130 db = PQconnectdb(conninfo);
00131 if (PQstatus(db) != CONNECTION_OK)
00132 {
00133 fprintf(stderr, "Connection to database '%s' failed.\n", PQdb(db));
00134 fprintf(stderr, "%s", PQerrorMessage(db));
00135 PQfinish(db);
00136
00137 if (port)
00138 {
00139 free(port);
00140 }
00141
00142 if (hostname)
00143 {
00144 free(hostname);
00145 }
00146
00147 return NULL;
00148 }
00149
00150
00151 handle = malloc(sizeof(DB_Handle_t));
00152 XASSERT(handle);
00153
00154 strncpy(handle->dbhost,PQhost(db),1024);
00155 snprintf(handle->dbport, sizeof(handle->dbport), "%s", PQport(db));
00156 strncpy(handle->dbname,PQdb(db),1024);
00157 strncpy(handle->dbuser,PQuser(db),1024);
00158 handle->db_connection = (void *) db;
00159 handle->abort_now = 0;
00160 handle->stmt_num = 0;
00161 handle->isolation_level = DB_TRANS_READCOMMIT;
00162 if (lock)
00163 {
00164 handle->db_lock = malloc(sizeof(pthread_mutex_t));
00165 XASSERT(handle->db_lock);
00166 pthread_mutex_init(handle->db_lock, NULL);
00167 }
00168 else
00169 handle->db_lock = NULL;
00170
00171 memset(handle->errmsg, 0, sizeof(handle->errmsg));
00172
00173 if (port)
00174 {
00175 free(port);
00176 }
00177
00178 if (hostname)
00179 {
00180 free(hostname);
00181 }
00182
00183 return handle;
00184 }
00185
00186 void db_disconnect(DB_Handle_t **db)
00187 {
00188 if (db && *db)
00189 {
00190 DB_Handle_t *dbin = *db;
00191 db_lock(dbin);
00192
00193 PQfinish(dbin->db_connection);
00194 dbin->db_connection = NULL;
00195
00196
00197 db_unlock(dbin);
00198
00199 if (dbin->db_lock)
00200 {
00201 pthread_mutex_destroy(dbin->db_lock);
00202 free(dbin->db_lock);
00203 dbin->db_lock = NULL;
00204 }
00205
00206 free(dbin);
00207 *db = NULL;
00208 }
00209 }
00210
00211 DB_Text_Result_t *db_query_txt(DB_Handle_t *dbin, char *query_string)
00212 {
00213 PGconn *db;
00214 PGresult *res;
00215 DB_Text_Result_t *result=NULL;
00216 int buffer_length, colname_length, width, buflen, n;
00217 unsigned int i,j;
00218 char *p;
00219
00220 if (dbin==NULL)
00221 return NULL;
00222 db = dbin->db_connection;
00223
00224
00225 db_lock(dbin);
00226 if (dbin->abort_now)
00227 goto failure;
00228
00229 #ifdef DEBUG
00230 printf("db_query_txt: query = %s\n",query_string);
00231 #endif
00232 DB_ResetErrmsg(dbin);
00233 res = PQexec(db, query_string);
00234 if (PQresultStatus(res) != PGRES_TUPLES_OK)
00235 {
00236 DB_SetErrmsg(dbin, PQerrorMessage(db));
00237 fprintf(stderr, "query failed: %s", DB_GetErrmsg(dbin));
00238 PQclear(res);
00239 goto failure;
00240 }
00241
00242
00243 result = (DB_Text_Result_t *)malloc(sizeof(DB_Text_Result_t));
00244 XASSERT(result);
00245 memset(result,0,sizeof(DB_Text_Result_t));
00246 result->num_rows = PQntuples(res);
00247 result->num_cols = PQnfields(res);
00248
00249 if (result->num_rows>0)
00250 {
00251 colname_length = 0;
00252 result->column_name = (char **)malloc(result->num_cols*sizeof(char *));
00253 XASSERT(result->column_name);
00254 result->column_width = (int *)malloc(result->num_cols*sizeof(int));
00255 XASSERT(result->column_width);
00256
00257 buffer_length = 0;
00258 for(i = 0; i < result->num_cols; i++)
00259 {
00260 colname_length += strlen(PQfname(res, i))+1;
00261 result->column_width[i] = 0;
00262 for(j = 0; j < result->num_rows; j++)
00263 {
00264 width = PQgetlength(res,j,i);
00265 buffer_length += width+1;
00266 if (width > result->column_width[i])
00267 result->column_width[i] = width;
00268 }
00269 }
00270
00271 buflen = (3+result->num_cols)*sizeof(int) + colname_length + buffer_length;
00272 result->buffer = malloc( buflen );
00273 XASSERT(result->buffer);
00274
00275 p = result->buffer;
00276
00277
00278 *((int *) p) = htonl(buflen);
00279 p += sizeof(int);
00280 *((int *) p) = htonl(result->num_rows);
00281 p += sizeof(int);
00282 *((int *) p) = htonl(result->num_cols);
00283 p += sizeof(int);
00284 for (i=0; i<result->num_cols; i++)
00285 {
00286 *((int *) p) = htonl(result->column_width[i]);
00287 p += sizeof(int);
00288 }
00289
00290
00291 for (i=0; i<result->num_cols; i++)
00292 {
00293 result->column_name[i] = p;
00294 n = strlen(PQfname(res, i))+1;
00295 p = memccpy(p, PQfname(res, i), 0, n);
00296 XASSERT(p!=NULL);
00297 #ifdef DEBUG
00298 printf("name = %s, copy = %s, n = %d.\n",PQfname(res, i),result->column_name[i], n);
00299 #endif
00300 }
00301
00302
00303
00304 result->field = (char ***) malloc(result->num_rows*sizeof(char **) + result->num_rows*result->num_cols*sizeof(char *));
00305 XASSERT(result->field);
00306 for (i=0; i<result->num_rows; i++)
00307 result->field[i] = (char **) &result->field[result->num_rows +
00308 i*result->num_cols];
00309
00310
00311 for (j=0; j < result->num_rows; j++)
00312 {
00313 for(i = 0; i < result->num_cols; i++)
00314 {
00315 result->field[j][i] = p;
00316 p = memccpy(p,PQgetvalue(res,j,i),0,PQgetlength(res,j,i)+1);
00317 XASSERT(p!=NULL);
00318 }
00319 }
00320 }
00321 PQclear(res);
00322 failure:
00323 if (!result)
00324 QUERY_ERROR(query_string);
00325 db_unlock(dbin);
00326 return result;
00327 }
00328
00329
00330
00331 DB_Binary_Result_t *db_query_bin(DB_Handle_t *dbin, char *query_string)
00332 {
00333 PGconn *db;
00334 PGresult *res;
00335 DB_Binary_Result_t *db_res;
00336 int colname_length;
00337 unsigned int i,j, width;
00338
00339 if (dbin==NULL)
00340 return NULL;
00341 db = dbin->db_connection;
00342
00343
00344 db_lock(dbin);
00345 if (dbin->abort_now)
00346 goto failure;
00347
00348 #ifdef DEBUG
00349 printf("db_query_bin: query = %s\n",query_string);
00350 #endif
00351 res = PQexecParams(db,query_string, 0, NULL,
00352 NULL, NULL, NULL, 1);
00353
00354 DB_ResetErrmsg(dbin);
00355 if (PQresultStatus(res) != PGRES_TUPLES_OK)
00356 {
00357 DB_SetErrmsg(dbin, PQerrorMessage(db));
00358 fprintf(stderr, "query failed: %s", DB_GetErrmsg(dbin));
00359 PQclear(res);
00360 goto failure;
00361 }
00362
00363
00364 db_res = (DB_Binary_Result_t *)malloc(sizeof(DB_Binary_Result_t));
00365 XASSERT(db_res);
00366 memset(db_res,0,sizeof(DB_Binary_Result_t));
00367 db_res->num_rows = PQntuples(res);
00368 db_res->num_cols = PQnfields(res);
00369
00370 if (db_res->num_cols>0)
00371 {
00372 db_res->column = (DB_Column_t *)malloc(db_res->num_cols * sizeof(DB_Column_t));
00373 XASSERT(db_res->column);
00374
00375 for (i=0; i<db_res->num_cols; i++)
00376 {
00377
00378 colname_length = strlen(PQfname(res,i))+1;
00379 db_res->column[i].column_name = malloc(colname_length);
00380 XASSERT(db_res->column[i].column_name);
00381 strcpy(db_res->column[i].column_name,PQfname(res,i));
00382
00383 db_res->column[i].type = pgsql2db_type(PQftype(res,i));
00384
00385 db_res->column[i].num_rows = db_res->num_rows;
00386 db_res->column[i].is_null = malloc(db_res->num_rows*sizeof(int));
00387 XASSERT(db_res->column[i].is_null);
00388 db_res->column[i].size = 0;
00389
00390
00391 for (j=0; j<db_res->num_rows; j++)
00392 {
00393 width = PQgetlength(res,j,i);
00394 if (width>db_res->column[i].size)
00395 db_res->column[i].size = width;
00396 }
00397
00398
00399
00400 if ( db_res->column[i].type == DB_STRING ||
00401 db_res->column[i].type == DB_VARCHAR )
00402 (db_res->column[i].size)++;
00403
00404 if (db_res->column[i].size == 0)
00405 {
00406
00407
00408
00409
00410
00411
00412
00413
00414 db_res->column[i].size = db_sizeof(db_res->column[i].type);
00415 }
00416
00417 db_res->column[i].data = malloc(db_res->num_rows * db_res->column[i].size);
00418 XASSERT(db_res->column[i].data);
00419
00420
00421
00422
00423 memset(db_res->column[i].data, 0, db_res->num_rows * db_res->column[i].size);
00424
00425 #ifdef DEBUG
00426 printf("sizeof column %d = %d\n",i,db_res->column[i].size);
00427 #endif
00428 }
00429
00430 for (i=0; i<db_res->num_cols; i++)
00431 {
00432 for (j=0; j<db_res->num_rows; j++)
00433 {
00434 db_res->column[i].is_null[j] = PQgetisnull(res,j,i);
00435 if (!db_res->column[i].is_null[j])
00436 memcpy(&db_res->column[i].data[j*db_res->column[i].size],
00437 PQgetvalue(res,j,i), PQgetlength(res,j,i));
00438 }
00439 #if __BYTE_ORDER == __LITTLE_ENDIAN
00440 db_byteswap(db_res->column[i].type, db_res->num_rows,
00441 db_res->column[i].data);
00442 #endif
00443 }
00444 }
00445 PQclear(res);
00446 db_unlock(dbin);
00447 return db_res;
00448 failure:
00449 QUERY_ERROR(query_string);
00450 db_unlock(dbin);
00451 return NULL;
00452 }
00453
00454
00455
00456
00457
00458
00459
00460
00461
00462
00463
00464
00465
00466
00467
00468
00469 DB_Binary_Result_t *db_query_bin_array(DB_Handle_t *dbin,
00470 char *query, int n_args,
00471 DB_Type_t *intype, void **argin )
00472 {
00473 PGconn *db;
00474 PGresult *res;
00475 DB_Binary_Result_t *db_res;
00476 int colname_length, buflen;
00477 unsigned int i,j, width;
00478 char *p, *pquery, *op,*q;
00479 int paramLengths[MAXARG], paramFormats[MAXARG],n;
00480 Oid paramTypes[MAXARG];
00481 char *paramValues[MAXARG];
00482 void *paramBuf;
00483
00484 if (dbin==NULL)
00485 return NULL;
00486 db = dbin->db_connection;
00487
00488 pquery = malloc(6*strlen(query));
00489 XASSERT(pquery);
00490
00491
00492 db_lock(dbin);
00493 if (dbin->abort_now)
00494 goto failure;
00495
00496
00497
00498 op = pquery;
00499 q = (char *)query;
00500 n = 1;
00501 while (*q)
00502 {
00503 if (*q == '?')
00504 {
00505 q++;
00506 op += sprintf(op,"$%d",n);
00507 n++;
00508 }
00509 else
00510 *op++ = *q++;
00511 }
00512 *op = '\0';
00513 n--;
00514
00515 #ifdef DEBUG
00516 printf("modified query string = '%s'.\n",pquery);
00517 #endif
00518
00519 if (n_args!=n)
00520 {
00521 fprintf(stderr,"Wrong number (%d) of parameters given for '%s'.\n",
00522 n_args,query);
00523 goto failure;
00524 }
00525 if (n_args>=MAXARG)
00526 {
00527 fprintf(stderr,"Maximum number of arguments exceeded (%d>=%d).\n",
00528 n_args,MAXARG);
00529 goto failure;
00530 }
00531
00532
00533 buflen = 0;
00534 for(i=0; i<n_args; i++)
00535 {
00536 if (intype[i] == DB_STRING || intype[i] == DB_VARCHAR )
00537 paramLengths[i] = strlen((char *) argin[i]);
00538 else
00539 paramLengths[i] = db_sizeof(intype[i]);
00540 paramTypes[i] = db2pgsql_type(intype[i]);
00541 paramFormats[i] = 1;
00542 buflen += paramLengths[i];
00543 }
00544 #ifdef DEBUG
00545 printf("n_args = %d, buflen = %d\n",n_args,buflen);
00546 #endif
00547 paramBuf = malloc(buflen);
00548 XASSERT(paramBuf);
00549 p = paramBuf;
00550 for(i=0; i<n_args; i++)
00551 {
00552 paramValues[i] = p;
00553 memcpy(p, argin[i], paramLengths[i]);
00554 #if __BYTE_ORDER == __LITTLE_ENDIAN
00555 db_byteswap(intype[i],1,p);
00556 #endif
00557 p += paramLengths[i];
00558 }
00559
00560 res = PQexecParams(db, pquery, n_args, paramTypes,
00561 (const char * const *)paramValues, paramLengths, paramFormats, 1);
00562 free(paramBuf);
00563
00564 if (PQresultStatus(res) != PGRES_TUPLES_OK)
00565 {
00566 fprintf(stderr, "query failed: %s", PQerrorMessage(db));
00567 PQclear(res);
00568 goto failure;
00569 }
00570
00571
00572 db_res = (DB_Binary_Result_t *)malloc(sizeof(DB_Binary_Result_t));
00573 XASSERT(db_res);
00574 memset(db_res,0,sizeof(DB_Binary_Result_t));
00575 db_res->num_rows = PQntuples(res);
00576 db_res->num_cols = PQnfields(res);
00577
00578 if (db_res->num_cols>0)
00579 {
00580 db_res->column = (DB_Column_t *)malloc(db_res->num_cols * sizeof(DB_Column_t));
00581 XASSERT(db_res->column);
00582
00583 for (i=0; i<db_res->num_cols; i++)
00584 {
00585
00586 colname_length = strlen(PQfname(res,i))+1;
00587 db_res->column[i].column_name = malloc(colname_length);
00588 XASSERT(db_res->column[i].column_name);
00589 strcpy(db_res->column[i].column_name,PQfname(res,i));
00590
00591 db_res->column[i].type = pgsql2db_type(PQftype(res,i));
00592
00593 db_res->column[i].num_rows = db_res->num_rows;
00594 db_res->column[i].is_null = malloc(db_res->num_rows*sizeof(int));
00595 XASSERT(db_res->column[i].is_null);
00596 db_res->column[i].size = 0;
00597 for (j=0; j<db_res->num_rows; j++)
00598 {
00599 width = PQgetlength(res,j,i);
00600 if (width>db_res->column[i].size)
00601 db_res->column[i].size = width;
00602 }
00603
00604
00605
00606 if ( db_res->column[i].type == DB_STRING ||
00607 db_res->column[i].type == DB_VARCHAR )
00608 {
00609 (db_res->column[i].size)++;
00610 }
00611 else
00612 {
00613
00614 if (db_res->column[i].size == 0)
00615 {
00616 db_res->column[i].size = db_sizeof(db_res->column[i].type);
00617 }
00618 }
00619
00620 db_res->column[i].data = malloc(db_res->num_rows * db_res->column[i].size);
00621 XASSERT(db_res->column[i].data);
00622 #ifdef DEBUG
00623 printf("sizeof column %d = %d\n",i,db_res->column[i].size);
00624 #endif
00625 }
00626
00627 for (i=0; i<db_res->num_cols; i++)
00628 {
00629 for (j=0; j<db_res->num_rows; j++)
00630 {
00631 db_res->column[i].is_null[j] = PQgetisnull(res,j,i);
00632 if (!db_res->column[i].is_null[j])
00633 memcpy(&db_res->column[i].data[j*db_res->column[i].size],
00634 PQgetvalue(res,j,i), db_res->column[i].size);
00635 else
00636 memset(&db_res->column[i].data[j*db_res->column[i].size], 0,
00637 db_res->column[i].size);
00638 }
00639 #if __BYTE_ORDER == __LITTLE_ENDIAN
00640 db_byteswap(db_res->column[i].type, db_res->num_rows,
00641 db_res->column[i].data);
00642 #endif
00643 }
00644 }
00645 free(pquery);
00646 PQclear(res);
00647 db_unlock(dbin);
00648 return db_res;
00649 failure:
00650 QUERY_ERROR(query);
00651 free(pquery);
00652 db_unlock(dbin);
00653 return NULL;
00654 }
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00667
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677
00678
00679
00680
00681
00682 DB_Binary_Result_t **db_query_bin_ntuple(DB_Handle_t *dbin, const char *stmnt, unsigned int nelems, unsigned int nargs, DB_Type_t *dbtypes, void **values)
00683 {
00684 PGconn *dbconn = NULL;
00685 int ielem;
00686 int iarg;
00687 char *prepareStmnt = NULL;
00688 char *prepareStmntClause = NULL;
00689 int counter;
00690 size_t stmntsz = 2048;
00691 char buf[2048];
00692 const char *pin = NULL;
00693 char *pParamBuf = NULL;
00694 char stmntName[64] = {0};
00695 int buflen;
00696 int paramLengths[MAXARG];
00697 int paramFormats[MAXARG];
00698 char *paramValues[MAXARG];
00699 void *paramBuf = NULL;
00700 PGresult *pgres = NULL;
00701 DB_Binary_Result_t *dbres = NULL;
00702 DB_Binary_Result_t **dbresults = NULL;
00703 int icol;
00704 int irow;
00705 int colnameLength;
00706 unsigned int width;
00707 int err;
00708
00709 err = 0;
00710
00711 if (dbin)
00712 {
00713
00714 db_lock(dbin);
00715 if (!dbin->abort_now)
00716 {
00717 if (nargs > MAXARG)
00718 {
00719 fprintf(stderr,"Maximum number of arguments exceeded (%d > %d).\n", nargs, MAXARG);
00720 err = 1;
00721 }
00722
00723 if (!err)
00724 {
00725 dbconn = dbin->db_connection;
00726 prepareStmntClause = calloc(1, stmntsz);
00727
00728 if (!prepareStmntClause)
00729 {
00730
00731 err = 1;
00732 }
00733 }
00734
00735 if (!err)
00736 {
00737 pin = stmnt;
00738
00739
00740 counter = 1;
00741
00742 while (*pin)
00743 {
00744 if (*pin == '?')
00745 {
00746 snprintf(buf, sizeof(buf), "$%d", counter++);
00747 }
00748 else
00749 {
00750 snprintf(buf, sizeof(buf), "%c", *pin);
00751 }
00752
00753 pin++;
00754 prepareStmntClause = base_strcatalloc(prepareStmntClause, buf, &stmntsz);
00755 if (!prepareStmntClause)
00756 {
00757
00758 err = 1;
00759 break;
00760 }
00761 }
00762 }
00763
00764
00765 if (!err)
00766 {
00767 stmntsz = 2048;
00768 prepareStmnt = calloc(1, stmntsz);
00769
00770 if (!prepareStmnt)
00771 {
00772
00773 err = 1;
00774 }
00775 }
00776
00777 if (!err)
00778 {
00779
00780 for (iarg = 0; iarg < nargs; iarg++)
00781 {
00782 if (iarg == 0)
00783 {
00784 snprintf(stmntName, sizeof(stmntName), "db_tmp_stmt_%u", dbin->stmt_num);
00785
00786
00787 ++dbin->stmt_num;
00788
00789 snprintf(buf, sizeof(buf), "PREPARE %s(%s", stmntName, db_type_string(dbtypes[iarg]));
00790 }
00791 else
00792 {
00793 snprintf(buf, sizeof(buf), ", %s", db_type_string(dbtypes[iarg]));
00794 }
00795
00796 prepareStmnt = base_strcatalloc(prepareStmnt, buf, &stmntsz);
00797
00798 if (!prepareStmnt)
00799 {
00800
00801 err = 1;
00802 }
00803 }
00804 }
00805
00806 if (!err)
00807 {
00808 prepareStmnt = base_strcatalloc(prepareStmnt, ") AS ", &stmntsz);
00809
00810 if (prepareStmnt)
00811 {
00812 prepareStmnt = base_strcatalloc(prepareStmnt, prepareStmntClause, &stmntsz);
00813 if (!prepareStmnt)
00814 {
00815
00816 err = 1;
00817 }
00818
00819 free(prepareStmntClause);
00820 prepareStmntClause = NULL;
00821 }
00822 else
00823 {
00824
00825 err = 1;
00826 }
00827 }
00828
00829 if (!err)
00830 {
00831
00832 db_unlock(dbin);
00833
00834
00835 if (db_dms(dbin, NULL, prepareStmnt))
00836 {
00837
00838 err = 1;
00839 }
00840
00841 db_lock(dbin);
00842
00843 free(prepareStmnt);
00844 prepareStmnt = NULL;
00845 }
00846
00847 if (!err)
00848 {
00849 void *tuple = NULL;
00850 char *strAddr = NULL;
00851
00852 for (ielem = 0; ielem < nelems; ielem++)
00853 {
00854
00855 for (iarg = 0, buflen = 0; iarg < nargs; iarg++)
00856 {
00857 tuple = values[iarg];
00858
00859 if (dbtypes[iarg] == DB_STRING || dbtypes[iarg] == DB_VARCHAR)
00860 {
00861 paramLengths[iarg] = strlen(((char **)tuple)[ielem]);
00862 }
00863 else
00864 {
00865 paramLengths[iarg] = db_sizeof(dbtypes[iarg]);
00866 }
00867
00868 paramFormats[iarg] = 1;
00869 buflen += paramLengths[iarg];
00870 }
00871
00872 paramBuf = calloc(1, buflen);
00873 if (!paramBuf)
00874 {
00875
00876 err = 1;
00877 break;
00878 }
00879
00880 pParamBuf = paramBuf;
00881 for (iarg = 0; iarg < nargs; iarg++)
00882 {
00883 tuple = values[iarg];
00884 paramValues[iarg] = pParamBuf;
00885 if (dbtypes[iarg] == DB_STRING || dbtypes[iarg] == DB_VARCHAR)
00886 {
00887 strAddr = *((char **)tuple + ielem);
00888 memcpy(pParamBuf, strAddr, paramLengths[iarg]);
00889 }
00890 else
00891 {
00892
00893 memcpy(pParamBuf, (uint8_t *)tuple + ielem * paramLengths[iarg], paramLengths[iarg]);
00894 }
00895 #if __BYTE_ORDER == __LITTLE_ENDIAN
00896 db_byteswap(dbtypes[iarg], 1, pParamBuf);
00897 #endif
00898 pParamBuf += paramLengths[iarg];
00899 }
00900
00901
00902 pgres = PQexecPrepared(dbconn, stmntName, nargs, (const char * const *)paramValues, paramLengths, paramFormats, 1);
00903 free(paramBuf);
00904 paramBuf = NULL;
00905
00906 if (PQresultStatus(pgres) == PGRES_TUPLES_OK)
00907 {
00908
00909 dbres = (DB_Binary_Result_t *)calloc(1, sizeof(DB_Binary_Result_t));
00910 if (!dbres)
00911 {
00912
00913 err = 1;
00914 break;
00915 }
00916
00917 dbres->num_rows = PQntuples(pgres);
00918 dbres->num_cols = PQnfields(pgres);
00919
00920 if (dbres->num_cols > 0)
00921 {
00922 dbres->column = (DB_Column_t *)malloc(dbres->num_cols * sizeof(DB_Column_t));
00923 if (!dbres->column)
00924 {
00925
00926 err = 1;
00927 break;
00928 }
00929
00930 for (icol = 0; icol < dbres->num_cols; icol++)
00931 {
00932 colnameLength = strlen(PQfname(pgres, icol)) + 1;
00933 dbres->column[icol].column_name = malloc(colnameLength);
00934 if (!dbres->column[icol].column_name)
00935 {
00936
00937 err = 1;
00938 break;
00939 }
00940
00941 strcpy(dbres->column[icol].column_name, PQfname(pgres,icol));
00942 dbres->column[icol].type = pgsql2db_type(PQftype(pgres, icol));
00943 dbres->column[icol].num_rows = dbres->num_rows;
00944 dbres->column[icol].is_null = malloc(dbres->num_rows * sizeof(int));
00945 if (!dbres->column[icol].is_null)
00946 {
00947
00948 err = 1;
00949 break;
00950 }
00951
00952
00953
00954
00955
00956
00957
00958
00959
00960
00961
00962
00963
00964
00965
00966
00967
00968
00969
00970
00971
00972
00973
00974
00975
00976
00977
00978
00979
00980 dbres->column[icol].size = 0;
00981 for (irow = 0; irow < dbres->num_rows; irow++)
00982 {
00983 width = PQgetlength(pgres, irow, icol);
00984 if (width > dbres->column[icol].size)
00985 {
00986 dbres->column[icol].size = width;
00987 }
00988 }
00989
00990
00991
00992 if (dbres->column[icol].type == DB_STRING || dbres->column[icol].type == DB_VARCHAR)
00993 {
00994 (dbres->column[icol].size)++;
00995 }
00996 else
00997 {
00998
00999 if (dbres->column[icol].size == 0)
01000 {
01001 dbres->column[icol].size = db_sizeof(dbres->column[icol].type);
01002 }
01003 }
01004
01005 dbres->column[icol].data = malloc(dbres->num_rows * dbres->column[icol].size);
01006 if (!dbres->column[icol].data)
01007 {
01008
01009 err = 1;
01010 break;
01011 }
01012 }
01013
01014 for (icol = 0; icol < dbres->num_cols; icol++)
01015 {
01016 for (irow = 0; irow < dbres->num_rows; irow++)
01017 {
01018 dbres->column[icol].is_null[irow] = PQgetisnull(pgres, irow, icol);
01019 if (!dbres->column[icol].is_null[irow])
01020 {
01021 memcpy(&dbres->column[icol].data[irow * dbres->column[icol].size], PQgetvalue(pgres, irow, icol), dbres->column[icol].size);
01022 }
01023 else
01024 {
01025 memset(&dbres->column[icol].data[irow * dbres->column[icol].size], 0, dbres->column[icol].size);
01026 }
01027 }
01028 #if __BYTE_ORDER == __LITTLE_ENDIAN
01029 db_byteswap(dbres->column[icol].type, dbres->num_rows, dbres->column[icol].data);
01030 #endif
01031 }
01032 }
01033 }
01034 else
01035 {
01036 fprintf(stderr, "query failed: %s", PQerrorMessage(dbconn));
01037 }
01038
01039 PQclear(pgres);
01040 if (!dbresults)
01041 {
01042 dbresults = (DB_Binary_Result_t **)calloc(nelems, sizeof(DB_Binary_Result_t *));
01043 }
01044
01045 dbresults[ielem] = dbres;
01046 dbres = NULL;
01047
01048 }
01049 }
01050
01051
01052 if (*stmntName)
01053 {
01054
01055 snprintf(buf, sizeof(buf), "DEALLOCATE %s", stmntName);
01056
01057
01058 db_unlock(dbin);
01059
01060 db_dms(dbin, NULL, buf);
01061
01062 db_lock(dbin);
01063 }
01064 }
01065
01066 db_unlock(dbin);
01067 }
01068
01069 if (err)
01070 {
01071 if (dbresults)
01072 {
01073 db_free_binary_result_tuple(&dbresults, nelems);
01074 }
01075 }
01076
01077 return dbresults;
01078 }
01079
01080
01081 int db_dms(DB_Handle_t *dbin, int *row_count, char *query_string)
01082 {
01083 PGconn *db;
01084 PGresult *res;
01085 char *str;
01086
01087 if (dbin==NULL)
01088 return 1;
01089 db = dbin->db_connection;
01090
01091
01092 db_lock(dbin);
01093 if (dbin->abort_now)
01094 goto failure;
01095 #ifdef DEBUG
01096 printf("db_dms: query = %s\n",query_string);
01097 #endif
01098 res = PQexec(db, query_string);
01099 if (PQresultStatus(res) != PGRES_COMMAND_OK)
01100 {
01101 fprintf(stderr, "query failed: %s", PQerrorMessage(db));
01102 PQclear(res);
01103 goto failure;
01104 }
01105 if (PQntuples(res) != 0)
01106 {
01107 fprintf(stderr, "Queries returning results are not allowed in query_dms.\n");
01108 PQclear(res);
01109 goto failure;
01110 }
01111 if (row_count)
01112 {
01113 str = PQcmdTuples(res);
01114 if (*str==0)
01115 *row_count = 0;
01116 else
01117 *row_count = atol(str);
01118 }
01119 PQclear(res);
01120 db_unlock(dbin);
01121 return 0;
01122 failure:
01123 QUERY_ERROR(query_string);
01124 db_unlock(dbin);
01125 return 1;
01126 }
01127
01128
01129 #define MAXARG 1024
01130
01131
01132
01133
01134
01135
01136
01137
01138
01139
01140
01141
01142
01143
01144 int db_dms_array(DB_Handle_t *dbin, int *row_count,
01145 char *query, int n_rows,
01146 int n_args, DB_Type_t *intype, void **argin )
01147 {
01148 PGconn *db;
01149 PGresult *res;
01150 int n,i,j;
01151 char *q, *str, *op;
01152 int paramLengths[MAXARG], paramFormats[MAXARG];
01153 Oid paramTypes[MAXARG];
01154 char preparestring[8192],*p, *pquery = 0;
01155 char *paramValues[MAXARG];
01156 char stmtname[20], dallocstmt[30];
01157
01158 #ifdef DEBUG
01159 printf("Entering db_dms_array.\n");
01160 #endif
01161 if (dbin==NULL)
01162 return 1;
01163 if (n_args>=MAXARG)
01164 {
01165 fprintf(stderr,"Maximum number of arguments exceeded.\n");
01166 goto failure;
01167 }
01168
01169
01170 db = dbin->db_connection;
01171 pquery = malloc(6*strlen(query));
01172 XASSERT(pquery);
01173
01174
01175 op = pquery;
01176 q = (char *)query;
01177 n = 1;
01178 while (*q)
01179 {
01180 if (*q == '?')
01181 {
01182 q++;
01183 op += sprintf(op,"$%d",n);
01184 n++;
01185 }
01186 else
01187 *op++ = *q++;
01188 }
01189 *op = '\0';
01190
01191 #ifdef DEBUG
01192 printf("modified query string = '%s'.\n",pquery);
01193 #endif
01194
01195
01196 for(i=0; i<n_args; i++)
01197 {
01198 paramLengths[i] = db_sizeof(intype[i]);
01199 paramTypes[i] = db2pgsql_type(intype[i]);
01200 paramFormats[i] = 1;
01201
01202
01203 #if __BYTE_ORDER == __LITTLE_ENDIAN
01204 db_byteswap(intype[i],n_rows,argin[i]);
01205 #endif
01206 }
01207
01208 if (n_rows>1)
01209 {
01210 db_lock(dbin);
01211 if (dbin->abort_now) {
01212 db_unlock(dbin);
01213 goto failure;
01214 }
01215
01216 sprintf(stmtname,"db_tmp_stmt_%u",dbin->stmt_num);
01217 sprintf(dallocstmt,"deallocate %s",stmtname);
01218
01219 ++dbin->stmt_num;
01220
01221 p = preparestring;
01222
01223 if (n_args>0)
01224 {
01225 p += sprintf(p,"prepare %s(%s",stmtname,db_type_string(intype[0]));
01226 for(i=1; i<n_args; i++)
01227 p += sprintf(p,", %s",db_type_string(intype[i]));
01228 p += sprintf(p,") as %s",pquery);
01229 *p = 0;
01230 }
01231 else
01232 p += sprintf(p,"prepare %s as %s",stmtname,pquery);
01233
01234 #ifdef DEBUG
01235 printf("PREPARE STRING= '%s'\n",preparestring);
01236 #endif
01237 db_unlock(dbin);
01238
01239
01240 if (db_dms(dbin, NULL, preparestring))
01241 goto failure;
01242
01243
01244 if (row_count)
01245 *row_count=0;
01246
01247 db_lock(dbin);
01248 if (dbin->abort_now) {
01249 db_unlock(dbin);
01250 goto failure;
01251 }
01252 for (i=0; i<n_rows; i++)
01253 {
01254 for (j=0; j<n_args; j++)
01255 {
01256 p = argin[j];
01257 if (intype[j] == DB_STRING || intype[j] == DB_VARCHAR )
01258 {
01259 paramValues[j] = ((char **) argin[j])[i];
01260 paramLengths[j] = strlen(paramValues[j]);
01261
01262
01263 }
01264 else
01265 paramValues[j] = p + i*paramLengths[j];
01266 }
01267
01268 res = PQexecPrepared(db, stmtname, n_args,
01269 (const char * const *)paramValues, paramLengths, paramFormats, 0);
01270
01271 if (PQresultStatus(res) != PGRES_COMMAND_OK)
01272 {
01273 fprintf(stderr, "query failed: %s", PQerrorMessage(db));
01274 PQclear(res);
01275 goto failure1;
01276 }
01277 if (row_count)
01278 {
01279 str = PQcmdTuples(res);
01280 if (*str != '\0')
01281 *row_count += atol(str);
01282 }
01283 PQclear(res);
01284 }
01285 db_unlock(dbin);
01286
01287 db_dms(dbin, NULL, dallocstmt);
01288 }
01289 else
01290 {
01291
01292
01293 if (row_count)
01294 *row_count=0;
01295 for (j=0; j<n_args; j++)
01296 {
01297 p = argin[j];
01298 if (intype[j] == DB_STRING || intype[j] == DB_VARCHAR )
01299 {
01300 paramValues[j] = ((char **) argin[j])[0];
01301 paramLengths[j] = strlen(paramValues[j]);
01302
01303
01304 }
01305 else
01306 paramValues[j] = p;
01307 }
01308 db_lock(dbin);
01309 if (dbin->abort_now) {
01310 db_unlock(dbin);
01311 goto failure;
01312 }
01313 res = PQexecParams(db,pquery, n_args, paramTypes,
01314 (const char * const *)paramValues, paramLengths, paramFormats, 1);
01315 db_unlock(dbin);
01316 if (PQresultStatus(res) != PGRES_COMMAND_OK)
01317 {
01318 fprintf(stderr, "query failed: %s", PQerrorMessage(db));
01319 PQclear(res);
01320 goto failure;
01321 }
01322 PQclear(res);
01323 }
01324
01325 #if __BYTE_ORDER == __LITTLE_ENDIAN
01326 for (i=0; i<n_args; i++)
01327 db_byteswap(intype[i],n_rows,argin[i]);
01328 #endif
01329 free(pquery);
01330 #ifdef DEBUG
01331 printf("Exiting db_dms_array status=0\n");
01332 #endif
01333 return 0;
01334
01335 failure1:
01336 db_unlock(dbin);
01337 db_dms(dbin, NULL, dallocstmt);
01338 failure:
01339 QUERY_ERROR(query);
01340 free(pquery);
01341 #ifdef DEBUG
01342 printf("Exiting db_dms_array status=1\n");
01343 #endif
01344 return 1;
01345 }
01346
01347
01348
01349 int db_bulk_insert_array(DB_Handle_t *dbin, char *table, int n_rows,
01350 int n_args, DB_Type_t *intype, void **argin )
01351 {
01352 PGconn *db;
01353 PGresult *res;
01354 int n,i,j;
01355 int bufsize;
01356 int *paramLengths[MAXARG];
01357 char *query=NULL,*p;
01358 char *buf=NULL;
01359 char header[20] = "PGCOPY\n\377\r\n\0\0\0\0\0\0\0\0\0";
01360 unsigned short nfield;
01361 unsigned int len;
01362 int status = 0;
01363
01364 #ifdef DEBUG
01365 FILE *fp;
01366 printf("Entering db_bulk_insert_array.\n");
01367 #endif
01368
01369 if (dbin==NULL)
01370 return 1;
01371 db = dbin->db_connection;
01372 if (n_args>=MAXARG)
01373 {
01374 fprintf(stderr,"Maximum number of arguments exceeded.\n");
01375 status = 1;
01376 goto failure;
01377 }
01378
01379 #if __BYTE_ORDER == __LITTLE_ENDIAN
01380 for(i=0; i<n_args; i++)
01381 db_byteswap(intype[i],n_rows,argin[i]);
01382 #endif
01383 query = malloc(strlen(table)+1000);
01384 XASSERT(query);
01385 bufsize = 19 + 2*n_rows + 4*n_rows*n_args + 2;
01386 for (j=0; j<n_args; j++)
01387 {
01388 if (intype[j] == DB_STRING || intype[j] == DB_VARCHAR )
01389 {
01390 paramLengths[j] = malloc(n_rows*sizeof(int));
01391 XASSERT(paramLengths[j]);
01392 for (i=0; i<n_rows; i++)
01393 {
01394 paramLengths[j][i] = strlen(((char **) argin[j])[i]);
01395 bufsize += paramLengths[j][i];
01396 #ifdef DEBUG
01397 printf("len('%s') = %d\n",((char **) argin[j])[i],paramLengths[j][i]);
01398 #endif
01399 }
01400 }
01401 else
01402 bufsize += n_rows*db_sizeof(intype[j]);
01403 }
01404
01405 buf = malloc(bufsize);
01406 XASSERT(buf);
01407 p = buf;
01408 memcpy(p, header, 19);
01409 p += 19;
01410 nfield = htons((short)n_args);
01411
01412
01413 for (i=0; i<n_rows; i++)
01414 {
01415 memcpy(p, &nfield, sizeof(short));
01416 p += sizeof(short);
01417 for (j=0; j<n_args; j++)
01418 {
01419 if (intype[j] == DB_STRING || intype[j] == DB_VARCHAR )
01420 {
01421 len = htonl(paramLengths[j][i]);
01422 memcpy(p, &len, sizeof(int));
01423 p += sizeof(int);
01424 memcpy(p, ((char **) argin[j])[i], paramLengths[j][i]);
01425 p += paramLengths[j][i];
01426 }
01427 else
01428 {
01429 n = db_sizeof(intype[j]);
01430 len = htonl(n);
01431 memcpy(p, &len, sizeof(int));
01432 p += sizeof(int);
01433 memcpy(p, ((char *)argin[j]) + i * n, n);
01434 p += n;
01435 }
01436 }
01437 }
01438 nfield = htons((short)-1);
01439 memcpy(p, &nfield, sizeof(short));
01440 p += sizeof(short);
01441
01442 #ifdef DEBUG
01443 fp = fopen("test.bin","w");
01444 fwrite(buf,bufsize,1,fp);
01445 fclose(fp);
01446 #endif
01447 sprintf(query,"copy %s from stdin binary", table);
01448
01449
01450 db_lock(dbin);
01451 if (dbin->abort_now) {
01452 status = 1;
01453 goto failure;
01454 }
01455
01456 res = PQexec(db, query);
01457 if (PQresultStatus(res) != PGRES_COPY_IN)
01458 {
01459 fprintf(stderr, "query failed: %s", PQerrorMessage(db));
01460 PQclear(res);
01461 status = 1;
01462 goto failure;
01463 }
01464 PQclear(res);
01465
01466 if (PQputCopyData(db, buf, bufsize) == -1)
01467 {
01468 fprintf(stderr, "query failed: %s", PQerrorMessage(db));
01469 status = 1;
01470 goto failure;
01471 }
01472
01473 if (PQputCopyEnd(db, NULL) == -1)
01474 {
01475 fprintf(stderr, "query failed: %s", PQerrorMessage(db));
01476 status = 1;
01477 goto failure;
01478 }
01479
01480 res = PQgetResult(db);
01481 if (PQresultStatus(res) != PGRES_COMMAND_OK)
01482 {
01483 fprintf(stderr, "query failed: %s", PQerrorMessage(db));
01484 PQclear(res);
01485 status = 1;
01486 goto failure;
01487 }
01488 PQclear(res);
01489 #if __BYTE_ORDER == __LITTLE_ENDIAN
01490 for (i=0; i<n_args; i++)
01491 db_byteswap(intype[i],n_rows,argin[i]);
01492 #endif
01493 #ifdef DEBUG
01494 printf("Exiting db_dms_array status=0\n");
01495 #endif
01496
01497 failure:
01498 db_unlock(dbin);
01499 for (j=0; j<n_args; j++)
01500 {
01501 if (intype[j] == DB_STRING || intype[j] == DB_VARCHAR )
01502 {
01503 free(paramLengths[j]);
01504 }
01505 }
01506 free(buf);
01507 free(query);
01508 #ifdef DEBUG
01509 printf("Exiting db_dms_array status=%d\n", status);
01510 #endif
01511 return status;
01512 }
01513
01514
01515
01516
01517
01518
01519 int db_isolation_level(DB_Handle_t *dbin, int level)
01520 {
01521 if (dbin == NULL)
01522 return 1;
01523
01524 switch(level)
01525 {
01526 case DB_TRANS_READCOMMIT:
01527 dbin->isolation_level = DB_TRANS_READCOMMIT;
01528 return db_dms(dbin, NULL, "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED");
01529 break;
01530 case DB_TRANS_SERIALIZABLE:
01531 dbin->isolation_level = DB_TRANS_SERIALIZABLE;
01532 return db_dms(dbin, NULL, "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE");
01533 break;
01534 case DB_TRANS_READONLY:
01535 dbin->isolation_level = DB_TRANS_READONLY;
01536 return db_dms(dbin, NULL, "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ ONLY");
01537 break;
01538 default:
01539 fprintf(stderr,"db_isolation_level: Invalid isolation level (%d) specified. Legal values are 0 = read commited, 1 = serializable, 2 = read only.\n", level);
01540 return 1;
01541 break;
01542 }
01543 }
01544
01545
01546 int db_start_transaction(DB_Handle_t *db)
01547 {
01548 return db_dms(db,NULL,"BEGIN");
01549 }
01550
01551
01552
01553 int db_commit(DB_Handle_t *db)
01554 {
01555 return db_dms(db,NULL,"COMMIT");
01556 }
01557
01558
01559 int db_rollback(DB_Handle_t *db)
01560 {
01561 return db_dms(db,NULL,"ROLLBACK");
01562 }
01563
01564
01565
01566
01567 long long db_sequence_getnext(DB_Handle_t *db, char *tablename)
01568 {
01569 DB_Text_Result_t *res;
01570 char query[1024];
01571 long long val;
01572
01573 sprintf(query, "SELECT nextval('%s_seq')",tablename);
01574 res = db_query_txt(db, query);
01575 if (res==NULL || res->num_rows != 1 || res->num_cols !=1)
01576 val = (long long)-1;
01577 else
01578 val = strtoll(res->field[0][0], NULL, 10);
01579 if (res)
01580 db_free_text_result(res);
01581
01582 return val;
01583 }
01584
01585
01586 #define PG_MAXARGS (1600)
01587 long long *db_sequence_getnext_n(DB_Handle_t *db, char *tablename, int n)
01588 {
01589 DB_Binary_Result_t *res;
01590 int len,count,chunk, i;
01591 char *query,*p;
01592 long long *val;
01593
01594 XASSERT(n>0);
01595 len = strlen(tablename)+16;
01596 query = malloc(len*n+10);
01597 XASSERT(query);
01598 val = malloc(sizeof(long long)*n);
01599 XASSERT(val);
01600
01601 count = 0;
01602 while (count < n)
01603 {
01604 p = query;
01605 p += sprintf(p, "SELECT ");
01606 chunk = (n-count)<PG_MAXARGS ? (n-count) : PG_MAXARGS;
01607 for (i=0; i<chunk-1; i++)
01608 p += sprintf(p, "nextval('%s_seq'),",tablename);
01609 p += sprintf(p, "nextval('%s_seq')",tablename);
01610 *p = 0;
01611 res = db_query_bin(db, query);
01612 if (res==NULL || res->num_rows != 1 || res->num_cols != chunk)
01613 {
01614 free(val);
01615 val = NULL;
01616 if (res)
01617 db_free_binary_result(res);
01618 break;
01619 }
01620 else
01621 {
01622 for (i=0; i<chunk; i++)
01623 val[count++] = *((long long *)res->column[i].data);
01624 }
01625 if (res)
01626 db_free_binary_result(res);
01627 }
01628 free(query);
01629
01630 return val;
01631 }
01632
01633
01634 long long db_sequence_getcurrent(DB_Handle_t *db, char *tablename)
01635 {
01636 DB_Text_Result_t *res;
01637 char query[512];
01638 unsigned int val;
01639
01640 sprintf(query, "SELECT currval('%s_seq')",tablename);
01641 res = db_query_txt(db, query);
01642 if (res==NULL || res->num_rows != 1 || res->num_cols !=1)
01643 val = (long long)-1;
01644 else
01645 val = strtoll(res->field[0][0], NULL, 10);
01646 if (res)
01647 db_free_text_result(res);
01648
01649 return val;
01650 }
01651
01652
01653
01654 long long db_sequence_getlast(DB_Handle_t *db, char *tablename)
01655 {
01656 DB_Text_Result_t *res;
01657 char query[512];
01658 unsigned int val;
01659
01660 sprintf(query, "SELECT last_value from %s_seq",tablename);
01661 res = db_query_txt(db, query);
01662 if (res==NULL || res->num_rows != 1 || res->num_cols !=1)
01663 val = (long long)-1;
01664 else
01665 val = strtoll(res->field[0][0], NULL, 10);
01666 if (res)
01667 db_free_text_result(res);
01668
01669 return val;
01670 }
01671
01672 int db_sequence_create(DB_Handle_t *db, char *tablename)
01673 {
01674 char query[512];
01675 sprintf(query, "CREATE SEQUENCE %s_seq",tablename);
01676 return db_dms(db,NULL,query);
01677 }
01678
01679 int db_sequence_drop(DB_Handle_t *db, char *tablename)
01680 {
01681 char query[512];
01682 sprintf(query, "DROP SEQUENCE %s_seq",tablename);
01683 return db_dms(db,NULL,query);
01684 }
01685
01686
01687 int db_cancel(DB_Handle_t *db, char *effbuf, int size)
01688 {
01689 PGcancel *readonly = PQgetCancel((PGconn *)db->db_connection);
01690 int rv = 0;
01691
01692 if (readonly)
01693 {
01694 rv = PQcancel(readonly, effbuf, size);
01695 PQfreeCancel(readonly);
01696 }
01697
01698 return rv;
01699 }
01700
01701
01702 int db_settimeout(DB_Handle_t *db, unsigned int timeoutval)
01703 {
01704 char stmnt[128];
01705
01706 snprintf(stmnt, sizeof(stmnt), "SET statement_timeout TO %u", timeoutval);
01707 return db_dms(db, NULL, stmnt);
01708 }
01709
01710
01711 int db_setutf8clientencoding(DB_Handle_t *db)
01712 {
01713 char stmnt[128];
01714
01715 snprintf(stmnt, sizeof(stmnt), "SET client_encoding TO 'UTF8'");
01716 return db_dms(db, NULL, stmnt);
01717 }