00001 static char rcsid[] = "$Header: /home/cvsuser/cvsroot/JSOC/proj/jpe/apps/spawnds.c,v 1.1 2009/02/03 22:18:47 production Exp $";
00002
00003
00004
00005 #include <stdio.h>
00006 #include <string.h>
00007 #include <pvm3.h>
00008 #include "pe.h"
00009
00010 int find_pe_rpc(int mytid, int (*msg)(char *fmt, ...));
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 int spawn_dsds_local(char *db, int mytid, int *dsdstid, int debugflg,
00023 int (*msgf)(char *fmt, ...))
00024 {
00025 KEY *alist, *blist;
00026 struct taskinfo *taskp;
00027 char thishost[MAX_STR];
00028 char *args[2];
00029 char *dsdsdb;
00030 int i, ntask, dsds_rm_tid;
00031
00032 if(pvm_tasks(0, &ntask, &taskp)) {
00033 (*msgf)("Error on pvm_taks() call for dsds_svc info\n");
00034 return(1);
00035 }
00036 for(i=0; i<ntask; i++) {
00037 if(!strcmp(taskp[i].ti_a_out, "dsds_svc")) {
00038 *dsdstid=taskp[i].ti_tid;
00039 break;
00040 }
00041 }
00042 if(i == ntask) {
00043 gethostname(thishost,MAX_STR);
00044 args[0] = db;
00045 args[1] = NULL;
00046 if(debugflg)
00047 pvm_spawn("dsds_svc",args,PvmTaskDebug,thishost,1,dsdstid);
00048 else
00049 pvm_spawn("dsds_svc",args,PvmTaskHost,thishost,1,dsdstid);
00050 if(*dsdstid < 0) {
00051 (*msgf)("Error spawing dsds_svc on %s\n", thishost);
00052 return(1);
00053 }
00054 (*msgf)("dsds_svc tid=%x spawned on %s for db=%s\n", *dsdstid, thishost, db);
00055 }
00056 else {
00057 alist=newkeylist();
00058 if((blist = (KEY *)call_dsds(&alist, REQDBNAME, *dsdstid, mytid, msgf, debugflg)) == NULL) {
00059 (*msgf)("Error requesting database name from dsds_svc\n");
00060 return(1);
00061 }
00062 dsdsdb = GETKEY_str(blist, "dsds_dbname");
00063 (*msgf)("dsds_svc tid=%x currently running for db=%s\n",*dsdstid,dsdsdb);
00064 freekeylist(&alist); freekeylist(&blist);
00065 if(strcmp(dsdsdb, db)) {
00066 (*msgf)("The dsds_svc must be halted if you really want to connect to db=%s\n", db);
00067 return(1);
00068 }
00069 }
00070
00071 for(i=0; i<ntask; i++) {
00072 if(!strcmp(taskp[i].ti_a_out, "dsds_rm")) {
00073 dsds_rm_tid=taskp[i].ti_tid;
00074 break;
00075 }
00076 }
00077 if(i == ntask) {
00078 gethostname(thishost,MAX_STR);
00079 args[0] = db;
00080 args[1] = NULL;
00081 if(debugflg)
00082 pvm_spawn("dsds_rm",args,PvmTaskDebug,thishost,1,&dsds_rm_tid);
00083 else
00084 pvm_spawn("dsds_rm",args,PvmTaskHost,thishost,1,&dsds_rm_tid);
00085 if(dsds_rm_tid < 0) {
00086 (*msgf)("Error spawing dsds_rm on %s\n", thishost);
00087 return(1);
00088 }
00089 (*msgf)("dsds_rm tid=%x spawned on %s for db=%s\n", dsds_rm_tid, thishost, db);
00090 }
00091 else {
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108 }
00109 return(0);
00110 }
00111
00112
00113
00114
00115 int spawn_rpc_now(int *perpctid, int debugflg, int (*msgf)(char *fmt, ...))
00116 {
00117 char thishost[MAX_STR];
00118 char *args[2];
00119 int initstat, dummy;
00120
00121 gethostname(thishost,MAX_STR);
00122
00123 args[0] = NULL;
00124 args[1] = NULL;
00125 if(debugflg)
00126 pvm_spawn("pe_rpc",args,PvmTaskDebug,thishost,1,perpctid);
00127 else
00128 pvm_spawn("pe_rpc",args,PvmTaskHost,thishost,1,perpctid);
00129 if(*perpctid < 0) {
00130 (*msgf)("Error spawing pe_rpc on %s\n", thishost);
00131 return(1);
00132 }
00133 (*msgf)("pe_rpc tid=%x spawned on %s\n", *perpctid, thishost);
00134
00135 if(pvm_recv(*perpctid, MSGDSDS) < 0) {
00136 (*msgf)("Error receiving initialization response from pe_rpc\n");
00137 return(1);
00138 }
00139 pvm_upkint(&dummy, 1, 1);
00140 pvm_upkint(&initstat, 1, 1);
00141 if(initstat) {
00142 (*msgf)("pe_rpc init err %d. Most likely pe_rpc_svc not running.\nContact CM@shoom.stanford.edu\n", initstat);
00143 return(1);
00144 }
00145 return(0);
00146 }
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159 int spawn_pe_rpc(char *db, int mytid, int *perpctid, int debugflg,
00160 int (*msgf)(char *fmt, ...))
00161 {
00162 KEY *alist, *blist;
00163 struct taskinfo *taskp;
00164 char *dsdsdb;
00165 int i, ntask;
00166
00167 if(pvm_tasks(0, &ntask, &taskp)) {
00168 (*msgf)("Error on pvm_tasks() call for pe_rpc info\n");
00169 return(1);
00170 }
00171 for(i=0; i<ntask; i++) {
00172 if(!strcmp(taskp[i].ti_a_out, "pe_rpc")) {
00173 *perpctid=taskp[i].ti_tid;
00174 break;
00175 }
00176 }
00177 if(i == ntask) {
00178 if(spawn_rpc_now(perpctid, debugflg, msgf))
00179 return(1);
00180 }
00181
00182 alist=newkeylist();
00183 if((blist = (KEY *)call_dsds(&alist, REQDBNAME, *perpctid, mytid, msgf, debugflg)) == NULL) {
00184 (*msgf)("Error requesting database name from dsds_svc:\n");
00185 (*msgf)("I'm restarting pe_rpc as you may have a stale connection...\n");
00186 pvm_kill(*perpctid);
00187
00188
00189
00190
00191
00192
00193
00194 if(spawn_rpc_now(perpctid, debugflg, msgf)) return(1);
00195 if((blist = (KEY *)call_dsds(&alist, REQDBNAME, *perpctid, mytid, msgf, debugflg)) == NULL) {
00196 (*msgf)("***Error RE-requesting database name from dsds_svc\n");
00197 freekeylist(&alist);
00198 return(1);
00199 }
00200 }
00201 dsdsdb = GETKEY_str(blist, "dsds_dbname");
00202 (*msgf)("dsds_svc currently running for db=%s\n", dsdsdb);
00203 freekeylist(&alist); freekeylist(&blist);
00204 if(strcmp(dsdsdb, db)) {
00205 (*msgf)("The dsds_svc must be halted if you really want to connect to db=%s\n", db);
00206 return(1);
00207 }
00208 return(0);
00209 }
00210
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228 int find_pe_rpc(int mytid, int (*msg)(char *fmt, ...))
00229 {
00230 FILE *fplog;
00231 char *thisuser, *cptr;
00232 char lfile[64], acmd[96], line[128], uline[64];
00233 int rpid;
00234
00235 sprintf(lfile, "/tmp/find_pe_rpc.%x.log", mytid);
00236 sprintf(acmd, "ps -ef | grep pe_rpc 1> %s 2>&1", lfile);
00237 if(system(acmd)) {
00238 (*msg)("**find_pe_rpc():Can't execute %s.\n", acmd);
00239 return(-1);
00240 }
00241 if((fplog=fopen(lfile, "r")) == NULL) {
00242 (*msg)("**find_pe_rpc():Can't open %s to find any rogue pe_rpc\n", lfile);
00243 return(-1);
00244 }
00245 if(!(thisuser = (char *)getenv("USER"))) thisuser = "nouser";
00246 while(fgets(line, 128, fplog)) {
00247 if(!(strstr(line, "pe_rpc \n"))) continue;
00248 if(strstr(line, "grep pe_rpc")) continue;
00249 sscanf(line, "%s %d", uline, &rpid);
00250 if(!strcmp(thisuser, uline)) {
00251 (*msg)("***You have a rogue pe_rpc pid=%d running outside your pvm machine\n", rpid);
00252 fclose(fplog);
00253 return(-1);
00254 }
00255 }
00256 fclose(fplog);
00257 return (0);
00258 }
00259
00260
00261
00262
00263
00264
00265
00266
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286
00287
00288
00289
00290
00291
00292
00293
00294
00295
00296
00297
00298
00299
00300
00301
00302
00303
00304