/[pdpsoft]/trunk/nl.nikhef.ndpf.tools/globus-gram-job-manager-pbs-nikhef/fork.pm.cin
ViewVC logotype

Annotation of /trunk/nl.nikhef.ndpf.tools/globus-gram-job-manager-pbs-nikhef/fork.pm.cin

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2548 - (hide annotations) (download)
Wed May 2 07:53:48 2012 UTC (10 years ago) by davidg
File size: 24987 byte(s)
Added modded fork job manager as well

1 davidg 2548 # Copyright 1999-2006 University of Chicago
2     #
3     # Licensed under the Apache License, Version 2.0 (the "License");
4     # you may not use this file except in compliance with the License.
5     # You may obtain a copy of the License at
6     #
7     # http://www.apache.org/licenses/LICENSE-2.0
8     #
9     # Unless required by applicable law or agreed to in writing, software
10     # distributed under the License is distributed on an "AS IS" BASIS,
11     # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12     # See the License for the specific language governing permissions and
13     # limitations under the License.
14    
15     # Globus::GRAM::JobManager::fork package
16     #
17     # CVS Information:
18     # $Source: /home/globdev/CVS/globus-packages/gram/jobmanager/lrms/fork/source/fork.pm,v $
19     # $Date: 2011/08/19 14:33:18 $
20     # $Revision: 1.3 $
21     # $Author: bester $
22    
23     use Globus::GRAM::Error;
24     use Globus::GRAM::JobState;
25     use Globus::GRAM::JobManager;
26     use Globus::GRAM::StdioMerger;
27     use Globus::Core::Paths;
28     use Globus::Core::Config;
29    
30     use Config;
31     use IPC::Open2;
32    
33     package Globus::GRAM::JobManager::fork;
34    
35     @ISA = qw(Globus::GRAM::JobManager);
36    
37     my ($mpirun, $mpiexec, $log_path);
38     my ($starter_in, $starter_out, $starter_index) = (undef, undef, 0);
39     my %signo;
40    
41     BEGIN
42     {
43     my $i = 0;
44    
45     foreach (split(' ', $Config::Config{sig_name}))
46     {
47     $signo{$_} = $i++;
48     }
49    
50     my $config = new Globus::Core::Config(
51     '${sysconfdir}/globus/globus-fork.conf');
52    
53     $mpirun = $config->get_attribute("mpirun") || "no";
54     if ($mpirun ne "no" && ! -x $mpirun)
55     {
56     $mpirun = "no";
57     }
58     $mpiexec = $config->get_attribute("mpiexec") || "no";
59     if ($mpiexec ne "no" && ! -x $mpiexec)
60     {
61     $mpiexec = "no";
62     }
63     $softenv_dir = $config->get_attribute("softenv_dir") || "";
64     $log_path = $config->get_attribute("log_path") || "/dev/null";
65     }
66    
67     sub new
68     {
69     my $proto = shift;
70     my $class = ref($proto) || $proto;
71     my $self = $class->SUPER::new(@_);
72     my $description = $self->{JobDescription};
73     my $stdout = $description->stdout();
74     my $stderr = $description->stderr();
75    
76     if($description->jobtype() eq 'multiple' && $description->count > 1)
77     {
78     $self->{STDIO_MERGER} =
79     new Globus::GRAM::StdioMerger($self->job_dir(), $stdout, $stderr);
80     }
81     else
82     {
83     $self->{STDIO_MERGER} = 0;
84     }
85    
86     return $self;
87     }
88    
89     sub submit
90     {
91     my $self = shift;
92     my $cmd;
93     my $pid;
94     my $pgid;
95     my @job_id;
96     my $count;
97     my $multi_output = 0;
98     my $description = $self->{JobDescription};
99     my $pipe;
100     my @cmdline;
101     my @environment;
102     my @arguments;
103     my $fork_starter = "$Globus::Core::Paths::sbindir/globus-fork-starter";
104     my $streamer = "$Globus::Core::Paths::sbindir/globus-gram-streamer";
105     my $fork_conf = "$Globus::Core::Paths::sysconfdir/globus-fork.conf";
106     my $is_grid_monitor = 0;
107     my $soft_msc = "$softenv_dir/bin/soft-msc";
108     my $softenv_load = "$softenv_dir/etc/softenv-load.sh";
109    
110     $starter_index++;
111    
112     if(!defined($description->directory()))
113     {
114     return Globus::GRAM::Error::RSL_DIRECTORY;
115     }
116     if ($description->directory() =~ m|^[^/]|) {
117     $description->add("directory",
118     $ENV{HOME} . '/' . $description->directory());
119     }
120     chdir $description->directory() or
121     return Globus::GRAM::Error::BAD_DIRECTORY;
122    
123     @environment = $description->environment();
124     foreach $tuple ($description->environment())
125     {
126     if(!ref($tuple) || scalar(@$tuple) != 2)
127     {
128     return Globus::GRAM::Error::RSL_ENVIRONMENT();
129     }
130     $CHILD_ENV{$tuple->[0]} = $tuple->[1];
131     }
132    
133     if(ref($description->count()) ||
134     $description->count() != int($description->count()))
135     {
136     return Globus::GRAM::Error::INVALID_COUNT();
137     }
138     if($description->jobtype() eq 'multiple')
139     {
140     $count = $description->count();
141     $multi_output = 1 if $count > 1;
142     }
143     elsif($description->jobtype() eq 'single')
144     {
145     $count = 1;
146     }
147     elsif($description->jobtype() eq 'mpi' && $mpiexec ne 'no')
148     {
149     $count = 1;
150     @cmdline = ($mpiexec, '-n', $description->count());
151     }
152     elsif($description->jobtype() eq 'mpi' && $mpirun ne 'no')
153     {
154     $count = 1;
155     @cmdline = ($mpirun, '-np', $description->count());
156     }
157     else
158     {
159     return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED();
160     }
161     if( $description->executable eq "")
162     {
163     return Globus::GRAM::Error::RSL_EXECUTABLE();
164     }
165     #elsif(! -e $description->executable())
166     #{
167     # return Globus::GRAM::Error::EXECUTABLE_NOT_FOUND();
168     #}
169     #elsif( (! -x $description->executable())
170     # || (! -f $description->executable()))
171     #{
172     # return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS();
173     #}
174     elsif( $description->stdin() eq "")
175     {
176     return Globus::GRAM::Error::RSL_STDIN;
177     }
178     elsif(! -r $description->stdin())
179     {
180     return Globus::GRAM::Error::STDIN_NOT_FOUND();
181     }
182    
183     # ugly auth hack here
184     my ($gridid, $peeraddr);
185     foreach my $tuple ($description->environment()) {
186     $peeraddr = $tuple->[1] if $tuple->[0] eq "GATEKEEPER_PEER";
187     $gridid = $tuple->[1] if $tuple->[0] eq "GRID_ID";
188     }
189     # example, but DO NOT USE in real production as it breaks the LCG-CE i/f
190     #if ( $gridid !~ m:^/O=dutchgrid/O=users/O=nikhef/CN=: and
191     # $gridid !~ m:^/DC=org/DC=terena/DC=tcs/C=NL/O=Nikhef/CN=:
192     # ) {
193     # return Globus::GRAM::Error::AUTHORIZATION_DENIED();
194     #}
195    
196     #if ($description->executable() =~ m:^(/|\.):) {
197     push(@cmdline, $description->executable());
198     #} else {
199     # push(@cmdline,
200     # $description->directory()
201     # . '/'
202     # . $description->executable());
203     #}
204    
205     # Check if this is the Condor-G grid monitor
206     my $exec = $description->executable();
207     my $file_out = `/usr/bin/file $exec`;
208     if ( $file_out =~ /script/ || $file_out =~ /text/ ||
209     $file_out =~ m|/usr/bin/env| ) {
210     open( EXEC, "<$exec" ) or
211     return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS();
212     while( <EXEC> ) {
213     if ( /Sends results from the grid_manager_monitor_agent back to a/ ) {
214     $is_grid_monitor = 1;
215     }
216     }
217     close( EXEC );
218     }
219    
220     # Reject jobs that want streaming, if so configured, but not for
221     # grid monitor jobs
222     if ( $description->streamingrequested() &&
223     $description->streamingdisabled() && !$is_grid_monitor ) {
224    
225     $self->log("Streaming is not allowed.");
226     return Globus::GRAM::Error::OPENING_STDOUT;
227     }
228    
229     @arguments = $description->arguments();
230     foreach(@arguments)
231     {
232     if(ref($_))
233     {
234     return Globus::GRAM::Error::RSL_ARGUMENTS;
235     }
236     }
237     if ($#arguments >= 0)
238     {
239     push(@cmdline, @arguments);
240     }
241    
242     if ($description->use_fork_starter() && -x $fork_starter)
243     {
244     if (!defined($starter_in) && !defined($starter_out))
245     {
246     $pid = IPC::Open2::open2($starter_out, $starter_in,
247     "$fork_starter $log_path");
248     my $oldfh = select $starter_out;
249     $|=1;
250     select $oldfh;
251     }
252    
253     print $starter_in "100;perl-fork-start-$$-$starter_index;";
254    
255     print $starter_in 'directory='.
256     &escape_for_starter($description->directory()) . ';';
257    
258     if (keys %CHILD_ENV > 0) {
259     print $starter_in 'environment='.
260     join(',', map { &escape_for_starter($_)
261     .'='.&escape_for_starter($CHILD_ENV{$_})
262     } (keys %CHILD_ENV)) . ';';
263     }
264    
265     print $starter_in "count=$count;";
266    
267     my @softenv = $description->softenv();
268     my $enable_default_software_environment
269     = $description->enable_default_software_environment();
270     if ( ($softenv_dir ne '')
271     && (@softenv || $enable_default_software_environment))
272     {
273     ### SoftEnv extension ###
274     $cmd_script_name = $self->job_dir() . '/scheduler_fork_cmd_script';
275     local(*CMD);
276     open( CMD, '>' . $cmd_script_name );
277    
278     print CMD "#!/bin/sh\n";
279    
280     $self->setup_softenv(
281     $self->job_dir() . '/fork_softenv_cmd_script',
282     $soft_msc,
283     $softenv_load,
284     *CMD);
285    
286     print CMD 'cd ', $description->directory(), "\n";
287     print CMD "@cmdline\n";
288     print CMD "exit \$?\n";
289    
290     close(CMD);
291     chmod 0700, $cmd_script_name;
292    
293     print $starter_in 'executable=' .
294     &escape_for_starter($cmd_script_name). ';';
295     print $starter_in 'arguments=;';
296     #########################
297     }
298     else
299     {
300     print $starter_in 'executable=' .
301     &escape_for_starter($cmdline[0]). ';';
302     shift @cmdline;
303     if ($#cmdline >= 0)
304     {
305     print $starter_in 'arguments=' .
306     join(',', map {&escape_for_starter($_)} @cmdline) .
307     ';';
308     }
309     }
310    
311     my @job_stdout;
312     my @job_stderr;
313    
314     for ($i = 0; $i < $count; $i++) {
315     if($multi_output)
316     {
317     push(@job_stdout, $self->{STDIO_MERGER}->add_file('out'));
318     push(@job_stderr, $self->{STDIO_MERGER}->add_file('err'));
319     }
320     else
321     {
322     if (defined($description->stdout)) {
323     push(@job_stdout, $description->stdout());
324     } else {
325     push(@job_stdout, '/dev/null');
326     }
327    
328     if (defined($description->stderr)) {
329     push(@job_stderr, $description->stderr());
330     } else {
331     push(@job_stderr, '/dev/null');
332     }
333     }
334     }
335    
336     print $starter_in "stdin=" . &escape_for_starter($description->stdin()).
337     ';';
338     print $starter_in "stdout=" .
339     join(',', map {&escape_for_starter($_)} @job_stdout) . ';';
340     print $starter_in "stderr=" .
341     join(',', map {&escape_for_starter($_)} @job_stderr) . "\n";
342    
343     while (<$starter_out>) {
344     chomp;
345     my @res = split(/;/, $_);
346    
347     if ($res[1] ne "perl-fork-start-$$-$starter_index") {
348     next;
349     }
350     if ($res[0] == '101') {
351     @job_id = split(',', $res[2]);
352     last;
353     } elsif ($res[0] == '102') {
354     $self->respond({GT3_FAILURE_MESSAGE => "starter: $res[3]" });
355     return new Globus::GRAM::Error($res[2]);
356     }
357     }
358    
359     if ($is_grid_monitor && -x $streamer)
360     {
361     my $streamer_startup='';
362    
363     $starter_index++;
364     $streamer_startup .= "100;perl-fork-start-$$-$starter_index;";
365     $streamer_startup .= 'directory='.$self->job_dir().';';
366    
367     if (keys %CHILD_ENV > 0) {
368     $streamer_startup .= 'environment='.
369     join(',', map { &escape_for_starter($_)
370     .'='.&escape_for_starter($CHILD_ENV{$_})
371     } (keys %CHILD_ENV)) . ';';
372     }
373    
374     $streamer_startup .= "count=1;";
375    
376     $streamer_startup .= 'executable=' . &escape_for_starter($streamer)
377     . ';';
378     @cmdline = ('-s', $description->state_file());
379     foreach my $p (@job_id)
380     {
381     my $q = $p;
382     # strip leading uuid
383     $q =~ s/.*://;
384     push(@cmdline, '-p', $q);
385     }
386     $streamer_startup .= 'arguments=' .
387     join(',', map {&escape_for_starter($_)} @cmdline) .
388     ';';
389    
390     $streamer_startup .= "stdin=/dev/null;";
391     $streamer_startup .= "stdout=gram_streamer_out;";
392     $streamer_startup .= "stderr=gram_streamer_err\n";
393    
394     $self->log("streamer_startup is $streamer_startup");
395     print $starter_in $streamer_startup;
396    
397     while (<$starter_out>) {
398     chomp;
399     my @res = split(/;/, $_);
400    
401     if ($res[1] ne "perl-fork-start-$$-$starter_index") {
402     next;
403     }
404     if ($res[0] == '101') {
405     @job_id = (@job_id, split(',', $res[2]));
406     last;
407     } elsif ($res[0] == '102') {
408     $self->respond({GT3_FAILURE_MESSAGE => "starter: $res[3]" });
409     return new Globus::GRAM::Error($res[2]);
410     }
411     }
412     }
413     $description->add('jobid', join(',', @job_id));
414     return { JOB_STATE => Globus::GRAM::JobState::ACTIVE,
415     JOB_ID => join(',', @job_id) };
416     } else {
417     my $starter_pid;
418     local(*READER,*WRITER); # always use local on perl FDs
419     pipe(READER, WRITER);
420    
421     $starter_pid = fork();
422    
423     if (! defined($starter_pid))
424     {
425     $failure_code = "fork:$!";
426     $self->respond({GT3_FAILURE_MESSAGE => $failure_code });
427     return Globus::GRAM::Error::JOB_EXECUTION_FAILED;
428     }
429     elsif ($starter_pid == 0)
430     {
431     # Starter Process
432     close(READER);
433    
434     local(*JOB_READER, *JOB_WRITER);
435     pipe(JOB_READER, JOB_WRITER);
436    
437     for(my $i = 0; $i < $count; $i++)
438     {
439     if($multi_output)
440     {
441     $job_stdout = $self->{STDIO_MERGER}->add_file('out');
442     $job_stderr = $self->{STDIO_MERGER}->add_file('err');
443     }
444     else
445     {
446     $job_stdout = $description->stdout();
447     $job_stderr = $description->stderr();
448     }
449    
450     # obtain plain old pipe into temporary variables
451     local $^F = 2; # assure close-on-exec for pipe FDs
452     select((select(WRITER),$|=1)[$[]);
453    
454     if( ($pid=fork()) == 0)
455     {
456     close(JOB_READER);
457    
458     # forked child
459     %ENV = %CHILD_ENV;
460    
461     close(STDIN);
462     close(STDOUT);
463     close(STDERR);
464    
465     open(STDIN, '<' . $description->stdin());
466     open(STDOUT, ">>$job_stdout");
467     open(STDERR, ">>$job_stderr");
468    
469     # the below should never fail since we just forked
470     setpgrp(0,$$);
471    
472     if ( ! exec (@cmdline) )
473     {
474     my $err = "$!\n";
475     $SIG{PIPE} = 'IGNORE';
476     print JOB_WRITER "$err";
477     close(JOB_WRITER);
478     exit(1);
479     }
480     }
481     else
482     {
483     my $error_code = '';
484    
485     if ($pid == undef)
486     {
487     $self->log("fork failed\n");
488     $failure_code = "fork: $!";
489     }
490     close(JOB_WRITER);
491    
492     $_ = <JOB_READER>;
493     close(JOB_READER);
494    
495     if($_ ne '')
496     {
497     chomp($_);
498     $self->log("exec failed\n");
499     $failure_code = "exec: $_";
500     }
501    
502     if ($failure_code ne '')
503     {
504     # fork or exec failed. kill rest of job and return an error
505     $failure_code =~ s/\n/\\n/g;
506     foreach(@job_id)
507     {
508     $pgid = getpgrp($_);
509    
510     $pgid == -1 ? kill($signo{TERM}, $_) :
511     kill(-$signo{TERM}, $pgid);
512    
513     sleep(5);
514    
515     $pgid == -1 ? kill($signo{KILL}, $_) :
516     kill(-$signo{KILL}, $pgid);
517    
518     }
519    
520     local(*ERR);
521     open(ERR, '>' . $description->stderr());
522     print ERR "$failure_code\n";
523     close(ERR);
524    
525     print WRITER "FAIL:$failure_code\n";
526     exit(1);
527     }
528     push(@job_id, $pid);
529     }
530     }
531     if ($is_grid_monitor)
532     {
533     # Create an extra process to stream output for grid monitor
534    
535     # obtain plain old pipe into temporary variables
536     local $^F = 2; # assure close-on-exec for pipe FDs
537     select((select(WRITER),$|=1)[$[]);
538    
539     if( ($pid=fork()) == 0)
540     {
541     close(JOB_READER);
542    
543     # forked child
544     %ENV = %CHILD_ENV;
545    
546     close(STDIN);
547     close(STDOUT);
548     close(STDERR);
549    
550     chdir $self->job_dir();
551    
552     open(STDIN, '<' . $description->stdin());
553     open(STDOUT, '>gram_streamer_out');
554     open(STDERR, '>gram_streamer_error');
555     select STDERR; $| = 1; # make unbuffered
556     select STDOUT; $| = 1; # make unbuffered
557    
558    
559     # the below should never fail since we just forked
560     setpgrp(0,$$);
561    
562     @cmdline = ($streamer, '-s', $description->state_file());
563     foreach my $p (@job_id)
564     {
565     push(@cmdline, '-p', $p);
566     }
567    
568     if ( ! exec (@cmdline) )
569     {
570     my $err = "$!\n";
571     $SIG{PIPE} = 'IGNORE';
572     print JOB_WRITER "$err";
573     close(JOB_WRITER);
574     exit(1);
575     }
576     }
577     else
578     {
579     my $error_code = '';
580    
581     if ($pid == undef)
582     {
583     $self->log("fork failed\n");
584     $failure_code = "fork: $!";
585     }
586     close(JOB_WRITER);
587    
588     $_ = <JOB_READER>;
589     close(JOB_READER);
590    
591     if($_ ne '')
592     {
593     chomp($_);
594     $self->log("exec failed\n");
595     $failure_code = "exec: $_";
596     }
597    
598     if ($failure_code ne '')
599     {
600     # fork or exec failed. kill rest of job and return an error
601     $failure_code =~ s/\n/\\n/g;
602     foreach(@job_id)
603     {
604     $pgid = getpgrp($_);
605    
606     $pgid == -1 ? kill($signo{TERM}, $_) :
607     kill(-$signo{TERM}, $pgid);
608    
609     sleep(5);
610    
611     $pgid == -1 ? kill($signo{KILL}, $_) :
612     kill(-$signo{KILL}, $pgid);
613    
614     }
615    
616     local(*ERR);
617     open(ERR, '>' . $description->stderr());
618     print ERR "$failure_code\n";
619     close(ERR);
620    
621     print WRITER "FAIL:$failure_code\n";
622     exit(1);
623     }
624     push(@job_id, $pid);
625     }
626     }
627     print WRITER "SUCCESS:" . join(',', @job_id) . "\n";
628     exit(0);
629     }
630     else
631     {
632     my ($res, $value);
633     close(WRITER);
634     $_ = <READER>;
635     close(READER);
636     chomp($_);
637     waitpid($starter_pid, 0);
638     ($res, $value) = split(/:/, $_, 2);
639    
640     if ($res eq 'SUCCESS')
641     {
642     $description->add('jobid', $value);
643     return { JOB_STATE => Globus::GRAM::JobState::ACTIVE,
644     JOB_ID => $value };
645     }
646     elsif ($res eq 'FAIL')
647     {
648     $self->respond({GT3_FAILURE_MESSAGE => "$value" });
649     return Globus::GRAM::Error::JOB_EXECUTION_FAILED;
650     }
651     else
652     {
653     return Globus::GRAM::Error::JOB_EXECUTION_FAILED;
654     }
655     }
656     }
657     }
658    
659     sub poll
660     {
661     my $self = shift;
662     my $description = $self->{JobDescription};
663     my $state;
664    
665     my $jobid = $description->jobid();
666    
667     if(!defined $jobid)
668     {
669     $self->log("poll: job id defined!");
670     return { JOB_STATE => Globus::GRAM::JobState::FAILED };
671     }
672    
673     $self->log("polling job " . $jobid);
674     $_ = kill(0, split(/,/, $jobid));
675    
676     if($_ > 0)
677     {
678     $state = Globus::GRAM::JobState::ACTIVE;
679     }
680     else
681     {
682     $state = Globus::GRAM::JobState::DONE;
683     }
684     if($self->{STDIO_MERGER})
685     {
686     $self->{STDIO_MERGER}->poll($state == Globus::GRAM::JobState::DONE);
687     }
688    
689     return { JOB_STATE => $state };
690     }
691    
692     sub cancel
693     {
694     my $self = shift;
695     my $description = $self->{JobDescription};
696     my $pgid;
697     my $jobid = $description->jobid();
698    
699     if(!defined $jobid)
700     {
701     $self->log("cancel: no jobid defined!");
702     return { JOB_STATE => Globus::GRAM::JobState::FAILED };
703     }
704    
705     $self->log("cancel job " . $jobid);
706    
707     foreach (split(/,/,$jobid))
708     {
709     s/..*://;
710     $pgid = getpgrp($_);
711    
712     $pgid == -1 ? kill($signo{TERM}, $_) :
713     kill(-$signo{TERM}, $pgid);
714    
715     sleep(5);
716    
717     $pgid == -1 ? kill($signo{KILL}, $_) :
718     kill(-$signo{KILL}, $pgid);
719     }
720    
721     return { JOB_STATE => Globus::GRAM::JobState::FAILED };
722     }
723    
724     sub stage_out
725     {
726     my $self = shift;
727     my $description = $self->{JobDescription};
728     my $is_grid_monitor = 0;
729     my $job_dir = $self->job_dir();
730     my $rc;
731     my $line;
732     my @fields;
733     my $stream_out;
734    
735     $self->log("Fork stage out");
736     # Check if this is the Condor-G grid monitor
737     my $exec = $description->executable();
738     my $file_out = `/usr/bin/file $exec`;
739     if ( $file_out =~ /script/ || $file_out =~ /text/ ||
740     $file_out =~ m|/usr/bin/env| ) {
741     open( EXEC, "<$exec" ) or
742     return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS();
743     while( <EXEC> ) {
744     if ( /Sends results from the grid_manager_monitor_agent back to a/ ) {
745     $is_grid_monitor = 1;
746     }
747     }
748     close( EXEC );
749     }
750    
751     if ($is_grid_monitor)
752     {
753     $self->log("Fork stage out is grid monitor");
754     local(*STREAMER_ERROR, *STREAMER_OUTPUT);
755    
756     $rc = open(STREAMER_ERROR, "<$job_dir/gram_streamer_error");
757     if (!$rc)
758     {
759     $self->log("Error opening gram_streamer_error: $!");
760     }
761     chomp($line = <STREAMER_ERROR>);
762     if ($line eq '')
763     {
764     $self->log("No error from streamer");
765     }
766     else
767     {
768     $self->log("Error from streamer $line");
769     @fields = split(':', $line, 2);
770    
771     $self->respond({GT3_FAILURE_MESSAGE => "$fields[0]" });
772     return new Globus::GRAM::Error($fields[1]);
773     }
774     close(STREAMER_ERROR);
775    
776     $stream_out = $description->get('file_stream_out');
777     $rc = open(STREAMER_OUTPUT, "<$job_dir/gram_streamer_out");
778     if (!$rc)
779     {
780     $self->log("Error opening gram_streamer_output: $!");
781     }
782     else
783     {
784     while ($line = <STREAMER_OUTPUT>)
785     {
786     chomp($line);
787     $self->log("Streamer output: $line");
788     my ($from, $to) = split(' ', $line);
789    
790     for (my $i = 0; $i < scalar(@{$stream_out}); $i++)
791     {
792     my $pair = $stream_out->[$i];
793     if ($pair->[0] eq $from && $pair->[1] eq $to)
794     {
795     splice(@{$stream_out}, $i, 1);
796     last;
797     }
798     }
799    
800     $self->respond({'STAGED_STREAM' => "$from $to"});
801     }
802     $description->add('filestreamout', $stream_out);
803     }
804     }
805     return $self->SUPER::stage_out();
806     }
807    
808     sub escape_for_starter
809     {
810     my $str = shift;
811    
812     $str =~ s/\\/\\\\/g;
813     $str =~ s/;/\\;/g;
814     $str =~ s/,/\\,/g;
815     $str =~ s/\n/\\n/g;
816     $str =~ s/=/\\=/g;
817    
818     return $str;
819     }
820    
821     END
822     {
823     if (defined($starter_in))
824     {
825     close($starter_in);
826     }
827     if (defined($starter_out))
828     {
829     close($starter_out);
830     }
831     }
832     1;

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28