(file) Return to sum_pe.c CVS log (file) (dir) Up to [Development] / JSOC / proj / dsdsmigr / apps

File: [Development] / JSOC / proj / dsdsmigr / apps / sum_pe.c (download)
Revision: 1.5, Tue Aug 4 01:29:17 2009 UTC (13 years, 10 months ago) by arta
Branch: MAIN
CVS Tags: Ver_LATEST, Ver_9-5, Ver_9-41, Ver_9-4, Ver_9-3, Ver_9-2, Ver_9-1, Ver_9-0, Ver_8-8, Ver_8-7, Ver_8-6, Ver_8-5, Ver_8-4, Ver_8-3, Ver_8-2, Ver_8-12, Ver_8-11, Ver_8-10, Ver_8-1, Ver_8-0, Ver_7-1, Ver_7-0, Ver_6-4, Ver_6-3, Ver_6-2, Ver_6-1, Ver_6-0, Ver_5-9, Ver_5-8, Ver_5-7, Ver_5-6, Ver_5-5, Ver_5-3, Ver_5-2, Ver_5-14, Ver_5-13, Ver_5-12, Ver_5-11, Ver_5-10, HEAD
Changes since 1.4: +1 -0 lines
Make warning #266 an error if the compiler is icc - not declaring a function can cause very subtle and hard to find bugs

/*
 * sum_pe - When an old MDI pe/peq type program gets a DRMS type ds name
 *           it will call sum_pe_svc. The sum_pe_svc spawns us (sum_pe) which
 *	runs with a drms_server and is given the keylist that the peq 
 *	generated. The sum_pe  gets the wd of the ds and returns
 *	it to sum_pe_svc which returns it to the original caller.
 * Sample call:
 *	sum_pe server=d00 keyfile=/tmp/keylist_10871857.log jsoc
 */

#include "jsoc_main.h"
#include "drms.h"
#include "drms_names.h"
#include <SUM.h>
#include <soi_error.h>
#include <sys/errno.h>
#include <rpc/rpc.h>
#include <rpc/pmap_clnt.h>
#include <signal.h>
#include <sum_rpc.h>
#include <printk.h>

ModuleArgs_t module_args[] =
{ 
  {ARG_STRING, "server", "", "host with sum_pe_svc"},
  {ARG_STRING, "keyfile", "", "file with peq keylist"},
  {ARG_STRING, "logdir", "/usr/local/logs/SUM", "dir for log file"},
  {ARG_FLAG, "v", "0", "verbose flag"},
  {ARG_FLAG, "h", "0", "help - print usage info"},
  {ARG_END}
};

int file2keylist(char *filename, KEY **list); /* must be declared */
void logkey();
KEY *getsumpe(KEY *params);
struct timeval TIMEOUT = { 20, 0 };

static KEY *retlist;            /* must be static for svc dispatch rte */
char *module_name = "sum_pe";
char *logdir, *db, *keyfile, *server;
char datestr[32];
char logname[MAX_STR];
int pid;
int dellog = 1;
uint32_t rinfo;
FILE *logfp;
SVCXPRT *glb_transp;
CLIENT *current_client;

int open_log(char *filename)
{
  if((logfp=fopen(filename, "w")) == NULL) {
    fprintf(stderr, "Can't open the log file %s\n", filename);
    return(1);
  }
  return(0);
}

/* Outputs the variable format message (re: printf) to the log file.
*/
int write_log(const char *fmt, ...)
{
  va_list args;
  char string[4096];

  va_start(args, fmt);
  vsprintf(string, fmt, args);
  if(logfp) {
    fprintf(logfp, string);
    fflush(logfp);
  }
  else
    fprintf(stderr, string);
  va_end(args);
  return(0);
}

/* Return ptr to "mmm dd hh:mm:ss". Uses global datestr[]. */
static char *datestring()
{
  struct timeval tvalr;
  struct tm *t_ptr;

  gettimeofday(&tvalr, NULL);
  t_ptr = localtime((const time_t *)&tvalr);
  sprintf(datestr, "%s", asctime(t_ptr));
  datestr[19] = (char)NULL;
  return(&datestr[4]);          /* isolate the mmm dd hh:mm:ss */
}

void sighandler(int sig)
{
  if(sig == SIGTERM) {
    printk("*** %s sum_pe_svc got SIGTERM. Exiting.\n", datestring());
    exit(1);
  }
  if(sig == SIGINT) {
    printk("*** %s sum_pe_svc got SIGINT. Exiting.\n", datestring());
    exit(1);
  }
  printk("*** %s sum_pe_svc got an illegal signal %d, ignoring...\n",
                        datestring(), sig);
  if (signal(SIGINT, SIG_IGN) != SIG_IGN)
      signal(SIGINT, sighandler);
  if (signal(SIGALRM, SIG_IGN) != SIG_IGN)
      signal(SIGALRM, sighandler);
}


int setup () {
  pid = getpid();
  sprintf(logname, "%s/sum_pe_%d.log", logdir, pid);
  if(open_log(logname)) return(1);
  printk_set(write_log, write_log);
  printk("%s\nStarting sum_pe for database = %s\nserver=%s\nkeyfile = %s\nlogfile = %s\n\n", datestring(), db, server, keyfile, logname);
  if (signal(SIGINT, SIG_IGN) != SIG_IGN)
      signal(SIGINT, sighandler);
  if (signal(SIGTERM, SIG_IGN) != SIG_IGN)
      signal(SIGTERM, sighandler);
  if (signal(SIGALRM, SIG_IGN) != SIG_IGN)
      signal(SIGALRM, sighandler);
  return(0);
}

int nice_intro(int usage) {
  if (usage)
    {
    printf ("Usage:\nsum_pe [-v] server=hostname keyfile=file "
	"[logdir=<dir for log file>] dbname\n"
        "  details are:\n"
	"  -v: verbose mode\n"
        "server=host where sum_pe_svc runs\n"
        "keyfile=filename containing peq generated keylist\n"
	"logdir=dir to put the log file in\n"
        "dbname is the DRMS data base to connect to, e.g. jsoc\n");
    return(1);
    }
  return (0);
}

void drms_print_query_rec(DRMS_Record_t *rec)
  {
  int iprime, nprime;
  DRMS_Keyword_t *rec_key, *key, **prime_keys;
  printk("%s\n",rec->seriesinfo->seriesname);
  nprime = rec->seriesinfo->pidx_num;
  prime_keys = rec->seriesinfo->pidx_keywords;
  printk("!!!!TEMP in drms_print_query_rec()\n");
  if (nprime > 0)
    {
    for (iprime = 0; iprime < nprime; iprime++)
      {
      key = prime_keys[iprime];
      rec_key = drms_keyword_lookup (rec, key->info->name, 1);
      printk("[");
      if (key->info->type != DRMS_TYPE_STRING)
        drms_keyword_printval (rec_key);
      else
        {
        printk("\"");
        drms_keyword_printval (rec_key);
        printk("\"");
        }
    printk("]\n");
    }
  }
else
printk("[:#%lld]",rec->recnum);
}



/* Module main function. */
int DoIt(void) {
  int status = 0;
  uint32_t sumpeback;
  char cmd[128];
  char *call_err;
  KEY *list=newkeylist();
  CLIENT *clntsumpesvc;

  /* Get command line arguments */
  /*int verbose = cmdparams_get_int (&cmdparams, "v", NULL);*/
  int usage = cmdparams_get_int (&cmdparams, "h", NULL);
  logdir = cmdparams_get_str (&cmdparams, "logdir", NULL);
  server = cmdparams_get_str (&cmdparams, "server", NULL);
  keyfile = cmdparams_get_str (&cmdparams, "keyfile", NULL);
  if(nice_intro(usage)) return (0);
  if(cmdparams_numargs(&cmdparams) >= 1 && (db = cmdparams_getarg (&cmdparams, 1))) {
    /*printf("Starting sum_pe for database = %s.\n",db);*/
  }
  else {
    nice_intro(1);
    return(0);
  }
  if(setup()) return(1);
  if(file2keylist(keyfile, &list)) {	/* convert input file to keylist */
    printk("Error in file2keylist() for %s\n", keyfile);
    return(1);
  }
  sprintf(cmd, "/bin/rm -f %s", keyfile);
  system(cmd);
  keyiterate(logkey, list);            /* !!!TEMP */
  /* Create client handle used for calling the sum_pe_svc */
  clntsumpesvc = clnt_create(server, SUMPEPROG, SUMPEVERS, "tcp");
  if(!clntsumpesvc) {                 /* server not there */
    clnt_pcreateerror("Can't get client handle to sum_pe_svc");
    printk("sum_pe_svc not there on %s\n", server);
    return(1);
  }

  retlist = getsumpe(list);
  /* now send answer back to sum_pe_svc */
    status = clnt_call(clntsumpesvc,SUMPEACK, (xdrproc_t)xdr_Rkey, 
	(char *)retlist, (xdrproc_t)xdr_uint32_t, (char *)&sumpeback, TIMEOUT);
    if(status != RPC_SUCCESS) {
      call_err = clnt_sperror(clntsumpesvc, "Err clnt_call for SUMPEACK");
      printk("%s %s\n", datestring(), call_err);
      if(status != RPC_TIMEDOUT) {
        return(status);
      }
    }
    if(sumpeback != 0) {
      printk("Error status= %d from clnt_call to SUMPEACK = %d\n", sumpeback);
      status = 1;
    }
  if(dellog) {
    sprintf(cmd, "/bin/rm -f %s", logname);
    system(cmd);
  }
  return(0);
}

/* Called to get a SUMS wd for a dataset.
 * The input keylist is the  normal expansion keylist of the pe/peq call. 
 * For example:
 * dsds_uid:       KEYTYP_LONG     9638582
 * arg_data_in_0:  KEYTYP_STRING   in
 * in_0_basename:  KEYTYP_STRING
 * in_0_wd:        KEYTYP_STRING   .
 * in_basename:    KEYTYP_STRING
 * in_wd:  KEYTYP_STRING   .
 * in_0_series_sn: KEYTYP_INT      60000
 * in_0_fmt:       KEYTYP_STRING   -1073754624
 * in_0_incr:      KEYTYP_INT      1
 * in_0_lsn:       KEYTYP_INT      -1
 * in_0_fsn:       KEYTYP_INT      0
 * in_0_data:      KEYTYP_STRING   prog:mdi,level:lev1.5,series:fd_V_01h[60000]
 * in_0_prog:      KEYTYP_STRING   mdi
 * in_0_level:     KEYTYP_STRING   lev1.5
 * in_0_series:    KEYTYP_STRING   fd_V_01h
 * in_0_series_range:      KEYTYP_STRING   60000
 * in_fmt: KEYTYP_STRING   -1073754624
 * in_incr:        KEYTYP_INT      1
 * in_lsn: KEYTYP_INT      -1
 * in_fsn: KEYTYP_INT      0
 * in_data:        KEYTYP_STRING   prog:mdi,level:lev1.5,series:fd_V_01h[60000]
 * in_prog:        KEYTYP_STRING   mdi
 * in_level:       KEYTYP_STRING   lev1.5
 * in_series:      KEYTYP_STRING   fd_V_01h
 * in_series_sn:   KEYTYP_INT      60000
 * in_series_range:        KEYTYP_STRING   60000
 * in_nsets:       KEYTYP_INT      1
 * in:     KEYTYP_STRING   prog:mdi,level:lev1.5,series:fd_V_01h[60000]
 *
 * This routine will then query the drms for the datasets and return the 
 * answer keylist to to calling sum_pe_svc which will return it to the 
 * original peq.
*/
KEY *getsumpe(KEY *params)
{
  int reqcnt;
  int status;
  DRMS_RecordSet_t *recordset;
  DRMS_Record_t *rec;
  int xdirflg = 0;
  int first_rec, last_rec, nrecs, irec, retrieve_flg, i;
  char name[80], value[80], cmd[80], xdir[128], errmsg[128];
  char path[DRMS_MAXPATHLEN];
  /*char *in = "ds_mdi.fd_V_01h_lev1_8[121903-121943]"; /* !!TEMP */
  char *in, *cptr;
  FILE *infile;

  /*printk("!!Keylist in sumpedo_1() is:\n");	/* !!!TEMP */
  /*keyiterate(logkey, params);*/
  retlist = newkeylist();
  add_keys(params, &retlist);           /* NOTE:does not do fileptr */
  setkey_fileptr(&retlist, "current_client", getkey_fileptr(params, "current_client"));
  setkey_int(&retlist, "REQCODE", PEPEQRESPDO);
  reqcnt = getkey_int(params, "in_nsets");
  for(i=0; i < reqcnt; i++) {
    in = getkey_str(params, "in");
    /* if special ds that has the extra dir due to the import script.. */
    if(strstr(in, "ds_mdi.fd_V_01h_lev1_8") || 
	strstr(in, "ds_mdi.fd_V_30s_01h_lev1_8") ||
	strstr(in, "ds_mdi.fd_M_96m_01d_lev1_8") ||
	strstr(in, "ds_mdi.vw_V_06h_lev1_8") ||
        strstr(in, "ds_mdi.fd_M_01h_lev1_8")) {
      xdirflg = 1;
    }
    retrieve_flg = getkey_int(params, "retrieve_flg");
    printk("%s\nretrieve_flg in sumpedo_1 = %d\n", datestring(), retrieve_flg);
    /* Open record_set */
    recordset = drms_open_records (drms_env, in, &status);
    if (status) {
      printk("drms_open_records failed, in=%s, status=%d.\n", in, status);
      setkey_int(&retlist, "STATUS", 1); /* tell orig caller error */
      sprintf(errmsg, "Err: see on sum host: %s\n", logname);
      dellog = 0;		//don't del the sum_pe log file
      setkey_str(&retlist, "ERRMSG", errmsg);
      return(retlist);  
    }
    /* recordset now points to a struct with  count of records found ("n"), 
     * and a pointer to an array of record pointers ("records");
    */
    nrecs = recordset->n;
    printk("#of records in %s = %d\n", in, nrecs); /* !!!TEMP */
    if (nrecs == 0) {
      printk ("** No records in selected data set **\n");
      setkey_int(&retlist, "STATUS", 1); /* tell orig caller error */
      sprintf(errmsg, "** No records in selected data set **\n");
      setkey_str(&retlist, "ERRMSG", errmsg);
      return(retlist);  
    }
    last_rec = nrecs - 1;
    first_rec = 0;
    for (irec = first_rec; irec <= last_rec; irec++) {
      rec = recordset->records[irec];  /* pointer to current record */
      /*drms_print_query_rec(rec);	/* !!!TEMP */
      drms_record_directory (rec, path, retrieve_flg);
      if(xdirflg && strcmp(path, "")) {/* special ds with the extra dir */
        sprintf(cmd, "/bin/ls %s", path);
        if(!(infile = popen(cmd, "r"))) {
          printk("Can't do popen() for %s\n", cmd);
          setkey_int(&retlist, "STATUS", 1); /* tell orig caller error */
          sprintf(errmsg, "Err: Can't do popen() for %s\n", cmd);
          setkey_str(&retlist, "ERRMSG", errmsg);
          return(retlist);  
        }
        if(!fgets(xdir, 128, infile)) {
          printk("Can't get the extra dir for special ds\n");
          setkey_int(&retlist, "STATUS", 1); /* tell orig caller error */
          sprintf(errmsg, "Err: Can't get the extra dir for special ds\n");
          setkey_str(&retlist, "ERRMSG", errmsg);
          return(retlist);  
        }
        if(cptr = rindex(xdir, '\n')) *cptr = NULL; /* elim term. CR */
        sprintf(path, "%s/%s", path, xdir);
        pclose(infile);
      }
      sprintf(name, "in_%d_wd", irec);
      setkey_str(&retlist, name, path);
      printk("%s %s\n", name, path);
      sprintf(name, "inname_%d", irec);
      sprintf(value, "in_%d", irec);
      setkey_str(&retlist, name, value);
      if(!strcmp(path, "")) {
        sprintf(name, "status_%d", irec);
        setkey_int(&retlist, name, DS_ARCHIVE);
      }
      else {
        sprintf(name, "status_%d", irec);
        setkey_int(&retlist, name, 0);
      }
    }
    drms_close_records(recordset, DRMS_FREE_RECORD);
  }
  setkey_int(&retlist, "in_nsets", nrecs);
  setkey_int(&retlist, "STATUS", 0);
  return(retlist);  
}

Karen Tian
Powered by
ViewCVS 0.9.4