1 |
# Copyright 1999-2006 University of Chicago |
2 |
# |
3 |
# Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
# you may not use this file except in compliance with the License. |
5 |
# You may obtain a copy of the License at |
6 |
# |
7 |
# http://www.apache.org/licenses/LICENSE-2.0 |
8 |
# |
9 |
# Unless required by applicable law or agreed to in writing, software |
10 |
# distributed under the License is distributed on an "AS IS" BASIS, |
11 |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 |
# See the License for the specific language governing permissions and |
13 |
# limitations under the License. |
14 |
|
15 |
# Globus::GRAM::JobManager::fork package |
16 |
# |
17 |
# CVS Information: |
18 |
# $Source: /home/globdev/CVS/globus-packages/gram/jobmanager/lrms/fork/source/fork.pm,v $ |
19 |
# $Date: 2011/08/19 14:33:18 $ |
20 |
# $Revision: 1.3 $ |
21 |
# $Author: bester $ |
22 |
|
23 |
use Globus::GRAM::Error; |
24 |
use Globus::GRAM::JobState; |
25 |
use Globus::GRAM::JobManager; |
26 |
use Globus::GRAM::StdioMerger; |
27 |
use Globus::Core::Paths; |
28 |
use Globus::Core::Config; |
29 |
|
30 |
use Config; |
31 |
use IPC::Open2; |
32 |
|
33 |
package Globus::GRAM::JobManager::fork; |
34 |
|
35 |
@ISA = qw(Globus::GRAM::JobManager); |
36 |
|
37 |
my ($mpirun, $mpiexec, $log_path); |
38 |
my ($starter_in, $starter_out, $starter_index) = (undef, undef, 0); |
39 |
my %signo; |
40 |
|
41 |
BEGIN |
42 |
{ |
43 |
my $i = 0; |
44 |
|
45 |
foreach (split(' ', $Config::Config{sig_name})) |
46 |
{ |
47 |
$signo{$_} = $i++; |
48 |
} |
49 |
|
50 |
my $config = new Globus::Core::Config( |
51 |
'${sysconfdir}/globus/globus-fork.conf'); |
52 |
|
53 |
$mpirun = $config->get_attribute("mpirun") || "no"; |
54 |
if ($mpirun ne "no" && ! -x $mpirun) |
55 |
{ |
56 |
$mpirun = "no"; |
57 |
} |
58 |
$mpiexec = $config->get_attribute("mpiexec") || "no"; |
59 |
if ($mpiexec ne "no" && ! -x $mpiexec) |
60 |
{ |
61 |
$mpiexec = "no"; |
62 |
} |
63 |
$softenv_dir = $config->get_attribute("softenv_dir") || ""; |
64 |
$log_path = $config->get_attribute("log_path") || "/dev/null"; |
65 |
} |
66 |
|
67 |
sub new |
68 |
{ |
69 |
my $proto = shift; |
70 |
my $class = ref($proto) || $proto; |
71 |
my $self = $class->SUPER::new(@_); |
72 |
my $description = $self->{JobDescription}; |
73 |
my $stdout = $description->stdout(); |
74 |
my $stderr = $description->stderr(); |
75 |
|
76 |
if($description->jobtype() eq 'multiple' && $description->count > 1) |
77 |
{ |
78 |
$self->{STDIO_MERGER} = |
79 |
new Globus::GRAM::StdioMerger($self->job_dir(), $stdout, $stderr); |
80 |
} |
81 |
else |
82 |
{ |
83 |
$self->{STDIO_MERGER} = 0; |
84 |
} |
85 |
|
86 |
return $self; |
87 |
} |
88 |
|
89 |
sub submit |
90 |
{ |
91 |
my $self = shift; |
92 |
my $cmd; |
93 |
my $pid; |
94 |
my $pgid; |
95 |
my @job_id; |
96 |
my $count; |
97 |
my $multi_output = 0; |
98 |
my $description = $self->{JobDescription}; |
99 |
my $pipe; |
100 |
my @cmdline; |
101 |
my @environment; |
102 |
my @arguments; |
103 |
my $fork_starter = "$Globus::Core::Paths::sbindir/globus-fork-starter"; |
104 |
my $streamer = "$Globus::Core::Paths::sbindir/globus-gram-streamer"; |
105 |
my $fork_conf = "$Globus::Core::Paths::sysconfdir/globus-fork.conf"; |
106 |
my $is_grid_monitor = 0; |
107 |
my $soft_msc = "$softenv_dir/bin/soft-msc"; |
108 |
my $softenv_load = "$softenv_dir/etc/softenv-load.sh"; |
109 |
|
110 |
$starter_index++; |
111 |
|
112 |
if(!defined($description->directory())) |
113 |
{ |
114 |
return Globus::GRAM::Error::RSL_DIRECTORY; |
115 |
} |
116 |
if ($description->directory() =~ m|^[^/]|) { |
117 |
$description->add("directory", |
118 |
$ENV{HOME} . '/' . $description->directory()); |
119 |
} |
120 |
chdir $description->directory() or |
121 |
return Globus::GRAM::Error::BAD_DIRECTORY; |
122 |
|
123 |
@environment = $description->environment(); |
124 |
foreach $tuple ($description->environment()) |
125 |
{ |
126 |
if(!ref($tuple) || scalar(@$tuple) != 2) |
127 |
{ |
128 |
return Globus::GRAM::Error::RSL_ENVIRONMENT(); |
129 |
} |
130 |
$CHILD_ENV{$tuple->[0]} = $tuple->[1]; |
131 |
} |
132 |
|
133 |
if(ref($description->count()) || |
134 |
$description->count() != int($description->count())) |
135 |
{ |
136 |
return Globus::GRAM::Error::INVALID_COUNT(); |
137 |
} |
138 |
if($description->jobtype() eq 'multiple') |
139 |
{ |
140 |
$count = $description->count(); |
141 |
$multi_output = 1 if $count > 1; |
142 |
} |
143 |
elsif($description->jobtype() eq 'single') |
144 |
{ |
145 |
$count = 1; |
146 |
} |
147 |
elsif($description->jobtype() eq 'mpi' && $mpiexec ne 'no') |
148 |
{ |
149 |
$count = 1; |
150 |
@cmdline = ($mpiexec, '-n', $description->count()); |
151 |
} |
152 |
elsif($description->jobtype() eq 'mpi' && $mpirun ne 'no') |
153 |
{ |
154 |
$count = 1; |
155 |
@cmdline = ($mpirun, '-np', $description->count()); |
156 |
} |
157 |
else |
158 |
{ |
159 |
return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED(); |
160 |
} |
161 |
if( $description->executable eq "") |
162 |
{ |
163 |
return Globus::GRAM::Error::RSL_EXECUTABLE(); |
164 |
} |
165 |
#elsif(! -e $description->executable()) |
166 |
#{ |
167 |
# return Globus::GRAM::Error::EXECUTABLE_NOT_FOUND(); |
168 |
#} |
169 |
#elsif( (! -x $description->executable()) |
170 |
# || (! -f $description->executable())) |
171 |
#{ |
172 |
# return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS(); |
173 |
#} |
174 |
elsif( $description->stdin() eq "") |
175 |
{ |
176 |
return Globus::GRAM::Error::RSL_STDIN; |
177 |
} |
178 |
elsif(! -r $description->stdin()) |
179 |
{ |
180 |
return Globus::GRAM::Error::STDIN_NOT_FOUND(); |
181 |
} |
182 |
|
183 |
# ugly auth hack here |
184 |
my ($gridid, $peeraddr); |
185 |
foreach my $tuple ($description->environment()) { |
186 |
$peeraddr = $tuple->[1] if $tuple->[0] eq "GATEKEEPER_PEER"; |
187 |
$gridid = $tuple->[1] if $tuple->[0] eq "GRID_ID"; |
188 |
} |
189 |
# example, but DO NOT USE in real production as it breaks the LCG-CE i/f |
190 |
#if ( $gridid !~ m:^/O=dutchgrid/O=users/O=nikhef/CN=: and |
191 |
# $gridid !~ m:^/DC=org/DC=terena/DC=tcs/C=NL/O=Nikhef/CN=: |
192 |
# ) { |
193 |
# return Globus::GRAM::Error::AUTHORIZATION_DENIED(); |
194 |
#} |
195 |
|
196 |
#if ($description->executable() =~ m:^(/|\.):) { |
197 |
push(@cmdline, $description->executable()); |
198 |
#} else { |
199 |
# push(@cmdline, |
200 |
# $description->directory() |
201 |
# . '/' |
202 |
# . $description->executable()); |
203 |
#} |
204 |
|
205 |
# Check if this is the Condor-G grid monitor |
206 |
my $exec = $description->executable(); |
207 |
my $file_out = `/usr/bin/file $exec`; |
208 |
if ( $file_out =~ /script/ || $file_out =~ /text/ || |
209 |
$file_out =~ m|/usr/bin/env| ) { |
210 |
open( EXEC, "<$exec" ) or |
211 |
return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS(); |
212 |
while( <EXEC> ) { |
213 |
if ( /Sends results from the grid_manager_monitor_agent back to a/ ) { |
214 |
$is_grid_monitor = 1; |
215 |
} |
216 |
} |
217 |
close( EXEC ); |
218 |
} |
219 |
|
220 |
# Reject jobs that want streaming, if so configured, but not for |
221 |
# grid monitor jobs |
222 |
if ( $description->streamingrequested() && |
223 |
$description->streamingdisabled() && !$is_grid_monitor ) { |
224 |
|
225 |
$self->log("Streaming is not allowed."); |
226 |
return Globus::GRAM::Error::OPENING_STDOUT; |
227 |
} |
228 |
|
229 |
@arguments = $description->arguments(); |
230 |
foreach(@arguments) |
231 |
{ |
232 |
if(ref($_)) |
233 |
{ |
234 |
return Globus::GRAM::Error::RSL_ARGUMENTS; |
235 |
} |
236 |
} |
237 |
if ($#arguments >= 0) |
238 |
{ |
239 |
push(@cmdline, @arguments); |
240 |
} |
241 |
|
242 |
if ($description->use_fork_starter() && -x $fork_starter) |
243 |
{ |
244 |
if (!defined($starter_in) && !defined($starter_out)) |
245 |
{ |
246 |
$pid = IPC::Open2::open2($starter_out, $starter_in, |
247 |
"$fork_starter $log_path"); |
248 |
my $oldfh = select $starter_out; |
249 |
$|=1; |
250 |
select $oldfh; |
251 |
} |
252 |
|
253 |
print $starter_in "100;perl-fork-start-$$-$starter_index;"; |
254 |
|
255 |
print $starter_in 'directory='. |
256 |
&escape_for_starter($description->directory()) . ';'; |
257 |
|
258 |
if (keys %CHILD_ENV > 0) { |
259 |
print $starter_in 'environment='. |
260 |
join(',', map { &escape_for_starter($_) |
261 |
.'='.&escape_for_starter($CHILD_ENV{$_}) |
262 |
} (keys %CHILD_ENV)) . ';'; |
263 |
} |
264 |
|
265 |
print $starter_in "count=$count;"; |
266 |
|
267 |
my @softenv = $description->softenv(); |
268 |
my $enable_default_software_environment |
269 |
= $description->enable_default_software_environment(); |
270 |
if ( ($softenv_dir ne '') |
271 |
&& (@softenv || $enable_default_software_environment)) |
272 |
{ |
273 |
### SoftEnv extension ### |
274 |
$cmd_script_name = $self->job_dir() . '/scheduler_fork_cmd_script'; |
275 |
local(*CMD); |
276 |
open( CMD, '>' . $cmd_script_name ); |
277 |
|
278 |
print CMD "#!/bin/sh\n"; |
279 |
|
280 |
$self->setup_softenv( |
281 |
$self->job_dir() . '/fork_softenv_cmd_script', |
282 |
$soft_msc, |
283 |
$softenv_load, |
284 |
*CMD); |
285 |
|
286 |
print CMD 'cd ', $description->directory(), "\n"; |
287 |
print CMD "@cmdline\n"; |
288 |
print CMD "exit \$?\n"; |
289 |
|
290 |
close(CMD); |
291 |
chmod 0700, $cmd_script_name; |
292 |
|
293 |
print $starter_in 'executable=' . |
294 |
&escape_for_starter($cmd_script_name). ';'; |
295 |
print $starter_in 'arguments=;'; |
296 |
######################### |
297 |
} |
298 |
else |
299 |
{ |
300 |
print $starter_in 'executable=' . |
301 |
&escape_for_starter($cmdline[0]). ';'; |
302 |
shift @cmdline; |
303 |
if ($#cmdline >= 0) |
304 |
{ |
305 |
print $starter_in 'arguments=' . |
306 |
join(',', map {&escape_for_starter($_)} @cmdline) . |
307 |
';'; |
308 |
} |
309 |
} |
310 |
|
311 |
my @job_stdout; |
312 |
my @job_stderr; |
313 |
|
314 |
for ($i = 0; $i < $count; $i++) { |
315 |
if($multi_output) |
316 |
{ |
317 |
push(@job_stdout, $self->{STDIO_MERGER}->add_file('out')); |
318 |
push(@job_stderr, $self->{STDIO_MERGER}->add_file('err')); |
319 |
} |
320 |
else |
321 |
{ |
322 |
if (defined($description->stdout)) { |
323 |
push(@job_stdout, $description->stdout()); |
324 |
} else { |
325 |
push(@job_stdout, '/dev/null'); |
326 |
} |
327 |
|
328 |
if (defined($description->stderr)) { |
329 |
push(@job_stderr, $description->stderr()); |
330 |
} else { |
331 |
push(@job_stderr, '/dev/null'); |
332 |
} |
333 |
} |
334 |
} |
335 |
|
336 |
print $starter_in "stdin=" . &escape_for_starter($description->stdin()). |
337 |
';'; |
338 |
print $starter_in "stdout=" . |
339 |
join(',', map {&escape_for_starter($_)} @job_stdout) . ';'; |
340 |
print $starter_in "stderr=" . |
341 |
join(',', map {&escape_for_starter($_)} @job_stderr) . "\n"; |
342 |
|
343 |
while (<$starter_out>) { |
344 |
chomp; |
345 |
my @res = split(/;/, $_); |
346 |
|
347 |
if ($res[1] ne "perl-fork-start-$$-$starter_index") { |
348 |
next; |
349 |
} |
350 |
if ($res[0] == '101') { |
351 |
@job_id = split(',', $res[2]); |
352 |
last; |
353 |
} elsif ($res[0] == '102') { |
354 |
$self->respond({GT3_FAILURE_MESSAGE => "starter: $res[3]" }); |
355 |
return new Globus::GRAM::Error($res[2]); |
356 |
} |
357 |
} |
358 |
|
359 |
if ($is_grid_monitor && -x $streamer) |
360 |
{ |
361 |
my $streamer_startup=''; |
362 |
|
363 |
$starter_index++; |
364 |
$streamer_startup .= "100;perl-fork-start-$$-$starter_index;"; |
365 |
$streamer_startup .= 'directory='.$self->job_dir().';'; |
366 |
|
367 |
if (keys %CHILD_ENV > 0) { |
368 |
$streamer_startup .= 'environment='. |
369 |
join(',', map { &escape_for_starter($_) |
370 |
.'='.&escape_for_starter($CHILD_ENV{$_}) |
371 |
} (keys %CHILD_ENV)) . ';'; |
372 |
} |
373 |
|
374 |
$streamer_startup .= "count=1;"; |
375 |
|
376 |
$streamer_startup .= 'executable=' . &escape_for_starter($streamer) |
377 |
. ';'; |
378 |
@cmdline = ('-s', $description->state_file()); |
379 |
foreach my $p (@job_id) |
380 |
{ |
381 |
my $q = $p; |
382 |
# strip leading uuid |
383 |
$q =~ s/.*://; |
384 |
push(@cmdline, '-p', $q); |
385 |
} |
386 |
$streamer_startup .= 'arguments=' . |
387 |
join(',', map {&escape_for_starter($_)} @cmdline) . |
388 |
';'; |
389 |
|
390 |
$streamer_startup .= "stdin=/dev/null;"; |
391 |
$streamer_startup .= "stdout=gram_streamer_out;"; |
392 |
$streamer_startup .= "stderr=gram_streamer_err\n"; |
393 |
|
394 |
$self->log("streamer_startup is $streamer_startup"); |
395 |
print $starter_in $streamer_startup; |
396 |
|
397 |
while (<$starter_out>) { |
398 |
chomp; |
399 |
my @res = split(/;/, $_); |
400 |
|
401 |
if ($res[1] ne "perl-fork-start-$$-$starter_index") { |
402 |
next; |
403 |
} |
404 |
if ($res[0] == '101') { |
405 |
@job_id = (@job_id, split(',', $res[2])); |
406 |
last; |
407 |
} elsif ($res[0] == '102') { |
408 |
$self->respond({GT3_FAILURE_MESSAGE => "starter: $res[3]" }); |
409 |
return new Globus::GRAM::Error($res[2]); |
410 |
} |
411 |
} |
412 |
} |
413 |
$description->add('jobid', join(',', @job_id)); |
414 |
return { JOB_STATE => Globus::GRAM::JobState::ACTIVE, |
415 |
JOB_ID => join(',', @job_id) }; |
416 |
} else { |
417 |
my $starter_pid; |
418 |
local(*READER,*WRITER); # always use local on perl FDs |
419 |
pipe(READER, WRITER); |
420 |
|
421 |
$starter_pid = fork(); |
422 |
|
423 |
if (! defined($starter_pid)) |
424 |
{ |
425 |
$failure_code = "fork:$!"; |
426 |
$self->respond({GT3_FAILURE_MESSAGE => $failure_code }); |
427 |
return Globus::GRAM::Error::JOB_EXECUTION_FAILED; |
428 |
} |
429 |
elsif ($starter_pid == 0) |
430 |
{ |
431 |
# Starter Process |
432 |
close(READER); |
433 |
|
434 |
local(*JOB_READER, *JOB_WRITER); |
435 |
pipe(JOB_READER, JOB_WRITER); |
436 |
|
437 |
for(my $i = 0; $i < $count; $i++) |
438 |
{ |
439 |
if($multi_output) |
440 |
{ |
441 |
$job_stdout = $self->{STDIO_MERGER}->add_file('out'); |
442 |
$job_stderr = $self->{STDIO_MERGER}->add_file('err'); |
443 |
} |
444 |
else |
445 |
{ |
446 |
$job_stdout = $description->stdout(); |
447 |
$job_stderr = $description->stderr(); |
448 |
} |
449 |
|
450 |
# obtain plain old pipe into temporary variables |
451 |
local $^F = 2; # assure close-on-exec for pipe FDs |
452 |
select((select(WRITER),$|=1)[$[]); |
453 |
|
454 |
if( ($pid=fork()) == 0) |
455 |
{ |
456 |
close(JOB_READER); |
457 |
|
458 |
# forked child |
459 |
%ENV = %CHILD_ENV; |
460 |
|
461 |
close(STDIN); |
462 |
close(STDOUT); |
463 |
close(STDERR); |
464 |
|
465 |
open(STDIN, '<' . $description->stdin()); |
466 |
open(STDOUT, ">>$job_stdout"); |
467 |
open(STDERR, ">>$job_stderr"); |
468 |
|
469 |
# the below should never fail since we just forked |
470 |
setpgrp(0,$$); |
471 |
|
472 |
if ( ! exec (@cmdline) ) |
473 |
{ |
474 |
my $err = "$!\n"; |
475 |
$SIG{PIPE} = 'IGNORE'; |
476 |
print JOB_WRITER "$err"; |
477 |
close(JOB_WRITER); |
478 |
exit(1); |
479 |
} |
480 |
} |
481 |
else |
482 |
{ |
483 |
my $error_code = ''; |
484 |
|
485 |
if ($pid == undef) |
486 |
{ |
487 |
$self->log("fork failed\n"); |
488 |
$failure_code = "fork: $!"; |
489 |
} |
490 |
close(JOB_WRITER); |
491 |
|
492 |
$_ = <JOB_READER>; |
493 |
close(JOB_READER); |
494 |
|
495 |
if($_ ne '') |
496 |
{ |
497 |
chomp($_); |
498 |
$self->log("exec failed\n"); |
499 |
$failure_code = "exec: $_"; |
500 |
} |
501 |
|
502 |
if ($failure_code ne '') |
503 |
{ |
504 |
# fork or exec failed. kill rest of job and return an error |
505 |
$failure_code =~ s/\n/\\n/g; |
506 |
foreach(@job_id) |
507 |
{ |
508 |
$pgid = getpgrp($_); |
509 |
|
510 |
$pgid == -1 ? kill($signo{TERM}, $_) : |
511 |
kill(-$signo{TERM}, $pgid); |
512 |
|
513 |
sleep(5); |
514 |
|
515 |
$pgid == -1 ? kill($signo{KILL}, $_) : |
516 |
kill(-$signo{KILL}, $pgid); |
517 |
|
518 |
} |
519 |
|
520 |
local(*ERR); |
521 |
open(ERR, '>' . $description->stderr()); |
522 |
print ERR "$failure_code\n"; |
523 |
close(ERR); |
524 |
|
525 |
print WRITER "FAIL:$failure_code\n"; |
526 |
exit(1); |
527 |
} |
528 |
push(@job_id, $pid); |
529 |
} |
530 |
} |
531 |
if ($is_grid_monitor) |
532 |
{ |
533 |
# Create an extra process to stream output for grid monitor |
534 |
|
535 |
# obtain plain old pipe into temporary variables |
536 |
local $^F = 2; # assure close-on-exec for pipe FDs |
537 |
select((select(WRITER),$|=1)[$[]); |
538 |
|
539 |
if( ($pid=fork()) == 0) |
540 |
{ |
541 |
close(JOB_READER); |
542 |
|
543 |
# forked child |
544 |
%ENV = %CHILD_ENV; |
545 |
|
546 |
close(STDIN); |
547 |
close(STDOUT); |
548 |
close(STDERR); |
549 |
|
550 |
chdir $self->job_dir(); |
551 |
|
552 |
open(STDIN, '<' . $description->stdin()); |
553 |
open(STDOUT, '>gram_streamer_out'); |
554 |
open(STDERR, '>gram_streamer_error'); |
555 |
select STDERR; $| = 1; # make unbuffered |
556 |
select STDOUT; $| = 1; # make unbuffered |
557 |
|
558 |
|
559 |
# the below should never fail since we just forked |
560 |
setpgrp(0,$$); |
561 |
|
562 |
@cmdline = ($streamer, '-s', $description->state_file()); |
563 |
foreach my $p (@job_id) |
564 |
{ |
565 |
push(@cmdline, '-p', $p); |
566 |
} |
567 |
|
568 |
if ( ! exec (@cmdline) ) |
569 |
{ |
570 |
my $err = "$!\n"; |
571 |
$SIG{PIPE} = 'IGNORE'; |
572 |
print JOB_WRITER "$err"; |
573 |
close(JOB_WRITER); |
574 |
exit(1); |
575 |
} |
576 |
} |
577 |
else |
578 |
{ |
579 |
my $error_code = ''; |
580 |
|
581 |
if ($pid == undef) |
582 |
{ |
583 |
$self->log("fork failed\n"); |
584 |
$failure_code = "fork: $!"; |
585 |
} |
586 |
close(JOB_WRITER); |
587 |
|
588 |
$_ = <JOB_READER>; |
589 |
close(JOB_READER); |
590 |
|
591 |
if($_ ne '') |
592 |
{ |
593 |
chomp($_); |
594 |
$self->log("exec failed\n"); |
595 |
$failure_code = "exec: $_"; |
596 |
} |
597 |
|
598 |
if ($failure_code ne '') |
599 |
{ |
600 |
# fork or exec failed. kill rest of job and return an error |
601 |
$failure_code =~ s/\n/\\n/g; |
602 |
foreach(@job_id) |
603 |
{ |
604 |
$pgid = getpgrp($_); |
605 |
|
606 |
$pgid == -1 ? kill($signo{TERM}, $_) : |
607 |
kill(-$signo{TERM}, $pgid); |
608 |
|
609 |
sleep(5); |
610 |
|
611 |
$pgid == -1 ? kill($signo{KILL}, $_) : |
612 |
kill(-$signo{KILL}, $pgid); |
613 |
|
614 |
} |
615 |
|
616 |
local(*ERR); |
617 |
open(ERR, '>' . $description->stderr()); |
618 |
print ERR "$failure_code\n"; |
619 |
close(ERR); |
620 |
|
621 |
print WRITER "FAIL:$failure_code\n"; |
622 |
exit(1); |
623 |
} |
624 |
push(@job_id, $pid); |
625 |
} |
626 |
} |
627 |
print WRITER "SUCCESS:" . join(',', @job_id) . "\n"; |
628 |
exit(0); |
629 |
} |
630 |
else |
631 |
{ |
632 |
my ($res, $value); |
633 |
close(WRITER); |
634 |
$_ = <READER>; |
635 |
close(READER); |
636 |
chomp($_); |
637 |
waitpid($starter_pid, 0); |
638 |
($res, $value) = split(/:/, $_, 2); |
639 |
|
640 |
if ($res eq 'SUCCESS') |
641 |
{ |
642 |
$description->add('jobid', $value); |
643 |
return { JOB_STATE => Globus::GRAM::JobState::ACTIVE, |
644 |
JOB_ID => $value }; |
645 |
} |
646 |
elsif ($res eq 'FAIL') |
647 |
{ |
648 |
$self->respond({GT3_FAILURE_MESSAGE => "$value" }); |
649 |
return Globus::GRAM::Error::JOB_EXECUTION_FAILED; |
650 |
} |
651 |
else |
652 |
{ |
653 |
return Globus::GRAM::Error::JOB_EXECUTION_FAILED; |
654 |
} |
655 |
} |
656 |
} |
657 |
} |
658 |
|
659 |
sub poll |
660 |
{ |
661 |
my $self = shift; |
662 |
my $description = $self->{JobDescription}; |
663 |
my $state; |
664 |
|
665 |
my $jobid = $description->jobid(); |
666 |
|
667 |
if(!defined $jobid) |
668 |
{ |
669 |
$self->log("poll: job id defined!"); |
670 |
return { JOB_STATE => Globus::GRAM::JobState::FAILED }; |
671 |
} |
672 |
|
673 |
$self->log("polling job " . $jobid); |
674 |
$_ = kill(0, split(/,/, $jobid)); |
675 |
|
676 |
if($_ > 0) |
677 |
{ |
678 |
$state = Globus::GRAM::JobState::ACTIVE; |
679 |
} |
680 |
else |
681 |
{ |
682 |
$state = Globus::GRAM::JobState::DONE; |
683 |
} |
684 |
if($self->{STDIO_MERGER}) |
685 |
{ |
686 |
$self->{STDIO_MERGER}->poll($state == Globus::GRAM::JobState::DONE); |
687 |
} |
688 |
|
689 |
return { JOB_STATE => $state }; |
690 |
} |
691 |
|
692 |
sub cancel |
693 |
{ |
694 |
my $self = shift; |
695 |
my $description = $self->{JobDescription}; |
696 |
my $pgid; |
697 |
my $jobid = $description->jobid(); |
698 |
|
699 |
if(!defined $jobid) |
700 |
{ |
701 |
$self->log("cancel: no jobid defined!"); |
702 |
return { JOB_STATE => Globus::GRAM::JobState::FAILED }; |
703 |
} |
704 |
|
705 |
$self->log("cancel job " . $jobid); |
706 |
|
707 |
foreach (split(/,/,$jobid)) |
708 |
{ |
709 |
s/..*://; |
710 |
$pgid = getpgrp($_); |
711 |
|
712 |
$pgid == -1 ? kill($signo{TERM}, $_) : |
713 |
kill(-$signo{TERM}, $pgid); |
714 |
|
715 |
sleep(5); |
716 |
|
717 |
$pgid == -1 ? kill($signo{KILL}, $_) : |
718 |
kill(-$signo{KILL}, $pgid); |
719 |
} |
720 |
|
721 |
return { JOB_STATE => Globus::GRAM::JobState::FAILED }; |
722 |
} |
723 |
|
724 |
sub stage_out |
725 |
{ |
726 |
my $self = shift; |
727 |
my $description = $self->{JobDescription}; |
728 |
my $is_grid_monitor = 0; |
729 |
my $job_dir = $self->job_dir(); |
730 |
my $rc; |
731 |
my $line; |
732 |
my @fields; |
733 |
my $stream_out; |
734 |
|
735 |
$self->log("Fork stage out"); |
736 |
# Check if this is the Condor-G grid monitor |
737 |
my $exec = $description->executable(); |
738 |
my $file_out = `/usr/bin/file $exec`; |
739 |
if ( $file_out =~ /script/ || $file_out =~ /text/ || |
740 |
$file_out =~ m|/usr/bin/env| ) { |
741 |
open( EXEC, "<$exec" ) or |
742 |
return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS(); |
743 |
while( <EXEC> ) { |
744 |
if ( /Sends results from the grid_manager_monitor_agent back to a/ ) { |
745 |
$is_grid_monitor = 1; |
746 |
} |
747 |
} |
748 |
close( EXEC ); |
749 |
} |
750 |
|
751 |
if ($is_grid_monitor) |
752 |
{ |
753 |
$self->log("Fork stage out is grid monitor"); |
754 |
local(*STREAMER_ERROR, *STREAMER_OUTPUT); |
755 |
|
756 |
$rc = open(STREAMER_ERROR, "<$job_dir/gram_streamer_error"); |
757 |
if (!$rc) |
758 |
{ |
759 |
$self->log("Error opening gram_streamer_error: $!"); |
760 |
} |
761 |
chomp($line = <STREAMER_ERROR>); |
762 |
if ($line eq '') |
763 |
{ |
764 |
$self->log("No error from streamer"); |
765 |
} |
766 |
else |
767 |
{ |
768 |
$self->log("Error from streamer $line"); |
769 |
@fields = split(':', $line, 2); |
770 |
|
771 |
$self->respond({GT3_FAILURE_MESSAGE => "$fields[0]" }); |
772 |
return new Globus::GRAM::Error($fields[1]); |
773 |
} |
774 |
close(STREAMER_ERROR); |
775 |
|
776 |
$stream_out = $description->get('file_stream_out'); |
777 |
$rc = open(STREAMER_OUTPUT, "<$job_dir/gram_streamer_out"); |
778 |
if (!$rc) |
779 |
{ |
780 |
$self->log("Error opening gram_streamer_output: $!"); |
781 |
} |
782 |
else |
783 |
{ |
784 |
while ($line = <STREAMER_OUTPUT>) |
785 |
{ |
786 |
chomp($line); |
787 |
$self->log("Streamer output: $line"); |
788 |
my ($from, $to) = split(' ', $line); |
789 |
|
790 |
for (my $i = 0; $i < scalar(@{$stream_out}); $i++) |
791 |
{ |
792 |
my $pair = $stream_out->[$i]; |
793 |
if ($pair->[0] eq $from && $pair->[1] eq $to) |
794 |
{ |
795 |
splice(@{$stream_out}, $i, 1); |
796 |
last; |
797 |
} |
798 |
} |
799 |
|
800 |
$self->respond({'STAGED_STREAM' => "$from $to"}); |
801 |
} |
802 |
$description->add('filestreamout', $stream_out); |
803 |
} |
804 |
} |
805 |
return $self->SUPER::stage_out(); |
806 |
} |
807 |
|
808 |
sub escape_for_starter |
809 |
{ |
810 |
my $str = shift; |
811 |
|
812 |
$str =~ s/\\/\\\\/g; |
813 |
$str =~ s/;/\\;/g; |
814 |
$str =~ s/,/\\,/g; |
815 |
$str =~ s/\n/\\n/g; |
816 |
$str =~ s/=/\\=/g; |
817 |
|
818 |
return $str; |
819 |
} |
820 |
|
821 |
END |
822 |
{ |
823 |
if (defined($starter_in)) |
824 |
{ |
825 |
close($starter_in); |
826 |
} |
827 |
if (defined($starter_out)) |
828 |
{ |
829 |
close($starter_out); |
830 |
} |
831 |
} |
832 |
1; |