00001
00002
00003
00004
00005
00006
00007
00008 #include "postgres.h"
00009 #include "fmgr.h"
00010
00011 PG_MODULE_MAGIC;
00012
00013 #define kShadowSuffix "_shadow"
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 static int PGCtxTabexists(const char *ns, const char *tab, int *status)
00026 {
00027 int result = -1;
00028 int istat = 0;
00029 char query[2048];
00030 DB_Text_Result_t *qres = NULL;
00031
00032 snprintf(query, sizeof(query), "SELECT * FROM pg_tables WHERE schemaname ILIKE '%s' AND tablename ILIKE '%s'", ns, tab);
00033
00034
00035 istat = SPI_execute(query, true, 0);
00036
00037 if (istat == SPI_OK_SELECT)
00038 {
00039 if (SPI_processed == 0)
00040 {
00041
00042 result = 0;
00043 }
00044 else if (SPI_processed == 1)
00045 {
00046
00047 result = 1;
00048 }
00049 else
00050 {
00051
00052 istat = 1;
00053 }
00054 }
00055 else
00056 {
00057
00058 istat = 1;
00059 }
00060
00061 if (status)
00062 {
00063 *status = istat;
00064 }
00065
00066 return result;
00067 }
00068
00069
00070
00071
00072 static int PGCtxShadowExists(const char *ns, const char *tab, int *status)
00073 {
00074 int istat = 0;
00075 int tabexists = 0;
00076 char shadowtable[64];
00077
00078 snprintf(shadowtable, sizeof(shadowtable), "%s%s", tab, kShadowSuffix);
00079 tabexists = PGCtxTabexists(namespace, shadowtable, &istat);
00080
00081 if (status)
00082 {
00083 *status = istat;
00084 }
00085
00086 return tabexists;
00087 }
00088
00089 static size_t PGCtxStrlcat(char *dst, const char *src, size_t size)
00090 {
00091 size_t max = size - strlen(dst) - 1;
00092 size_t start = strlen(dst);
00093
00094 if (max > 0)
00095 {
00096 snprintf(dst + start, max + 1, "%s", src);
00097 }
00098
00099 return start + strlen(src);
00100 }
00101
00102 static void *PGCtxStrcatalloc(char *dst, const char *src, size_t *sizedst)
00103 {
00104 size_t srclen = strlen(src);
00105 size_t dstlen = strlen(dst);
00106 void *retstr = NULL;
00107
00108 if (srclen > *sizedst - dstlen - 1)
00109 {
00110 void *tmp = palloc(*sizedst * 2);
00111
00112 if (tmp)
00113 {
00114 snprintf(tmp, *sizedst * 2, "%s", dst);
00115 *sizedst *= 2;
00116 retstr = tmp;
00117 }
00118 }
00119 else
00120 {
00121 retstr = dst;
00122 }
00123
00124 if (retstr)
00125 {
00126 PGCtxStrlcat(retstr, src, *sizedst);
00127 }
00128
00129 return retstr;
00130 }
00131
00132 static void PGCtxStrtolower(char *str)
00133 {
00134 int n;
00135 int i;
00136
00137 n = strlen(str);
00138
00139 for (i = 0; i < n; i++)
00140 {
00141 str[i] = (char)tolower(str[i]);
00142 }
00143 }
00144
00145
00146 static int PGCtxIsGroupNew(long long recnum,
00147 const char *ns,
00148 const char *tab
00149 char **pkeynames,
00150 int ncols,
00151 int *status)
00152 {
00153 char *query = NULL;
00154 size_t stsz = 8192;
00155 char scolnum[8];
00156 char srecnum[32];
00157 int ans = -1;
00158 int icol;
00159 char *lcseries = NULL;
00160 int istat = 0;
00161
00162 snprintf(srecnum, sizeof(srecnum), "%lld", recnum);
00163
00164 lcseries = palloc((strlen(ns) + strlen(tab) + 2) * sizeof(char));
00165 snprintf(lcseries, (strlen(ns) + strlen(tab) + 2) * sizeof(char), "%s.%s", ns, tab);
00166
00167 if (lcseries)
00168 {
00169 PGCtxStrtolower(lcseries);
00170 query = palloc(stsz);
00171 if (query)
00172 {
00173
00174
00175
00176 *query = '\0';
00177
00178 query = PGCtxStrcatalloc(query, "SELECT count(*) FROM ", &stsz);
00179 query = PGCtxStrcatalloc(query, lcseries, &stsz);
00180 query = PGCtxStrcatalloc(query, " AS T1, (SELECT ", &stsz);
00181
00182 for (icol = 0; icol < ncols; icol++)
00183 {
00184 snprintf(scolnum, sizeof(scolnum), "%d", icol);
00185 query = PGCtxStrcatalloc(query, pkeynames[icol], &stsz);
00186 query = PGCtxStrcatalloc(query, " AS p", &stsz);
00187 query = PGCtxStrcatalloc(query, scolnum, &stsz);
00188
00189 if (icol < ncols - 1)
00190 {
00191 query = PGCtxStrcatalloc(query, ", ", &stsz);
00192 }
00193 }
00194
00195 query = PGCtxStrcatalloc(query, " FROM ", &stsz);
00196 query = PGCtxStrcatalloc(query, lcseries, &stsz);
00197 query = PGCtxStrcatalloc(query, " WHERE recnum = ", &stsz);
00198 query = PGCtxStrcatalloc(query, srecnum, &stsz);
00199 query = PGCtxStrcatalloc(query, ") AS T2 WHERE ", &stsz);
00200
00201 for (icol = 0; icol < ncols; icol++)
00202 {
00203 snprintf(scolnum, sizeof(scolnum), "%d", icol);
00204 query = PGCtxStrcatalloc(query, "T1.", &stsz);
00205 query = PGCtxStrcatalloc(query, pkeynames[icol], &stsz);
00206 query = PGCtxStrcatalloc(query, " = T2.p", &stsz);
00207 query = PGCtxStrcatalloc(query, scolnum, &stsz);
00208
00209 if (icol < ncols - 1)
00210 {
00211 query = PGCtxStrcatalloc(query, " AND ", &stsz);
00212 }
00213 }
00214 }
00215 else
00216 {
00217 istat = 1;
00218 }
00219 }
00220 else
00221 {
00222 istat = 1;
00223 }
00224
00225
00226 if (istat == 0)
00227 {
00228 istat = SPI_execute(query, true, 0);
00229
00230 if (istat == SPI_OK_SELECT)
00231 {
00232 if (SPI_processed == 0)
00233 {
00234
00235
00236 fprintf(stderr, "Unexpected record count %lld; should be at least one.\n", num);
00237 istat = 1;
00238 }
00239 else if (SPI_processed == 1)
00240 {
00241
00242 ans = 1;
00243 }
00244 else
00245 {
00246
00247
00248 ans = 0;
00249 }
00250 }
00251 else
00252 {
00253
00254 istat = 1;
00255 }
00256
00257 tres = drms_query_txt(env->session, query);
00258 }
00259
00260 if (status)
00261 {
00262 *status = istat;
00263 }
00264
00265 return ans;
00266 }
00267
00268
00269
00270 PG_FUNCTION_INFO_V1(UpdateShadow);
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285 Datum UpdateShadow(PG_FUNCTION_ARGS)
00286 {
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301 int status = 0;
00302 int shadowexists = 0;
00303 size_t lsz = 0;
00304 char recnumstr[64];
00305 int isnew;
00306 text *nstext = NULL;
00307 text *tabletext = NULL;
00308 int added;
00309 long long recnum = -1;
00310 ArrayType *pkeynames = NULL;
00311 char *ns = NULL;
00312 char *tab = NULL;
00313
00314 if (SPI_connect() == SPI_OK_CONNECT)
00315 {
00316 nstext = PG_GETARG_TEXT_P(0);
00317 tabletext = PG_GETARG_TEXT_P(1);
00318 added = (strcmp(PG_GETARG_INT32(2), "INSERT") ? 1 : 0);
00319 recnum = PG_GETARG_INT64(3);
00320 pkeynames = PG_GETARG_ARRAYTYPE_P(4);
00321
00322 ns = palloc(nstext->length + 1);
00323 table = palloc(tabletext->length + 1);
00324
00325 if (!ns || !table)
00326 {
00327 PG_RETURN_INT32(kPGCtxErrNoMemory);
00328 }
00329
00330 memcpy(ns, nstext->data, nstext->length);
00331 *(ns + nstext->length) = '\0';
00332
00333 memcpy(table, tabletext->data, tabletext->length);
00334 *(table + tabletext->length) = '\0';
00335
00336 get_typlenbyvalalign(i_eltype, &i_typlen, &i_typbyval, &i_typalign);
00337 deconstruct_array(pkeynames, TEXTOID, i_typelen, i_typebyval, i_typealign, &idata, nulls, &num );
00338
00339 shadowexists = PGCtxShadowExists(ns, table, &status);
00340
00341 if (status != 0)
00342 {
00343
00344 PG_RETURN_INT32(kPGCtxErrShadowCheck);
00345 }
00346
00347 if (shadowexists)
00348 {
00349 snprintf(recnumstr, sizeof(recnumstr), "%lld", recnum);
00350
00351 if (added)
00352 {
00353
00354
00355 isnew = IsGroupNew(recnum, ns, tab, pkeynames, ncols, &status);
00356
00357 if (isnew == 0)
00358 {
00359
00360 status = UpdateShadow(ns, tab, pkeynames, ncols, recnum, 1);
00361 }
00362 else if (isnew == 1)
00363 {
00364
00365
00366
00367 status = InsertIntoShadow(ns, tab, pkeynames, ncols, recnum);
00368 }
00369 else
00370 {
00371
00372 }
00373 }
00374 else
00375 {
00376
00377
00378 int wasdel;
00379
00380 wasdel = WasGroupDeleted(recnum, ns, tab, pkeynames, ncols, &status);
00381 if (wasdel)
00382 {
00383
00384
00385 status = DeleteFromShadow(ns, tab, pkeynames, ncols, recnum);
00386 }
00387 else
00388 {
00389
00390
00391
00392
00393 status = UpdateShadow(ns, tab, pkeynames, ncols, recnum, 0);
00394 }
00395 }
00396
00397 }
00398
00399
00400
00401 PG_RETURN_INT32(kPGCtxSuccess);
00402
00403 SPI_finish();
00404 }
00405 }