1 |
# Copyright 1999-2006 University of Chicago |
2 |
# |
3 |
# Licensed under the Apache License, Version 2.0 (the "License"); |
4 |
# you may not use this file except in compliance with the License. |
5 |
# You may obtain a copy of the License at |
6 |
# |
7 |
# http://www.apache.org/licenses/LICENSE-2.0 |
8 |
# |
9 |
# Unless required by applicable law or agreed to in writing, software |
10 |
# distributed under the License is distributed on an "AS IS" BASIS, |
11 |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 |
# See the License for the specific language governing permissions and |
13 |
# limitations under the License. |
14 |
|
15 |
use Globus::GRAM::Error; |
16 |
use Globus::GRAM::JobState; |
17 |
use Globus::GRAM::JobManager; |
18 |
use Globus::Core::Paths; |
19 |
use Globus::Core::Config; |
20 |
|
21 |
# NOTE: This package name must match the name of the .pm file!! |
22 |
package Globus::GRAM::JobManager::pbs; |
23 |
|
24 |
@ISA = qw(Globus::GRAM::JobManager); |
25 |
|
26 |
my ($mpirun, $mpiexec, $qsub, $qstat, $qdel, $cluster, $cpu_per_node, $remote_shell, $vomsinfo); |
27 |
|
28 |
BEGIN |
29 |
{ |
30 |
my $config = new Globus::Core::Config( |
31 |
'${sysconfdir}/globus/globus-pbs.conf'); |
32 |
|
33 |
$mpiexec = $config->get_attribute('mpiexec') || 'no'; |
34 |
if ($mpiexec ne 'no' && ! -x $mpiexec) |
35 |
{ |
36 |
$mpiexec = 'no'; |
37 |
} |
38 |
$mpirun = $config->get_attribute('mpirun') || 'no'; |
39 |
if ($mpirun ne 'no' && ! -x $mpirun) |
40 |
{ |
41 |
$mpirun = 'no'; |
42 |
} |
43 |
$qsub = $config->get_attribute('qsub') || 'no'; |
44 |
$qstat = $config->get_attribute('qstat') || 'no'; |
45 |
$qdel = $config->get_attribute('qdel') || 'no'; |
46 |
$cluster = $config->get_attribute('cluster') || undef; |
47 |
if ($cluster eq 'no') |
48 |
{ |
49 |
$cluster = undef; |
50 |
} |
51 |
my $pbs_default = $config->get_attribute('pbs_default') || ''; |
52 |
if ($pbs_default ne '') |
53 |
{ |
54 |
$ENV{PBS_DEFAULT} = $pbs_default; |
55 |
} |
56 |
|
57 |
$cpu_per_node = $config->get_attribute('cpu_per_node') || 1; |
58 |
$remote_shell = $config->get_attribute('remote_shell') || undef; |
59 |
$softenv_dir = $config->get_attribute('softenv_dir') || ''; |
60 |
$vomsinfo = $config->get_attribute('vomsinfo') || undef; |
61 |
} |
62 |
|
63 |
sub myceil ($) |
64 |
{ |
65 |
my $x = shift; |
66 |
( abs($x-int($x)) < 1E-12 ) ? int($x) : int($x < 0 ? $x : $x+1.0); |
67 |
} |
68 |
|
69 |
sub submit |
70 |
{ |
71 |
my $self = shift; |
72 |
my $description = $self->{JobDescription}; |
73 |
my $status; |
74 |
my $pbs_job_script; |
75 |
my $pbs_job_script_name; |
76 |
my $pbs_qsub_err_name ; |
77 |
my $errfile = ''; |
78 |
my $job_id; |
79 |
my $rsh_env; |
80 |
my @arguments; |
81 |
my $email_when = ''; |
82 |
my $args; |
83 |
my $cache_pgm = "$Globus::Core::Paths::bindir/globus-gass-cache"; |
84 |
my $soft_msc = "$softenv_dir/bin/soft-msc"; |
85 |
my $softenv_load = "$softenv_dir/etc/softenv-load.sh"; |
86 |
|
87 |
|
88 |
$self->log("Entering pbs submit"); |
89 |
|
90 |
# Reject jobs that want streaming, if so configured |
91 |
if ( $description->streamingrequested() && |
92 |
$description->streamingdisabled() ) { |
93 |
|
94 |
$self->log("Streaming is not allowed."); |
95 |
return Globus::GRAM::Error::OPENING_STDOUT; |
96 |
} |
97 |
|
98 |
# check jobtype |
99 |
if(defined($description->jobtype())) |
100 |
{ |
101 |
if($description->jobtype !~ /^(mpi|single|multiple)$/) |
102 |
{ |
103 |
return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED; |
104 |
} |
105 |
} |
106 |
if( $description->directory eq '') |
107 |
{ |
108 |
return Globus::GRAM::Error::RSL_DIRECTORY(); |
109 |
} |
110 |
if ($description->directory() =~ m|^[^/]|) { |
111 |
$description->add("directory", |
112 |
$ENV{HOME} . '/' . $description->directory()); |
113 |
} |
114 |
chdir $description->directory() or |
115 |
return Globus::GRAM::Error::BAD_DIRECTORY(); |
116 |
|
117 |
$self->nfssync( $description->executable() ) |
118 |
unless $description->executable() eq ''; |
119 |
$self->nfssync( $description->stdin() ) |
120 |
unless $description->stdin() eq ''; |
121 |
if( $description->executable eq '') |
122 |
{ |
123 |
return Globus::GRAM::Error::RSL_EXECUTABLE(); |
124 |
} |
125 |
#elsif(! -f $description->executable()) |
126 |
#{ |
127 |
# return Globus::GRAM::Error::EXECUTABLE_NOT_FOUND(); |
128 |
#} |
129 |
#elsif(! -x $description->executable()) |
130 |
#{ |
131 |
# return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS(); |
132 |
#} |
133 |
elsif( $description->stdin() eq '') |
134 |
{ |
135 |
return Globus::GRAM::Error::RSL_STDIN; |
136 |
} |
137 |
#elsif(! -r $description->stdin()) |
138 |
#{ |
139 |
# return Globus::GRAM::Error::STDIN_NOT_FOUND(); |
140 |
#} |
141 |
|
142 |
$self->log("Determining job max time cpu from job description"); |
143 |
if(defined($description->max_cpu_time())) |
144 |
{ |
145 |
$cpu_time = $description->max_cpu_time(); |
146 |
$self->log(" using maxcputime of $cpu_time"); |
147 |
} |
148 |
elsif(! $cluster && defined($description->max_time())) |
149 |
{ |
150 |
$cpu_time = $description->max_time(); |
151 |
$self->log(" using maxtime of $cpu_time"); |
152 |
} |
153 |
else |
154 |
{ |
155 |
$cpu_time = 0; |
156 |
$self->log(' using queue default'); |
157 |
} |
158 |
|
159 |
$self->log("Determining job max wall time limit from job description"); |
160 |
if(defined($description->max_wall_time())) |
161 |
{ |
162 |
$wall_time = $description->max_wall_time(); |
163 |
$self->log(" using maxwalltime of $wall_time"); |
164 |
} |
165 |
elsif($cluster && defined($description->max_time())) |
166 |
{ |
167 |
$wall_time = $description->max_time(); |
168 |
$self->log(" using maxtime of $wall_time"); |
169 |
} |
170 |
else |
171 |
{ |
172 |
$wall_time = 0; |
173 |
$self->log(' using queue default'); |
174 |
} |
175 |
|
176 |
$self->log('Building job script'); |
177 |
|
178 |
$pbs_job_script_name = $self->job_dir() . '/scheduler_pbs_job_script'; |
179 |
|
180 |
local(*JOB); |
181 |
$rc = open( JOB, '>' . $pbs_job_script_name ); |
182 |
|
183 |
if (!$rc) |
184 |
{ |
185 |
return $self->respond_with_failure_extension( |
186 |
"open: $pbs_job_script_name: $!", |
187 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
188 |
} |
189 |
$rc = print JOB<<"EOF"; |
190 |
#! /bin/sh |
191 |
# PBS batch job script built by Globus job manager |
192 |
# |
193 |
#PBS -S /bin/sh |
194 |
EOF |
195 |
if (!$rc) |
196 |
{ |
197 |
return $self->respond_with_failure_extension( |
198 |
"print: $pbs_job_script_name: $!", |
199 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
200 |
} |
201 |
|
202 |
if($description->name() ne '') |
203 |
{ |
204 |
$rc = print JOB '#PBS -N ', $description->name(), "\n"; |
205 |
if (!$rc) |
206 |
{ |
207 |
return $self->respond_with_failure_extension( |
208 |
"print: $pbs_job_script_name: $!", |
209 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
210 |
} |
211 |
} |
212 |
if($description->email_address() ne '') |
213 |
{ |
214 |
$rc = print JOB '#PBS -M ', $description->email_address(), "\n"; |
215 |
if (!$rc) |
216 |
{ |
217 |
return $self->respond_with_failure_extension( |
218 |
"print: $pbs_job_script_name: $!", |
219 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
220 |
} |
221 |
} |
222 |
if($description->emailonabort() eq 'yes') |
223 |
{ |
224 |
$email_when .= 'a'; |
225 |
} |
226 |
if($description->emailonexecution() eq 'yes') |
227 |
{ |
228 |
$email_when .= 'b'; |
229 |
} |
230 |
if($description->emailontermination() eq 'yes') |
231 |
{ |
232 |
$email_when .= 'e'; |
233 |
} |
234 |
if($email_when eq '') |
235 |
{ |
236 |
$email_when = 'n'; |
237 |
} |
238 |
$rc = print JOB "#PBS -m $email_when\n"; |
239 |
if (!$rc) |
240 |
{ |
241 |
return $self->respond_with_failure_extension( |
242 |
"print: $pbs_job_script_name: $!", |
243 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
244 |
} |
245 |
|
246 |
if($description->queue() ne '') |
247 |
{ |
248 |
$rc = print JOB '#PBS -q ', $description->queue(), "\n"; |
249 |
if (!$rc) |
250 |
{ |
251 |
return $self->respond_with_failure_extension( |
252 |
"print: $pbs_job_script_name: $!", |
253 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
254 |
} |
255 |
} |
256 |
if($description->project() ne '') |
257 |
{ |
258 |
$rc = print JOB '#PBS -A ', $description->project(), "\n"; |
259 |
if (!$rc) |
260 |
{ |
261 |
return $self->respond_with_failure_extension( |
262 |
"print: $pbs_job_script_name: $!", |
263 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
264 |
} |
265 |
} |
266 |
|
267 |
if($cpu_time != 0) |
268 |
{ |
269 |
if($description->jobtype() eq 'multiple') |
270 |
{ |
271 |
if ($description->totalprocesses() > 0) |
272 |
{ |
273 |
$total_cpu_time = $cpu_time * $description->totalprocesses(); |
274 |
} |
275 |
else |
276 |
{ |
277 |
$total_cpu_time = $cpu_time * $description->count(); |
278 |
} |
279 |
} |
280 |
else |
281 |
{ |
282 |
$total_cpu_time = $cpu_time; |
283 |
} |
284 |
$rc = print JOB "#PBS -l pcput=${cpu_time}:00\n" |
285 |
. "#PBS -l cput=${total_cpu_time}:00\n"; |
286 |
if (!$rc) |
287 |
{ |
288 |
return $self->respond_with_failure_extension( |
289 |
"print: $pbs_job_script_name: $!", |
290 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
291 |
} |
292 |
} |
293 |
|
294 |
if($wall_time != 0) |
295 |
{ |
296 |
$rc = print JOB "#PBS -l walltime=${wall_time}:00\n"; |
297 |
if (!$rc) |
298 |
{ |
299 |
return $self->respond_with_failure_extension( |
300 |
"print: $pbs_job_script_name: $!", |
301 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
302 |
} |
303 |
} |
304 |
|
305 |
if($description->max_memory() != 0) |
306 |
{ |
307 |
if($description->jobtype() eq 'multiple') |
308 |
{ |
309 |
if ($description->totalprocesses() > 0) |
310 |
{ |
311 |
$max_memory = $description->max_memory() |
312 |
* $description->totalprocesses(); |
313 |
} |
314 |
else |
315 |
{ |
316 |
$max_memory = $description->max_memory() |
317 |
* $description->count(); |
318 |
} |
319 |
} |
320 |
else |
321 |
{ |
322 |
$max_memory = $description->max_memory(); |
323 |
} |
324 |
$rc = print JOB "#PBS -l mem=${max_memory}mb\n"; |
325 |
if (!$rc) |
326 |
{ |
327 |
return $self->respond_with_failure_extension( |
328 |
"print: $pbs_job_script_name: $!", |
329 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
330 |
} |
331 |
} |
332 |
$rc = print JOB '#PBS -o ', $description->stdout(), "\n" , |
333 |
'#PBS -e ', $description->stderr(), "\n"; |
334 |
if (!$rc) |
335 |
{ |
336 |
return $self->respond_with_failure_extension( |
337 |
"print: $pbs_job_script_name: $!", |
338 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
339 |
} |
340 |
|
341 |
if (defined $description->nodes()) |
342 |
{ |
343 |
#Generated by ExtensionsHandler.pm from resourceAllocationGroup elements |
344 |
$rc = print JOB '#PBS -l nodes=', $description->nodes(), "\n"; |
345 |
} |
346 |
elsif($description->host_count() != 0) |
347 |
{ |
348 |
$rc = print JOB '#PBS -l nodes=', $description->host_count(), "\n"; |
349 |
} |
350 |
elsif($cluster && $cpu_per_node != 0) |
351 |
{ |
352 |
$rc = print JOB '#PBS -l nodes=', |
353 |
myceil($description->count() / $cpu_per_node), "\n"; |
354 |
} |
355 |
if (!$rc) |
356 |
{ |
357 |
return $self->respond_with_failure_extension( |
358 |
"print: $pbs_job_script_name: $!", |
359 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
360 |
} |
361 |
|
362 |
### SoftEnv extension ### |
363 |
if ($softenv_dir ne '') |
364 |
{ |
365 |
$rc = $self->setup_softenv( |
366 |
$self->job_dir() . '/pbs_softenv_job_script', |
367 |
$soft_msc, |
368 |
$softenv_load, |
369 |
*JOB); |
370 |
|
371 |
if ($rc != 0) |
372 |
{ |
373 |
return $self->respond_with_failure_extension( |
374 |
"setup_softenv: $rc", |
375 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
376 |
} |
377 |
} |
378 |
######################### |
379 |
|
380 |
$rsh_env = ''; |
381 |
|
382 |
foreach my $tuple ($description->environment()) |
383 |
{ |
384 |
if(!ref($tuple) || scalar(@$tuple) != 2) |
385 |
{ |
386 |
return Globus::GRAM::Error::RSL_ENVIRONMENT(); |
387 |
} |
388 |
|
389 |
# Unset any variable whose value contains a comma, |
390 |
# because OpenPBS cannot handle those. EDG bug 1208. |
391 |
push(@new_env, $tuple->[0] . '="' . $tuple->[1] . '"') |
392 |
unless $tuple->[1] =~ /,/; |
393 |
|
394 |
$tuple->[0] =~ s/\\/\\\\/g; |
395 |
$tuple->[0] =~ s/\$/\\\$/g; |
396 |
$tuple->[0] =~ s/"/\\\"/g; #" |
397 |
$tuple->[0] =~ s/`/\\\`/g; #` |
398 |
|
399 |
$tuple->[1] =~ s/\\/\\\\/g; |
400 |
$tuple->[1] =~ s/\$/\\\$/g; |
401 |
$tuple->[1] =~ s/"/\\\"/g; #" |
402 |
$tuple->[1] =~ s/`/\\\`/g; #` |
403 |
|
404 |
$rsh_env .= $tuple->[0] . '="' . $tuple->[1] . "\";\n" |
405 |
. 'export ' . $tuple->[0] . ";\n"; |
406 |
|
407 |
} |
408 |
|
409 |
########################################################################## |
410 |
# Extract accounting and VO data name for later use |
411 |
########################################################################## |
412 |
my ($proxylocation, $voname, $gridid, $peeraddr, $jmid); |
413 |
my @fqans; |
414 |
foreach my $tuple ($description->environment()) { |
415 |
$proxylocation = $tuple->[1] if $tuple->[0] eq "X509_USER_PROXY"; |
416 |
$gridid = $tuple->[1] if $tuple->[0] eq "GRID_ID"; |
417 |
$peeraddr = $tuple->[1] if $tuple->[0] eq "GATEKEEPER_PEER"; |
418 |
$jmid = $tuple->[1] if $tuple->[0] eq "GATEKEEPER_JM_ID"; |
419 |
} |
420 |
if ( defined ($proxylocation) and defined ($vomsinfo) and $vomsinfo ) { |
421 |
print JOB '# Inspecting proxy '.$proxylocation."\n"; |
422 |
|
423 |
if ( -x $vomsinfo ) { |
424 |
chomp($voname=`$vomsinfo -file "$proxylocation" -vo 2>/dev/null`); |
425 |
@fqans = map { chomp($_); $_; } |
426 |
`$vomsinfo -file "$proxylocation" -fqan 2>/dev/null`; |
427 |
|
428 |
### EXAMPLE for as long as long proxies kill the jobmanager |
429 |
$voname="dteam"; |
430 |
@fqans = ( '/dteam/Role=NULL/Capability=NULL' ); |
431 |
|
432 |
if ( $voname =~ /[a-zA-Z][\w\d\.]*/ ) { |
433 |
print JOB "# VO name from proxy is $voname\n"; |
434 |
$rsh_env .= 'VONAME="' . $voname . "\";\n" |
435 |
. 'export ' . "VONAME" . ";\n"; |
436 |
push(@new_env, "VONAME" . '="' . $voname . '"'); |
437 |
} else { |
438 |
print JOB "# VO name $voname from proxy is NOT valid\n"; |
439 |
} |
440 |
if ( $#fqans >= 0 ) { |
441 |
print JOB "# FQAN order in proxy:\n"; |
442 |
foreach my $qfan ( @fqans ) { print JOB "# $fqan\n"; } |
443 |
}; |
444 |
} else { |
445 |
print JOB "# $vomsinfo not available, ignored\n"; |
446 |
} |
447 |
} else { |
448 |
print JOB "# Proxy location undefined or no vomsinfo requested\n"; |
449 |
} |
450 |
########################################################################## |
451 |
|
452 |
|
453 |
$rc = print JOB "$rsh_env\n" |
454 |
. "#Change to directory requested by user\n" |
455 |
. 'cd ' . $description->directory() . "\n"; |
456 |
if (!$rc) |
457 |
{ |
458 |
return $self->respond_with_failure_extension( |
459 |
"print: $pbs_job_script_name: $!", |
460 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
461 |
} |
462 |
|
463 |
@arguments = $description->arguments(); |
464 |
|
465 |
foreach(@arguments) |
466 |
{ |
467 |
if(ref($_)) |
468 |
{ |
469 |
return Globus::GRAM::Error::RSL_ARGUMENTS(); |
470 |
} |
471 |
} |
472 |
if($#arguments >= 0) |
473 |
{ |
474 |
foreach(@arguments) |
475 |
{ |
476 |
$self->log("Transforming argument \"$_\"\n"); |
477 |
$_ =~ s/\\/\\\\/g; |
478 |
$_ =~ s/\$/\\\$/g; |
479 |
$_ =~ s/"/\\\"/g; #" |
480 |
$_ =~ s/`/\\\`/g; #` |
481 |
$self->log("Transformed to \"$_\"\n"); |
482 |
|
483 |
$args .= '"' . $_ . '" '; |
484 |
} |
485 |
} |
486 |
else |
487 |
{ |
488 |
$args = ''; |
489 |
} |
490 |
|
491 |
#if ($description->executable() =~ m|^[^/]|) |
492 |
#{ |
493 |
# $description->add('executable', './' . $description->executable()); |
494 |
#} |
495 |
if($description->jobtype() eq 'multiple' && (($description->count()==1) || !$cluster)) |
496 |
{ |
497 |
my $process_count; |
498 |
if ($description->totalprocesses() > 0) |
499 |
{ |
500 |
$process_count = $description->totalprocesses(); |
501 |
} |
502 |
else |
503 |
{ |
504 |
$process_count = $description->count(); |
505 |
} |
506 |
|
507 |
# ##################################################################### |
508 |
# this is a simple multi-processor job on one node that can use $TMPDIR |
509 |
# unless the user has given one explicitly |
510 |
# refer back to JobManager.pm, but currently it seems that |
511 |
# $self->make_scratchdir uses "gram_scratch_" as a component |
512 |
print JOB '# tmpdir patch: multiple-cpu job on single node'."\n"; |
513 |
if ( ( $description->directory() =~ /(\/tmp|.*gram_scratch_.*)/ || |
514 |
$description->directory() eq $ENV{HOME} ) and |
515 |
( $description->host_count() <= 1 ) and |
516 |
( $description->count <= 1 ) |
517 |
) { |
518 |
print JOB '# user ended in a scratch directory, reset to TMPDIR'."\n"; |
519 |
print JOB '[ x"$TMPDIR" != x"" ] && cd $TMPDIR'."\n"; |
520 |
} else { |
521 |
print JOB '# user requested this specific directory'."\n"; |
522 |
print JOB '# DIR = '.$description->directory()."\n"; |
523 |
} |
524 |
# ##################################################################### |
525 |
|
526 |
|
527 |
$rc = print JOB "pids=''\n" |
528 |
. "exit_code=0\n"; |
529 |
if (!$rc) |
530 |
{ |
531 |
return $self->respond_with_failure_extension( |
532 |
"print: $pbs_job_script_name: $!", |
533 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
534 |
} |
535 |
for(my $i = 0; $i < $process_count; $i++) |
536 |
{ |
537 |
$rc = print JOB $description->executable(), " $args <", |
538 |
$description->stdin(), "&\n", "pids=\"\$pids \$!\"\n"; |
539 |
if (!$rc) |
540 |
{ |
541 |
return $self->respond_with_failure_extension( |
542 |
"print: $pbs_job_script_name: $!", |
543 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
544 |
} |
545 |
} |
546 |
$rc = print JOB <<EOF; |
547 |
for x in \$pids; do |
548 |
wait \$x |
549 |
tmp_exit_code=\$? |
550 |
if [ \$exit_code = 0 -a \$tmp_exit_code != 0 ]; then |
551 |
exit_code=\$tmp_exit_code |
552 |
fi |
553 |
done |
554 |
exit \$exit_code |
555 |
EOF |
556 |
if (!$rc) |
557 |
{ |
558 |
return $self->respond_with_failure_extension( |
559 |
"print: $pbs_job_script_name: $!", |
560 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
561 |
} |
562 |
} |
563 |
# mpi and multiple only honoured on multi-core jobs |
564 |
elsif( $description->jobtype() eq 'mpi' || |
565 |
( ($description->jobtype() eq 'multiple') and |
566 |
($description->count > 1 ) ) ) |
567 |
{ |
568 |
my $count; |
569 |
if ($description->totalprocesses() > 0) |
570 |
{ |
571 |
$count = $description->totalprocesses(); |
572 |
} |
573 |
else |
574 |
{ |
575 |
$count = $description->count(); |
576 |
} |
577 |
my $cmd_script_name ; |
578 |
my $cmd_script ; |
579 |
my $stdin = $description->stdin(); |
580 |
|
581 |
$cmd_script_name = $self->job_dir() . '/scheduler_pbs_cmd_script'; |
582 |
|
583 |
local(*CMD); |
584 |
if ( open( CMD, ">$cmd_script_name" ) ) |
585 |
{ |
586 |
$rc = print CMD "#!/bin/sh\n"; |
587 |
if (!$rc) |
588 |
{ |
589 |
return $self->respond_with_failure_extension( |
590 |
"print: $cmd_script_name: $!", |
591 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
592 |
} |
593 |
|
594 |
### SoftEnv extension ### |
595 |
$rc = $self->setup_softenv( |
596 |
$self->job_dir() . '/pbs_softenv_cmd_script', |
597 |
$soft_msc, |
598 |
*CMD); |
599 |
if ($rc != 0) |
600 |
{ |
601 |
return $self->respond_with_failure_extension( |
602 |
"setup_softenv: $rc", |
603 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
604 |
} |
605 |
######################### |
606 |
|
607 |
$rc = print CMD 'cd ', $description->directory(), "\n", |
608 |
"$rsh_env\n", |
609 |
$description->executable(), " $args\n"; |
610 |
if (!$rc) |
611 |
{ |
612 |
return $self->respond_with_failure_extension( |
613 |
"print: $cmd_script_name: $!", |
614 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
615 |
} |
616 |
$rc = close(CMD); |
617 |
if (!$rc) |
618 |
{ |
619 |
return $self->respond_with_failure_extension( |
620 |
"close: $cmd_script_name: $!", |
621 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
622 |
} |
623 |
chmod 0700, $cmd_script_name; |
624 |
|
625 |
$self->nfssync( $cmd_script_name ); |
626 |
} |
627 |
else |
628 |
{ |
629 |
return $self->respond_with_failure_extension( |
630 |
"open: $cmd_script_name: $!", |
631 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
632 |
} |
633 |
|
634 |
if ($description->jobtype() eq "mpi") |
635 |
{ |
636 |
if ($mpiexec ne 'no') |
637 |
{ |
638 |
my $machinefilearg = ""; |
639 |
if ($cluster) |
640 |
{ |
641 |
$machinefilearg = ' -machinefile $PBS_NODEFILE'; |
642 |
} |
643 |
if ($description->totalprocesses() > 0) |
644 |
{ |
645 |
$rc = print JOB "$mpiexec $machinefilearg -n " |
646 |
. $description->totalprocesses(); |
647 |
} |
648 |
else |
649 |
{ |
650 |
$rc = print JOB "$mpiexec $machinefilearg -n " |
651 |
. $description->count(); |
652 |
} |
653 |
if (!$rc) |
654 |
{ |
655 |
return $self->respond_with_failure_extension( |
656 |
"print: $pbs_job_script_name: $!", |
657 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
658 |
} |
659 |
} |
660 |
else |
661 |
{ |
662 |
if ($description->totalprocesses() > 0) |
663 |
{ |
664 |
$rc = print JOB "$mpirun -np " . $description->totalprocesses(); |
665 |
} |
666 |
else |
667 |
{ |
668 |
$rc = print JOB "$mpirun -np " . $description->count(); |
669 |
} |
670 |
if (!$rc) |
671 |
{ |
672 |
return $self->respond_with_failure_extension( |
673 |
"print: $pbs_job_script_name: $!", |
674 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
675 |
} |
676 |
if ($cluster) |
677 |
{ |
678 |
$rc = print JOB ' -machinefile $PBS_NODEFILE'; |
679 |
if (!$rc) |
680 |
{ |
681 |
return $self->respond_with_failure_extension( |
682 |
"print: $pbs_job_script_name: $!", |
683 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
684 |
} |
685 |
} |
686 |
} |
687 |
|
688 |
$rc = print JOB " $cmd_script_name < ".$description->stdin() . "\n"; |
689 |
if (!$rc) |
690 |
{ |
691 |
return $self->respond_with_failure_extension( |
692 |
"print: $pbs_job_script_name: $!", |
693 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
694 |
} |
695 |
} |
696 |
else |
697 |
{ |
698 |
my $exit_prefix=$self->job_dir() . '/exit'; |
699 |
|
700 |
$rc = print JOB <<"EOF"; |
701 |
|
702 |
hosts=\`cat \$PBS_NODEFILE\`; |
703 |
counter=0 |
704 |
while test \$counter -lt $count; do |
705 |
for host in \$hosts; do |
706 |
if test \$counter -lt $count; then |
707 |
$remote_shell \$host "/bin/sh $cmd_script_name; echo \\\$? > $exit_prefix.\$counter" < $stdin & |
708 |
counter=\`expr \$counter + 1\` |
709 |
else |
710 |
break |
711 |
fi |
712 |
done |
713 |
done |
714 |
wait |
715 |
|
716 |
counter=0 |
717 |
exit_code=0 |
718 |
while test \$counter -lt $count; do |
719 |
/bin/touch $exit_prefix.\$counter; |
720 |
|
721 |
read tmp_exit_code < $exit_prefix.\$counter |
722 |
if [ \$exit_code = 0 -a \$tmp_exit_code != 0 ]; then |
723 |
exit_code=\$tmp_exit_code |
724 |
fi |
725 |
counter=\`expr \$counter + 1\` |
726 |
done |
727 |
|
728 |
exit \$exit_code |
729 |
EOF |
730 |
if (!$rc) |
731 |
{ |
732 |
return $self->respond_with_failure_extension( |
733 |
"print: $pbs_job_script_name: $!", |
734 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
735 |
} |
736 |
} |
737 |
} |
738 |
else |
739 |
{ |
740 |
# ##################################################################### |
741 |
# this is a simple single-node job that can use $TMPDIR |
742 |
# unless the user has given one explicitly |
743 |
# refer back to JobManager.pm, but currently it seems that |
744 |
# $self->make_scratchdir uses "gram_scratch_" as a component |
745 |
print JOB '# tmpdir patch: unknown job on single node'."\n"; |
746 |
if ( ( $description->directory() =~ /(\/tmp|.*gram_scratch_.*)/ || |
747 |
$description->directory() eq $ENV{HOME} ) and |
748 |
( $description->host_count() <= 1 ) and |
749 |
( $description->count <= 1 ) |
750 |
) { |
751 |
print JOB '# user ended in a scratch directory, reset to TMPDIR'."\n"; |
752 |
print JOB '[ x"$TMPDIR" != x"" ] && cd $TMPDIR'."\n"; |
753 |
} else { |
754 |
print JOB '# user requested this specific directory'."\n"; |
755 |
print JOB '# DIR = '.$description->directory()."\n"; |
756 |
} |
757 |
# ##################################################################### |
758 |
|
759 |
$rc = print JOB $description->executable(), " $args <", |
760 |
$description->stdin(), "\n"; |
761 |
if (!$rc) |
762 |
{ |
763 |
return $self->respond_with_failure_extension( |
764 |
"print: $pbs_job_script_name: $!", |
765 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
766 |
} |
767 |
} |
768 |
$rc = close(JOB); |
769 |
if (!$rc) |
770 |
{ |
771 |
return $self->respond_with_failure_extension( |
772 |
"print: $pbs_job_script_name: $!", |
773 |
Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED()); |
774 |
} |
775 |
|
776 |
$pbs_qsub_err_name = $self->job_dir() . '/scheduler_pbs_submit_stderr'; |
777 |
$errfile = "2>$pbs_qsub_err_name"; |
778 |
|
779 |
$self->nfssync( $pbs_job_script_name ); |
780 |
$self->nfssync( $pbs_qsub_err_name ); |
781 |
$self->nfssync( $description->stdout, 1 ); |
782 |
$self->nfssync( $description->stderr, 1 ); |
783 |
$self->log("submitting job -- $qsub < $pbs_job_script_name $errfile"); |
784 |
chomp($job_id = `$qsub < $pbs_job_script_name $errfile 2>&1`); |
785 |
|
786 |
# ##################################################################### |
787 |
# Parsing job submission status and log accounting information |
788 |
my $qsubstatus = $job_id; |
789 |
$qsubstatus =~ s/[^-a-zA-Z0-9\.:@\/]/_/gs; |
790 |
# ##################################################################### |
791 |
|
792 |
if($? == 0) |
793 |
{ |
794 |
# ##################################################################### |
795 |
# produce CREAM-like accounting record in syslog |
796 |
my $todaylocal = POSIX::strftime "%Y%m%d", localtime; |
797 |
my $accountinglog = "jobmanagement accounting;"; |
798 |
$accountinglog .= " REMOTE_REQUEST_ADDRESS=$peeraddr;" |
799 |
if defined $peeraddr; |
800 |
$accountinglog .= " USER_DN=$gridid;" if defined $gridid; |
801 |
if ( $#fqans >= 0 ) { |
802 |
$accountinglog .= " USER_FQAN={"; |
803 |
foreach my $fqan ( @fqans ) { |
804 |
$accountinglog .= " $fqan;"; |
805 |
} |
806 |
$accountinglog .= " };"; |
807 |
} |
808 |
$accountinglog .= " USER_VO=$voname;" if defined $voname; |
809 |
$accountinglog .= " JOB_REPOSITORY_ID=$jmid;" if defined $jmid; |
810 |
$accountinglog .= " CMD_NAME=JOB_START;"; |
811 |
$accountinglog .= " uid=".getpwuid($<).";"; |
812 |
$accountinglog .= " gid=".getgrgid($().";"; |
813 |
$accountinglog .= " jobId=".$description->uniqid().";"; |
814 |
$accountinglog .= " lrmsAbsJobId=pbs/$todaylocal/$qsubstatus;"; |
815 |
|
816 |
$accountinglog .= "\nqsub success (".getpwuid($<).":".getgrgid($(). |
817 |
") $pbs_job_script_name: $qsubstatus\"\n"; |
818 |
|
819 |
open ACCOUNTING,"|logger -p daemon.info -t jobmanager-pbs\[".$$."\]" and do { |
820 |
print ACCOUNTING $accountinglog; |
821 |
close ACCOUNTING; |
822 |
}; |
823 |
# ##################################################################### |
824 |
|
825 |
$self->log("job submission successful, setting state to PENDING"); |
826 |
return {JOB_ID => $job_id, |
827 |
JOB_STATE => Globus::GRAM::JobState::PENDING }; |
828 |
} |
829 |
else |
830 |
{ |
831 |
local(*ERR); |
832 |
open(ERR, "<$pbs_qsub_err_name"); |
833 |
local $/; |
834 |
my $stderr = <ERR>; |
835 |
close(ERR); |
836 |
|
837 |
# ##################################################################### |
838 |
# error log in syslog |
839 |
system("logger -p daemon.err -t jobmanager-pbs -i -- ". |
840 |
"\"qsub error (".getpwuid($<).":".getgrgid($().") ". |
841 |
" $pbs_job_script_name: $qsubstatus\""); |
842 |
# ##################################################################### |
843 |
|
844 |
$self->log("qsub returned $job_id"); |
845 |
$self->log("qsub stderr $stderr"); |
846 |
|
847 |
open(ERR, ">" . $description->stderr()); |
848 |
print ERR $stderr; |
849 |
close(ERR); |
850 |
|
851 |
$stderr =~ s/\n/\\n/g; |
852 |
|
853 |
$self->respond({GT3_FAILURE_MESSAGE => $stderr }); |
854 |
} |
855 |
|
856 |
return Globus::GRAM::Error::JOB_EXECUTION_FAILED(); |
857 |
} |
858 |
|
859 |
sub poll |
860 |
{ |
861 |
my $self = shift; |
862 |
my $description = $self->{JobDescription}; |
863 |
my $job_id = $description->jobid(); |
864 |
my $state; |
865 |
my $status_line; |
866 |
my $exit_code; |
867 |
|
868 |
$self->log("polling job $job_id"); |
869 |
|
870 |
# Get job id from the full qstat output. |
871 |
$_ = (grep(/job_state/, $self->pipe_out_cmd($qstat, '-f', $job_id)))[0]; |
872 |
# get the exit code of the qstat command. for info search $CHILD_ERROR |
873 |
# in perlvar documentation. |
874 |
$exit_code = $? >> 8; |
875 |
|
876 |
$self->log("qstat job_state line is: $_"); |
877 |
|
878 |
# return code 153 = "Unknown Job Id". |
879 |
# verifying that the job is no longer there. |
880 |
if($exit_code == 153 || $exit_code == 35) |
881 |
{ |
882 |
$self->log("qstat rc is 153 == Unknown Job ID == DONE"); |
883 |
$state = Globus::GRAM::JobState::DONE; |
884 |
$self->nfssync( $description->stdout() ) |
885 |
if $description->stdout() ne ''; |
886 |
$self->nfssync( $description->stderr() ) |
887 |
if $description->stderr() ne ''; |
888 |
} |
889 |
else |
890 |
{ |
891 |
|
892 |
# Get 3rd field (after = ) |
893 |
$_ = (split(/\s+/))[3]; |
894 |
|
895 |
if(/Q|W|T/) |
896 |
{ |
897 |
$state = Globus::GRAM::JobState::PENDING; |
898 |
} |
899 |
elsif(/S|H/) |
900 |
{ |
901 |
$state = Globus::GRAM::JobState::SUSPENDED |
902 |
} |
903 |
elsif(/R|E/) |
904 |
{ |
905 |
$state = Globus::GRAM::JobState::ACTIVE; |
906 |
} |
907 |
elsif(/C/) |
908 |
{ |
909 |
$state = Globus::GRAM::JobState::DONE; |
910 |
$self->nfssync( $description->stdout() ) |
911 |
if $description->stdout() ne ''; |
912 |
$self->nfssync( $description->stderr() ) |
913 |
if $description->stderr() ne ''; |
914 |
} |
915 |
else |
916 |
{ |
917 |
# This else is reached by an unknown response from pbs. |
918 |
# It could be that PBS was temporarily unavailable, but that it |
919 |
# can recover and the submitted job is fine. |
920 |
# So, we want the JM to ignore this poll and keep the same state |
921 |
# as the previous state. Returning an empty hash below will tell |
922 |
# the JM to ignore the respose. |
923 |
$self->log("qstat returned an unknown response. Telling JM to ignore this poll"); |
924 |
return {}; |
925 |
} |
926 |
} |
927 |
|
928 |
return {JOB_STATE => $state}; |
929 |
} |
930 |
|
931 |
sub cancel |
932 |
{ |
933 |
my $self = shift; |
934 |
my $description = $self->{JobDescription}; |
935 |
my $job_id = $description->jobid(); |
936 |
|
937 |
$self->log("cancel job $job_id"); |
938 |
|
939 |
$self->fork_and_exec_cmd( $qdel, $job_id ); |
940 |
|
941 |
if($? == 0) |
942 |
{ |
943 |
return { JOB_STATE => Globus::GRAM::JobState::FAILED } |
944 |
} |
945 |
|
946 |
return Globus::GRAM::Error::JOB_CANCEL_FAILED(); |
947 |
} |
948 |
|
949 |
sub respond_with_failure_extension |
950 |
{ |
951 |
my $self = shift; |
952 |
my $msg = shift; |
953 |
my $rc = shift; |
954 |
|
955 |
$self->respond({GT3_FAILURE_MESSAGE => $msg }); |
956 |
return $rc; |
957 |
} |
958 |
|
959 |
1; |