1 tplarson 1.1 #define MODES(L) ((((L)+1)*(L))/2)
2 #define MINIMUM(X,Y) (((X)<(Y))?(X):(Y))
3 #define MAXIMUM(X,Y) (((X)>(Y))?(X):(Y))
4
5 int jretile_manytofew(void)
6 {
7 int newstat = 0;
8 int status = DRMS_SUCCESS;
9 int fetchstat = DRMS_SUCCESS;
10 DRMS_RecChunking_t chunkstat = kRecChunking_None;
11
12 char *inrecquery = NULL;
13 char *outseries = NULL;
14 char *segnamein = NULL;
15 char *segnameout = NULL;
16 DRMS_RecordSet_t *inrecset = NULL;
17 DRMS_RecordSet_t *outrecset = NULL;
18 int irecin, irecout, nrecsin=0, nrecsout=0, nlchunks;
19 DRMS_Record_t *inrec = NULL;
20 DRMS_Record_t *outrec = NULL;
21 DRMS_Segment_t *segin = NULL;
22 tplarson 1.1 DRMS_Segment_t *segout = NULL;
23 DRMS_Array_t *inarr = NULL;
24 DRMS_Array_t *outarr = NULL;
25 DRMS_RecLifetime_t lifetime;
26 DRMS_Type_t usetype = DRMS_TYPE_FLOAT;
27 int length[2], startind[2], endind[2], totallength[2];
28 float *inptr, *outptr;
29 long long histrecnum=-1;
30 int quality;
31 int mapmmax=-1;
32 int sinbdivs=-1;
33 double cadence=0;
34
35 TIME tnow, UNIX_epoch = -220924792.000; /* 1970.01.01_00:00:00_UTC */
36 char tstartstr[100], tscrstr[100];
37
38 double tstart, tepoch, tstep, tround, tstop, tstartin, tstopin, tstepin, tstartuse, tstopuse, nseconds, chunksecs;
39 char *ttotal, *tchunk;
40 int ndt;
41 int lmin, lmax, lminin, lmaxin, lminuse, lmaxuse, lchunk, lchunksize, lchunkfirst, lchunklast;
42 int ntimechunks, nmodes, npts, imode, itime;
43 tplarson 1.1 int out_time_offset, out_modes_offset, out_offset, in_time_offset, in_modes_offset, in_offset, out_index, in_index;
44 int iset, lminout, lmaxout;
45 double tstartout, tstopout;
46 float *arrptr;
47
48 int errbufstat=setvbuf(stderr, NULL, _IONBF, BUFSIZ);
49 int outbufstat=setvbuf(stdout, NULL, _IONBF, BUFSIZ);
50
51 double wt0, wt1, wt2, wt3, wt;
52 double ut0, ut1, ut2, ut3, ut;
53 double st0, st1, st2, st3, st;
54 double ct0, ct1, ct2, ct3, ct;
55
56 wt0=getwalltime();
57 ct0=getcputime(&ut0, &st0);
58
59 inrecquery = (char *)cmdparams_save_str(&cmdparams, "in", &newstat);
60 outseries = (char *)cmdparams_save_str(&cmdparams, "out", &newstat);
61 segnamein = (char *)cmdparams_save_str(&cmdparams, "segin", &newstat);
62 segnameout = (char *)cmdparams_save_str(&cmdparams, "segout", &newstat);
63 int seginflag = strcmp(kNOTSPECIFIED, segnamein);
64 tplarson 1.1 int segoutflag = strcmp(kNOTSPECIFIED, segnameout);
65 int verbflag = cmdparams_save_int(&cmdparams, "VERB", &newstat);
66 int permflag = cmdparams_save_int(&cmdparams, "PERM", &newstat);
67 if (permflag)
68 lifetime = DRMS_PERMANENT;
69 else
70 lifetime = DRMS_TRANSIENT;
71
72 char *histlinkname = (char *)cmdparams_save_str(&cmdparams, "histlink", &newstat);
73
74 tstart=cmdparams_save_time(&cmdparams, "TSTART", &newstat);
75 sprint_time(tstartstr, tstart, "TAI", 0);
76 ttotal=(char *)cmdparams_save_str(&cmdparams, "TTOTAL", &newstat);
77 status=drms_names_parseduration(&ttotal, &nseconds, 1);
78 if (status)
79 {
80 // newstat = newstat | CPSAVE_UNKNOWN_ERROR;
81 fprintf(stderr, "ERROR: problem parsing TTOTAL, = %s\n", ttotal);
82 return 1;
83 }
84 tchunk=(char *)cmdparams_save_str(&cmdparams, "TCHUNK", &newstat);
85 tplarson 1.1 if (strcmp(kNOTSPECIFIED, tchunk))
86 {
87 status=drms_names_parseduration(&tchunk, &chunksecs, 1);
88 if (status)
89 newstat = newstat | CPSAVE_UNKNOWN_ERROR;
90 }
91 else
92 chunksecs=0;
93
94 lmin=cmdparams_save_int(&cmdparams, "LMIN", &newstat);
95 lmax=cmdparams_save_int(&cmdparams, "LMAX", &newstat);
96 lchunksize=cmdparams_save_int(&cmdparams, "LCHUNK", &newstat);
97 if (lchunksize == 0)
98 lchunksize=lmax+1;
99
100 if (newstat)
101 {
102 fprintf(stderr, "ERROR: problem with input arguments, status = %d, diagnosis follows\n", newstat);
103 cpsave_decode_error(newstat);
104 return 1;
105 }
106 tplarson 1.1 else if (savestrlen != strlen(savestr))
107 {
108 fprintf(stderr, "ERROR: problem with savestr, savestrlen = %d, strlen(savestr) = %d\n", savestrlen, (int)strlen(savestr));
109 return 1;
110 }
111
112 DRMS_Record_t *tempoutrec = drms_create_record(drms_env,
113 outseries,
114 DRMS_TRANSIENT,
115 &status);
116
117 if (status != DRMS_SUCCESS)
118 {
119 fprintf(stderr,"ERROR: couldn't open a record in output dataseries %s, status = %d\n", outseries, status);
120 return 1;
121 }
122
123 // set up ancillary dataseries for processing metadata
124 char *cvsinfo = strdup("$Header$");
125 DRMS_Link_t *histlink = hcon_lookup_lower(&tempoutrec->links, histlinkname);
126 if (histlink != NULL)
127 tplarson 1.1 {
128 histrecnum=set_history(histlink, cvsinfo);
129 if (histrecnum < 0)
130 {
131 drms_close_record(tempoutrec, DRMS_FREE_RECORD);
132 return 1;
133 }
134 }
135 else
136 {
137 fprintf(stderr,"WARNING: could not find history link in output dataseries\n");
138 }
139
140 // these must be present in the output dataseries and variable, not links or constants
141 // now done in DoIt() that calls this function
142 /*
143 char *outchecklist[] = {"T_START", "QUALITY", "LMIN", "LMAX", "NDT"};
144 DRMS_Keyword_t *outkeytest;
145 int itest;
146 for (itest=0; itest < ARRLENGTH(outchecklist); itest++)
147 {
148 tplarson 1.1 outkeytest = hcon_lookup_lower(&tempoutrec->keywords, outchecklist[itest]);
149 if (outkeytest == NULL || outkeytest->info->islink || outkeytest->info->recscope == 1)
150 {
151 fprintf(stderr, "ERROR: output keyword %s is either missing, constant, or a link\n", outchecklist[itest]);
152 drms_close_record(tempoutrec, DRMS_FREE_RECORD);
153 return 1;
154 }
155 }
156 */
157
158 tepoch=drms_getkey_time(tempoutrec, "T_START_epoch", &status);
159 tstep=drms_getkey_float(tempoutrec, "T_START_step", &status);
160 tround=drms_getkey_float(tempoutrec, "T_START_round", &status);
161 cadence=drms_getkey_float(tempoutrec, "T_STEP", &status);
162 if (fmod(tstart-tepoch,tstep) > tround/2)
163 {
164 sprint_time(tscrstr, tepoch, "TAI", 0);
165 fprintf(stderr, "ERROR: output dataseries seems incompatible with input parameters (tstep must divide tstart-tepoch): TSTART = %s, T_START_epoch = %s, tstep = %f\n",
166 tstartstr, tscrstr, tstep);
167 drms_close_record(tempoutrec, DRMS_FREE_RECORD);
168 return 1;
169 tplarson 1.1 }
170 if (chunksecs == 0.0)
171 chunksecs = tstep;
172 else if (fmod(chunksecs,tstep))
173 {
174 fprintf(stderr, "ERROR: output dataseries seems incompatible with input parameters (tstep must divide chunksecs): chunksecs = %f, tstep = %f\n", chunksecs, tstep);
175 drms_close_record(tempoutrec, DRMS_FREE_RECORD);
176 return 1;
177 }
178 if (fmod(nseconds,chunksecs) != 0.0)
179 {
180 fprintf(stderr, "ERROR: input parameters seem incompatible (chunksecs must divide totalsecs): totalsecs = %f, chunksecs = %f\n", nseconds, chunksecs);
181 drms_close_record(tempoutrec, DRMS_FREE_RECORD);
182 return 1;
183 }
184 ntimechunks=nseconds/chunksecs;
185 ndt=chunksecs/cadence;
186 if (verbflag)
187 {
188 printf("%d timechunks, %.1f seconds per chunk\n", ntimechunks, chunksecs);
189 }
190 tplarson 1.1
191 int mapmmaxout=-1;
192 int sinbdivsout=-1;
193 DRMS_Keyword_t *outkeytest = hcon_lookup_lower(&tempoutrec->keywords, "MAPMMAX");
194 if (outkeytest != NULL && outkeytest->info->recscope == 1)
195 mapmmaxout=drms_getkey_int(tempoutrec, "MAPMMAX", &status);
196 outkeytest = hcon_lookup_lower(&tempoutrec->keywords, "SINBDIVS");
197 if (outkeytest != NULL && outkeytest->info->recscope == 1)
198 sinbdivsout=drms_getkey_int(tempoutrec, "SINBDIVS", &status);
199
200 drms_close_record(tempoutrec, DRMS_FREE_RECORD);
201
202 char *inchecklist[] = {"T_START", "QUALITY", "LMIN", "LMAX", "T_STEP"};
203 DRMS_Keyword_t *inkeytest;
204 int itest;
205 inrecset = drms_open_recordset(drms_env, inrecquery, &status);
206 // inrecset = drms_open_records(drms_env, inrecquery, &status);
207
208 if (status != DRMS_SUCCESS || inrecset == NULL)
209 {
210 fprintf(stderr, "ERROR: problem opening input recordset: status = %d\n", status);
211 tplarson 1.1 return 1;
212 }
213 nrecsin = inrecset->n;
214
215 if (verbflag)
216 printf("input recordset opened, nrecs = %d\n", nrecsin);
217
218 int noinput=0;
219 if (nrecsin == 0)
220 {
221 printf("WARNING: input recordset contains no records\n");
222 noinput=1;
223 goto skip1;
224 // return 1;
225 }
226
227 inrec = drms_recordset_fetchnext(drms_env, inrecset, &fetchstat, &chunkstat, NULL);
228 // inrec = inrecset->records[0];
229
230 for (itest=0; itest < ARRLENGTH(inchecklist); itest++)
231 {
232 tplarson 1.1 inkeytest = hcon_lookup_lower(&inrec->keywords, inchecklist[itest]);
233 if (inkeytest == NULL)
234 {
235 fprintf(stderr, "ERROR: required input keyword %s is missing\n", inchecklist[itest]);
236 drms_close_records(inrecset, DRMS_FREE_RECORD);
237 return 1;
238 }
239 }
240
241 if (cadence != drms_getkey_float(inrec, "T_STEP", &status))
242 {
243 fprintf(stderr, "ERROR: input T_STEP does not equal output T_STEP\n");
244 drms_close_records(inrecset, DRMS_FREE_RECORD);
245 return 1;
246 }
247
248 inkeytest = hcon_lookup_lower(&inrec->keywords, "MAPMMAX");
249 if (inkeytest != NULL)
250 mapmmax=drms_getkey_int(inrec, "MAPMMAX", &status);
251 if (mapmmaxout != -1 && mapmmaxout != mapmmax)
252 {
253 tplarson 1.1 fprintf(stderr, "ERROR: input MAPMMAX does not equal output MAPMMAX, in=%d, out=%d\n", mapmmax, mapmmaxout);
254 drms_close_records(inrecset, DRMS_FREE_RECORD);
255 return 1;
256 }
257
258 inkeytest = hcon_lookup_lower(&inrec->keywords, "SINBDIVS");
259 if (outkeytest != NULL)
260 sinbdivs=drms_getkey_int(inrec, "SINBDIVS", &status);
261 if (sinbdivsout != -1 && sinbdivsout != sinbdivs)
262 {
263 fprintf(stderr, "ERROR: input SINBDIVS does not equal output SINBDIVS, in=%d, out=%d\n", sinbdivs, sinbdivsout);
264 drms_close_records(inrecset, DRMS_FREE_RECORD);
265 return 1;
266 }
267
268 status=drms_stage_records(inrecset, 1, 0);
269 if (status != DRMS_SUCCESS)
270 {
271 fprintf(stderr, "ERROR: drms_stage_records returned status = %d\n", status);
272 return 1;
273 }
274 tplarson 1.1
275 skip1:
276
277 lchunkfirst = lmin/lchunksize;
278 lchunklast = lmax/lchunksize;
279
280 nlchunks = (lchunklast - lchunkfirst) + 1;
281 nrecsout = nlchunks*ntimechunks;
282 outrecset = drms_create_records(drms_env, nrecsout, outseries, lifetime, &status);
283 if (status != DRMS_SUCCESS || outrecset == NULL)
284 {
285 fprintf(stderr,"ERROR: unable to create records record in output dataseries %s, status = %d\n", outseries, status);
286 drms_close_records(inrecset, DRMS_FREE_RECORD);
287 return 1;
288 }
289
290 irecout=0;
291 for (iset=0; iset < ntimechunks; iset++)
292 {
293 tstartout=tstart + iset * chunksecs;
294 tstopout=tstartout+chunksecs;
295 tplarson 1.1
296 for (lchunk = lchunkfirst; lchunk <= lchunklast; lchunk++)
297 {
298 lminout = lchunk * lchunksize;
299 lmaxout = lminout + lchunksize - 1;
300 lminout = MAXIMUM(lminout,lmin);
301 lmaxout = MINIMUM(lmaxout,lmax);
302
303 outrec = outrecset->records[irecout];
304 if (histlink)
305 drms_setlink_static(outrec, histlinkname, histrecnum);
306 drms_setkey_int(outrec, "LMIN", lminout);
307 drms_setkey_int(outrec, "LMAX", lmaxout);
308 drms_setkey_time(outrec, "T_START", tstartout);
309 drms_setkey_time(outrec, "T_STOP", tstopout);
310 drms_setkey_time(outrec, "T_OBS", tstartout+chunksecs/2);
311 drms_setkey_int(outrec, "NDT", ndt);
312 /*
313 if (segoutflag)
314 segout = drms_segment_lookup(outrec, segnameout);
315 else
316 tplarson 1.1 segout = drms_segment_lookupnum(outrec, 0);
317 segout->axis[0]=2*ndt;
318 segout->axis[1]=lmaxout*(lmaxout+1)/2+lmaxout - lminout*(lminout+1)/2 + 1;
319 */
320 irecout++;
321 }
322 }
323
324
325 int *nskiparr=(int *)calloc(nrecsout,sizeof(int));
326 for (irecin=0; irecin < nrecsin; irecin++)
327 {
328 // move to end of loop when using drms_recordset_fetchnext(drms_env, inrecset, &fetchstat, &chunkstat, NULL);
329 // inrec = inrecset->records[irecin];
330 tstartin=drms_getkey_time(inrec, "T_START", &status);
331 tstopin=drms_getkey_time(inrec, "T_STOP", &status);
332 lminin=drms_getkey_int(inrec, "LMIN", &status);
333 lmaxin=drms_getkey_int(inrec, "LMAX", &status);
334 tstepin=tstopin-tstartin;
335
336 quality=drms_getkey_int(inrec, "QUALITY", &status);
337 tplarson 1.1 if (status != DRMS_SUCCESS || (quality & QUAL_NODATA)) //may want stricter test on quality here
338 {
339 if (verbflag > 2)
340 {
341 sprint_time(tscrstr, tstartin, "TAI", 0);
342 fprintf(stderr, "WARNING: input data not used due to quality: T_START = %s, LMIN = %d, LMAX = %d, recnum = %lld, irec = %d, status = %d, quality = %08x\n",
343 tscrstr, lminin, lmaxin, inrec->recnum, irecin, status, quality);
344 }
345 for (irecout=0; irecout < nrecsout; irecout++)
346 nskiparr[irecout]++;
347 goto continue_outer_loop;
348 // continue;
349 }
350
351 if (seginflag)
352 segin = drms_segment_lookup(inrec, segnamein);
353 else
354 segin = drms_segment_lookupnum(inrec, 0);
355 if (segin)
356 inarr = drms_segment_read(segin, usetype, &status);
357 // inarr = drms_segment_readslice(segin, usetype, startind, endind, &status);
358 tplarson 1.1 if (status != DRMS_SUCCESS || inarr == NULL || segin == NULL)
359 {
360 sprint_time(tscrstr, tstartin, "TAI", 0);
361 fprintf(stderr, "ERROR: problem reading input segment, T_START = %s, LMIN = %d, LMAX = %d, recnum = %lld, irec = %d, status = %d\n",
362 tscrstr, lminin, lmaxin, inrec->recnum, irecin, status);
363 drms_close_records(inrecset, DRMS_FREE_RECORD);
364 drms_close_records(outrecset, DRMS_FREE_RECORD);
365 return 0;
366 }
367 else
368 {
369 inptr=(float *)(inarr->data);
370 }
371
372 irecout=0;
373 for (iset=0; iset < ntimechunks; iset++)
374 {
375 tstartout=tstart + iset * chunksecs;
376 tstopout=tstartout+chunksecs;
377 sprint_time(tstartstr, tstartout, "TAI", 0);
378
379 tplarson 1.1 for (lchunk = lchunkfirst; lchunk <= lchunklast; lchunk++)
380 {
381 lminout = lchunk * lchunksize;
382 lmaxout = lminout + lchunksize - 1;
383 lminout = MAXIMUM(lminout,lmin);
384 lmaxout = MINIMUM(lmaxout,lmax);
385
386 if (tstartin >= tstopout || tstopin <= tstartout || lminin > lmaxout || lmaxin < lminout)
387 {
388 nskiparr[irecout++]++;
389 continue;
390 }
391
392 outrec = outrecset->records[irecout];
393 if (segoutflag)
394 segout = drms_segment_lookup(outrec, segnameout);
395 else
396 segout = drms_segment_lookupnum(outrec, 0);
397 tstartuse=MAXIMUM(tstartout, tstartin);
398 tstopuse= MINIMUM(tstopout, tstopin);
399 lminuse=MAXIMUM(lminout, lminin);
400 tplarson 1.1 lmaxuse=MINIMUM(lmaxout, lmaxin);
401 nmodes=MODES(lmaxuse+1)-MODES(lminuse);
402 npts=(tstopuse - tstartuse)/cadence;
403
404 out_time_offset = (tstartuse - tstartout)/cadence;
405 out_modes_offset = MODES(lminuse) - MODES(lminout);
406 // out_offset = 2 * (out_modes_offset * ndt + out_time_offset);
407 out_offset = 0; // 2 * (out_modes_offset * npts + out_time_offset);
408 in_time_offset = (tstartuse - tstartin)/cadence;
409 in_modes_offset = MODES(lminuse) - MODES(lminin);
410 in_offset = 2 * (in_modes_offset * tstepin / cadence + in_time_offset);
411
412 startind[0]=2*out_time_offset;
413 startind[1]=out_modes_offset;
414 endind[0]=2*(out_time_offset + npts) - 1;
415 endind[1]=out_modes_offset + nmodes - 1;
416 totallength[0]=2*ndt;
417 totallength[1]=lmaxout*(lmaxout+1)/2+lmaxout - lminout*(lminout+1)/2 + 1;
418
419 length[0]=2*npts;
420 length[1]=nmodes;
421 tplarson 1.1 arrptr=(float *)(calloc(length[0]*length[1],sizeof(float)));
422 outarr = drms_array_create(usetype, 2, length, arrptr, &status);
423 if (status != DRMS_SUCCESS || outarr == NULL || arrptr == NULL)
424 {
425 fprintf(stderr,"ERROR: problem creating output array: T_START = %s, LMIN = %d, LMAX = %d, length = [%d, %d], status = %d, histrecnum = %lld",
426 tstartstr, lminout, lmaxout, length[0], length[1], status, histrecnum);
427 drms_close_records(inrecset, DRMS_FREE_RECORD);
428 drms_close_records(outrecset, DRMS_FREE_RECORD);
429 return 0;
430 }
431 outptr = (float *)(outarr->data);
432
433 for (imode=0; imode<nmodes; imode++)
434 {
435 for (itime=0; itime<npts; itime++)
436 {
437 in_index=in_offset + 2*itime;
438 out_index=out_offset + 2*itime;
439 outptr[out_index] = inptr[in_index];
440 outptr[out_index+1] = inptr[in_index+1];
441 }
442 tplarson 1.1 out_offset+=2*npts; // 2*ndt;
443 in_offset+=2*tstepin/cadence;
444 }
445
446 status=drms_segment_writeslice_ext(segout, outarr, startind, endind, totallength, 0);
447 if (status != DRMS_SUCCESS)
448 {
449 fprintf(stderr, "ERROR: problem writing output segment: status = %d, T_START = %s, LMIN = %d, LMAX = %d, histrecnum = %lld\n",
450 status, tstartstr, lminout, lmaxout, histrecnum);
451 drms_close_records(inrecset, DRMS_FREE_RECORD);
452 drms_close_records(outrecset, DRMS_FREE_RECORD);
453 return 0;
454 }
455
456 drms_free_array(outarr);
457
458 irecout++;
459 } // end loop on lchunk
460 } // end loop on iset
461
462 continue_outer_loop:
463 tplarson 1.1 inrec = drms_recordset_fetchnext(drms_env, inrecset, &fetchstat, &chunkstat, NULL);
464 } // end loop on irecin
465
466 drms_close_records(inrecset, DRMS_FREE_RECORD);
467
468 int nsegments=0;
469 for (irecout=0; irecout < nrecsout; irecout++)
470 {
471 outrec=outrecset->records[irecout];
472 if (noinput || nskiparr[irecout] == nrecsin)
473 {
474 drms_setkey_int(outrec, "QUALITY", QUAL_NODATA);
475 }
476 else
477 {
478 drms_setkey_int(outrec, "QUALITY", 0);
479 nsegments++;
480 }
481
482 tnow = (double)time(NULL);
483 tnow += UNIX_epoch;
484 tplarson 1.1 drms_setkey_time(outrec, "DATE", tnow);
485 }
486
487 free(nskiparr);
488 drms_close_records(outrecset, DRMS_INSERT_RECORD);
489
490 wt=getwalltime();
491 ct=getcputime(&ut, &st);
492 if (verbflag)
493 {
494 printf("number of records created = %d\n", nrecsout);
495 printf("number of segments created = %d\n", nsegments);
496 fprintf(stdout, "total time spent: %.2f ms wall time, %.2f ms cpu time\n",
497 wt-wt0, ct-ct0);
498 }
499
500 printf("module %s successful completion\n", cmdparams.argv[0]);
501
502 return 0;
503
504 }
|