/[pdpsoft]/trunk/nl.nikhef.ndpf.tools/globus-gram-job-manager-pbs-nikhef/pbs.pm.cin
ViewVC logotype

Contents of /trunk/nl.nikhef.ndpf.tools/globus-gram-job-manager-pbs-nikhef/pbs.pm.cin

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2518 - (show annotations) (download)
Tue Apr 24 15:56:43 2012 UTC (10 years, 5 months ago) by davidg
File size: 31024 byte(s)
Add accounting trickery

1 # Copyright 1999-2006 University of Chicago
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 use Globus::GRAM::Error;
16 use Globus::GRAM::JobState;
17 use Globus::GRAM::JobManager;
18 use Globus::Core::Paths;
19 use Globus::Core::Config;
20
21 # NOTE: This package name must match the name of the .pm file!!
22 package Globus::GRAM::JobManager::pbs;
23
24 @ISA = qw(Globus::GRAM::JobManager);
25
26 my ($mpirun, $mpiexec, $qsub, $qstat, $qdel, $cluster, $cpu_per_node, $remote_shell);
27
28 BEGIN
29 {
30 my $config = new Globus::Core::Config(
31 '${sysconfdir}/globus/globus-pbs.conf');
32
33 $mpiexec = $config->get_attribute('mpiexec') || 'no';
34 if ($mpiexec ne 'no' && ! -x $mpiexec)
35 {
36 $mpiexec = 'no';
37 }
38 $mpirun = $config->get_attribute('mpirun') || 'no';
39 if ($mpirun ne 'no' && ! -x $mpirun)
40 {
41 $mpirun = 'no';
42 }
43 $qsub = $config->get_attribute('qsub') || 'no';
44 $qstat = $config->get_attribute('qstat') || 'no';
45 $qdel = $config->get_attribute('qdel') || 'no';
46 $cluster = $config->get_attribute('cluster') || undef;
47 if ($cluster eq 'no')
48 {
49 $cluster = undef;
50 }
51 my $pbs_default = $config->get_attribute('pbs_default') || '';
52 if ($pbs_default ne '')
53 {
54 $ENV{PBS_DEFAULT} = $pbs_default;
55 }
56
57 $cpu_per_node = $config->get_attribute('cpu_per_node') || 1;
58 $remote_shell = $config->get_attribute('remote_shell') || undef;
59 $softenv_dir = $config->get_attribute('softenv_dir') || '';
60 }
61
62 sub myceil ($)
63 {
64 my $x = shift;
65 ( abs($x-int($x)) < 1E-12 ) ? int($x) : int($x < 0 ? $x : $x+1.0);
66 }
67
68 sub submit
69 {
70 my $self = shift;
71 my $description = $self->{JobDescription};
72 my $status;
73 my $pbs_job_script;
74 my $pbs_job_script_name;
75 my $pbs_qsub_err_name ;
76 my $errfile = '';
77 my $job_id;
78 my $rsh_env;
79 my @arguments;
80 my $email_when = '';
81 my $args;
82 my $cache_pgm = "$Globus::Core::Paths::bindir/globus-gass-cache";
83 my $soft_msc = "$softenv_dir/bin/soft-msc";
84 my $softenv_load = "$softenv_dir/etc/softenv-load.sh";
85
86
87 $self->log("Entering pbs submit");
88
89 # Reject jobs that want streaming, if so configured
90 if ( $description->streamingrequested() &&
91 $description->streamingdisabled() ) {
92
93 $self->log("Streaming is not allowed.");
94 return Globus::GRAM::Error::OPENING_STDOUT;
95 }
96
97 # check jobtype
98 if(defined($description->jobtype()))
99 {
100 if($description->jobtype !~ /^(mpi|single|multiple)$/)
101 {
102 return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED;
103 }
104 }
105 if( $description->directory eq '')
106 {
107 return Globus::GRAM::Error::RSL_DIRECTORY();
108 }
109 if ($description->directory() =~ m|^[^/]|) {
110 $description->add("directory",
111 $ENV{HOME} . '/' . $description->directory());
112 }
113 chdir $description->directory() or
114 return Globus::GRAM::Error::BAD_DIRECTORY();
115
116 $self->nfssync( $description->executable() )
117 unless $description->executable() eq '';
118 $self->nfssync( $description->stdin() )
119 unless $description->stdin() eq '';
120 if( $description->executable eq '')
121 {
122 return Globus::GRAM::Error::RSL_EXECUTABLE();
123 }
124 #elsif(! -f $description->executable())
125 #{
126 # return Globus::GRAM::Error::EXECUTABLE_NOT_FOUND();
127 #}
128 #elsif(! -x $description->executable())
129 #{
130 # return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS();
131 #}
132 elsif( $description->stdin() eq '')
133 {
134 return Globus::GRAM::Error::RSL_STDIN;
135 }
136 #elsif(! -r $description->stdin())
137 #{
138 # return Globus::GRAM::Error::STDIN_NOT_FOUND();
139 #}
140
141 $self->log("Determining job max time cpu from job description");
142 if(defined($description->max_cpu_time()))
143 {
144 $cpu_time = $description->max_cpu_time();
145 $self->log(" using maxcputime of $cpu_time");
146 }
147 elsif(! $cluster && defined($description->max_time()))
148 {
149 $cpu_time = $description->max_time();
150 $self->log(" using maxtime of $cpu_time");
151 }
152 else
153 {
154 $cpu_time = 0;
155 $self->log(' using queue default');
156 }
157
158 $self->log("Determining job max wall time limit from job description");
159 if(defined($description->max_wall_time()))
160 {
161 $wall_time = $description->max_wall_time();
162 $self->log(" using maxwalltime of $wall_time");
163 }
164 elsif($cluster && defined($description->max_time()))
165 {
166 $wall_time = $description->max_time();
167 $self->log(" using maxtime of $wall_time");
168 }
169 else
170 {
171 $wall_time = 0;
172 $self->log(' using queue default');
173 }
174
175 $self->log('Building job script');
176
177 $pbs_job_script_name = $self->job_dir() . '/scheduler_pbs_job_script';
178
179 local(*JOB);
180 $rc = open( JOB, '>' . $pbs_job_script_name );
181
182 if (!$rc)
183 {
184 return $self->respond_with_failure_extension(
185 "open: $pbs_job_script_name: $!",
186 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
187 }
188 $rc = print JOB<<"EOF";
189 #! /bin/sh
190 # PBS batch job script built by Globus job manager
191 #
192 #PBS -S /bin/sh
193 EOF
194 if (!$rc)
195 {
196 return $self->respond_with_failure_extension(
197 "print: $pbs_job_script_name: $!",
198 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
199 }
200
201 if($description->name() ne '')
202 {
203 $rc = print JOB '#PBS -N ', $description->name(), "\n";
204 if (!$rc)
205 {
206 return $self->respond_with_failure_extension(
207 "print: $pbs_job_script_name: $!",
208 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
209 }
210 }
211 if($description->email_address() ne '')
212 {
213 $rc = print JOB '#PBS -M ', $description->email_address(), "\n";
214 if (!$rc)
215 {
216 return $self->respond_with_failure_extension(
217 "print: $pbs_job_script_name: $!",
218 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
219 }
220 }
221 if($description->emailonabort() eq 'yes')
222 {
223 $email_when .= 'a';
224 }
225 if($description->emailonexecution() eq 'yes')
226 {
227 $email_when .= 'b';
228 }
229 if($description->emailontermination() eq 'yes')
230 {
231 $email_when .= 'e';
232 }
233 if($email_when eq '')
234 {
235 $email_when = 'n';
236 }
237 $rc = print JOB "#PBS -m $email_when\n";
238 if (!$rc)
239 {
240 return $self->respond_with_failure_extension(
241 "print: $pbs_job_script_name: $!",
242 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
243 }
244
245 if($description->queue() ne '')
246 {
247 $rc = print JOB '#PBS -q ', $description->queue(), "\n";
248 if (!$rc)
249 {
250 return $self->respond_with_failure_extension(
251 "print: $pbs_job_script_name: $!",
252 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
253 }
254 }
255 if($description->project() ne '')
256 {
257 $rc = print JOB '#PBS -A ', $description->project(), "\n";
258 if (!$rc)
259 {
260 return $self->respond_with_failure_extension(
261 "print: $pbs_job_script_name: $!",
262 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
263 }
264 }
265
266 if($cpu_time != 0)
267 {
268 if($description->jobtype() eq 'multiple')
269 {
270 if ($description->totalprocesses() > 0)
271 {
272 $total_cpu_time = $cpu_time * $description->totalprocesses();
273 }
274 else
275 {
276 $total_cpu_time = $cpu_time * $description->count();
277 }
278 }
279 else
280 {
281 $total_cpu_time = $cpu_time;
282 }
283 $rc = print JOB "#PBS -l pcput=${cpu_time}:00\n"
284 . "#PBS -l cput=${total_cpu_time}:00\n";
285 if (!$rc)
286 {
287 return $self->respond_with_failure_extension(
288 "print: $pbs_job_script_name: $!",
289 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
290 }
291 }
292
293 if($wall_time != 0)
294 {
295 $rc = print JOB "#PBS -l walltime=${wall_time}:00\n";
296 if (!$rc)
297 {
298 return $self->respond_with_failure_extension(
299 "print: $pbs_job_script_name: $!",
300 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
301 }
302 }
303
304 if($description->max_memory() != 0)
305 {
306 if($description->jobtype() eq 'multiple')
307 {
308 if ($description->totalprocesses() > 0)
309 {
310 $max_memory = $description->max_memory()
311 * $description->totalprocesses();
312 }
313 else
314 {
315 $max_memory = $description->max_memory()
316 * $description->count();
317 }
318 }
319 else
320 {
321 $max_memory = $description->max_memory();
322 }
323 $rc = print JOB "#PBS -l mem=${max_memory}mb\n";
324 if (!$rc)
325 {
326 return $self->respond_with_failure_extension(
327 "print: $pbs_job_script_name: $!",
328 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
329 }
330 }
331 $rc = print JOB '#PBS -o ', $description->stdout(), "\n" ,
332 '#PBS -e ', $description->stderr(), "\n";
333 if (!$rc)
334 {
335 return $self->respond_with_failure_extension(
336 "print: $pbs_job_script_name: $!",
337 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
338 }
339
340 if (defined $description->nodes())
341 {
342 #Generated by ExtensionsHandler.pm from resourceAllocationGroup elements
343 $rc = print JOB '#PBS -l nodes=', $description->nodes(), "\n";
344 }
345 elsif($description->host_count() != 0)
346 {
347 $rc = print JOB '#PBS -l nodes=', $description->host_count(), "\n";
348 }
349 elsif($cluster && $cpu_per_node != 0)
350 {
351 $rc = print JOB '#PBS -l nodes=',
352 myceil($description->count() / $cpu_per_node), "\n";
353 }
354 if (!$rc)
355 {
356 return $self->respond_with_failure_extension(
357 "print: $pbs_job_script_name: $!",
358 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
359 }
360
361 ### SoftEnv extension ###
362 if ($softenv_dir ne '')
363 {
364 $rc = $self->setup_softenv(
365 $self->job_dir() . '/pbs_softenv_job_script',
366 $soft_msc,
367 $softenv_load,
368 *JOB);
369
370 if ($rc != 0)
371 {
372 return $self->respond_with_failure_extension(
373 "setup_softenv: $rc",
374 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
375 }
376 }
377 #########################
378
379 $rsh_env = '';
380
381 foreach my $tuple ($description->environment())
382 {
383 if(!ref($tuple) || scalar(@$tuple) != 2)
384 {
385 return Globus::GRAM::Error::RSL_ENVIRONMENT();
386 }
387
388 # Unset any variable whose value contains a comma,
389 # because OpenPBS cannot handle those. EDG bug 1208.
390
391 push(@new_env, $tuple->[0] . '="' . $tuple->[1] . '"') unless $tuple->[1] =~ /,/;
392
393 $tuple->[0] =~ s/\\/\\\\/g;
394 $tuple->[0] =~ s/\$/\\\$/g;
395 $tuple->[0] =~ s/"/\\\"/g; #"
396 $tuple->[0] =~ s/`/\\\`/g; #`
397
398 $tuple->[1] =~ s/\\/\\\\/g;
399 $tuple->[1] =~ s/\$/\\\$/g;
400 $tuple->[1] =~ s/"/\\\"/g; #"
401 $tuple->[1] =~ s/`/\\\`/g; #`
402
403 $rsh_env .= $tuple->[0] . '="' . $tuple->[1] . "\";\n"
404 . 'export ' . $tuple->[0] . ";\n";
405
406 # preserve GRID_ID, GATEKEEPER_PEER, and others locally
407 if ( $tuple->[0] =~ /^GRID_ID|GATEKEEPER_PEER|GATEKEEPER_JM_ID/ ) {
408 $ENV{$tuple->[0]} = $tuple->[1];
409 }
410 }
411
412 ###############################################################################
413 # Extract VO name for later use in variable definition
414 ###############################################################################
415 my $proxylocation;
416 my $voname;
417 my @fqans;
418 my $vpinfo = '/usr/bin/voms-proxy-info';
419 foreach my $tuple ($description->environment()) {
420 $proxylocation = $tuple->[1] if $tuple->[0] eq "X509_USER_PROXY";
421 }
422 if ( defined ($proxylocation) ) {
423 print JOB '# Inspecting proxy '.$proxylocation."\n";
424
425 [ -x $vpinfo ] and do {
426 $voname=`$vpinfo -file "$proxylocation" -vo 2>/dev/null`;
427 $voname="dteam";
428 chomp($voname);
429 if ( $voname =~ /[a-zA-Z][\w\d\.]*/ ) {
430 print JOB "# VO name from proxy is $voname\n";
431 $rsh_env .= 'VONAME="' . $voname . "\";\n"
432 . 'export ' . "VONAME" . ";\n";
433 push(@new_env, "VONAME" . '="' . $voname . '"');
434 } else {
435 print JOB "# VO name $voname from proxy is NOT valid\n";
436 }
437 @fqans = map { chomp($_); $_; }
438 `$vpinfo -file "$proxylocation" -fqan 2>/dev/null`;
439 if ( $#fqans >= 0 ) {
440 print JOB "# FQAN order in proxy:\n";
441 foreach my $qfan ( @fqans ) { print JOB "# $fqan\n"; }
442 };
443 };
444 } else {
445 print JOB "# Proxy location undefined, VO not set\n";
446 }
447 foreach my $e ( keys %ENV ) {
448 print JOB "# SYSTEM ENV $e = ".$ENV{$e}."\n";
449 }
450 ###############################################################################
451
452
453 $rc = print JOB "$rsh_env\n"
454 . "#Change to directory requested by user\n"
455 . 'cd ' . $description->directory() . "\n";
456 if (!$rc)
457 {
458 return $self->respond_with_failure_extension(
459 "print: $pbs_job_script_name: $!",
460 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
461 }
462
463 @arguments = $description->arguments();
464
465 foreach(@arguments)
466 {
467 if(ref($_))
468 {
469 return Globus::GRAM::Error::RSL_ARGUMENTS();
470 }
471 }
472 if($#arguments >= 0)
473 {
474 foreach(@arguments)
475 {
476 $self->log("Transforming argument \"$_\"\n");
477 $_ =~ s/\\/\\\\/g;
478 $_ =~ s/\$/\\\$/g;
479 $_ =~ s/"/\\\"/g; #"
480 $_ =~ s/`/\\\`/g; #`
481 $self->log("Transformed to \"$_\"\n");
482
483 $args .= '"' . $_ . '" ';
484 }
485 }
486 else
487 {
488 $args = '';
489 }
490
491 #if ($description->executable() =~ m|^[^/]|)
492 #{
493 # $description->add('executable', './' . $description->executable());
494 #}
495 if($description->jobtype() eq 'multiple' && (($description->count()==1) || !$cluster))
496 {
497 my $process_count;
498 if ($description->totalprocesses() > 0)
499 {
500 $process_count = $description->totalprocesses();
501 }
502 else
503 {
504 $process_count = $description->count();
505 }
506
507 # #####################################################################
508 # this is a simple multi-processor job on one node that can use $TMPDIR
509 # unless the user has given one explicitly
510 # refer back to JobManager.pm, but currently it seems that
511 # $self->make_scratchdir uses "gram_scratch_" as a component
512 print JOB '# tmpdir patch: multiple-cpu job on single node'."\n";
513 if ( ( $description->directory() =~ /(\/tmp|.*gram_scratch_.*)/ ||
514 $description->directory() eq $ENV{HOME} ) and
515 ( $description->host_count() <= 1 ) and
516 ( $description->count <= 1 )
517 ) {
518 print JOB '# user ended in a scratch directory, reset to TMPDIR'."\n";
519 print JOB '[ x"$TMPDIR" != x"" ] && cd $TMPDIR'."\n";
520 } else {
521 print JOB '# user requested this specific directory'."\n";
522 print JOB '# DIR = '.$description->directory()."\n";
523 }
524 # #####################################################################
525
526
527 $rc = print JOB "pids=''\n"
528 . "exit_code=0\n";
529 if (!$rc)
530 {
531 return $self->respond_with_failure_extension(
532 "print: $pbs_job_script_name: $!",
533 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
534 }
535 for(my $i = 0; $i < $process_count; $i++)
536 {
537 $rc = print JOB $description->executable(), " $args <",
538 $description->stdin(), "&\n", "pids=\"\$pids \$!\"\n";
539 if (!$rc)
540 {
541 return $self->respond_with_failure_extension(
542 "print: $pbs_job_script_name: $!",
543 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
544 }
545 }
546 $rc = print JOB <<EOF;
547 for x in \$pids; do
548 wait \$x
549 tmp_exit_code=\$?
550 if [ \$exit_code = 0 -a \$tmp_exit_code != 0 ]; then
551 exit_code=\$tmp_exit_code
552 fi
553 done
554 exit \$exit_code
555 EOF
556 if (!$rc)
557 {
558 return $self->respond_with_failure_extension(
559 "print: $pbs_job_script_name: $!",
560 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
561 }
562 }
563 # mpi and multiple only honoured on multi-core jobs
564 elsif( $description->jobtype() eq 'mpi' ||
565 ( ($description->jobtype() eq 'multiple') and
566 ($description->count > 1 ) ) )
567 {
568 my $count;
569 if ($description->totalprocesses() > 0)
570 {
571 $count = $description->totalprocesses();
572 }
573 else
574 {
575 $count = $description->count();
576 }
577 my $cmd_script_name ;
578 my $cmd_script ;
579 my $stdin = $description->stdin();
580
581 $cmd_script_name = $self->job_dir() . '/scheduler_pbs_cmd_script';
582
583 local(*CMD);
584 if ( open( CMD, ">$cmd_script_name" ) )
585 {
586 $rc = print CMD "#!/bin/sh\n";
587 if (!$rc)
588 {
589 return $self->respond_with_failure_extension(
590 "print: $cmd_script_name: $!",
591 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
592 }
593
594 ### SoftEnv extension ###
595 $rc = $self->setup_softenv(
596 $self->job_dir() . '/pbs_softenv_cmd_script',
597 $soft_msc,
598 *CMD);
599 if ($rc != 0)
600 {
601 return $self->respond_with_failure_extension(
602 "setup_softenv: $rc",
603 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
604 }
605 #########################
606
607 $rc = print CMD 'cd ', $description->directory(), "\n",
608 "$rsh_env\n",
609 $description->executable(), " $args\n";
610 if (!$rc)
611 {
612 return $self->respond_with_failure_extension(
613 "print: $cmd_script_name: $!",
614 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
615 }
616 $rc = close(CMD);
617 if (!$rc)
618 {
619 return $self->respond_with_failure_extension(
620 "close: $cmd_script_name: $!",
621 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
622 }
623 chmod 0700, $cmd_script_name;
624
625 $self->nfssync( $cmd_script_name );
626 }
627 else
628 {
629 return $self->respond_with_failure_extension(
630 "open: $cmd_script_name: $!",
631 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
632 }
633
634 if ($description->jobtype() eq "mpi")
635 {
636 if ($mpiexec ne 'no')
637 {
638 my $machinefilearg = "";
639 if ($cluster)
640 {
641 $machinefilearg = ' -machinefile $PBS_NODEFILE';
642 }
643 if ($description->totalprocesses() > 0)
644 {
645 $rc = print JOB "$mpiexec $machinefilearg -n "
646 . $description->totalprocesses();
647 }
648 else
649 {
650 $rc = print JOB "$mpiexec $machinefilearg -n "
651 . $description->count();
652 }
653 if (!$rc)
654 {
655 return $self->respond_with_failure_extension(
656 "print: $pbs_job_script_name: $!",
657 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
658 }
659 }
660 else
661 {
662 if ($description->totalprocesses() > 0)
663 {
664 $rc = print JOB "$mpirun -np " . $description->totalprocesses();
665 }
666 else
667 {
668 $rc = print JOB "$mpirun -np " . $description->count();
669 }
670 if (!$rc)
671 {
672 return $self->respond_with_failure_extension(
673 "print: $pbs_job_script_name: $!",
674 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
675 }
676 if ($cluster)
677 {
678 $rc = print JOB ' -machinefile $PBS_NODEFILE';
679 if (!$rc)
680 {
681 return $self->respond_with_failure_extension(
682 "print: $pbs_job_script_name: $!",
683 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
684 }
685 }
686 }
687
688 $rc = print JOB " $cmd_script_name < ".$description->stdin() . "\n";
689 if (!$rc)
690 {
691 return $self->respond_with_failure_extension(
692 "print: $pbs_job_script_name: $!",
693 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
694 }
695 }
696 else
697 {
698 my $exit_prefix=$self->job_dir() . '/exit';
699
700 $rc = print JOB <<"EOF";
701
702 hosts=\`cat \$PBS_NODEFILE\`;
703 counter=0
704 while test \$counter -lt $count; do
705 for host in \$hosts; do
706 if test \$counter -lt $count; then
707 $remote_shell \$host "/bin/sh $cmd_script_name; echo \\\$? > $exit_prefix.\$counter" < $stdin &
708 counter=\`expr \$counter + 1\`
709 else
710 break
711 fi
712 done
713 done
714 wait
715
716 counter=0
717 exit_code=0
718 while test \$counter -lt $count; do
719 /bin/touch $exit_prefix.\$counter;
720
721 read tmp_exit_code < $exit_prefix.\$counter
722 if [ \$exit_code = 0 -a \$tmp_exit_code != 0 ]; then
723 exit_code=\$tmp_exit_code
724 fi
725 counter=\`expr \$counter + 1\`
726 done
727
728 exit \$exit_code
729 EOF
730 if (!$rc)
731 {
732 return $self->respond_with_failure_extension(
733 "print: $pbs_job_script_name: $!",
734 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
735 }
736 }
737 }
738 else
739 {
740 # #####################################################################
741 # this is a simple single-node job that can use $TMPDIR
742 # unless the user has given one explicitly
743 # refer back to JobManager.pm, but currently it seems that
744 # $self->make_scratchdir uses "gram_scratch_" as a component
745 print JOB '# tmpdir patch: unknown job on single node'."\n";
746 if ( ( $description->directory() =~ /(\/tmp|.*gram_scratch_.*)/ ||
747 $description->directory() eq $ENV{HOME} ) and
748 ( $description->host_count() <= 1 ) and
749 ( $description->count <= 1 )
750 ) {
751 print JOB '# user ended in a scratch directory, reset to TMPDIR'."\n";
752 print JOB '[ x"$TMPDIR" != x"" ] && cd $TMPDIR'."\n";
753 } else {
754 print JOB '# user requested this specific directory'."\n";
755 print JOB '# DIR = '.$description->directory()."\n";
756 }
757 # #####################################################################
758
759 $rc = print JOB $description->executable(), " $args <",
760 $description->stdin(), "\n";
761 if (!$rc)
762 {
763 return $self->respond_with_failure_extension(
764 "print: $pbs_job_script_name: $!",
765 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
766 }
767 }
768 $rc = close(JOB);
769 if (!$rc)
770 {
771 return $self->respond_with_failure_extension(
772 "print: $pbs_job_script_name: $!",
773 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
774 }
775
776 $pbs_qsub_err_name = $self->job_dir() . '/scheduler_pbs_submit_stderr';
777 $errfile = "2>$pbs_qsub_err_name";
778
779 $self->nfssync( $pbs_job_script_name );
780 $self->nfssync( $pbs_qsub_err_name );
781 $self->nfssync( $description->stdout, 1 );
782 $self->nfssync( $description->stderr, 1 );
783 $self->log("submitting job -- $qsub < $pbs_job_script_name $errfile");
784 chomp($job_id = `$qsub < $pbs_job_script_name $errfile 2>&1`);
785
786 # #####################################################################
787 # Parsing job submission status and log accounting information
788 my $qsubstatus = $job_id;
789 $qsubstatus =~ s/[^-a-zA-Z0-9\.:@\/]/_/gs;
790 # #####################################################################
791
792 if($? == 0)
793 {
794 # #####################################################################
795 # produce CREAM-like accounting record in syslog
796 my $todaylocal = POSIX::strftime "%Y%m%d", localtime;
797 my $accountinglog = "jobmanagement accounting;";
798 $accountinglog .= " REMOTE_REQUEST_ADDRESS=".$ENV{"GATEKEEPER_PEER"}.";";
799 $accountinglog .= " USER_DN=".$ENV{"GRID_ID"}.";";
800 if ( $#fqans >= 0 ) {
801 $accountinglog .= " USER_FQAN={";
802 foreach my $fqan ( @fqans ) {
803 $accountinglog .= " $fqan;";
804 }
805 $accountinglog .= " };";
806 }
807 $accountinglog .= " USER_VO=$voname;";
808 $accountinglog .= " JOB_REPOSITORY_ID=".$ENV{"JOB_REPOSITORY_ID"}.";"
809 if defined $ENV{"JOB_REPOSITORY_ID"};
810 $accountinglog .= " CMD_NAME=JOB_START;";
811 $accountinglog .= " uid=".getpwuid($<).";";
812 $accountinglog .= " gid=".getgrgid($().";";
813 $accountinglog .= " jobId=".$description->uniqid().";";
814 $accountinglog .= " lrmsAbsJobId=pbs/$todaylocal/$qsubstatus;";
815
816 $accountinglog .= "\nqsub success (".getpwuid($<).":".getgrgid($().
817 ") $pbs_job_script_name: $qsubstatus\"\n";
818
819 open ACCOUNTING,"|logger -p daemon.info -t jobmanager-pbs\[".$$."\]" and do {
820 print ACCOUNTING $accountinglog;
821 close ACCOUNTING;
822 };
823 # #####################################################################
824
825 $self->log("job submission successful, setting state to PENDING");
826 return {JOB_ID => $job_id,
827 JOB_STATE => Globus::GRAM::JobState::PENDING };
828 }
829 else
830 {
831 local(*ERR);
832 open(ERR, "<$pbs_qsub_err_name");
833 local $/;
834 my $stderr = <ERR>;
835 close(ERR);
836
837 # #####################################################################
838 # error log in syslog
839 system("logger -p daemon.err -t jobmanager-pbs -i -- ".
840 "\"qsub error (".getpwuid($<).":".getgrgid($().")".
841 " $pbs_job_script_name: $qsubstatus\"");
842 # #####################################################################
843
844 $self->log("qsub returned $job_id");
845 $self->log("qsub stderr $stderr");
846
847 open(ERR, ">" . $description->stderr());
848 print ERR $stderr;
849 close(ERR);
850
851 $stderr =~ s/\n/\\n/g;
852
853 $self->respond({GT3_FAILURE_MESSAGE => $stderr });
854 }
855
856 return Globus::GRAM::Error::JOB_EXECUTION_FAILED();
857 }
858
859 sub poll
860 {
861 my $self = shift;
862 my $description = $self->{JobDescription};
863 my $job_id = $description->jobid();
864 my $state;
865 my $status_line;
866 my $exit_code;
867
868 $self->log("polling job $job_id");
869
870 # Get job id from the full qstat output.
871 $_ = (grep(/job_state/, $self->pipe_out_cmd($qstat, '-f', $job_id)))[0];
872 # get the exit code of the qstat command. for info search $CHILD_ERROR
873 # in perlvar documentation.
874 $exit_code = $? >> 8;
875
876 $self->log("qstat job_state line is: $_");
877
878 # return code 153 = "Unknown Job Id".
879 # verifying that the job is no longer there.
880 if($exit_code == 153 || $exit_code == 35)
881 {
882 $self->log("qstat rc is 153 == Unknown Job ID == DONE");
883 $state = Globus::GRAM::JobState::DONE;
884 $self->nfssync( $description->stdout() )
885 if $description->stdout() ne '';
886 $self->nfssync( $description->stderr() )
887 if $description->stderr() ne '';
888 }
889 else
890 {
891
892 # Get 3rd field (after = )
893 $_ = (split(/\s+/))[3];
894
895 if(/Q|W|T/)
896 {
897 $state = Globus::GRAM::JobState::PENDING;
898 }
899 elsif(/S|H/)
900 {
901 $state = Globus::GRAM::JobState::SUSPENDED
902 }
903 elsif(/R|E/)
904 {
905 $state = Globus::GRAM::JobState::ACTIVE;
906 }
907 elsif(/C/)
908 {
909 $state = Globus::GRAM::JobState::DONE;
910 $self->nfssync( $description->stdout() )
911 if $description->stdout() ne '';
912 $self->nfssync( $description->stderr() )
913 if $description->stderr() ne '';
914 }
915 else
916 {
917 # This else is reached by an unknown response from pbs.
918 # It could be that PBS was temporarily unavailable, but that it
919 # can recover and the submitted job is fine.
920 # So, we want the JM to ignore this poll and keep the same state
921 # as the previous state. Returning an empty hash below will tell
922 # the JM to ignore the respose.
923 $self->log("qstat returned an unknown response. Telling JM to ignore this poll");
924 return {};
925 }
926 }
927
928 return {JOB_STATE => $state};
929 }
930
931 sub cancel
932 {
933 my $self = shift;
934 my $description = $self->{JobDescription};
935 my $job_id = $description->jobid();
936
937 $self->log("cancel job $job_id");
938
939 $self->fork_and_exec_cmd( $qdel, $job_id );
940
941 if($? == 0)
942 {
943 return { JOB_STATE => Globus::GRAM::JobState::FAILED }
944 }
945
946 return Globus::GRAM::Error::JOB_CANCEL_FAILED();
947 }
948
949 sub respond_with_failure_extension
950 {
951 my $self = shift;
952 my $msg = shift;
953 my $rc = shift;
954
955 $self->respond({GT3_FAILURE_MESSAGE => $msg });
956 return $rc;
957 }
958
959 1;

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28