1 |
davidg |
2548 |
# Copyright 1999-2006 University of Chicago |
2 |
|
|
# |
3 |
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
|
|
# you may not use this file except in compliance with the License. |
5 |
|
|
# You may obtain a copy of the License at |
6 |
|
|
# |
7 |
|
|
# http://www.apache.org/licenses/LICENSE-2.0 |
8 |
|
|
# |
9 |
|
|
# Unless required by applicable law or agreed to in writing, software |
10 |
|
|
# distributed under the License is distributed on an "AS IS" BASIS, |
11 |
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 |
|
|
# See the License for the specific language governing permissions and |
13 |
|
|
# limitations under the License. |
14 |
|
|
|
15 |
|
|
# Globus::GRAM::JobManager::fork package |
16 |
|
|
# |
17 |
|
|
# CVS Information: |
18 |
|
|
# $Source: /home/globdev/CVS/globus-packages/gram/jobmanager/lrms/fork/source/fork.pm,v $ |
19 |
|
|
# $Date: 2011/08/19 14:33:18 $ |
20 |
|
|
# $Revision: 1.3 $ |
21 |
|
|
# $Author: bester $ |
22 |
|
|
|
23 |
|
|
use Globus::GRAM::Error; |
24 |
|
|
use Globus::GRAM::JobState; |
25 |
|
|
use Globus::GRAM::JobManager; |
26 |
|
|
use Globus::GRAM::StdioMerger; |
27 |
|
|
use Globus::Core::Paths; |
28 |
|
|
use Globus::Core::Config; |
29 |
|
|
|
30 |
|
|
use Config; |
31 |
|
|
use IPC::Open2; |
32 |
|
|
|
33 |
|
|
package Globus::GRAM::JobManager::fork; |
34 |
|
|
|
35 |
|
|
@ISA = qw(Globus::GRAM::JobManager); |
36 |
|
|
|
37 |
|
|
my ($mpirun, $mpiexec, $log_path); |
38 |
|
|
my ($starter_in, $starter_out, $starter_index) = (undef, undef, 0); |
39 |
|
|
my %signo; |
40 |
|
|
|
41 |
|
|
BEGIN |
42 |
|
|
{ |
43 |
|
|
my $i = 0; |
44 |
|
|
|
45 |
|
|
foreach (split(' ', $Config::Config{sig_name})) |
46 |
|
|
{ |
47 |
|
|
$signo{$_} = $i++; |
48 |
|
|
} |
49 |
|
|
|
50 |
|
|
my $config = new Globus::Core::Config( |
51 |
|
|
'${sysconfdir}/globus/globus-fork.conf'); |
52 |
|
|
|
53 |
|
|
$mpirun = $config->get_attribute("mpirun") || "no"; |
54 |
|
|
if ($mpirun ne "no" && ! -x $mpirun) |
55 |
|
|
{ |
56 |
|
|
$mpirun = "no"; |
57 |
|
|
} |
58 |
|
|
$mpiexec = $config->get_attribute("mpiexec") || "no"; |
59 |
|
|
if ($mpiexec ne "no" && ! -x $mpiexec) |
60 |
|
|
{ |
61 |
|
|
$mpiexec = "no"; |
62 |
|
|
} |
63 |
|
|
$softenv_dir = $config->get_attribute("softenv_dir") || ""; |
64 |
|
|
$log_path = $config->get_attribute("log_path") || "/dev/null"; |
65 |
|
|
} |
66 |
|
|
|
67 |
|
|
sub new |
68 |
|
|
{ |
69 |
|
|
my $proto = shift; |
70 |
|
|
my $class = ref($proto) || $proto; |
71 |
|
|
my $self = $class->SUPER::new(@_); |
72 |
|
|
my $description = $self->{JobDescription}; |
73 |
|
|
my $stdout = $description->stdout(); |
74 |
|
|
my $stderr = $description->stderr(); |
75 |
|
|
|
76 |
|
|
if($description->jobtype() eq 'multiple' && $description->count > 1) |
77 |
|
|
{ |
78 |
|
|
$self->{STDIO_MERGER} = |
79 |
|
|
new Globus::GRAM::StdioMerger($self->job_dir(), $stdout, $stderr); |
80 |
|
|
} |
81 |
|
|
else |
82 |
|
|
{ |
83 |
|
|
$self->{STDIO_MERGER} = 0; |
84 |
|
|
} |
85 |
|
|
|
86 |
|
|
return $self; |
87 |
|
|
} |
88 |
|
|
|
89 |
|
|
sub submit |
90 |
|
|
{ |
91 |
|
|
my $self = shift; |
92 |
|
|
my $cmd; |
93 |
|
|
my $pid; |
94 |
|
|
my $pgid; |
95 |
|
|
my @job_id; |
96 |
|
|
my $count; |
97 |
|
|
my $multi_output = 0; |
98 |
|
|
my $description = $self->{JobDescription}; |
99 |
|
|
my $pipe; |
100 |
|
|
my @cmdline; |
101 |
|
|
my @environment; |
102 |
|
|
my @arguments; |
103 |
|
|
my $fork_starter = "$Globus::Core::Paths::sbindir/globus-fork-starter"; |
104 |
|
|
my $streamer = "$Globus::Core::Paths::sbindir/globus-gram-streamer"; |
105 |
|
|
my $fork_conf = "$Globus::Core::Paths::sysconfdir/globus-fork.conf"; |
106 |
|
|
my $is_grid_monitor = 0; |
107 |
|
|
my $soft_msc = "$softenv_dir/bin/soft-msc"; |
108 |
|
|
my $softenv_load = "$softenv_dir/etc/softenv-load.sh"; |
109 |
|
|
|
110 |
|
|
$starter_index++; |
111 |
|
|
|
112 |
|
|
if(!defined($description->directory())) |
113 |
|
|
{ |
114 |
|
|
return Globus::GRAM::Error::RSL_DIRECTORY; |
115 |
|
|
} |
116 |
|
|
if ($description->directory() =~ m|^[^/]|) { |
117 |
|
|
$description->add("directory", |
118 |
|
|
$ENV{HOME} . '/' . $description->directory()); |
119 |
|
|
} |
120 |
|
|
chdir $description->directory() or |
121 |
|
|
return Globus::GRAM::Error::BAD_DIRECTORY; |
122 |
|
|
|
123 |
|
|
@environment = $description->environment(); |
124 |
|
|
foreach $tuple ($description->environment()) |
125 |
|
|
{ |
126 |
|
|
if(!ref($tuple) || scalar(@$tuple) != 2) |
127 |
|
|
{ |
128 |
|
|
return Globus::GRAM::Error::RSL_ENVIRONMENT(); |
129 |
|
|
} |
130 |
|
|
$CHILD_ENV{$tuple->[0]} = $tuple->[1]; |
131 |
|
|
} |
132 |
|
|
|
133 |
|
|
if(ref($description->count()) || |
134 |
|
|
$description->count() != int($description->count())) |
135 |
|
|
{ |
136 |
|
|
return Globus::GRAM::Error::INVALID_COUNT(); |
137 |
|
|
} |
138 |
|
|
if($description->jobtype() eq 'multiple') |
139 |
|
|
{ |
140 |
|
|
$count = $description->count(); |
141 |
|
|
$multi_output = 1 if $count > 1; |
142 |
|
|
} |
143 |
|
|
elsif($description->jobtype() eq 'single') |
144 |
|
|
{ |
145 |
|
|
$count = 1; |
146 |
|
|
} |
147 |
|
|
elsif($description->jobtype() eq 'mpi' && $mpiexec ne 'no') |
148 |
|
|
{ |
149 |
|
|
$count = 1; |
150 |
|
|
@cmdline = ($mpiexec, '-n', $description->count()); |
151 |
|
|
} |
152 |
|
|
elsif($description->jobtype() eq 'mpi' && $mpirun ne 'no') |
153 |
|
|
{ |
154 |
|
|
$count = 1; |
155 |
|
|
@cmdline = ($mpirun, '-np', $description->count()); |
156 |
|
|
} |
157 |
|
|
else |
158 |
|
|
{ |
159 |
|
|
return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED(); |
160 |
|
|
} |
161 |
|
|
if( $description->executable eq "") |
162 |
|
|
{ |
163 |
|
|
return Globus::GRAM::Error::RSL_EXECUTABLE(); |
164 |
|
|
} |
165 |
|
|
#elsif(! -e $description->executable()) |
166 |
|
|
#{ |
167 |
|
|
# return Globus::GRAM::Error::EXECUTABLE_NOT_FOUND(); |
168 |
|
|
#} |
169 |
|
|
#elsif( (! -x $description->executable()) |
170 |
|
|
# || (! -f $description->executable())) |
171 |
|
|
#{ |
172 |
|
|
# return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS(); |
173 |
|
|
#} |
174 |
|
|
elsif( $description->stdin() eq "") |
175 |
|
|
{ |
176 |
|
|
return Globus::GRAM::Error::RSL_STDIN; |
177 |
|
|
} |
178 |
|
|
elsif(! -r $description->stdin()) |
179 |
|
|
{ |
180 |
|
|
return Globus::GRAM::Error::STDIN_NOT_FOUND(); |
181 |
|
|
} |
182 |
|
|
|
183 |
|
|
# ugly auth hack here |
184 |
|
|
my ($gridid, $peeraddr); |
185 |
|
|
foreach my $tuple ($description->environment()) { |
186 |
|
|
$peeraddr = $tuple->[1] if $tuple->[0] eq "GATEKEEPER_PEER"; |
187 |
|
|
$gridid = $tuple->[1] if $tuple->[0] eq "GRID_ID"; |
188 |
|
|
} |
189 |
|
|
# example, but DO NOT USE in real production as it breaks the LCG-CE i/f |
190 |
|
|
#if ( $gridid !~ m:^/O=dutchgrid/O=users/O=nikhef/CN=: and |
191 |
|
|
# $gridid !~ m:^/DC=org/DC=terena/DC=tcs/C=NL/O=Nikhef/CN=: |
192 |
|
|
# ) { |
193 |
|
|
# return Globus::GRAM::Error::AUTHORIZATION_DENIED(); |
194 |
|
|
#} |
195 |
|
|
|
196 |
|
|
#if ($description->executable() =~ m:^(/|\.):) { |
197 |
|
|
push(@cmdline, $description->executable()); |
198 |
|
|
#} else { |
199 |
|
|
# push(@cmdline, |
200 |
|
|
# $description->directory() |
201 |
|
|
# . '/' |
202 |
|
|
# . $description->executable()); |
203 |
|
|
#} |
204 |
|
|
|
205 |
|
|
# Check if this is the Condor-G grid monitor |
206 |
|
|
my $exec = $description->executable(); |
207 |
|
|
my $file_out = `/usr/bin/file $exec`; |
208 |
|
|
if ( $file_out =~ /script/ || $file_out =~ /text/ || |
209 |
|
|
$file_out =~ m|/usr/bin/env| ) { |
210 |
|
|
open( EXEC, "<$exec" ) or |
211 |
|
|
return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS(); |
212 |
|
|
while( <EXEC> ) { |
213 |
|
|
if ( /Sends results from the grid_manager_monitor_agent back to a/ ) { |
214 |
|
|
$is_grid_monitor = 1; |
215 |
|
|
} |
216 |
|
|
} |
217 |
|
|
close( EXEC ); |
218 |
|
|
} |
219 |
|
|
|
220 |
|
|
# Reject jobs that want streaming, if so configured, but not for |
221 |
|
|
# grid monitor jobs |
222 |
|
|
if ( $description->streamingrequested() && |
223 |
|
|
$description->streamingdisabled() && !$is_grid_monitor ) { |
224 |
|
|
|
225 |
|
|
$self->log("Streaming is not allowed."); |
226 |
|
|
return Globus::GRAM::Error::OPENING_STDOUT; |
227 |
|
|
} |
228 |
|
|
|
229 |
|
|
@arguments = $description->arguments(); |
230 |
|
|
foreach(@arguments) |
231 |
|
|
{ |
232 |
|
|
if(ref($_)) |
233 |
|
|
{ |
234 |
|
|
return Globus::GRAM::Error::RSL_ARGUMENTS; |
235 |
|
|
} |
236 |
|
|
} |
237 |
|
|
if ($#arguments >= 0) |
238 |
|
|
{ |
239 |
|
|
push(@cmdline, @arguments); |
240 |
|
|
} |
241 |
|
|
|
242 |
|
|
if ($description->use_fork_starter() && -x $fork_starter) |
243 |
|
|
{ |
244 |
|
|
if (!defined($starter_in) && !defined($starter_out)) |
245 |
|
|
{ |
246 |
|
|
$pid = IPC::Open2::open2($starter_out, $starter_in, |
247 |
|
|
"$fork_starter $log_path"); |
248 |
|
|
my $oldfh = select $starter_out; |
249 |
|
|
$|=1; |
250 |
|
|
select $oldfh; |
251 |
|
|
} |
252 |
|
|
|
253 |
|
|
print $starter_in "100;perl-fork-start-$$-$starter_index;"; |
254 |
|
|
|
255 |
|
|
print $starter_in 'directory='. |
256 |
|
|
&escape_for_starter($description->directory()) . ';'; |
257 |
|
|
|
258 |
|
|
if (keys %CHILD_ENV > 0) { |
259 |
|
|
print $starter_in 'environment='. |
260 |
|
|
join(',', map { &escape_for_starter($_) |
261 |
|
|
.'='.&escape_for_starter($CHILD_ENV{$_}) |
262 |
|
|
} (keys %CHILD_ENV)) . ';'; |
263 |
|
|
} |
264 |
|
|
|
265 |
|
|
print $starter_in "count=$count;"; |
266 |
|
|
|
267 |
|
|
my @softenv = $description->softenv(); |
268 |
|
|
my $enable_default_software_environment |
269 |
|
|
= $description->enable_default_software_environment(); |
270 |
|
|
if ( ($softenv_dir ne '') |
271 |
|
|
&& (@softenv || $enable_default_software_environment)) |
272 |
|
|
{ |
273 |
|
|
### SoftEnv extension ### |
274 |
|
|
$cmd_script_name = $self->job_dir() . '/scheduler_fork_cmd_script'; |
275 |
|
|
local(*CMD); |
276 |
|
|
open( CMD, '>' . $cmd_script_name ); |
277 |
|
|
|
278 |
|
|
print CMD "#!/bin/sh\n"; |
279 |
|
|
|
280 |
|
|
$self->setup_softenv( |
281 |
|
|
$self->job_dir() . '/fork_softenv_cmd_script', |
282 |
|
|
$soft_msc, |
283 |
|
|
$softenv_load, |
284 |
|
|
*CMD); |
285 |
|
|
|
286 |
|
|
print CMD 'cd ', $description->directory(), "\n"; |
287 |
|
|
print CMD "@cmdline\n"; |
288 |
|
|
print CMD "exit \$?\n"; |
289 |
|
|
|
290 |
|
|
close(CMD); |
291 |
|
|
chmod 0700, $cmd_script_name; |
292 |
|
|
|
293 |
|
|
print $starter_in 'executable=' . |
294 |
|
|
&escape_for_starter($cmd_script_name). ';'; |
295 |
|
|
print $starter_in 'arguments=;'; |
296 |
|
|
######################### |
297 |
|
|
} |
298 |
|
|
else |
299 |
|
|
{ |
300 |
|
|
print $starter_in 'executable=' . |
301 |
|
|
&escape_for_starter($cmdline[0]). ';'; |
302 |
|
|
shift @cmdline; |
303 |
|
|
if ($#cmdline >= 0) |
304 |
|
|
{ |
305 |
|
|
print $starter_in 'arguments=' . |
306 |
|
|
join(',', map {&escape_for_starter($_)} @cmdline) . |
307 |
|
|
';'; |
308 |
|
|
} |
309 |
|
|
} |
310 |
|
|
|
311 |
|
|
my @job_stdout; |
312 |
|
|
my @job_stderr; |
313 |
|
|
|
314 |
|
|
for ($i = 0; $i < $count; $i++) { |
315 |
|
|
if($multi_output) |
316 |
|
|
{ |
317 |
|
|
push(@job_stdout, $self->{STDIO_MERGER}->add_file('out')); |
318 |
|
|
push(@job_stderr, $self->{STDIO_MERGER}->add_file('err')); |
319 |
|
|
} |
320 |
|
|
else |
321 |
|
|
{ |
322 |
|
|
if (defined($description->stdout)) { |
323 |
|
|
push(@job_stdout, $description->stdout()); |
324 |
|
|
} else { |
325 |
|
|
push(@job_stdout, '/dev/null'); |
326 |
|
|
} |
327 |
|
|
|
328 |
|
|
if (defined($description->stderr)) { |
329 |
|
|
push(@job_stderr, $description->stderr()); |
330 |
|
|
} else { |
331 |
|
|
push(@job_stderr, '/dev/null'); |
332 |
|
|
} |
333 |
|
|
} |
334 |
|
|
} |
335 |
|
|
|
336 |
|
|
print $starter_in "stdin=" . &escape_for_starter($description->stdin()). |
337 |
|
|
';'; |
338 |
|
|
print $starter_in "stdout=" . |
339 |
|
|
join(',', map {&escape_for_starter($_)} @job_stdout) . ';'; |
340 |
|
|
print $starter_in "stderr=" . |
341 |
|
|
join(',', map {&escape_for_starter($_)} @job_stderr) . "\n"; |
342 |
|
|
|
343 |
|
|
while (<$starter_out>) { |
344 |
|
|
chomp; |
345 |
|
|
my @res = split(/;/, $_); |
346 |
|
|
|
347 |
|
|
if ($res[1] ne "perl-fork-start-$$-$starter_index") { |
348 |
|
|
next; |
349 |
|
|
} |
350 |
|
|
if ($res[0] == '101') { |
351 |
|
|
@job_id = split(',', $res[2]); |
352 |
|
|
last; |
353 |
|
|
} elsif ($res[0] == '102') { |
354 |
|
|
$self->respond({GT3_FAILURE_MESSAGE => "starter: $res[3]" }); |
355 |
|
|
return new Globus::GRAM::Error($res[2]); |
356 |
|
|
} |
357 |
|
|
} |
358 |
|
|
|
359 |
|
|
if ($is_grid_monitor && -x $streamer) |
360 |
|
|
{ |
361 |
|
|
my $streamer_startup=''; |
362 |
|
|
|
363 |
|
|
$starter_index++; |
364 |
|
|
$streamer_startup .= "100;perl-fork-start-$$-$starter_index;"; |
365 |
|
|
$streamer_startup .= 'directory='.$self->job_dir().';'; |
366 |
|
|
|
367 |
|
|
if (keys %CHILD_ENV > 0) { |
368 |
|
|
$streamer_startup .= 'environment='. |
369 |
|
|
join(',', map { &escape_for_starter($_) |
370 |
|
|
.'='.&escape_for_starter($CHILD_ENV{$_}) |
371 |
|
|
} (keys %CHILD_ENV)) . ';'; |
372 |
|
|
} |
373 |
|
|
|
374 |
|
|
$streamer_startup .= "count=1;"; |
375 |
|
|
|
376 |
|
|
$streamer_startup .= 'executable=' . &escape_for_starter($streamer) |
377 |
|
|
. ';'; |
378 |
|
|
@cmdline = ('-s', $description->state_file()); |
379 |
|
|
foreach my $p (@job_id) |
380 |
|
|
{ |
381 |
|
|
my $q = $p; |
382 |
|
|
# strip leading uuid |
383 |
|
|
$q =~ s/.*://; |
384 |
|
|
push(@cmdline, '-p', $q); |
385 |
|
|
} |
386 |
|
|
$streamer_startup .= 'arguments=' . |
387 |
|
|
join(',', map {&escape_for_starter($_)} @cmdline) . |
388 |
|
|
';'; |
389 |
|
|
|
390 |
|
|
$streamer_startup .= "stdin=/dev/null;"; |
391 |
|
|
$streamer_startup .= "stdout=gram_streamer_out;"; |
392 |
|
|
$streamer_startup .= "stderr=gram_streamer_err\n"; |
393 |
|
|
|
394 |
|
|
$self->log("streamer_startup is $streamer_startup"); |
395 |
|
|
print $starter_in $streamer_startup; |
396 |
|
|
|
397 |
|
|
while (<$starter_out>) { |
398 |
|
|
chomp; |
399 |
|
|
my @res = split(/;/, $_); |
400 |
|
|
|
401 |
|
|
if ($res[1] ne "perl-fork-start-$$-$starter_index") { |
402 |
|
|
next; |
403 |
|
|
} |
404 |
|
|
if ($res[0] == '101') { |
405 |
|
|
@job_id = (@job_id, split(',', $res[2])); |
406 |
|
|
last; |
407 |
|
|
} elsif ($res[0] == '102') { |
408 |
|
|
$self->respond({GT3_FAILURE_MESSAGE => "starter: $res[3]" }); |
409 |
|
|
return new Globus::GRAM::Error($res[2]); |
410 |
|
|
} |
411 |
|
|
} |
412 |
|
|
} |
413 |
|
|
$description->add('jobid', join(',', @job_id)); |
414 |
|
|
return { JOB_STATE => Globus::GRAM::JobState::ACTIVE, |
415 |
|
|
JOB_ID => join(',', @job_id) }; |
416 |
|
|
} else { |
417 |
|
|
my $starter_pid; |
418 |
|
|
local(*READER,*WRITER); # always use local on perl FDs |
419 |
|
|
pipe(READER, WRITER); |
420 |
|
|
|
421 |
|
|
$starter_pid = fork(); |
422 |
|
|
|
423 |
|
|
if (! defined($starter_pid)) |
424 |
|
|
{ |
425 |
|
|
$failure_code = "fork:$!"; |
426 |
|
|
$self->respond({GT3_FAILURE_MESSAGE => $failure_code }); |
427 |
|
|
return Globus::GRAM::Error::JOB_EXECUTION_FAILED; |
428 |
|
|
} |
429 |
|
|
elsif ($starter_pid == 0) |
430 |
|
|
{ |
431 |
|
|
# Starter Process |
432 |
|
|
close(READER); |
433 |
|
|
|
434 |
|
|
local(*JOB_READER, *JOB_WRITER); |
435 |
|
|
pipe(JOB_READER, JOB_WRITER); |
436 |
|
|
|
437 |
|
|
for(my $i = 0; $i < $count; $i++) |
438 |
|
|
{ |
439 |
|
|
if($multi_output) |
440 |
|
|
{ |
441 |
|
|
$job_stdout = $self->{STDIO_MERGER}->add_file('out'); |
442 |
|
|
$job_stderr = $self->{STDIO_MERGER}->add_file('err'); |
443 |
|
|
} |
444 |
|
|
else |
445 |
|
|
{ |
446 |
|
|
$job_stdout = $description->stdout(); |
447 |
|
|
$job_stderr = $description->stderr(); |
448 |
|
|
} |
449 |
|
|
|
450 |
|
|
# obtain plain old pipe into temporary variables |
451 |
|
|
local $^F = 2; # assure close-on-exec for pipe FDs |
452 |
|
|
select((select(WRITER),$|=1)[$[]); |
453 |
|
|
|
454 |
|
|
if( ($pid=fork()) == 0) |
455 |
|
|
{ |
456 |
|
|
close(JOB_READER); |
457 |
|
|
|
458 |
|
|
# forked child |
459 |
|
|
%ENV = %CHILD_ENV; |
460 |
|
|
|
461 |
|
|
close(STDIN); |
462 |
|
|
close(STDOUT); |
463 |
|
|
close(STDERR); |
464 |
|
|
|
465 |
|
|
open(STDIN, '<' . $description->stdin()); |
466 |
|
|
open(STDOUT, ">>$job_stdout"); |
467 |
|
|
open(STDERR, ">>$job_stderr"); |
468 |
|
|
|
469 |
|
|
# the below should never fail since we just forked |
470 |
|
|
setpgrp(0,$$); |
471 |
|
|
|
472 |
|
|
if ( ! exec (@cmdline) ) |
473 |
|
|
{ |
474 |
|
|
my $err = "$!\n"; |
475 |
|
|
$SIG{PIPE} = 'IGNORE'; |
476 |
|
|
print JOB_WRITER "$err"; |
477 |
|
|
close(JOB_WRITER); |
478 |
|
|
exit(1); |
479 |
|
|
} |
480 |
|
|
} |
481 |
|
|
else |
482 |
|
|
{ |
483 |
|
|
my $error_code = ''; |
484 |
|
|
|
485 |
|
|
if ($pid == undef) |
486 |
|
|
{ |
487 |
|
|
$self->log("fork failed\n"); |
488 |
|
|
$failure_code = "fork: $!"; |
489 |
|
|
} |
490 |
|
|
close(JOB_WRITER); |
491 |
|
|
|
492 |
|
|
$_ = <JOB_READER>; |
493 |
|
|
close(JOB_READER); |
494 |
|
|
|
495 |
|
|
if($_ ne '') |
496 |
|
|
{ |
497 |
|
|
chomp($_); |
498 |
|
|
$self->log("exec failed\n"); |
499 |
|
|
$failure_code = "exec: $_"; |
500 |
|
|
} |
501 |
|
|
|
502 |
|
|
if ($failure_code ne '') |
503 |
|
|
{ |
504 |
|
|
# fork or exec failed. kill rest of job and return an error |
505 |
|
|
$failure_code =~ s/\n/\\n/g; |
506 |
|
|
foreach(@job_id) |
507 |
|
|
{ |
508 |
|
|
$pgid = getpgrp($_); |
509 |
|
|
|
510 |
|
|
$pgid == -1 ? kill($signo{TERM}, $_) : |
511 |
|
|
kill(-$signo{TERM}, $pgid); |
512 |
|
|
|
513 |
|
|
sleep(5); |
514 |
|
|
|
515 |
|
|
$pgid == -1 ? kill($signo{KILL}, $_) : |
516 |
|
|
kill(-$signo{KILL}, $pgid); |
517 |
|
|
|
518 |
|
|
} |
519 |
|
|
|
520 |
|
|
local(*ERR); |
521 |
|
|
open(ERR, '>' . $description->stderr()); |
522 |
|
|
print ERR "$failure_code\n"; |
523 |
|
|
close(ERR); |
524 |
|
|
|
525 |
|
|
print WRITER "FAIL:$failure_code\n"; |
526 |
|
|
exit(1); |
527 |
|
|
} |
528 |
|
|
push(@job_id, $pid); |
529 |
|
|
} |
530 |
|
|
} |
531 |
|
|
if ($is_grid_monitor) |
532 |
|
|
{ |
533 |
|
|
# Create an extra process to stream output for grid monitor |
534 |
|
|
|
535 |
|
|
# obtain plain old pipe into temporary variables |
536 |
|
|
local $^F = 2; # assure close-on-exec for pipe FDs |
537 |
|
|
select((select(WRITER),$|=1)[$[]); |
538 |
|
|
|
539 |
|
|
if( ($pid=fork()) == 0) |
540 |
|
|
{ |
541 |
|
|
close(JOB_READER); |
542 |
|
|
|
543 |
|
|
# forked child |
544 |
|
|
%ENV = %CHILD_ENV; |
545 |
|
|
|
546 |
|
|
close(STDIN); |
547 |
|
|
close(STDOUT); |
548 |
|
|
close(STDERR); |
549 |
|
|
|
550 |
|
|
chdir $self->job_dir(); |
551 |
|
|
|
552 |
|
|
open(STDIN, '<' . $description->stdin()); |
553 |
|
|
open(STDOUT, '>gram_streamer_out'); |
554 |
|
|
open(STDERR, '>gram_streamer_error'); |
555 |
|
|
select STDERR; $| = 1; # make unbuffered |
556 |
|
|
select STDOUT; $| = 1; # make unbuffered |
557 |
|
|
|
558 |
|
|
|
559 |
|
|
# the below should never fail since we just forked |
560 |
|
|
setpgrp(0,$$); |
561 |
|
|
|
562 |
|
|
@cmdline = ($streamer, '-s', $description->state_file()); |
563 |
|
|
foreach my $p (@job_id) |
564 |
|
|
{ |
565 |
|
|
push(@cmdline, '-p', $p); |
566 |
|
|
} |
567 |
|
|
|
568 |
|
|
if ( ! exec (@cmdline) ) |
569 |
|
|
{ |
570 |
|
|
my $err = "$!\n"; |
571 |
|
|
$SIG{PIPE} = 'IGNORE'; |
572 |
|
|
print JOB_WRITER "$err"; |
573 |
|
|
close(JOB_WRITER); |
574 |
|
|
exit(1); |
575 |
|
|
} |
576 |
|
|
} |
577 |
|
|
else |
578 |
|
|
{ |
579 |
|
|
my $error_code = ''; |
580 |
|
|
|
581 |
|
|
if ($pid == undef) |
582 |
|
|
{ |
583 |
|
|
$self->log("fork failed\n"); |
584 |
|
|
$failure_code = "fork: $!"; |
585 |
|
|
} |
586 |
|
|
close(JOB_WRITER); |
587 |
|
|
|
588 |
|
|
$_ = <JOB_READER>; |
589 |
|
|
close(JOB_READER); |
590 |
|
|
|
591 |
|
|
if($_ ne '') |
592 |
|
|
{ |
593 |
|
|
chomp($_); |
594 |
|
|
$self->log("exec failed\n"); |
595 |
|
|
$failure_code = "exec: $_"; |
596 |
|
|
} |
597 |
|
|
|
598 |
|
|
if ($failure_code ne '') |
599 |
|
|
{ |
600 |
|
|
# fork or exec failed. kill rest of job and return an error |
601 |
|
|
$failure_code =~ s/\n/\\n/g; |
602 |
|
|
foreach(@job_id) |
603 |
|
|
{ |
604 |
|
|
$pgid = getpgrp($_); |
605 |
|
|
|
606 |
|
|
$pgid == -1 ? kill($signo{TERM}, $_) : |
607 |
|
|
kill(-$signo{TERM}, $pgid); |
608 |
|
|
|
609 |
|
|
sleep(5); |
610 |
|
|
|
611 |
|
|
$pgid == -1 ? kill($signo{KILL}, $_) : |
612 |
|
|
kill(-$signo{KILL}, $pgid); |
613 |
|
|
|
614 |
|
|
} |
615 |
|
|
|
616 |
|
|
local(*ERR); |
617 |
|
|
open(ERR, '>' . $description->stderr()); |
618 |
|
|
print ERR "$failure_code\n"; |
619 |
|
|
close(ERR); |
620 |
|
|
|
621 |
|
|
print WRITER "FAIL:$failure_code\n"; |
622 |
|
|
exit(1); |
623 |
|
|
} |
624 |
|
|
push(@job_id, $pid); |
625 |
|
|
} |
626 |
|
|
} |
627 |
|
|
print WRITER "SUCCESS:" . join(',', @job_id) . "\n"; |
628 |
|
|
exit(0); |
629 |
|
|
} |
630 |
|
|
else |
631 |
|
|
{ |
632 |
|
|
my ($res, $value); |
633 |
|
|
close(WRITER); |
634 |
|
|
$_ = <READER>; |
635 |
|
|
close(READER); |
636 |
|
|
chomp($_); |
637 |
|
|
waitpid($starter_pid, 0); |
638 |
|
|
($res, $value) = split(/:/, $_, 2); |
639 |
|
|
|
640 |
|
|
if ($res eq 'SUCCESS') |
641 |
|
|
{ |
642 |
|
|
$description->add('jobid', $value); |
643 |
|
|
return { JOB_STATE => Globus::GRAM::JobState::ACTIVE, |
644 |
|
|
JOB_ID => $value }; |
645 |
|
|
} |
646 |
|
|
elsif ($res eq 'FAIL') |
647 |
|
|
{ |
648 |
|
|
$self->respond({GT3_FAILURE_MESSAGE => "$value" }); |
649 |
|
|
return Globus::GRAM::Error::JOB_EXECUTION_FAILED; |
650 |
|
|
} |
651 |
|
|
else |
652 |
|
|
{ |
653 |
|
|
return Globus::GRAM::Error::JOB_EXECUTION_FAILED; |
654 |
|
|
} |
655 |
|
|
} |
656 |
|
|
} |
657 |
|
|
} |
658 |
|
|
|
659 |
|
|
sub poll |
660 |
|
|
{ |
661 |
|
|
my $self = shift; |
662 |
|
|
my $description = $self->{JobDescription}; |
663 |
|
|
my $state; |
664 |
|
|
|
665 |
|
|
my $jobid = $description->jobid(); |
666 |
|
|
|
667 |
|
|
if(!defined $jobid) |
668 |
|
|
{ |
669 |
|
|
$self->log("poll: job id defined!"); |
670 |
|
|
return { JOB_STATE => Globus::GRAM::JobState::FAILED }; |
671 |
|
|
} |
672 |
|
|
|
673 |
|
|
$self->log("polling job " . $jobid); |
674 |
|
|
$_ = kill(0, split(/,/, $jobid)); |
675 |
|
|
|
676 |
|
|
if($_ > 0) |
677 |
|
|
{ |
678 |
|
|
$state = Globus::GRAM::JobState::ACTIVE; |
679 |
|
|
} |
680 |
|
|
else |
681 |
|
|
{ |
682 |
|
|
$state = Globus::GRAM::JobState::DONE; |
683 |
|
|
} |
684 |
|
|
if($self->{STDIO_MERGER}) |
685 |
|
|
{ |
686 |
|
|
$self->{STDIO_MERGER}->poll($state == Globus::GRAM::JobState::DONE); |
687 |
|
|
} |
688 |
|
|
|
689 |
|
|
return { JOB_STATE => $state }; |
690 |
|
|
} |
691 |
|
|
|
692 |
|
|
sub cancel |
693 |
|
|
{ |
694 |
|
|
my $self = shift; |
695 |
|
|
my $description = $self->{JobDescription}; |
696 |
|
|
my $pgid; |
697 |
|
|
my $jobid = $description->jobid(); |
698 |
|
|
|
699 |
|
|
if(!defined $jobid) |
700 |
|
|
{ |
701 |
|
|
$self->log("cancel: no jobid defined!"); |
702 |
|
|
return { JOB_STATE => Globus::GRAM::JobState::FAILED }; |
703 |
|
|
} |
704 |
|
|
|
705 |
|
|
$self->log("cancel job " . $jobid); |
706 |
|
|
|
707 |
|
|
foreach (split(/,/,$jobid)) |
708 |
|
|
{ |
709 |
|
|
s/..*://; |
710 |
|
|
$pgid = getpgrp($_); |
711 |
|
|
|
712 |
|
|
$pgid == -1 ? kill($signo{TERM}, $_) : |
713 |
|
|
kill(-$signo{TERM}, $pgid); |
714 |
|
|
|
715 |
|
|
sleep(5); |
716 |
|
|
|
717 |
|
|
$pgid == -1 ? kill($signo{KILL}, $_) : |
718 |
|
|
kill(-$signo{KILL}, $pgid); |
719 |
|
|
} |
720 |
|
|
|
721 |
|
|
return { JOB_STATE => Globus::GRAM::JobState::FAILED }; |
722 |
|
|
} |
723 |
|
|
|
724 |
|
|
sub stage_out |
725 |
|
|
{ |
726 |
|
|
my $self = shift; |
727 |
|
|
my $description = $self->{JobDescription}; |
728 |
|
|
my $is_grid_monitor = 0; |
729 |
|
|
my $job_dir = $self->job_dir(); |
730 |
|
|
my $rc; |
731 |
|
|
my $line; |
732 |
|
|
my @fields; |
733 |
|
|
my $stream_out; |
734 |
|
|
|
735 |
|
|
$self->log("Fork stage out"); |
736 |
|
|
# Check if this is the Condor-G grid monitor |
737 |
|
|
my $exec = $description->executable(); |
738 |
|
|
my $file_out = `/usr/bin/file $exec`; |
739 |
|
|
if ( $file_out =~ /script/ || $file_out =~ /text/ || |
740 |
|
|
$file_out =~ m|/usr/bin/env| ) { |
741 |
|
|
open( EXEC, "<$exec" ) or |
742 |
|
|
return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS(); |
743 |
|
|
while( <EXEC> ) { |
744 |
|
|
if ( /Sends results from the grid_manager_monitor_agent back to a/ ) { |
745 |
|
|
$is_grid_monitor = 1; |
746 |
|
|
} |
747 |
|
|
} |
748 |
|
|
close( EXEC ); |
749 |
|
|
} |
750 |
|
|
|
751 |
|
|
if ($is_grid_monitor) |
752 |
|
|
{ |
753 |
|
|
$self->log("Fork stage out is grid monitor"); |
754 |
|
|
local(*STREAMER_ERROR, *STREAMER_OUTPUT); |
755 |
|
|
|
756 |
|
|
$rc = open(STREAMER_ERROR, "<$job_dir/gram_streamer_error"); |
757 |
|
|
if (!$rc) |
758 |
|
|
{ |
759 |
|
|
$self->log("Error opening gram_streamer_error: $!"); |
760 |
|
|
} |
761 |
|
|
chomp($line = <STREAMER_ERROR>); |
762 |
|
|
if ($line eq '') |
763 |
|
|
{ |
764 |
|
|
$self->log("No error from streamer"); |
765 |
|
|
} |
766 |
|
|
else |
767 |
|
|
{ |
768 |
|
|
$self->log("Error from streamer $line"); |
769 |
|
|
@fields = split(':', $line, 2); |
770 |
|
|
|
771 |
|
|
$self->respond({GT3_FAILURE_MESSAGE => "$fields[0]" }); |
772 |
|
|
return new Globus::GRAM::Error($fields[1]); |
773 |
|
|
} |
774 |
|
|
close(STREAMER_ERROR); |
775 |
|
|
|
776 |
|
|
$stream_out = $description->get('file_stream_out'); |
777 |
|
|
$rc = open(STREAMER_OUTPUT, "<$job_dir/gram_streamer_out"); |
778 |
|
|
if (!$rc) |
779 |
|
|
{ |
780 |
|
|
$self->log("Error opening gram_streamer_output: $!"); |
781 |
|
|
} |
782 |
|
|
else |
783 |
|
|
{ |
784 |
|
|
while ($line = <STREAMER_OUTPUT>) |
785 |
|
|
{ |
786 |
|
|
chomp($line); |
787 |
|
|
$self->log("Streamer output: $line"); |
788 |
|
|
my ($from, $to) = split(' ', $line); |
789 |
|
|
|
790 |
|
|
for (my $i = 0; $i < scalar(@{$stream_out}); $i++) |
791 |
|
|
{ |
792 |
|
|
my $pair = $stream_out->[$i]; |
793 |
|
|
if ($pair->[0] eq $from && $pair->[1] eq $to) |
794 |
|
|
{ |
795 |
|
|
splice(@{$stream_out}, $i, 1); |
796 |
|
|
last; |
797 |
|
|
} |
798 |
|
|
} |
799 |
|
|
|
800 |
|
|
$self->respond({'STAGED_STREAM' => "$from $to"}); |
801 |
|
|
} |
802 |
|
|
$description->add('filestreamout', $stream_out); |
803 |
|
|
} |
804 |
|
|
} |
805 |
|
|
return $self->SUPER::stage_out(); |
806 |
|
|
} |
807 |
|
|
|
808 |
|
|
sub escape_for_starter |
809 |
|
|
{ |
810 |
|
|
my $str = shift; |
811 |
|
|
|
812 |
|
|
$str =~ s/\\/\\\\/g; |
813 |
|
|
$str =~ s/;/\\;/g; |
814 |
|
|
$str =~ s/,/\\,/g; |
815 |
|
|
$str =~ s/\n/\\n/g; |
816 |
|
|
$str =~ s/=/\\=/g; |
817 |
|
|
|
818 |
|
|
return $str; |
819 |
|
|
} |
820 |
|
|
|
821 |
|
|
END |
822 |
|
|
{ |
823 |
|
|
if (defined($starter_in)) |
824 |
|
|
{ |
825 |
|
|
close($starter_in); |
826 |
|
|
} |
827 |
|
|
if (defined($starter_out)) |
828 |
|
|
{ |
829 |
|
|
close($starter_out); |
830 |
|
|
} |
831 |
|
|
} |
832 |
|
|
1; |