/[pdpsoft]/trunk/nl.nikhef.ndpf.tools/globus-gram-job-manager-pbs-nikhef/fork.pm.cin
ViewVC logotype

Contents of /trunk/nl.nikhef.ndpf.tools/globus-gram-job-manager-pbs-nikhef/fork.pm.cin

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2548 - (show annotations) (download)
Wed May 2 07:53:48 2012 UTC (10 years ago) by davidg
File size: 24987 byte(s)
Added modded fork job manager as well

1 # Copyright 1999-2006 University of Chicago
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 # Globus::GRAM::JobManager::fork package
16 #
17 # CVS Information:
18 # $Source: /home/globdev/CVS/globus-packages/gram/jobmanager/lrms/fork/source/fork.pm,v $
19 # $Date: 2011/08/19 14:33:18 $
20 # $Revision: 1.3 $
21 # $Author: bester $
22
23 use Globus::GRAM::Error;
24 use Globus::GRAM::JobState;
25 use Globus::GRAM::JobManager;
26 use Globus::GRAM::StdioMerger;
27 use Globus::Core::Paths;
28 use Globus::Core::Config;
29
30 use Config;
31 use IPC::Open2;
32
33 package Globus::GRAM::JobManager::fork;
34
35 @ISA = qw(Globus::GRAM::JobManager);
36
37 my ($mpirun, $mpiexec, $log_path);
38 my ($starter_in, $starter_out, $starter_index) = (undef, undef, 0);
39 my %signo;
40
41 BEGIN
42 {
43 my $i = 0;
44
45 foreach (split(' ', $Config::Config{sig_name}))
46 {
47 $signo{$_} = $i++;
48 }
49
50 my $config = new Globus::Core::Config(
51 '${sysconfdir}/globus/globus-fork.conf');
52
53 $mpirun = $config->get_attribute("mpirun") || "no";
54 if ($mpirun ne "no" && ! -x $mpirun)
55 {
56 $mpirun = "no";
57 }
58 $mpiexec = $config->get_attribute("mpiexec") || "no";
59 if ($mpiexec ne "no" && ! -x $mpiexec)
60 {
61 $mpiexec = "no";
62 }
63 $softenv_dir = $config->get_attribute("softenv_dir") || "";
64 $log_path = $config->get_attribute("log_path") || "/dev/null";
65 }
66
67 sub new
68 {
69 my $proto = shift;
70 my $class = ref($proto) || $proto;
71 my $self = $class->SUPER::new(@_);
72 my $description = $self->{JobDescription};
73 my $stdout = $description->stdout();
74 my $stderr = $description->stderr();
75
76 if($description->jobtype() eq 'multiple' && $description->count > 1)
77 {
78 $self->{STDIO_MERGER} =
79 new Globus::GRAM::StdioMerger($self->job_dir(), $stdout, $stderr);
80 }
81 else
82 {
83 $self->{STDIO_MERGER} = 0;
84 }
85
86 return $self;
87 }
88
89 sub submit
90 {
91 my $self = shift;
92 my $cmd;
93 my $pid;
94 my $pgid;
95 my @job_id;
96 my $count;
97 my $multi_output = 0;
98 my $description = $self->{JobDescription};
99 my $pipe;
100 my @cmdline;
101 my @environment;
102 my @arguments;
103 my $fork_starter = "$Globus::Core::Paths::sbindir/globus-fork-starter";
104 my $streamer = "$Globus::Core::Paths::sbindir/globus-gram-streamer";
105 my $fork_conf = "$Globus::Core::Paths::sysconfdir/globus-fork.conf";
106 my $is_grid_monitor = 0;
107 my $soft_msc = "$softenv_dir/bin/soft-msc";
108 my $softenv_load = "$softenv_dir/etc/softenv-load.sh";
109
110 $starter_index++;
111
112 if(!defined($description->directory()))
113 {
114 return Globus::GRAM::Error::RSL_DIRECTORY;
115 }
116 if ($description->directory() =~ m|^[^/]|) {
117 $description->add("directory",
118 $ENV{HOME} . '/' . $description->directory());
119 }
120 chdir $description->directory() or
121 return Globus::GRAM::Error::BAD_DIRECTORY;
122
123 @environment = $description->environment();
124 foreach $tuple ($description->environment())
125 {
126 if(!ref($tuple) || scalar(@$tuple) != 2)
127 {
128 return Globus::GRAM::Error::RSL_ENVIRONMENT();
129 }
130 $CHILD_ENV{$tuple->[0]} = $tuple->[1];
131 }
132
133 if(ref($description->count()) ||
134 $description->count() != int($description->count()))
135 {
136 return Globus::GRAM::Error::INVALID_COUNT();
137 }
138 if($description->jobtype() eq 'multiple')
139 {
140 $count = $description->count();
141 $multi_output = 1 if $count > 1;
142 }
143 elsif($description->jobtype() eq 'single')
144 {
145 $count = 1;
146 }
147 elsif($description->jobtype() eq 'mpi' && $mpiexec ne 'no')
148 {
149 $count = 1;
150 @cmdline = ($mpiexec, '-n', $description->count());
151 }
152 elsif($description->jobtype() eq 'mpi' && $mpirun ne 'no')
153 {
154 $count = 1;
155 @cmdline = ($mpirun, '-np', $description->count());
156 }
157 else
158 {
159 return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED();
160 }
161 if( $description->executable eq "")
162 {
163 return Globus::GRAM::Error::RSL_EXECUTABLE();
164 }
165 #elsif(! -e $description->executable())
166 #{
167 # return Globus::GRAM::Error::EXECUTABLE_NOT_FOUND();
168 #}
169 #elsif( (! -x $description->executable())
170 # || (! -f $description->executable()))
171 #{
172 # return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS();
173 #}
174 elsif( $description->stdin() eq "")
175 {
176 return Globus::GRAM::Error::RSL_STDIN;
177 }
178 elsif(! -r $description->stdin())
179 {
180 return Globus::GRAM::Error::STDIN_NOT_FOUND();
181 }
182
183 # ugly auth hack here
184 my ($gridid, $peeraddr);
185 foreach my $tuple ($description->environment()) {
186 $peeraddr = $tuple->[1] if $tuple->[0] eq "GATEKEEPER_PEER";
187 $gridid = $tuple->[1] if $tuple->[0] eq "GRID_ID";
188 }
189 # example, but DO NOT USE in real production as it breaks the LCG-CE i/f
190 #if ( $gridid !~ m:^/O=dutchgrid/O=users/O=nikhef/CN=: and
191 # $gridid !~ m:^/DC=org/DC=terena/DC=tcs/C=NL/O=Nikhef/CN=:
192 # ) {
193 # return Globus::GRAM::Error::AUTHORIZATION_DENIED();
194 #}
195
196 #if ($description->executable() =~ m:^(/|\.):) {
197 push(@cmdline, $description->executable());
198 #} else {
199 # push(@cmdline,
200 # $description->directory()
201 # . '/'
202 # . $description->executable());
203 #}
204
205 # Check if this is the Condor-G grid monitor
206 my $exec = $description->executable();
207 my $file_out = `/usr/bin/file $exec`;
208 if ( $file_out =~ /script/ || $file_out =~ /text/ ||
209 $file_out =~ m|/usr/bin/env| ) {
210 open( EXEC, "<$exec" ) or
211 return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS();
212 while( <EXEC> ) {
213 if ( /Sends results from the grid_manager_monitor_agent back to a/ ) {
214 $is_grid_monitor = 1;
215 }
216 }
217 close( EXEC );
218 }
219
220 # Reject jobs that want streaming, if so configured, but not for
221 # grid monitor jobs
222 if ( $description->streamingrequested() &&
223 $description->streamingdisabled() && !$is_grid_monitor ) {
224
225 $self->log("Streaming is not allowed.");
226 return Globus::GRAM::Error::OPENING_STDOUT;
227 }
228
229 @arguments = $description->arguments();
230 foreach(@arguments)
231 {
232 if(ref($_))
233 {
234 return Globus::GRAM::Error::RSL_ARGUMENTS;
235 }
236 }
237 if ($#arguments >= 0)
238 {
239 push(@cmdline, @arguments);
240 }
241
242 if ($description->use_fork_starter() && -x $fork_starter)
243 {
244 if (!defined($starter_in) && !defined($starter_out))
245 {
246 $pid = IPC::Open2::open2($starter_out, $starter_in,
247 "$fork_starter $log_path");
248 my $oldfh = select $starter_out;
249 $|=1;
250 select $oldfh;
251 }
252
253 print $starter_in "100;perl-fork-start-$$-$starter_index;";
254
255 print $starter_in 'directory='.
256 &escape_for_starter($description->directory()) . ';';
257
258 if (keys %CHILD_ENV > 0) {
259 print $starter_in 'environment='.
260 join(',', map { &escape_for_starter($_)
261 .'='.&escape_for_starter($CHILD_ENV{$_})
262 } (keys %CHILD_ENV)) . ';';
263 }
264
265 print $starter_in "count=$count;";
266
267 my @softenv = $description->softenv();
268 my $enable_default_software_environment
269 = $description->enable_default_software_environment();
270 if ( ($softenv_dir ne '')
271 && (@softenv || $enable_default_software_environment))
272 {
273 ### SoftEnv extension ###
274 $cmd_script_name = $self->job_dir() . '/scheduler_fork_cmd_script';
275 local(*CMD);
276 open( CMD, '>' . $cmd_script_name );
277
278 print CMD "#!/bin/sh\n";
279
280 $self->setup_softenv(
281 $self->job_dir() . '/fork_softenv_cmd_script',
282 $soft_msc,
283 $softenv_load,
284 *CMD);
285
286 print CMD 'cd ', $description->directory(), "\n";
287 print CMD "@cmdline\n";
288 print CMD "exit \$?\n";
289
290 close(CMD);
291 chmod 0700, $cmd_script_name;
292
293 print $starter_in 'executable=' .
294 &escape_for_starter($cmd_script_name). ';';
295 print $starter_in 'arguments=;';
296 #########################
297 }
298 else
299 {
300 print $starter_in 'executable=' .
301 &escape_for_starter($cmdline[0]). ';';
302 shift @cmdline;
303 if ($#cmdline >= 0)
304 {
305 print $starter_in 'arguments=' .
306 join(',', map {&escape_for_starter($_)} @cmdline) .
307 ';';
308 }
309 }
310
311 my @job_stdout;
312 my @job_stderr;
313
314 for ($i = 0; $i < $count; $i++) {
315 if($multi_output)
316 {
317 push(@job_stdout, $self->{STDIO_MERGER}->add_file('out'));
318 push(@job_stderr, $self->{STDIO_MERGER}->add_file('err'));
319 }
320 else
321 {
322 if (defined($description->stdout)) {
323 push(@job_stdout, $description->stdout());
324 } else {
325 push(@job_stdout, '/dev/null');
326 }
327
328 if (defined($description->stderr)) {
329 push(@job_stderr, $description->stderr());
330 } else {
331 push(@job_stderr, '/dev/null');
332 }
333 }
334 }
335
336 print $starter_in "stdin=" . &escape_for_starter($description->stdin()).
337 ';';
338 print $starter_in "stdout=" .
339 join(',', map {&escape_for_starter($_)} @job_stdout) . ';';
340 print $starter_in "stderr=" .
341 join(',', map {&escape_for_starter($_)} @job_stderr) . "\n";
342
343 while (<$starter_out>) {
344 chomp;
345 my @res = split(/;/, $_);
346
347 if ($res[1] ne "perl-fork-start-$$-$starter_index") {
348 next;
349 }
350 if ($res[0] == '101') {
351 @job_id = split(',', $res[2]);
352 last;
353 } elsif ($res[0] == '102') {
354 $self->respond({GT3_FAILURE_MESSAGE => "starter: $res[3]" });
355 return new Globus::GRAM::Error($res[2]);
356 }
357 }
358
359 if ($is_grid_monitor && -x $streamer)
360 {
361 my $streamer_startup='';
362
363 $starter_index++;
364 $streamer_startup .= "100;perl-fork-start-$$-$starter_index;";
365 $streamer_startup .= 'directory='.$self->job_dir().';';
366
367 if (keys %CHILD_ENV > 0) {
368 $streamer_startup .= 'environment='.
369 join(',', map { &escape_for_starter($_)
370 .'='.&escape_for_starter($CHILD_ENV{$_})
371 } (keys %CHILD_ENV)) . ';';
372 }
373
374 $streamer_startup .= "count=1;";
375
376 $streamer_startup .= 'executable=' . &escape_for_starter($streamer)
377 . ';';
378 @cmdline = ('-s', $description->state_file());
379 foreach my $p (@job_id)
380 {
381 my $q = $p;
382 # strip leading uuid
383 $q =~ s/.*://;
384 push(@cmdline, '-p', $q);
385 }
386 $streamer_startup .= 'arguments=' .
387 join(',', map {&escape_for_starter($_)} @cmdline) .
388 ';';
389
390 $streamer_startup .= "stdin=/dev/null;";
391 $streamer_startup .= "stdout=gram_streamer_out;";
392 $streamer_startup .= "stderr=gram_streamer_err\n";
393
394 $self->log("streamer_startup is $streamer_startup");
395 print $starter_in $streamer_startup;
396
397 while (<$starter_out>) {
398 chomp;
399 my @res = split(/;/, $_);
400
401 if ($res[1] ne "perl-fork-start-$$-$starter_index") {
402 next;
403 }
404 if ($res[0] == '101') {
405 @job_id = (@job_id, split(',', $res[2]));
406 last;
407 } elsif ($res[0] == '102') {
408 $self->respond({GT3_FAILURE_MESSAGE => "starter: $res[3]" });
409 return new Globus::GRAM::Error($res[2]);
410 }
411 }
412 }
413 $description->add('jobid', join(',', @job_id));
414 return { JOB_STATE => Globus::GRAM::JobState::ACTIVE,
415 JOB_ID => join(',', @job_id) };
416 } else {
417 my $starter_pid;
418 local(*READER,*WRITER); # always use local on perl FDs
419 pipe(READER, WRITER);
420
421 $starter_pid = fork();
422
423 if (! defined($starter_pid))
424 {
425 $failure_code = "fork:$!";
426 $self->respond({GT3_FAILURE_MESSAGE => $failure_code });
427 return Globus::GRAM::Error::JOB_EXECUTION_FAILED;
428 }
429 elsif ($starter_pid == 0)
430 {
431 # Starter Process
432 close(READER);
433
434 local(*JOB_READER, *JOB_WRITER);
435 pipe(JOB_READER, JOB_WRITER);
436
437 for(my $i = 0; $i < $count; $i++)
438 {
439 if($multi_output)
440 {
441 $job_stdout = $self->{STDIO_MERGER}->add_file('out');
442 $job_stderr = $self->{STDIO_MERGER}->add_file('err');
443 }
444 else
445 {
446 $job_stdout = $description->stdout();
447 $job_stderr = $description->stderr();
448 }
449
450 # obtain plain old pipe into temporary variables
451 local $^F = 2; # assure close-on-exec for pipe FDs
452 select((select(WRITER),$|=1)[$[]);
453
454 if( ($pid=fork()) == 0)
455 {
456 close(JOB_READER);
457
458 # forked child
459 %ENV = %CHILD_ENV;
460
461 close(STDIN);
462 close(STDOUT);
463 close(STDERR);
464
465 open(STDIN, '<' . $description->stdin());
466 open(STDOUT, ">>$job_stdout");
467 open(STDERR, ">>$job_stderr");
468
469 # the below should never fail since we just forked
470 setpgrp(0,$$);
471
472 if ( ! exec (@cmdline) )
473 {
474 my $err = "$!\n";
475 $SIG{PIPE} = 'IGNORE';
476 print JOB_WRITER "$err";
477 close(JOB_WRITER);
478 exit(1);
479 }
480 }
481 else
482 {
483 my $error_code = '';
484
485 if ($pid == undef)
486 {
487 $self->log("fork failed\n");
488 $failure_code = "fork: $!";
489 }
490 close(JOB_WRITER);
491
492 $_ = <JOB_READER>;
493 close(JOB_READER);
494
495 if($_ ne '')
496 {
497 chomp($_);
498 $self->log("exec failed\n");
499 $failure_code = "exec: $_";
500 }
501
502 if ($failure_code ne '')
503 {
504 # fork or exec failed. kill rest of job and return an error
505 $failure_code =~ s/\n/\\n/g;
506 foreach(@job_id)
507 {
508 $pgid = getpgrp($_);
509
510 $pgid == -1 ? kill($signo{TERM}, $_) :
511 kill(-$signo{TERM}, $pgid);
512
513 sleep(5);
514
515 $pgid == -1 ? kill($signo{KILL}, $_) :
516 kill(-$signo{KILL}, $pgid);
517
518 }
519
520 local(*ERR);
521 open(ERR, '>' . $description->stderr());
522 print ERR "$failure_code\n";
523 close(ERR);
524
525 print WRITER "FAIL:$failure_code\n";
526 exit(1);
527 }
528 push(@job_id, $pid);
529 }
530 }
531 if ($is_grid_monitor)
532 {
533 # Create an extra process to stream output for grid monitor
534
535 # obtain plain old pipe into temporary variables
536 local $^F = 2; # assure close-on-exec for pipe FDs
537 select((select(WRITER),$|=1)[$[]);
538
539 if( ($pid=fork()) == 0)
540 {
541 close(JOB_READER);
542
543 # forked child
544 %ENV = %CHILD_ENV;
545
546 close(STDIN);
547 close(STDOUT);
548 close(STDERR);
549
550 chdir $self->job_dir();
551
552 open(STDIN, '<' . $description->stdin());
553 open(STDOUT, '>gram_streamer_out');
554 open(STDERR, '>gram_streamer_error');
555 select STDERR; $| = 1; # make unbuffered
556 select STDOUT; $| = 1; # make unbuffered
557
558
559 # the below should never fail since we just forked
560 setpgrp(0,$$);
561
562 @cmdline = ($streamer, '-s', $description->state_file());
563 foreach my $p (@job_id)
564 {
565 push(@cmdline, '-p', $p);
566 }
567
568 if ( ! exec (@cmdline) )
569 {
570 my $err = "$!\n";
571 $SIG{PIPE} = 'IGNORE';
572 print JOB_WRITER "$err";
573 close(JOB_WRITER);
574 exit(1);
575 }
576 }
577 else
578 {
579 my $error_code = '';
580
581 if ($pid == undef)
582 {
583 $self->log("fork failed\n");
584 $failure_code = "fork: $!";
585 }
586 close(JOB_WRITER);
587
588 $_ = <JOB_READER>;
589 close(JOB_READER);
590
591 if($_ ne '')
592 {
593 chomp($_);
594 $self->log("exec failed\n");
595 $failure_code = "exec: $_";
596 }
597
598 if ($failure_code ne '')
599 {
600 # fork or exec failed. kill rest of job and return an error
601 $failure_code =~ s/\n/\\n/g;
602 foreach(@job_id)
603 {
604 $pgid = getpgrp($_);
605
606 $pgid == -1 ? kill($signo{TERM}, $_) :
607 kill(-$signo{TERM}, $pgid);
608
609 sleep(5);
610
611 $pgid == -1 ? kill($signo{KILL}, $_) :
612 kill(-$signo{KILL}, $pgid);
613
614 }
615
616 local(*ERR);
617 open(ERR, '>' . $description->stderr());
618 print ERR "$failure_code\n";
619 close(ERR);
620
621 print WRITER "FAIL:$failure_code\n";
622 exit(1);
623 }
624 push(@job_id, $pid);
625 }
626 }
627 print WRITER "SUCCESS:" . join(',', @job_id) . "\n";
628 exit(0);
629 }
630 else
631 {
632 my ($res, $value);
633 close(WRITER);
634 $_ = <READER>;
635 close(READER);
636 chomp($_);
637 waitpid($starter_pid, 0);
638 ($res, $value) = split(/:/, $_, 2);
639
640 if ($res eq 'SUCCESS')
641 {
642 $description->add('jobid', $value);
643 return { JOB_STATE => Globus::GRAM::JobState::ACTIVE,
644 JOB_ID => $value };
645 }
646 elsif ($res eq 'FAIL')
647 {
648 $self->respond({GT3_FAILURE_MESSAGE => "$value" });
649 return Globus::GRAM::Error::JOB_EXECUTION_FAILED;
650 }
651 else
652 {
653 return Globus::GRAM::Error::JOB_EXECUTION_FAILED;
654 }
655 }
656 }
657 }
658
659 sub poll
660 {
661 my $self = shift;
662 my $description = $self->{JobDescription};
663 my $state;
664
665 my $jobid = $description->jobid();
666
667 if(!defined $jobid)
668 {
669 $self->log("poll: job id defined!");
670 return { JOB_STATE => Globus::GRAM::JobState::FAILED };
671 }
672
673 $self->log("polling job " . $jobid);
674 $_ = kill(0, split(/,/, $jobid));
675
676 if($_ > 0)
677 {
678 $state = Globus::GRAM::JobState::ACTIVE;
679 }
680 else
681 {
682 $state = Globus::GRAM::JobState::DONE;
683 }
684 if($self->{STDIO_MERGER})
685 {
686 $self->{STDIO_MERGER}->poll($state == Globus::GRAM::JobState::DONE);
687 }
688
689 return { JOB_STATE => $state };
690 }
691
692 sub cancel
693 {
694 my $self = shift;
695 my $description = $self->{JobDescription};
696 my $pgid;
697 my $jobid = $description->jobid();
698
699 if(!defined $jobid)
700 {
701 $self->log("cancel: no jobid defined!");
702 return { JOB_STATE => Globus::GRAM::JobState::FAILED };
703 }
704
705 $self->log("cancel job " . $jobid);
706
707 foreach (split(/,/,$jobid))
708 {
709 s/..*://;
710 $pgid = getpgrp($_);
711
712 $pgid == -1 ? kill($signo{TERM}, $_) :
713 kill(-$signo{TERM}, $pgid);
714
715 sleep(5);
716
717 $pgid == -1 ? kill($signo{KILL}, $_) :
718 kill(-$signo{KILL}, $pgid);
719 }
720
721 return { JOB_STATE => Globus::GRAM::JobState::FAILED };
722 }
723
724 sub stage_out
725 {
726 my $self = shift;
727 my $description = $self->{JobDescription};
728 my $is_grid_monitor = 0;
729 my $job_dir = $self->job_dir();
730 my $rc;
731 my $line;
732 my @fields;
733 my $stream_out;
734
735 $self->log("Fork stage out");
736 # Check if this is the Condor-G grid monitor
737 my $exec = $description->executable();
738 my $file_out = `/usr/bin/file $exec`;
739 if ( $file_out =~ /script/ || $file_out =~ /text/ ||
740 $file_out =~ m|/usr/bin/env| ) {
741 open( EXEC, "<$exec" ) or
742 return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS();
743 while( <EXEC> ) {
744 if ( /Sends results from the grid_manager_monitor_agent back to a/ ) {
745 $is_grid_monitor = 1;
746 }
747 }
748 close( EXEC );
749 }
750
751 if ($is_grid_monitor)
752 {
753 $self->log("Fork stage out is grid monitor");
754 local(*STREAMER_ERROR, *STREAMER_OUTPUT);
755
756 $rc = open(STREAMER_ERROR, "<$job_dir/gram_streamer_error");
757 if (!$rc)
758 {
759 $self->log("Error opening gram_streamer_error: $!");
760 }
761 chomp($line = <STREAMER_ERROR>);
762 if ($line eq '')
763 {
764 $self->log("No error from streamer");
765 }
766 else
767 {
768 $self->log("Error from streamer $line");
769 @fields = split(':', $line, 2);
770
771 $self->respond({GT3_FAILURE_MESSAGE => "$fields[0]" });
772 return new Globus::GRAM::Error($fields[1]);
773 }
774 close(STREAMER_ERROR);
775
776 $stream_out = $description->get('file_stream_out');
777 $rc = open(STREAMER_OUTPUT, "<$job_dir/gram_streamer_out");
778 if (!$rc)
779 {
780 $self->log("Error opening gram_streamer_output: $!");
781 }
782 else
783 {
784 while ($line = <STREAMER_OUTPUT>)
785 {
786 chomp($line);
787 $self->log("Streamer output: $line");
788 my ($from, $to) = split(' ', $line);
789
790 for (my $i = 0; $i < scalar(@{$stream_out}); $i++)
791 {
792 my $pair = $stream_out->[$i];
793 if ($pair->[0] eq $from && $pair->[1] eq $to)
794 {
795 splice(@{$stream_out}, $i, 1);
796 last;
797 }
798 }
799
800 $self->respond({'STAGED_STREAM' => "$from $to"});
801 }
802 $description->add('filestreamout', $stream_out);
803 }
804 }
805 return $self->SUPER::stage_out();
806 }
807
808 sub escape_for_starter
809 {
810 my $str = shift;
811
812 $str =~ s/\\/\\\\/g;
813 $str =~ s/;/\\;/g;
814 $str =~ s/,/\\,/g;
815 $str =~ s/\n/\\n/g;
816 $str =~ s/=/\\=/g;
817
818 return $str;
819 }
820
821 END
822 {
823 if (defined($starter_in))
824 {
825 close($starter_in);
826 }
827 if (defined($starter_out))
828 {
829 close($starter_out);
830 }
831 }
832 1;

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28