1 arta 1.1 #!/home/jsoc/bin/linux_x86_64/activeperl
2
3 # The Gatekeeper iterates over all gates. If a gate's status needs updating, it spawns a task to update the gate's status.
4 # If a gate has tickets, it spawns tasks to process the tickets. Each gate is created with an "add script". An add script
5 # first makes a gate (a subdirectory in the parent gates directory), then it makes a task (a gate-specific directory
6 # in the parent tasks directory). The Gatekeeper assumes that every gate has an associated task directory.
7 #
8 # To process a ticket, the Gatekeeper spawns a task, which creates a task instance (and subdirectory) in the gate-specific
9 # task directory.
10
11 use strict;
12 use warnings;
13 use Data::Dumper;
14 use IO::Dir;
15 use threads;
16 use threads::shared;
17 use lib "$Bin/../../../base/libs/perl";
18 use drmsNetLocks;
19 use drmsArgs;
20 use drmsSysRun;
21
22 arta 1.1 # Install signal handler (so we can ctrl-c to end this app).
23 use sigtrap qw/handler Sighandler INT/;
24
25 # Script command-line parameters
26 use constant kArgDataDir => "ddir"; # The directory containing pipeline data and log files.
27 use cosntant kArgCodeDir => "cdir"; # The directory containing the pipeline scripts and apps.
28 use constant kArgVerbose => "verbose"; # The verbosity level.
29 use constant kArgDebug => "debug" # The debug level.
30
31 # Hardcoded parameters
32 use constant kSleepShort => 10;
33 use constant kSleepLong => 60;
34 use constant kLockFile => "/home/jsoc/locks/gatekeeperlck.txt";
35 use constant kDefDataDir => "/home/jsoc/pipeline";
36 use constant kDefCodeDir => "/home/jsoc/cvs/Development/JSOC/proj/workflow";
37 use constant kGateDir => "gates";
38 use constant kLogDir => "/home/jsoc/jsoclogs/pipeline";
39 use constant kRestartLog => "restartlog.txt";
40 use constant kRunLog => "runlog.txt";
41 use constant kGKOwner => "Gatekeeper_owner";
42 use constant kKeepRunningFile => "Keep_running";
43 arta 1.1 use constant kEvWorkflowCode => "WORKFLOW_ROOT";
44 use constant kEvWorkflowData => "WORKFLOW_DATA";
45 use constant kGateStatusDoneTO => 14400; # 4 hours - Time interval to wait for all gates' status tasks to complete.
46 use constant kGateStatusTO => 120; # 1 minute - Time interval to wait for one gate's status task to complete.
47
48 # Flag files
49 use constant kFlagGKVerbose => "GATEKEEPER_VERBOSE";
50 use constant kFlagGKDebug => "GATEKEEPER_DEBUG";
51
52 # Return values
53 use constant kRetSuccess => 0;
54 use constant kRetNoLock => 1;
55 use constant kRetInvalidArgs => 2;
56 use constant kRetFileIO => 3;
57 use constant kRetConfig => 4;
58 use constant kRetLog => 5;
59 use constant kRetUnexpectedTerm => 6;
60 use constant kRetStatusTask => 7;
61
62 # Global variables
63 my($gTerminate);
64 arta 1.1 my($gGateStatusMutex:shared); # Mutex for condition variable that synchronizes gate status-task completion.
65 my($gNThreads); # Total number of threads (i.e., number of gates) spawned for gate status tasks. Locked
66 # by $gGateStatusMutex.
67
68 # Local variables
69 my($opts);
70 my($cfgH);
71 my($lock);
72 my($busy);
73 my($restartlog);
74 my($runlog);
75 my($odir); # original directory
76 my($fh);
77 my($sleepdur);
78 my($gatesH); # A reference to a hash whose key is the name of a gate, and whose value is a Gate object.
79 my(@gatedirs);
80
81 # Acquire lock - this guarantees that no two gatekeepers can run simultaneously.
82 # This code will attempt to get the lock 10 times. If it doesn't acquire it during
83 # one of those attempts, then the constructor returns undef.
84 $lock = new drmsNetLocks(&kLockFile);
85 arta 1.1
86 if (defined($lock))
87 {
88 # Read-in command-line arguments.
89 $opts = GetOpts();
90
91 # Read-in the configuration, which comes from a number of sources. Some are environment variables,
92 # and some are flag files in the pipeline data directory. Returns undef if there was a problem
93 # reading required configuration information.
94 $cfgH = ReadConfig($args);
95
96 if (!defined($cfgH))
97 {
98 ShutDown(\$busy, \$lock, \$runlog, "", &kRetConfig);
99 }
100
101 if ($rv == &kRetSuccess)
102 {
103 # Append the PID to the restart.log file.
104 $restartlog = new WriteLog(&kLogDir . "/" . &kRestartLog, 1);
105
106 arta 1.1 if (!defined($restartlog))
107 {
108 ShutDown(\$busy, \$lock, \$runlog, "Unable to create log file; bailing out.", &kRetLog);
109 }
110 else
111 {
112 $restartlog->Write($$, 1);
113 $restartlog->Close();
114 }
115 }
116
117 if ($rv == &kRetSuccess)
118 {
119 # Write the name of the user who is running the gatekeeper to &kGKOwner.
120 if (WriteToFile($cfgH->{&kCfgDataDir}, &kGKOwner, ">", $ENV{'USER'}))
121 {
122 ShutDown(\$busy, \$lock, \$runlog, "", &kRetFileIO);
123 }
124
125 # Write the name of the gatekeeper host machine and the PID of the gatekeeper to &kKeepRunningFile.
126 if (WriteToFile($cfgH->{&kCfgDataDir}, &kKeepRunningFile, ">", $ENV{'HOST'} . "\." . $$))
127 arta 1.1 {
128 ShutDown(\$busy, \$lock, \$runlog, "", &kRetFileIO);
129 }
130
131 # Append to the run log.
132 $runlog = new WriteLog(&kLogDir . "/" . &kRunLog, 1);
133 if (!defined($runlog))
134 {
135 ShutDown(\$busy, \$lock, \$runlog, "Unable to create run log " . &kLogDir . "/" . &kRunLog, &kRetLog);
136 }
137 }
138
139 if ($rv == &kRetSuccess)
140 {
141 # Main gate-checking loop
142 $gTerminate = 0;
143
144 MAINLOOP: while (1)
145 {
146 # Check to see if it is time to shutdown.
147 if ($gTerminate)
148 arta 1.1 {
149 ShutDown(\$busy, \$lock, \$runlog, "Shutting down per user request.", &kRetSuccess);
150 }
151
152 $runlog->Write(); # Write just the timestamp.
153 $runlog->Write("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", 1); # Don't write timestamp.
154
155 # Ensure that DRMS/SUMS are functional.
156 if (drmsSysRun($cfgH->{&kCfgCodeDir} . "/scripts/checkDRMSnSUMS.csh"))
157 {
158 # Cannot talk to DRMS or SUMS or both.
159 $runlog->Write("GATEKEEPER, DRMS and/or SUMS is down, try again in a minute.");
160 $sleepdur = &kSleepLong;
161 LoopSleepOrBreak(\$busy, $cfgH->{&kCfgDataDir}, $sleepdur); # $busy is always undef here, so $gatesH will be undef here too and there is no need to untie.
162 }
163 else
164 {
165 $sleepdur = &kSleepShort;
166 }
167
168 # While the gate lock is being held, then no other program can modify the gate contents.
169 arta 1.1 $busy = new gateLock();
170
171 if (defined($busy))
172 {
173 my(@allgates); # Array of all gate names
174 my($badinit);
175 my($nextupdateO); # JSOCTime object
176 my($nextupdateS); # Time string
177 my($nowS); # Time string
178 my($nowO); # JSOCTime object
179 my($nowT); # seconds since machine epoch
180 my($currentO);
181 my($expTimeO);
182
183
184 if (!defined($gatesH))
185 {
186 if (CreateGates($cfgH, $runlog, \$gatesH))
187 {
188 ShutDown(\$busy, \$lock, \$runlog, "Unable to read gates directory.", &kRetFileIO);
189 }
190 arta 1.1 }
191 else
192 {
193 # Reload the state for each gate. Also, an operator could have added a new gate, so check for that.
194 if (ReloadGates($cfgH, $runlog, \$gatesH))
195 {
196 ShutDown(\$busy, \$lock, \$runlog, "Unable to read gates directory.", &kRetFileIO);
197 }
198 }
199
200 # Lock the mutex for the condition variable associated with the gate's status tasks
201 {
202 lock($gGateStatusMutex);
203 $gNThreads = 0;
204 } # unlock $gGateStatusMutex
205
206 # Iterate through the gates, checking for work to be done for each. They are in the gates subdirectory
207 # in the data directory). Each gate is implemented as a subdirectory. We have to initalize the gates
208 # on each iteration of the main loop, since the state of each gate may change while the Gatekeeper is
209 # running.
210 foreach my $gatename (@allgates)
211 arta 1.1 {
212 $gateO = $gatesH->{$gatename};
213
214 if ($gateO->OnHold())
215 {
216 if ($cfgH->{&kCfgVerbose})
217 {
218 $runlog->Write("GATEKEEPER Gate: " . $gateO->Name() . " on HOLD, skip this gate.");
219 }
220 next;
221 }
222
223 $runlog->Write("starting " . $gateO->Name());
224
225 if ($cfgH->{&kCfgVerbose})
226 {
227 $runlog->Write("GATEKEEPER Gate: " . $gateO->Name() . ", starting to check for update times expired.");
228 }
229
230 $nowT = localtime();
231 $nowS = POSIX::strftime("%Y.%m.%d_%H:%M:%S_%Z", $nowT);
232 arta 1.1 $nowO = new JSOCTime($nowS, "%Y.%m.%d_%H:%M:%S_%Z");
233 $nextupdateO = new JSOCTime($gateO->Get(&Gate::kStateFNextUpdate), "%Y.%m.%d_%H:%M:%S_%Z");
234 $nextupdateS = $nextupdateO->TimeString();
235
236 if (!defined($nextupdateO))
237 {
238 $nextupdateO = new JSOCTime($nowS, "%Y.%m.%d_%H:%M:%S_%Z");
239 }
240
241 if ($cfgH->{&kCfgVerbose})
242 {
243 $runlog->Write("GATEKEEPER Gate: " . $gateO->Name() . ", now = $nowS");
244 }
245
246 $badinit = length($cfgH->{&kCfgLow}) == 0 || $cfgH->{&kCfgLow} eq "NaN" ||
247 length($cfgH->{&kCfgHigh}) == 0 || $cfgH->{&kCfgHigh} eq "NaN";
248
249 if (!defined($nextupdateO) || $nowO->GT($nextupdateO) || $badinit)
250 {
251 if ($cfgH->{&kCfgVerbose})
252 {
253 arta 1.1 if ($badinit)
254 {
255 $runlog->Write("GATEKEEPER Gate " . $gateO->Name() . " not properly initialized, try to init");
256 }
257 }
258
259 $nextupdateO = $nowO->Add($gateO->Get(&Gate::kStateFUpdateDelta));
260 $gateO->Set(&Gate::kStateFNextUpdate, $nextupdateS);
261 $gateO->Set(&Gate::kStateFLastUpdate, $nowS);
262
263 # Spawn a thread that will process this gate. No gate should currently be
264 # busy. After we have spawned all threads, we wait until they have all
265 # completed before re-visiting the gates (there is a timeout so that
266 # we don't wait forever if a thread doesn't complete).
267 # We spawn such a thread by creating a StatusTask object and we will
268 # poll for completed gate processing. Oriinally, there was a flag file,
269 # statusbusy, that lived in the gate directory. It was created by the Gatekeeper
270 # and deleted by the statustask for that gate. The file was used to
271 # know when the statustask completed. However, by using threads, the
272 # Gatekeeper knows when the statustask has completed.
273
274 arta 1.1 # Spawn the gate's status task.
275 $st = $gateO->StatusTask();
276 if (!defined($st))
277 {
278 # Fatal error.
279 ShutDown(\$busy, \$lock, \$runlog, "Unable to run status task for gate " . $gateO->Name() . "\.", &kRetFileIO);
280 }
281 }
282 } # end loop over gates
283
284 $startO = new JSOCTime(POSIX::strftime("%Y.%m.%d_%H:%M:%S_%Z", localtime()), "%Y.%m.%d_%H:%M:%S_%Z");
285 $currentO = $startO->copy();
286 $expTimeO = $startO->copy();
287 $expTimeO->Add(&kGateStatusDoneTO); # Add kGateStatusTO seconds to current time - that defines the time
288 # when we give up.
289
290 # check for thread termination
291 {
292 lock($gGateStatusMutex);
293 while ($currentO->LT($expTimeO))
294 {
295 arta 1.1 if ($gNThreads == 0)
296 {
297 last;
298 }
299
300 if (!cond_timedwait($gGateStatusMutex, &kGateStatusTO)) # timeout after 2 minutes.
301 {
302 # Timeout occurred. Allow the script a chance to see if a global timeout should happen.
303 $log->Write("WARNING: No active status task has completed in the last " . &kGateStatusTO . " seconds.");
304 }
305
306 $currentO = new JSOCTime(POSIX::strftime("%Y.%m.%d_%H:%M:%S_%Z", localtime()), "%Y.%m.%d_%H:%M:%S_%Z");
307 }
308 } # unlock $gGateStatusMutex
309
310 {
311 lock($gGateStatusMutex);
312 if ($gNThreads != 0)
313 {
314 # Error - a thread is still running, bail.
315 $log->Write("ERROR: At least one status task has not completed within " . &kGateStatusDoneTO . " seconds. Bailing out");
316 arta 1.1
317 # Iterate through gates again to see which status tasks have not completed.
318
319 ShutDown(\$busy, \$lock, \$runlog, "Unable to run status task for gate " . $gateO->Name() . "\.", &kRetFileIO);
320 }
321 }
322
323 # The status tasks may have modified certain flag files (like low, high). Also, the gatestatus flag file may have
324 # been modified by an operator. Need to reload these three values for all gates since their values are used
325 # downstream.
326 #
327 foreach my $gatename (@allgates)
328 {
329 $gateO = $gatesH->{$gatename};
330
331 $gateO->Reload(&Gate::kStateFLow);
332 $gateO->Reload(&Gate::kStateFHigh);
333 $gateO->Reload(&Gate::kStateFGateStatus);
334
335 if ($gateO->Get(&Gate::kStateFType) ~= /time/i)
336 {
337 arta 1.1 # Have to call time_convert
338 $lowtime = ;
339 $hightime = ;
340 }
341 else
342 {
343 $lowtime = $gateO->Get(&Gate::kStateFLow);
344 $highttime = $gateO->Get(&Gate::kStateFHigh);
345 }
346
347 # Run the tasks requested by the tickets at the gate. This code first asynchronously runs the tasks that the tickets request
348 # be run, then it waits for all the tasks to complete.
349 $gate->ProcessTickets();
350 } # end loop over gates
351
352 foreach my $gatename (@allgates)
353 {
354 $gateO = $gatesH->{$gatename};
355
356 #
357 }
358 arta 1.1
359
360 } # end busy block
361
362
363
364
365
366 # The gatekeeper will have modified some gate attributes that may be used by code outside of the gatekeeper, so
367 # we need to flush the attributes back to disk.
368 foreach my $gatename (@allgates)
369 {
370 $gateO = $gatesH->{$gatename};
371
372 $gateO->Flush();
373 } # end loop over gates
374
375 # Release the busy lock and allow other waiting programs to modify the gate information.
376 LoopSleepOrBreak(\$busy, $cfgH->{&kCfgDataDir}, $sleepdur);
377 } # end main gate-checking loop
378 }
379 arta 1.1 }
380 else
381 {
382 print STDERR "The gatekeeper is already running; bailing out.\n";
383 $rv = &kRetNoLock;
384 }
385
386 ShutDown(\$busy, \$lock, \$runlog, "The Gatekeeper quit unexpectedly!", &kUnexpectedTerm);
387
388 sub GetOpts
389 {
390 my($optsinH);
391
392 $optsinH =
393 {
394 &kArgDataDir => 's',
395 &kArgCodeDir => 's',
396 &kArgVerbose => 'i',
397 &kArgDebug => 'i'
398 };
399
400 arta 1.1 return new drmsArgs($optsinH, 0);
401 }
402
403 sub Sighandler
404 {
405 # Set global variable to signify that the gatekeeper should terminate.
406 $gTerminate = 1;
407 }
408
409 sub ReadArgs
410 {
411 my($args);
412
413 return $args;
414 }
415
416 sub ReadConfig
417 {
418 my($opts) = @_;
419 my($err) = 0;
420
421 arta 1.1 my($datadir);
422 my($codedir);
423 my($verbose);
424 my($debug);
425 my($cfgH) = {};
426 my($cfg);
427
428 $datadir = $opts->Get(&kArgDataDir);
429 if (!defined($datadir))
430 {
431 $datadir = $ENV{&kEvWorkflowData};
432 }
433 if (!defined($datadir))
434 {
435 $datadir = &kDefDataDir;
436 }
437
438 $codedir = $opts->Get(&kArgCodeDir);
439 if (!defined($codedir))
440 {
441 $codedir = $ENV{&kEvWorkflowCode};
442 arta 1.1 }
443 if (!defined($codedir))
444 {
445 $codedir = &kDefCodeDir;
446 }
447
448 if (!defined($datadir) || !defined($codedir) || !(-d $datadir && -d $codedir))
449 {
450 print STDERR "Workflow data directory and/or code directory undefined.\n";
451 $err = 1;
452 }
453
454 if (!$err)
455 {
456 $verbose = $opts->Get(&kArgVerbose);
457 if (!defined($verbose))
458 {
459 $verbose = 1;
460 if (-e "$datadir/" . &kFlagGKVerbose)
461 {
462 my($fh);
463 arta 1.1 if (defined(open($fh, "<$datadir/" . &kFlagGKVerbose)))
464 {
465 $verbose = <$fh>;
466 chomp($verbose);
467 $fh->close();
468 }
469 }
470 }
471
472 $debug = $opts->Get(&kArgDebug);
473 if (!defined($debug))
474 {
475 $debug = 1;
476 if (-e "$datadir/" . &kFlagGKDebug)
477 {
478 my($fh);
479 if (defined(open($fh, "<$datadir/" . &kFlagGKDebug)))
480 {
481 $debug = <$fh>;
482 chomp($debug);
483 $fh->close();
484 arta 1.1 }
485 }
486 }
487 }
488
489 if (!$err)
490 {
491 $cfgH->{&kCfgDataDir} = $datadir;
492 $cfgH->{&kCfgCodeDir} = $codedir;
493 $cfgH->{&kCfgVerbose} = $verbose;
494 $cfgH->{&kCfgDebug} = $debug;
495
496 $cfg = new Cfg($cfgH);
497 }
498
499 return $cfg;
500 }
501
502 sub WriteToFile
503 {
504 my($datadir, $file, $mode, $content) = @_;
505 arta 1.1 my($fh);
506 my($msg);
507 my($rv);
508
509 $rv = 0;
510
511 if (!defined(open($fh, "$mode$datadir/$file")))
512 {
513 $msg = "Cannot open $datadir/$file for writing.\n";
514 $rv = 1;
515 }
516
517 print $fh $content;
518 $fh->close();
519
520 return $rv;
521 }
522
523 sub ShutDown
524 {
525 my($busyR, $lockR, $runlogR, $msg, $retval) = @_;
526 arta 1.1
527 # Turn-off the busy if it is set.
528 if (defined($$busyR))
529 {
530 $$busyR->ReleaseLock();
531 $$busyR = undef;
532 }
533
534 # Print the message to the log file, then destroy the run log.
535 if (defined($runlogR) && defined($rrunlogR))
536 {
537 $$runlogR->Write("$msg");
538 $$runlogR->Close();
539 $$runlogR = undef;
540 }
541 elsif ($retval == &kRetSuccess)
542 {
543 print "$msg\n";
544 }
545 else
546 {
547 arta 1.1 print STDERR "$msg\n";
548 }
549
550 # Release the concurrency lock.
551 if (defined($$lockR))
552 {
553 $$lockR->ReleaseLock();
554 }
555
556 exit($retval);
557 }
558
559 # Unset busy flag (release the "busy" lock) and check for the presence of the Keep_running file. If it is missing, then
560 # set the global variable that indicates it is time to shutdown. On the next loop iteration, the shutdow will happen.
561 # There is no concurrency code handling Keep_running, and there is none necessary. All concurrency is handled with the
562 # net lock and the gate lock.
563 sub LoopSleepOrBreak
564 {
565 my($busy, $datadir, $sleepdur) = @_;
566
567 if (defined($$busy))
568 arta 1.1 {
569 $$busy->ReleaseLock();
570 $$busy = undef;
571 }
572
573 if (!(-e "$datadir/" . &kKeepRunningFile))
574 {
575 # keeprunning flag file removed, terminate.
576 $gTerminate = 1;
577 }
578 else
579 {
580 # Don't terminate, but sleep to allow other scripts/programs access to gates.
581 sleep($sleepdur);
582 }
583
584 next MAINLOOP;
585 }
586
587 sub CreateOrUpdateGates
588 {
589 arta 1.1 my($cfgH, $runlog, $gatesHref) = @_;
590 my($update);
591 my(%gates);
592 my(@gatedirs);
593 my($gatedir);
594 my($gateO);
595 my($rv);
596
597 $update = 0;
598 if (defined($gatesH))
599 {
600 my(@gkeys) = keys(%$gatesH);
601
602 if ($#gkeys >= 0)
603 {
604 $update = 1;
605 }
606 }
607
608 if (!defined(tie(%gates, "IO::Dir", $cfgH->{&kCfgDataDir} . "/" . &kGateDir)))
609 {
610 arta 1.1 $rv = 1;
611 }
612 else
613 {
614 if (!defined($$gatesHref))
615 {
616 $$gatesHref = {};
617 }
618
619 @gatedirs = keys(%gates);
620
621 while (defined($gatedir = shift(@gatedirs)))
622 {
623 if ($gatedir =~ /^\.$/ || $gatedir =~ /^\.\.$/)
624 {
625 # Skip the "." and ".." files
626 next;
627 }
628
629 if (!$update)
630 {
631 arta 1.1 # New gates from scratch.
632 my($gate);
633
634 $gateO = new Gate($cfgH->{&kCfgDataDir} . "/" . &kGateDir, $gatedir, $runlog);
635 $$gatesHref->{$gateO->Name()} = $gateO;
636 }
637 else
638 {
639 # Reload. Iterate through all gates, re-initializing the state. Also, check for new
640 # gates that were added, and check for gates that were dropped.
641 my(@gkeys) = keys(%$gatesHref);
642
643 foreach my $gate (@gkeys)
644 {
645 $gateO = $gatesHref->{$gate};
646 $gateO->Reload();
647 }
648 }
649 }
650
651 # Release the gates-directory object.
652 arta 1.1 untie(%gates);
653 }
654
655 return $rv;
656 }
657
658 sub CreateGates
659 {
660 my($cfgH, $runlog, $gatesH) = @_;
661
662 $gatesH = undef;
663
664 return CreateOrUpdateGates($cfgH, $runlog, $gatesH);
665 }
666
667 sub ReloadGates
668 {
669 my($cfgH, $runlog, $gatesH) = @_;
670
671 return CreateOrUpdateGates($cfgH, $runlog, $gatesH);
672 }
|