(file) Return to ingest_tlm.c CVS log (file) (dir) Up to [Development] / JSOC / proj / datacapture / apps

File: [Development] / JSOC / proj / datacapture / apps / ingest_tlm.c (download)
Revision: 1.25, Wed Jan 18 19:38:20 2012 UTC (11 years, 4 months ago) by arta
Branch: MAIN
CVS Tags: Ver_9-1, Ver_9-0, Ver_8-8, Ver_8-7, Ver_8-6, Ver_8-5, Ver_8-4, Ver_8-3, Ver_8-2, Ver_8-12, Ver_8-11, Ver_8-10, Ver_8-1, Ver_8-0, Ver_7-1, Ver_7-0, Ver_6-4, Ver_6-3, Ver_6-2
Changes since 1.24: +8 -19 lines
Revert changes Jim accidentally made to this file.

/*-----------------------------------------------------------------------------
 * cvs/jsoc/src/datacapture/ingest_tlm.c
 * NOTE: old hmi_lev0.c on hmi0
 *-----------------------------------------------------------------------------
 *
 * DESCRIPTION:
 * The ingest_tlm program is started by the DDS_SOC program when it is first
 * run. This program has the SUMS API interface to get and put SUM storage. 
 * The ingest_tlm is given the input dir to monitor as an argument when
 * it is invoked. This dir is normally the $DIRSOC2SOC directory that the 
 * DDS files are moved into after they are validated. Ingest_tlm is also
 * given the dir to copy files to after it is done ingesting them into SUMS.
 * This is normally the $DIRSOC2PIPE directory that the files are staged to
 * for pickup by the pipeline processing backend system.
 * When ingest_tlm sees a 
 * new .qac file, it will confirm the .tlm file and allocate disk storage via 
 * SUM_Allocate() and copy the current files into this storage and do a 
 * SUM_Put() to make the storage archivable by SUMS. When the files have been 
 * ingested into SUMS the program then moves them from $DIRSOC2SOC to 
 * $DIRSOC2PIPE for the backend pipeline system to begin processing.
 * 
 * Ingest_tlm is also passed the pipeline to soc dir that the pipeline backend
 * system writes .parc file to, to indicate which storage units have been 
 * archived in the backend. We will read these files and update the sum_main
 * table for the given storage unit with the safe_tape info provided.
 *
 * Called:
 *ingest_tlm [-v] -pVC02 /dir_to_ingest /dir_to_pipeline /dir_from_pipeline [log]
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#include <signal.h>
#include <strings.h>
#include <errno.h>
#include <sum_rpc.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/stat.h> /* for umask(2) */
#include <dirent.h>
#include <unistd.h> /* for alarm(2) among other things... */
#include <printk.h>
#include "egsehmicomp.h"


#define LEV0FILEON "/usr/local/logs/soc/LEV0FILEON" //touch to turnon lev0
#define H0LOGFILE "/tmp/h0.%s.%d.log"
#define DIRDDS "/egse/ssim2soc"
#define IMAGEDIR "/tmp/jim"	/* dir to put the last IMAGEDIRCNT images */
#define IMAGEDIRSMALL "/tmp/jim/small"
#define IMAGEDIRCNT 100		/* # of images to save */
#define SEC1970TO2004 1072828800 /* approx #of secs from 1970 to 2004 */
#define PKTSZ 1788		/* size of VCDU pkt */
#define DEFAULTDB "jsocdc"	/* the default db to connect to */
//#define MAXFILES 512		/* max # of file can handle in tlmdir */
#define MAXFILES 8192		/* max # of file can handle in tlmdir */
#define NUMTIMERS 10		/* number of seperate timers avail */
#define TESTAPPID 0x199		/* appid of test pattern packet */
#define TESTVALUE 0xc0b             /* first value in test pattern packet */
  	                                 /* previous used 0xc0b */
FILE *h0logfp;                  /* fp for h0 ouput log for this run */

static char datestr[32];
static struct timeval first[NUMTIMERS], second[NUMTIMERS];
static float tsum[NUMTIMERS];

/* Declarations for static functions */
static void open_sum(void);
/* static double du_dir(void);*/
static time_t call_time(void);
static void now_do_alrm_sig();

extern int numcontexts;
extern Decompress_Context_t *Context[];
extern int errno;

unsigned int fsn = 0;
unsigned int fsn_prev = 0;
unsigned int fid = 0;
SUM_t *sum;
SUMID_t uid = 0;
char **cptr;
uint64_t *dsixpt;
uint64_t alloc_index;
char alloc_wd[64];

long long vcdu_seq_num;
long long vcdu_seq_num_next;
long long total_missing_im_pdu;
unsigned int vcdu_24_cnt, vcdu_24_cnt_next;
int verbose;			/* set by get_cmd() */
int lev0_on_flag;
double reqbytes;		/* # of bytes requested */
double dsize;			/* # of bytes used */
double bytes_used;		/* total #of bytes for all cataloged output */
int total_tlm_vcdu;
int total_missing_vcdu;
int dsds_tid;			/* originally the tid of dsds_svc */
				/* now the tid of pe_rpc that gets us to dsds*/
int abort_active;		/* set while doing an abort */
int sigalrmflg = 0;		/* set on signal so prog will know */
int sigtermflg = 0;		/* set on signal so prog will know */
int tlmactive = 0;              /* set when tlm lev0 processing is active */
int pflg = 0;
int imagedircnt = 0;            /* inc each time write an image to IMAGEDIR */
int ALRMSEC = 60;               /* seconds for alarm signal */
int dbxflg;			/* user defined while running dbx, else N/A */
int debugflg;			/* run all pvm servers in the debug mode */
				/* also do keyiterate. Don't use w/tae */ 
char database[MAX_STR];
char pdshost[MAX_STR];
char pchan[8];			/* primary channel to listen to e.g. VC02 */
char rchan[8];			/* redundant channel to listen to e.g. VC10 */
char prependfits[8];		/* VC02 or VC10 for fits file name */
char *dbname = DEFAULTDB;	/* !!TBD pass this in as an arg */
char *username;			/* from getenv("USER") */
char *tlmdir;			/* tlm dir name passed in as argv[0] */
char *pipedir;			/* to pipeline backend dir in as argv[1] */
char *frompipedir;		/* from pipeline backend dir as in argv[2] */
char *logfile;			/* optional log name passed in as argv[3] */
struct p_r_chans {
  char *pchan;
  char *rchan;
};
typedef struct p_r_chans P_R_CHANS;

P_R_CHANS p_r_chan_pairs[] = {
{"VC01", "VC09"},		/* AIA */
{"VC04", "VC12"},		/* AIA */
{"VC02", "VC10"},		/* HMI */
{"VC05", "VC13"},		/* HMI */
{"n/a", "n/a"}
};

struct namesort {		/* sorted file names in tlmdir */
  char *name;
};
typedef struct namesort NAMESORT;

/* linked list of open images */
struct openimg {
  struct openimg *next;
  time_t sec;
  unsigned int fsn;
};
typedef struct openimg OPENIMG;
OPENIMG *openimg_hdr = NULL;	/* linked list of open images */
OPENIMG *openimg_ptr;		/* current enty */

/* NOTE: This my be vestigial. Now we can only have one open image per process. !!CHECK */
/* Add an entry with the given values to the OPENIMG linked list */
void setopenimg(OPENIMG **list, time_t sec, unsigned int fsn)
{
  OPENIMG *newone;

  newone = (OPENIMG *)malloc(sizeof(OPENIMG));
  newone->next = *list;
  newone->sec = sec;
  newone->fsn = fsn;
  *list = newone;
}

/* remove the OPENIMG list entry with the given fsn */
void remopenimg(OPENIMG **list, unsigned int fsn)
{
  OPENIMG *walk = *list;
  OPENIMG *trail = NULL;

  while(walk) {
    if(walk->fsn != fsn) {
      trail = walk;
      walk = walk->next;
    }
    else {
      if(trail)
        trail->next = walk->next;
      else
        *list = walk->next;
      free(walk);
      walk = NULL;
    }
  }
}

/* get the OPENIMG list entry with the given fsn. return null if none. */
OPENIMG *getopenimg(OPENIMG *list, unsigned int fsn)
{
  OPENIMG *walk = list;

  while(walk) {
    if(walk->fsn != fsn)
      walk = walk->next;
    else
      return walk;
  }
  return walk;
}

/* Set local time in datastr[] global vrbl */
void get_date() 
{
  time_t tval;
  struct tm *t_ptr;

  tval = time(NULL);
  t_ptr = localtime(&tval);
  sprintf(datestr, "%d.%02d.%02d_%02d:%02d:%02d", 
	  (t_ptr->tm_year+1900), (t_ptr->tm_mon+1),
	  t_ptr->tm_mday, t_ptr->tm_hour, t_ptr->tm_min, t_ptr->tm_sec);
}


void StartTimer(int n)
{
  gettimeofday (&first[n], NULL);
}

float StopTimer(int n)
{
  gettimeofday (&second[n], NULL);
  if (first[n].tv_usec > second[n].tv_usec) {
    second[n].tv_usec += 1000000;
    second[n].tv_sec--;
  }
  return (float) (second[n].tv_sec-first[n].tv_sec) +
    (float) (second[n].tv_usec-first[n].tv_usec)/1000000.0;
}

/* Output a printf type formatted msg string to stdout.
 */
int msg(char *fmt, ...)
{
  va_list args;
  char string[32768];

  va_start(args, fmt);
  vsprintf(string, fmt, args);
  printf(string);
  fflush(stdout);
  va_end(args);
  return(0);
}

/* Open the h0 log file for this do_tlm run.
 * Open either a new file for write, or a given file for append.
 */
void open_h0log(char *filename, char *type)
{
  if((h0logfp=fopen(filename, type)) == NULL)
    fprintf(stderr, "**Can't open the log file %s\n", filename);
}

/* Outputs the variable format message (re: printf) to the pe log file.
 */
int h0log(const char *fmt, ...)
{
  va_list args;
  char string[32768];

  va_start(args, fmt);
  vsprintf(string, fmt, args);
  if(h0logfp) {
    fprintf(h0logfp, string);
    fflush(h0logfp);
  } else {			/* couldn't open log */
    printf(string);
    fflush(stdout);
  }
  va_end(args);
  return(0);
}

void decompress_next_vcdu_err_str(int status, char *text)
{
  if(status == ERROR_BADOFFSET) sprintf(text, "ERROR_BADOFFSET");
  else if(status == ERROR_CORRUPTDATA) sprintf(text, "ERROR_CORRUPTDATA");
  else if(status == ERROR_BADHEADER) sprintf(text, "ERROR_BADHEADER");
  else if(status == ERROR_CTXOVERFLOW) sprintf(text, "ERROR_CTXOVERFLOW");
  else if(status == ERROR_INVALIDID) sprintf(text, "ERROR_INVALIDID");
  else if(status == ERROR_TOOMANYPIXELS) sprintf(text, "ERROR_TOOMANYPIXELS");
  else if(status == ERROR_NOSUCHIMAGE) sprintf(text, "ERROR_NOSUCHIMAGE");
  else if(status == ERROR_PARTIALOVERWRITE) sprintf(text,
"ERROR_PARTIALOVERWRITE");
  else if(status == ERROR_WRONGPACKET) sprintf(text, "ERROR_WRONGPACKET");
  else if(status == ERROR_MISSING_FSN) sprintf(text, "ERROR_MISSING_FSN");
  else if(status == ERROR_MISSING_FID) sprintf(text, "ERROR_MISSING_FID");
  else if(status == ERROR_INVALIDCROPID) sprintf(text, "ERROR_INVALIDCROPID");
  else if(status == ERROR_NODATA) sprintf(text, "ERROR_NODATA");
  else if(status == ERROR_HK_UNKNOWN_APID) sprintf(text,
"ERROR_HK_UNKNOWN_APID");
  else if(status == ERROR_HK_CANNOT_FIND_VER_NUM) sprintf(text,
"ERROR_HK_CANNOT_FIND_VER_NUM");
  else if(status == ERROR_HK_CANNOT_LOAD_HK_VALUES) sprintf(text,
"ERROR_HK_CANNOT_LOAD_HK_VALUES");
  else if(status == ERROR_HK_CANNOT_LOAD_ENGR_VALUES) sprintf(text,
"ERROR_HK_CANNOT_LOAD_ENGR_VALUES");
  else if(status == ERROR_HK_INVALID_BITFIELD_LENGTH) sprintf(text,
"ERROR_HK_INVALID_BITFIELD_LENGTH");
  else if(status == ERROR_HK_UNHANDLED_TYPE) sprintf(text,
"ERROR_HK_UNHANDLED_TYPE");
}

/* Got a fatal error sometime after registering with SUMS. 
 * Degregister and close with SUMS as approriate.
 */
void abortit(int stat)
{
  printk("***Abort in progress ...\n");
  if(uid ) {			/* we've registered with SUMS */
    SUM_close(sum, h0log);
  }
  printk("**Exit ingest_tlm w/ status = %d\n", stat);
  msg("Exit ingest_tlm w/ status = %d\n\n", stat);
  if (h0logfp) fclose(h0logfp);
  exit(stat);
}

/* Called every 60 sec to check if timeout on opened images.
 * Sets flag only as the libhmicomp lib is not reentrant.
 * The main program will check for the flag when it is safe to do the 
 * processing and call now_do_alrm_sig() if set.
 */
void alrm_sig(int sig)
{
  signal(SIGALRM, alrm_sig);
  if(!lev0_on_flag) return;
  if(!tlmactive)
    now_do_alrm_sig();
  else
    sigalrmflg = 1;
  return;
}

void now_do_alrm_sig()
{
  Image_t *image;
  Decompress_Stat_t *decomp_stat;
  int nx, i, status;
  char imgfile[128], cmd[128], bcmd[192];

  /* h0log("In now_do_alrm_sig()\n"); */ /* !!TEMP */
  nx = decompress_status_all(&decomp_stat);
  for(i=0; i < nx; i++) {
    fsn = ID2FSN(decomp_stat[i].ID);
    fsn = fsn & 0x3fffffff;         //make sure 30bits
    fid = ID2FID(decomp_stat[i].ID);
    openimg_ptr = (OPENIMG *)getopenimg(openimg_hdr, fsn);
    if(openimg_ptr != NULL) {
      h0log("#Timeout: Found an open image fsn=%u sec=%d call_time=%d\n",
            openimg_ptr->fsn, openimg_ptr->sec, call_time()); /*!!TEMP*/
      if((openimg_ptr->sec + ALRMSEC) <= call_time()) { /* image timed out */
        h0log("decmpress_print_status() says:\n");
        decompress_print_status(&decomp_stat[i]);
        h0log("Write partial image for timed out image.\n");
        status = decompress_flush_image(fsn, fid, &image);
        if(status == SUCCESS) {
	  h0log("*SUCCESS FLUSH of partial image fsn=%u\n", fsn);
          sprintf(imgfile, "%s/%s_%09u.%d.fits", 
			IMAGEDIR, prependfits, fsn, imagedircnt);
          if(decompress_writefitsimage(imgfile, image, 0)) {
            h0log("Error on output of %s\n", imgfile);
          }
          else {
            sprintf(bcmd, "/usr/local/bin/bin256 %s %s/%s_%09u.%d.fits", imgfile, IMAGEDIRSMALL, prependfits, fsn, imagedircnt);
            system(bcmd);
          }
          imagedircnt++;
          decompress_free_images(image);
          /* only keep the last n images */
          if(imagedircnt > IMAGEDIRCNT) {
            sprintf(cmd,"/bin/rm -f %s/%s_*.%d.fits",
			IMAGEDIR,prependfits,imagedircnt-IMAGEDIRCNT);
            h0log("%s\n", cmd);
            system(cmd);
          }
	  h0log("*SUCCESS FLUSH of partial image %s for fsn=%u\n",
			imgfile, fsn);
        }
        else {
	  h0log("Can't flush image for fsn = %u\n", fsn);
          decompress_free_images(image);
        }
        remopenimg(&openimg_hdr, fsn);
      }
    }
    else {                      /* add an entry for this fsn */
      setopenimg(&openimg_hdr, call_time(), fsn);
    }
  }
  free(decomp_stat);
  sigalrmflg = 0;
  alarm(ALRMSEC);
  return;
}

void sighandler(sig)
     int sig;
{
  sigtermflg = 1;
  return;
}

void now_do_term_sig()
{
  Image_t *image;
  Decompress_Stat_t *decomp_stat;
  int nx, i, status;
  char imgfile[128], cmd[128], bcmd[192];

  printk("\n***ingest_tlm received a termination signal\n");
  msg("\n***ingest_tlm received a termination signal\n");
  nx = decompress_status_all(&decomp_stat);
  msg("# of currently opened images to be flushed = %d\n", nx);
  if(nx) msg("(see log for more details)\n");
  h0log("# of currently opened images to be flushed = %d\n", nx);
  for(i=0; i < nx; i++) {
    h0log("decmpress_print_status() says:\n");
    decompress_print_status(&decomp_stat[i]);
    h0log("Write partial image.\n");
    fsn = ID2FSN(decomp_stat[i].ID);
    fsn = fsn & 0x3fffffff;         //make sure 30bits
    fid = ID2FID(decomp_stat[i].ID);
    status = decompress_flush_image(fsn, fid, &image);
    if(status == SUCCESS) {
      sprintf(imgfile, "%s/%s_%09u.%d.fits", 
			IMAGEDIR,prependfits,fsn,imagedircnt);
      if(decompress_writefitsimage(imgfile, image, 0)) {
        h0log("Error on output of %s\n", imgfile);
      }
      else {
        sprintf(bcmd, "/usr/local/bin/bin256 %s %s/%s_%09u.%d.fits", imgfile, IMAGEDIRSMALL, prependfits, fsn, imagedircnt);
        system(bcmd);
      }
      imagedircnt++;
      decompress_free_images(image);
      /* only keep the last n images */
      if(imagedircnt >= IMAGEDIRCNT) {
        sprintf(cmd,"/bin/rm -f %s/%s_*.%d.fits",
			IMAGEDIR,prependfits,imagedircnt-IMAGEDIRCNT);
        h0log("%s\n", cmd);
        system(cmd);
      }
      h0log("*SUCCESS FLUSH of partial image %s for fsn=%u\n", imgfile, fsn);
    }
    else {
      h0log("Can't flush image for fsn = %u. Status=%d\n", fsn, status);
      decompress_free_images(image);
    }
  }
  free(decomp_stat);
  abortit(2);
}

/* Print the usage message and abort 
 */
void usage()
{
  msg("Usage:\ningest_tlm [-v] -pVCnn dir_in dir_out [log_file]\n");
  msg("where: -v = verbose\n");
  msg("where: -p = primary channel to listen to e.g. VC02\n");
  msg(" dir_in = directory containing the files to ingest.\n");
  msg(" dir_out = directory to move the files to after the ingest.\n");
  msg(" log_file = optional log file name. Will create one if not given.\n");
  abortit(1);
}


/* Gets the command line and reads the switches.
 */
void get_cmd(int argc, char *argv[])
{
  char *cptr;
  int c, i;

  while(--argc > 0 && ((*++argv)[0] == '-')) {
    while((c = *++argv[0]))
      switch(c) {
      case 'd':
        debugflg=1;
        break;
      case 'v':
        verbose=1;
        break;
      case 'p':			/* primary chan e.g. VC02 */
        if(*++argv[0] != NULL) {
          cptr = argv[0];
          strcpy(pchan, cptr);
          for(i=0; ; i++) {	/* ck for valid and get redundant chan */
            if(!strcmp(p_r_chan_pairs[i].pchan, pchan)) {
              strcpy(rchan, p_r_chan_pairs[i].rchan);
              break;
            }
            if(!strcmp(p_r_chan_pairs[i].pchan, "n/a")) {
              printk("!!ERROR: Invalid VCid (%s) specified\n", pchan);
              usage();
            }
          }
          pflg = 1;
        }
        while(*++argv[0] != NULL);
        --argv[0];
        break;
      default:
        usage();
        break;
      }
  }
  if(!pflg) usage();
  if(argc != 4) usage();
  tlmdir = argv[0];
  pipedir = argv[1];
  frompipedir = argv[2];
  logfile = argv[3];
  gethostname(pdshost, MAX_STR);
  if((cptr = index(pdshost, '.'))) 
    *cptr = 0;                       /* remove any .Stanford.EDU */
}

time_t call_time() 
{
  time_t tsec;

  tsec = time(NULL);
  return(tsec - (time_t)SEC1970TO2004); /* make the epoch 2004 */
}

/* Open with the sum_svc.
 * Sets the global handle, sum, to the value returned by SUM_open().
 */
void open_sum()
{
  if((sum = SUM_open(NULL, NULL, h0log)) == 0) {
    printk("***Failed on SUM_open()\n");
    abortit(3);
  }
  uid=sum->uid;			/* uid assigned to this open */
  printk("*Opened with sum_svc as uid=%ld\n", uid); 
}

int compare_names(const void *a, const void *b)
{
  NAMESORT *x=(NAMESORT *)a, *y=(NAMESORT *)b;
  return(strcmp(x->name+4, y->name+4));	/* skip  VC02/VC05 in compare */
}

unsigned short MDI_getshort (unsigned char *c)    /*  machine independent  */
{
  unsigned short s = 0;

  s = (unsigned short) *c++ << 8;
  s |= (unsigned short) *c;
  return s;
}

/* Process the tlm file to validate and optionally to extract the 
 * lev0 image.
*/
int get_tlm(char *file)
{
  FILE *fpin;
  Image_t *images, *sav_images, *im, *partimg;
  CCSDS_Packet_t *hk_packets;
  unsigned char cbuf[PKTSZ];
  char errtxt[128], imgfile[128], cmd[128], bcmd[192];
  long long gap_42_cnt;
  int status, rstatus, fpkt_cnt, i, j, sync_bad_cnt, nx;
  int imagecnt, appid, datval, eflg, first;
  unsigned int cnt1, cnt2, cnt3, fsnx, fidx, gap_24_cnt;
  int zero_pn;
  unsigned short pksync1, pksync2;
  float ftmp;

  StartTimer(1);			/* time tlm file processing */
  if(!(fpin = fopen(file, "r"))) {	/* open the tlm input */
    h0log("*Can't open tlm file %s\n", file);
    return(1);
  }
  get_date();
  h0log("%s\n", datestr);
  h0log("*Processing tlm file %s\n", file);
  fpkt_cnt = sync_bad_cnt = imagecnt = eflg = 0;
  zero_pn = gap_24_cnt = gap_42_cnt = 0;
  first = 1; 
  /*StartTimer(2);			/* time each image */
  /* read a VCDU packet */
  while((status = fread(cbuf,sizeof(char),PKTSZ,fpin) ) == PKTSZ) {
    pksync1 = MDI_getshort(cbuf);
    pksync2 = MDI_getshort(cbuf+2);
    if((pksync1 == 0) && (pksync2 == 0)) { /* skip 0 pn code */
      if(!zero_pn) {			/* give msg for 1st one only */
        h0log("*0 PN code at pkt# %d\n", fpkt_cnt);
        h0log("*Subsequent ones will be ignored until non-0 again\n");
        zero_pn = 1;
      }
      fpkt_cnt++;			/* count # of pkts found */
      continue;
    }
    if((pksync1 != 0x1acf) || (pksync2 != 0xfc1d)) {
      h0log("*Lost sync at VCDU pkt# %d. pksync1=%x pksync2=%x\n", 
		fpkt_cnt, pksync1, pksync2);
      fpkt_cnt++;			/* count # of pkts found */
      eflg++;
      if(sync_bad_cnt++ > 4) {
        h0log("**Too many out of sync packets.\n");
        return(1);
      }
      h0log("  Will attempt to press on...\n");
      zero_pn = 0;
      continue;
    }
    if(first) {			/* print first good sync found */
      h0log("*VCDU pkt# %d sync = %x %x\n", fpkt_cnt, pksync1, pksync2);
    }
    fpkt_cnt++;			/* count # of pkts found */
    /* get 24 bit VCDU counter */
    cnt1 = MDI_getshort(cbuf+6);
    cnt2 = MDI_getshort(cbuf+8);
    cnt2 = (cnt2 >> 8)& 0xFF;
    cnt2 = ((cnt1 << 8)& 0xFF00) + cnt2;
    cnt1 = (cnt1 >> 8)& 0xFF;
    vcdu_24_cnt = (cnt1*65536) + cnt2;
    if(vcdu_24_cnt_next != vcdu_24_cnt) {
      h0log("*VCDU 24bit seq num out of sequence. exp: %u  rec: %u\n", 
	    vcdu_24_cnt_next, vcdu_24_cnt);
      if(vcdu_24_cnt_next > vcdu_24_cnt) {
        h0log("*NOTE: VCDU 24 bit counter retarded\n"); /*cntr does go thru 0*/
        h0log("*NOTE: gap report will be inaccurate (tbd)\n");
      }
      if(!first) {		/* don't count gap across .tlm files */
        gap_24_cnt += vcdu_24_cnt - vcdu_24_cnt_next;
      }
    }
    vcdu_24_cnt_next = vcdu_24_cnt + 1;
    /* now get the 42bit IM_PDU counter */
    cnt1 = MDI_getshort(cbuf+10);
    cnt2 = MDI_getshort(cbuf+12);
    cnt3 = MDI_getshort(cbuf+14);
    cnt1 = cnt1 & 0x03ff;
    vcdu_seq_num = (cnt1*4294967296) + (cnt2*65536) + cnt3;
    /* h0log("vcdu_seq_num = %lld\n", vcdu_seq_num); */
    if(vcdu_seq_num_next != vcdu_seq_num) {
      h0log("*IM_PDU seq num out of sequence. exp: %lld  rec: %lld\n", 
	    vcdu_seq_num_next, vcdu_seq_num);
      if(vcdu_seq_num_next > vcdu_seq_num) {
        h0log("*NOTE: IM_PDU 42 bit counter retarded\n");
        h0log("*NOTE: gap report will be inaccurate\n");
      }
      if(!first) {		/* don't count gap across .tlm files */
        gap_42_cnt += vcdu_seq_num - vcdu_seq_num_next;
      }
      eflg++;
    }
    first = 0;
    vcdu_seq_num_next = vcdu_seq_num + 1;
    /* get the App ID. Low 11 bit of short at buf+18 */
    appid = MDI_getshort(cbuf+18);
    appid = appid & 0x07ff;
    if(appid == TESTAPPID) {	/* appid of test pattern */

  	       /*continue;                 /* !!TEMP just go to next pkt */

      h0log("*Test ApID of %0x found for IM_PDU Cntr = %lld\n", 
			TESTAPPID, vcdu_seq_num);
      for(i=0, j=TESTVALUE; i < 877; i=i+2, j++) {
        datval = MDI_getshort(cbuf+32+i);	/* next data value */
        if(datval != j) {
          h0log("*Test data value=%0x, expected=%0x for IM_PDU Cntr=%lld\n", 
		datval, j, vcdu_seq_num);
          eflg++;
          break;		/* skip the rest of this packet */
        }
      }
      continue; 		/* go on to next packet */
    }

    /* Parse tlm packet headers. */
  if(lev0_on_flag) {
    rstatus = decompress_next_vcdu((unsigned short *)(cbuf+10), 
			&images, &hk_packets);
  }
  else {
    /*goto BYPASS;*/ 	     rstatus = SUCCESS;
    goto BYPASS;
  }

    /*h0log("decompress_next_vcdu() rstatus = %d\n", rstatus); /* !!TEMP */
    switch(rstatus) {
    case SUCCESS:
      /* 0: A science data VCDU was successfully decoded
      /* The coresponding image is not yet complete.
      /* On return, *image and *hk_packets are untouched. 
      */
      cnt1 = MDI_getshort(cbuf+32);
      cnt2 = MDI_getshort(cbuf+34);
      fsnx = (unsigned int)(cnt1<<16)+(unsigned int)(cnt2);
      fsnx = fsnx & 0x3fffffff;         //low 30bits for fsn */
      if(fsnx == 0) {
	h0log("Found fsn=0. Ignore.\n");
        continue;			//a 0 fsn is not acceptable
      }
      if(fsnx != fsn_prev) {            /* the fsn has changed */
	h0log("*FSN has changed from %u to %u\n", fsn_prev, fsnx);
        /* close the image of the prev fsn if not 0 */
        if(fsn_prev != 0) {
          for(nx=0; nx < numcontexts; nx++) {
            fsn = ID2FSN(Context[nx]->ID);
            fsn = fsn & 0x3fffffff;         //make sure 30bits
            if(fsn == fsn_prev) {
              im = Context[nx]->image;
              if(im->keywords != NULL) {  /* the ISP is in */
                h0log("*New fsn. ISP in for prev fsn. Flush its image\n");
              }
              else {
                h0log("*New fsn. ISP NOT in for prev fsn. Flush its image\n");
              }
                fid = ID2FID(Context[nx]->ID);
                status = decompress_flush_image(fsn, fid, &partimg);
                if(status == SUCCESS) {
		  h0log("*SUCCESS FLUSH of partial image fsn=%u\n", fsn);
                  imagecnt++;
                  sprintf(imgfile, "%s/%s_%09u.%d.fits",
                        IMAGEDIR, prependfits, fsn, imagedircnt);
                  if(decompress_writefitsimage(imgfile, partimg, 0)) {
                    h0log("Error on output of %s\n", imgfile);
                  }
                  else {
                    sprintf(bcmd, "/usr/local/bin/bin256 %s %s/%s_%09u.%d.fits", imgfile, IMAGEDIRSMALL, prependfits, fsn, imagedircnt);
                    system(bcmd);
                  }
                  imagedircnt++;
                  decompress_free_images(partimg);
                  /* only keep the last n images */
                  if(imagedircnt >= IMAGEDIRCNT) {
                    sprintf(cmd,"/bin/rm -f %s/%s_*.%d.fits",
                          IMAGEDIR,prependfits,imagedircnt-IMAGEDIRCNT);
                    h0log("%s\n", cmd);
                    system(cmd);
                  }
                }
                else {
		  h0log("*FAILURE to flush prev fsn=%u\n", fsn);
                  decompress_free_images(partimg);
                }
                remopenimg(&openimg_hdr, fsn); /* remove in case it's there */
            }
          }
        }
        fsn_prev = fsnx;
      }
      break;
    case SUCCESS_IMAGECOMPLETE: case SUCCESS_HKCOMPLETE:
      /* 1: A science data VCDU was successfully decoded.
      /* The corresponding image is complete, including its image status
      /* packet keywords. On return *image will contain a pointer to an
      /* image structure holding the completely reconstructed image and
      /* its image status info.
      /* 3: A housekeeping VCDU was successfully decoded, and as a result one
      /* or more images had their image status info attached. On return
      /* *hk_packets will point to the head of a linked list of
      /* CCSDS_Packet_t structs containing the decoded contents of
      /* each housekeeping packet. On return *image will contain a
      /* pointer to the head of a linked list of image structures holding
      /* completely reconstructed images and their image status info.
      */
      sav_images = images;
      while(images) {
        imagecnt++;
        fsn = IMAGE_FSN(images);
        fsn = fsn & 0x3fffffff;         //make sure 30bits
        fid = IMAGE_FID(images);
/***********************************************************************
        ftmp = StopTimer(2);
        h0log("Image %d: Time since last IMAGECOMPLETE = %fsec\n", 
		fsn, ftmp);
        tsum[2] += ftmp;
***********************************************************************/
        sprintf(imgfile, "%s/%s_%09u.%d.fits",
			IMAGEDIR,prependfits,fsn,imagedircnt);
        if(decompress_writefitsimage(imgfile, images, 0)) {
          h0log("Error on output of %s\n", imgfile);
        }
        else {
          sprintf(bcmd, "/usr/local/bin/bin256 %s %s/%s_%09u.%d.fits", imgfile, IMAGEDIRSMALL, prependfits, fsn, imagedircnt);
          system(bcmd);
        }
        imagedircnt++;
        /* only keep the last n images */
        if(imagedircnt >= IMAGEDIRCNT) {
          sprintf(cmd, "/bin/rm -f %s/%s_*.%d.fits",
			IMAGEDIR,prependfits,imagedircnt-IMAGEDIRCNT);
          h0log("%s\n", cmd);
          system(cmd);
        }
        h0log("*SUCCESS_IMAGECOMPLETE %s\n", imgfile);
        /*StartTimer(2);			/* time next image */
        images = images->next;
      }
      decompress_free_images(sav_images);
      alarm(ALRMSEC);			/* reset timer */
      break;
    case SUCCESS_HK:
      /* 2: A housekeeping VCDU was successfully decoded. On return
      /* *hk_packets will point to the head of a linked list of
      /* CCSDS_Packet_t structs containing the decoded contents of
      /* each housekeeping packet.
      */
      break;
    default:
      /* < 0: An error occured. Consult hmi_compression.h for macro
      /* definitions of negative error codes.
      */
      h0log("*decompress_next_vcdu() returns err status = %d:\n", rstatus);
      decompress_next_vcdu_err_str(rstatus, errtxt);
      h0log("%s\n\n", errtxt);
      break;
    }
    if(rstatus > 1) {		/* !!TBD just free hk pkts for now */
      decompress_free_hk(hk_packets);
    }
BYPASS:
    if(sigtermflg) { now_do_term_sig(); }
    if(sigalrmflg) { now_do_alrm_sig(); }
  }
  if(!eflg) {
    h0log("*No errors in tlm file\n");
  }
  /* !!TBD ck for incomplete pkt */
  fclose(fpin);
  ftmp = StopTimer(1);
  h0log("**Processed %s with\n**complete images %d and %d VCDUs in %f sec\n\n",
	file, imagecnt, fpkt_cnt, ftmp);
  if(fpkt_cnt != total_tlm_vcdu) {
    h0log("**WARNING: Found #vcdu=%d; expected=%d\n", fpkt_cnt, total_tlm_vcdu);
  }
  if(gap_24_cnt != total_missing_vcdu) {
    h0log("**WARNING: VCDU 24bit cntr gaps=%d; expected=%d\n",
	 gap_24_cnt, total_missing_vcdu);
  }
  if(gap_42_cnt != total_missing_im_pdu) {
    h0log("**WARNING: IM_PDU 42bit cntr gaps=%lld; expected=%lld\n",
	 gap_42_cnt, total_missing_im_pdu);
  }
  return(0);
}

/* This is called from the main loop to check if any .parc files are in
 * the pipeline to soc dir ($DIRPIPE2SOC).
 * The .parc file is sent to /dds/pipe2soc/aia or /dds/pipe2soc/hmi
 * every time the pipeline back end system does a tapearc. 
 * Actually by this cron job on d02:
 *   0 0 * * * /home/production/cvs/JSOC/base/sums/scripts/build_parc_file.pl
 * (during development the .parc files are sent by the cron job 
 * /home/jim/cvs/jsoc/scripts/pipefe_rm on d00.)
 * The .parc file
 * has info on storage units that were archived successfully by the backend,
 * and so can be marked as such in the data capture sum_main table.
 * The sum_main table is updated for its safe_tape info from the .parc.
 * A .parc file looks like:
 * dcs0.jsoc:/dds/pipe2soc/aia> t AIA_2007_131_11_56.parc 
 * #owning_series_name                         tape_id  fn  date
 * VC01_2007_131_11_51_39_0123456789A_FFFFF_00 000000S1 666 2007-04-12 17:15:45
 * VC04_2007_131_11_52_09_0123456789A_FFFFF_00 000000S1 666 2007-04-12 17:15:45
 *
 * storage_unit_name pipeline_tape_id_archived_on tape_fn date
*/
void do_pipe2soc() {
  DIR *dfd;
  struct dirent *dp;
  FILE *fp;
  int ptape_fn, complete;
  char line[128], fname[128], cmd[128];
  char *su_name, *ptape_id, *ptape_date;

  /* only run this on the primary channel ingest_tlm process */
  if(strcmp(pchan, "VC01") && strcmp(pchan, "VC02")) { return; }

    if(DS_ConnectDB_Q(dbname)) {
      printk("**Can't connect to DB %s\n", dbname);
      abortit(3);
    }

  if((dfd=opendir(frompipedir)) == NULL) {
    printk("**Can't opendir(%s) to find files\n", frompipedir);
    abortit(3);
  }
  while((dp=readdir(dfd)) != NULL) {
    if(strstr(dp->d_name, ".parc")) {
      sprintf(fname, "%s/%s", frompipedir, dp->d_name);
      if(!(fp=fopen(fname, "r"))) {
        printk("***Can't open %s\n", fname);
        continue;
      }
      printk("Found parc file: %s\n", fname);
      complete = 1;
      while(fgets(line, 128, fp)) {       /* get .parc file lines */
        if(line[0] == '#' || line[0] == '\n') continue;
        printk("%s", line);
        su_name = (char *)strtok(line, " ");
        ptape_id = (char *)strtok(NULL, " ");
        ptape_fn = atoi((char *)strtok(NULL, " "));
        ptape_date = (char *)strtok(NULL, "\n");
        if(SUMLIB_SafeTapeUpdate(su_name,ptape_id,ptape_fn,ptape_date)) {
          printk("**ERROR in SUMLIB_SafeTapeUpdate(%s...)\n", su_name);
          complete = 0;
        }
      }
      fclose(fp);
      if(complete) {
        sprintf(cmd, "/bin/rm -f %s", fname);
      }
      else {
        sprintf(cmd, "/bin/mv -f %s %s/err/", fname, frompipedir);
      }
      printk("%s\n", cmd);
      system(cmd);
    }
  }
  closedir(dfd);
  DS_DisConnectDB_Q();
}

/* This is the main loop that gets the .qac and .tlm files and 
 * ingests them into SUMS.
 */
void do_ingest()
{
  FILE *fp;
  FILE *dolev0fp;
  DIR *dfd;
  NAMESORT *nameptr;
  struct dirent *dp;
  float ttmp;
  int found, i, j, status;
  char name[128], line[128], mvname[128], tlmfile[128], tlmname[96];
  char cmd[128], xxname[128], tlmsize[80];
  char *token;

  /* init summary timers */
  for(i=0; i < NUMTIMERS; i++) {
    tsum[i] = 0.0;
  }
  if((dfd=opendir(tlmdir)) == NULL) {
    printk("**Can't opendir(%s) to find files\n", tlmdir);
    abortit(3);
  }
  found = 0; i = 0;
  if((nameptr = (NAMESORT *)malloc(MAXFILES * sizeof(NAMESORT))) == NULL) {
    printk("***Can't alloc memory for file name sort\n");
    abortit(3);
  }

  while((dp=readdir(dfd)) != NULL) {
    /* printk("%s\n", dp->d_name) ; continue;*/ /* !!TEMP */
    /* Only accept our files. */
    if(strstr(dp->d_name, pchan) || strstr(dp->d_name, rchan) || strstr(dp->d_name, ".dsf")) {
      nameptr[i++].name = strdup(dp->d_name);
      if(i >= MAXFILES) {
        printk("***Fatal error. Too many (%d) files in %s\n", MAXFILES, tlmdir);
        abortit(3);
      }
    }
  }
  closedir(dfd);
  qsort(nameptr, i, sizeof(NAMESORT), &compare_names);

  for(j=0; j < i; j++) {
    /*printk("####QSORT FILES: %s\n", nameptr[j].name); /* !!TEMP */
    /* OLD. the .dsf file is now moved by dds_soc program to pipedir */
    /********************
    if(strstr(nameptr[j].name, ".dsf")) {
      sprintf(cmd, "/bin/mv -f %s/%s %s", tlmdir, nameptr[j].name, pipedir);
      printk("*mv dsf file to %s\n", pipedir);
      printk("%s\n", cmd);
      if(system(cmd)) {
        printk("***Error on: %s\n", cmd);
      }
    }
    *********************/
    if(!strstr(nameptr[j].name, ".qac")) {	/* can be .qac or .qacx */
      free(nameptr[j].name);
      continue;
    }
    if(strstr(nameptr[j].name, pchan)) strcpy(prependfits, pchan); 
    if(strstr(nameptr[j].name, rchan)) strcpy(prependfits, rchan); 
    StartTimer(NUMTIMERS-1);
    sprintf(name, "%s/%s", tlmdir, nameptr[j].name);
    printk("\n*Found qac file:\n* %s\n", name);
    if(!(fp=fopen(name, "r"))) {
      printk("***Can't open %s\n", name);
      free(nameptr[j].name);
      continue;
    }
    found = 1;
    /* NOTE: the qac file is already verified by the caller of ingest_tlm */
    while(fgets(line, 256, fp)) {	/* get qac file lines */
      if(line[0] == '#' || line[0] == '\n') continue;
      if(strstr(line, "TLM_FILE_NAME=")) {
        token = (char *)strtok(line, "=");
        token = (char *)strtok(NULL, "\n");
        printk("tlm file is %s\n", token);
        sprintf(tlmfile, "%s/%s", tlmdir, token);
        sprintf(tlmname, "%s", token);
      }
      else if(strstr(line, "TLM_FILE_SIZE=")) {
        token = (char *)strtok(line, "=");
        token = (char *)strtok(NULL, "=");
        printk("*tlm file size is %s", token);
        sprintf(tlmsize, "%s", token);
        tlmsize[strlen(token)-1] = 0;
        reqbytes = (double)atol(token);
        /*reqbytes += (double)1000000;*/	/* add some overhead */
      }
      else if(strstr(line, "TOTAL_TLM_IM_PDU=")) {
        token = (char *)strtok(line, "=");
        token = (char *)strtok(NULL, "\n");
        total_tlm_vcdu = atoi(token);
      }
      else if(strstr(line, "TOTAL_MISSING_VCDU=")) {
        token = (char *)strtok(line, "=");
        token = (char *)strtok(NULL, "\n");
        total_missing_vcdu = atoi(token);
      }
      else if(strstr(line, "TOTAL_MISSING_IM_PDU=")) {
        token = (char *)strtok(line, "=");
        token = (char *)strtok(NULL, "\n");
        total_missing_im_pdu = atol(token);
        break;
      }
    }
    fclose(fp);

    /* get storage to ingest the files */
      sum->bytes = reqbytes;
      sum->reqcnt = 1;
      if(status = SUM_alloc(sum, h0log)) { /* allocate a data segment */
	printk("***Can't allocate %g bytes in SUM_alloc. Error code = %d\n", 
		reqbytes, status);
	abortit(3);
      }
      cptr = sum->wd;
      dsixpt = sum->dsix_ptr;
      alloc_index = *dsixpt;
      strcpy(alloc_wd, *cptr);
      printk("*Alloc %g bytes at %s dsindex=%ld\n",
                        sum->bytes, *cptr, alloc_index);

    /* cp the files to the ingest dir */
    sprintf(mvname, "%s/%s", alloc_wd, nameptr[j].name);
    free(nameptr[j].name);

            /*StartTimer(7);*/
	    /*sprintf(cmd, "cp -p %s %s", name, mvname);*/
	    sprintf(cmd, "cp -p %s %s", name, alloc_wd);
            printk("*cp qac to %s\n", alloc_wd);
	    printk("%s\n", cmd);
	    if(status = system(cmd)) {
		printk("***Error %d on: %s\n", status, cmd);
                printk("errno = %d\n", errno);
		//abortit(1);
		continue;
	    }
	    sprintf(cmd, "cp -p %s %s", tlmfile, alloc_wd);
            printk("*cp tlm to %s\n", alloc_wd);
	    printk("%s\n", cmd);
	    if(system(cmd)) {
		printk("***Error on: %s\n", cmd);
                printk("errno = %d\n", errno);
		//abortit(1);
		continue;
	    }
            /*tsum[7] += StopTimer(7);*/

    /* now catalog the ingested files with the database */
    sum->mode = ARCH;
    token = (char *)rindex(tlmname, '.');
    *token = 0;			/* elim .tlm for ds name */
    sum->dsname = tlmname;
    sum->group = 1;		/* !!TBD ck if always use group 1 */
    sum->storeset = 0;		/* always use set 0 for datacapture */
    sum->reqcnt = 1;
    if(SUM_put(sum, h0log)) {   /* save the data segment for archiving */
      printk("**Error: on SUM_put()\n");
    }
    else {
      printk("*SUM_put() successfull for wd = %s\n", *sum->wd);
      printk("Marked for archive data unit ds_index=%ld\n", *dsixpt);
    }

    /*StartTimer(7);*/
    //must move .tlm file first
    sprintf(cmd, "/bin/mv -f %s %s", tlmfile, pipedir);
    printk("*mv tlm file to %s\n", pipedir);
    printk("%s\n", cmd);
    if(system(cmd)) {
      printk("***Error on: %s\n", cmd);
    }
    sprintf(cmd, "/bin/mv -f %s %s", name, pipedir);
    printk("*mv qac file to %s\n", pipedir);
    printk("%s\n", cmd);
    if(system(cmd)) {
      printk("***Error on: %s\n", cmd);
    }
    /*tsum[7] += StopTimer(7);*/
    /*printk("Time for cp and mv of raw files = %fsec\n", tsum[7]);*/

    /* new stuff to do lev0 below !!!!TBD check !!! */
    sprintf(xxname, "%s/%s.tlm", alloc_wd, tlmname);
    tlmactive = 1;              /* set active for sigalrm */
    if(get_tlm(xxname)) {       /* lev0 extraction of image */
      h0log("***Error in lev0 extraction for %s\n", xxname);
    }
    tlmactive = 0;
    ttmp = StopTimer(NUMTIMERS-1);
    printk("Rate tlm %s bytes in %f sec\n", tlmsize, ttmp);
    //see if we should now process to lev0
    if((dolev0fp=fopen(LEV0FILEON, "r")) != NULL) {
      if(!lev0_on_flag) {
	printk("Found file: %s. Lev0 processing now active.\n", LEV0FILEON);
	lev0_on_flag = 1;
      }
      fclose(dolev0fp);
    }
    else {
      if(lev0_on_flag) {
	printk("Not Found: %s. Lev0 processing not active.\n", LEV0FILEON);
	lev0_on_flag = 0;
      }
    }
  }
  free(nameptr);
  /*if(!found) { printk("No .qac files found in %s\n", DIRDDS); }*/
}

/* Initial setup stuff called when main is first entered.
 */
void setup(int argc, char *argv[])
{
  int i;
  char logname[128], string[128], cwdbuf[128], idstr[256];

  if (signal(SIGINT, SIG_IGN) != SIG_IGN)
    signal(SIGINT, sighandler);
  if (signal(SIGTERM, SIG_IGN) != SIG_IGN)
    signal(SIGTERM, sighandler);
  signal(SIGALRM, alrm_sig);

  get_date();
  if(!(username = (char *)getenv("USER"))) username = "nouser";
  if(!logfile) {		/* no logfile given on command line */
    sprintf(logname, H0LOGFILE, username, getpid());
    open_h0log(logname, "w");	/* open new file for write */
  } else {
    open_h0log(logfile, "a");   /* open given file for append */
  }
  printk_set(h0log, h0log);	/* set for printk calls */
  printk("%s\n", datestr);
  getcwd(cwdbuf, 126);
  sprintf(idstr, "Cwd: %s\nCall: ", cwdbuf);
  for(i=0; i < argc; i++) { 	/* echo cmd line */
    sprintf(string, "%s%s", argv[i], (i < argc-1) ? " " : "");
    strcat(idstr, string);
  }
  strcat(idstr, "\n");
  sprintf(string, "ingest_tlm started as pid=%d user=%s\n", getpid(), username);
  strcat(idstr, string);
  printk("*%s", idstr);
  /*printk("*%s\n", datestr);*/
  /*load_all_apids_hk_configs();   /* load hk config files */
  umask(002);			/* allow group write */
  open_sum();			/* open with sum_svc */
}

int main(int argc, char *argv[])
{
  get_cmd(argc, argv);		/* check the calling sequence */
  setup(argc, argv);		/* start pvm and init things */
  alarm(ALRMSEC);		/* ck for partial images every 60 sec */
  while(1) {
    do_ingest();		/* loop to get files from the input dir */
    do_pipe2soc();		/* get any .parc files from pipeline backend */
    sleep(4);
    if(sigtermflg) { now_do_term_sig(); }
  }
}


Karen Tian
Powered by
ViewCVS 0.9.4