/[pdpsoft]/trunk/nl.nikhef.ndpf.tools/globus-gram-job-manager-pbs-nikhef/pbs.pm.cin
ViewVC logotype

Contents of /trunk/nl.nikhef.ndpf.tools/globus-gram-job-manager-pbs-nikhef/pbs.pm.cin

Parent Directory Parent Directory | Revision Log Revision Log


Revision 2536 - (show annotations) (download)
Fri Apr 27 08:28:45 2012 UTC (9 years, 8 months ago) by davidg
File size: 31232 byte(s)
Remove exmaple VOMS info

1 # Copyright 1999-2006 University of Chicago
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 use Globus::GRAM::Error;
16 use Globus::GRAM::JobState;
17 use Globus::GRAM::JobManager;
18 use Globus::Core::Paths;
19 use Globus::Core::Config;
20
21 # NOTE: This package name must match the name of the .pm file!!
22 package Globus::GRAM::JobManager::pbs;
23
24 @ISA = qw(Globus::GRAM::JobManager);
25
26 my ($mpirun, $mpiexec, $qsub, $qstat, $qdel, $cluster, $cpu_per_node, $remote_shell, $vomsinfo);
27
28 BEGIN
29 {
30 my $config = new Globus::Core::Config(
31 '${sysconfdir}/globus/globus-pbs.conf');
32
33 $mpiexec = $config->get_attribute('mpiexec') || 'no';
34 if ($mpiexec ne 'no' && ! -x $mpiexec)
35 {
36 $mpiexec = 'no';
37 }
38 $mpirun = $config->get_attribute('mpirun') || 'no';
39 if ($mpirun ne 'no' && ! -x $mpirun)
40 {
41 $mpirun = 'no';
42 }
43 $qsub = $config->get_attribute('qsub') || 'no';
44 $qstat = $config->get_attribute('qstat') || 'no';
45 $qdel = $config->get_attribute('qdel') || 'no';
46 $cluster = $config->get_attribute('cluster') || undef;
47 if ($cluster eq 'no')
48 {
49 $cluster = undef;
50 }
51 my $pbs_default = $config->get_attribute('pbs_default') || '';
52 if ($pbs_default ne '')
53 {
54 $ENV{PBS_DEFAULT} = $pbs_default;
55 }
56
57 $cpu_per_node = $config->get_attribute('cpu_per_node') || 1;
58 $remote_shell = $config->get_attribute('remote_shell') || undef;
59 $softenv_dir = $config->get_attribute('softenv_dir') || '';
60 $vomsinfo = $config->get_attribute('vomsinfo') || undef;
61 }
62
63 sub myceil ($)
64 {
65 my $x = shift;
66 ( abs($x-int($x)) < 1E-12 ) ? int($x) : int($x < 0 ? $x : $x+1.0);
67 }
68
69 sub submit
70 {
71 my $self = shift;
72 my $description = $self->{JobDescription};
73 my $status;
74 my $pbs_job_script;
75 my $pbs_job_script_name;
76 my $pbs_qsub_err_name ;
77 my $errfile = '';
78 my $job_id;
79 my $rsh_env;
80 my @arguments;
81 my $email_when = '';
82 my $args;
83 my $cache_pgm = "$Globus::Core::Paths::bindir/globus-gass-cache";
84 my $soft_msc = "$softenv_dir/bin/soft-msc";
85 my $softenv_load = "$softenv_dir/etc/softenv-load.sh";
86
87
88 $self->log("Entering pbs submit");
89
90 # Reject jobs that want streaming, if so configured
91 if ( $description->streamingrequested() &&
92 $description->streamingdisabled() ) {
93
94 $self->log("Streaming is not allowed.");
95 return Globus::GRAM::Error::OPENING_STDOUT;
96 }
97
98 # check jobtype
99 if(defined($description->jobtype()))
100 {
101 if($description->jobtype !~ /^(mpi|single|multiple)$/)
102 {
103 return Globus::GRAM::Error::JOBTYPE_NOT_SUPPORTED;
104 }
105 }
106 if( $description->directory eq '')
107 {
108 return Globus::GRAM::Error::RSL_DIRECTORY();
109 }
110 if ($description->directory() =~ m|^[^/]|) {
111 $description->add("directory",
112 $ENV{HOME} . '/' . $description->directory());
113 }
114 chdir $description->directory() or
115 return Globus::GRAM::Error::BAD_DIRECTORY();
116
117 $self->nfssync( $description->executable() )
118 unless $description->executable() eq '';
119 $self->nfssync( $description->stdin() )
120 unless $description->stdin() eq '';
121 if( $description->executable eq '')
122 {
123 return Globus::GRAM::Error::RSL_EXECUTABLE();
124 }
125 #elsif(! -f $description->executable())
126 #{
127 # return Globus::GRAM::Error::EXECUTABLE_NOT_FOUND();
128 #}
129 #elsif(! -x $description->executable())
130 #{
131 # return Globus::GRAM::Error::EXECUTABLE_PERMISSIONS();
132 #}
133 elsif( $description->stdin() eq '')
134 {
135 return Globus::GRAM::Error::RSL_STDIN;
136 }
137 #elsif(! -r $description->stdin())
138 #{
139 # return Globus::GRAM::Error::STDIN_NOT_FOUND();
140 #}
141
142 $self->log("Determining job max time cpu from job description");
143 if(defined($description->max_cpu_time()))
144 {
145 $cpu_time = $description->max_cpu_time();
146 $self->log(" using maxcputime of $cpu_time");
147 }
148 elsif(! $cluster && defined($description->max_time()))
149 {
150 $cpu_time = $description->max_time();
151 $self->log(" using maxtime of $cpu_time");
152 }
153 else
154 {
155 $cpu_time = 0;
156 $self->log(' using queue default');
157 }
158
159 $self->log("Determining job max wall time limit from job description");
160 if(defined($description->max_wall_time()))
161 {
162 $wall_time = $description->max_wall_time();
163 $self->log(" using maxwalltime of $wall_time");
164 }
165 elsif($cluster && defined($description->max_time()))
166 {
167 $wall_time = $description->max_time();
168 $self->log(" using maxtime of $wall_time");
169 }
170 else
171 {
172 $wall_time = 0;
173 $self->log(' using queue default');
174 }
175
176 $self->log('Building job script');
177
178 $pbs_job_script_name = $self->job_dir() . '/scheduler_pbs_job_script';
179
180 local(*JOB);
181 $rc = open( JOB, '>' . $pbs_job_script_name );
182
183 if (!$rc)
184 {
185 return $self->respond_with_failure_extension(
186 "open: $pbs_job_script_name: $!",
187 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
188 }
189 $rc = print JOB<<"EOF";
190 #! /bin/sh
191 # PBS batch job script built by Globus job manager
192 #
193 #PBS -S /bin/sh
194 EOF
195 if (!$rc)
196 {
197 return $self->respond_with_failure_extension(
198 "print: $pbs_job_script_name: $!",
199 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
200 }
201
202 if($description->name() ne '')
203 {
204 $rc = print JOB '#PBS -N ', $description->name(), "\n";
205 if (!$rc)
206 {
207 return $self->respond_with_failure_extension(
208 "print: $pbs_job_script_name: $!",
209 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
210 }
211 }
212 if($description->email_address() ne '')
213 {
214 $rc = print JOB '#PBS -M ', $description->email_address(), "\n";
215 if (!$rc)
216 {
217 return $self->respond_with_failure_extension(
218 "print: $pbs_job_script_name: $!",
219 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
220 }
221 }
222 if($description->emailonabort() eq 'yes')
223 {
224 $email_when .= 'a';
225 }
226 if($description->emailonexecution() eq 'yes')
227 {
228 $email_when .= 'b';
229 }
230 if($description->emailontermination() eq 'yes')
231 {
232 $email_when .= 'e';
233 }
234 if($email_when eq '')
235 {
236 $email_when = 'n';
237 }
238 $rc = print JOB "#PBS -m $email_when\n";
239 if (!$rc)
240 {
241 return $self->respond_with_failure_extension(
242 "print: $pbs_job_script_name: $!",
243 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
244 }
245
246 if($description->queue() ne '')
247 {
248 $rc = print JOB '#PBS -q ', $description->queue(), "\n";
249 if (!$rc)
250 {
251 return $self->respond_with_failure_extension(
252 "print: $pbs_job_script_name: $!",
253 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
254 }
255 }
256 if($description->project() ne '')
257 {
258 $rc = print JOB '#PBS -A ', $description->project(), "\n";
259 if (!$rc)
260 {
261 return $self->respond_with_failure_extension(
262 "print: $pbs_job_script_name: $!",
263 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
264 }
265 }
266
267 if($cpu_time != 0)
268 {
269 if($description->jobtype() eq 'multiple')
270 {
271 if ($description->totalprocesses() > 0)
272 {
273 $total_cpu_time = $cpu_time * $description->totalprocesses();
274 }
275 else
276 {
277 $total_cpu_time = $cpu_time * $description->count();
278 }
279 }
280 else
281 {
282 $total_cpu_time = $cpu_time;
283 }
284 $rc = print JOB "#PBS -l pcput=${cpu_time}:00\n"
285 . "#PBS -l cput=${total_cpu_time}:00\n";
286 if (!$rc)
287 {
288 return $self->respond_with_failure_extension(
289 "print: $pbs_job_script_name: $!",
290 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
291 }
292 }
293
294 if($wall_time != 0)
295 {
296 $rc = print JOB "#PBS -l walltime=${wall_time}:00\n";
297 if (!$rc)
298 {
299 return $self->respond_with_failure_extension(
300 "print: $pbs_job_script_name: $!",
301 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
302 }
303 }
304
305 if($description->max_memory() != 0)
306 {
307 if($description->jobtype() eq 'multiple')
308 {
309 if ($description->totalprocesses() > 0)
310 {
311 $max_memory = $description->max_memory()
312 * $description->totalprocesses();
313 }
314 else
315 {
316 $max_memory = $description->max_memory()
317 * $description->count();
318 }
319 }
320 else
321 {
322 $max_memory = $description->max_memory();
323 }
324 $rc = print JOB "#PBS -l mem=${max_memory}mb\n";
325 if (!$rc)
326 {
327 return $self->respond_with_failure_extension(
328 "print: $pbs_job_script_name: $!",
329 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
330 }
331 }
332 $rc = print JOB '#PBS -o ', $description->stdout(), "\n" ,
333 '#PBS -e ', $description->stderr(), "\n";
334 if (!$rc)
335 {
336 return $self->respond_with_failure_extension(
337 "print: $pbs_job_script_name: $!",
338 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
339 }
340
341 if (defined $description->nodes())
342 {
343 #Generated by ExtensionsHandler.pm from resourceAllocationGroup elements
344 $rc = print JOB '#PBS -l nodes=', $description->nodes(), "\n";
345 }
346 elsif($description->host_count() != 0)
347 {
348 $rc = print JOB '#PBS -l nodes=', $description->host_count(), "\n";
349 }
350 elsif($cluster && $cpu_per_node != 0)
351 {
352 $rc = print JOB '#PBS -l nodes=',
353 myceil($description->count() / $cpu_per_node), "\n";
354 }
355 if (!$rc)
356 {
357 return $self->respond_with_failure_extension(
358 "print: $pbs_job_script_name: $!",
359 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
360 }
361
362 ### SoftEnv extension ###
363 if ($softenv_dir ne '')
364 {
365 $rc = $self->setup_softenv(
366 $self->job_dir() . '/pbs_softenv_job_script',
367 $soft_msc,
368 $softenv_load,
369 *JOB);
370
371 if ($rc != 0)
372 {
373 return $self->respond_with_failure_extension(
374 "setup_softenv: $rc",
375 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
376 }
377 }
378 #########################
379
380 $rsh_env = '';
381
382 foreach my $tuple ($description->environment())
383 {
384 if(!ref($tuple) || scalar(@$tuple) != 2)
385 {
386 return Globus::GRAM::Error::RSL_ENVIRONMENT();
387 }
388
389 # Unset any variable whose value contains a comma,
390 # because OpenPBS cannot handle those. EDG bug 1208.
391 push(@new_env, $tuple->[0] . '="' . $tuple->[1] . '"')
392 unless $tuple->[1] =~ /,/;
393
394 $tuple->[0] =~ s/\\/\\\\/g;
395 $tuple->[0] =~ s/\$/\\\$/g;
396 $tuple->[0] =~ s/"/\\\"/g; #"
397 $tuple->[0] =~ s/`/\\\`/g; #`
398
399 $tuple->[1] =~ s/\\/\\\\/g;
400 $tuple->[1] =~ s/\$/\\\$/g;
401 $tuple->[1] =~ s/"/\\\"/g; #"
402 $tuple->[1] =~ s/`/\\\`/g; #`
403
404 $rsh_env .= $tuple->[0] . '="' . $tuple->[1] . "\";\n"
405 . 'export ' . $tuple->[0] . ";\n";
406
407 }
408
409 ##########################################################################
410 # Extract accounting and VO data name for later use
411 ##########################################################################
412 my ($proxylocation, $voname, $gridid, $peeraddr, $jmid);
413 my @fqans;
414 foreach my $tuple ($description->environment()) {
415 $proxylocation = $tuple->[1] if $tuple->[0] eq "X509_USER_PROXY";
416 $gridid = $tuple->[1] if $tuple->[0] eq "GRID_ID";
417 $peeraddr = $tuple->[1] if $tuple->[0] eq "GATEKEEPER_PEER";
418 $jmid = $tuple->[1] if $tuple->[0] eq "GATEKEEPER_JM_ID";
419 }
420 if ( defined ($proxylocation) and defined ($vomsinfo) and $vomsinfo ) {
421 print JOB '# Inspecting proxy '.$proxylocation."\n";
422
423 if ( -x $vomsinfo ) {
424 chomp($voname=`$vomsinfo -file "$proxylocation" -vo 2>/dev/null`);
425 @fqans = map { chomp($_); $_; }
426 `$vomsinfo -file "$proxylocation" -fqan 2>/dev/null`;
427
428 ### EXAMPLE for as long as long proxies kill the jobmanager
429 #$voname="dteam";
430 #@fqans = ( '/dteam/Role=NULL/Capability=NULL' );
431
432 if ( $voname =~ /[a-zA-Z][\w\d\.]*/ ) {
433 print JOB "# VO name from proxy is $voname\n";
434 $rsh_env .= 'VONAME="' . $voname . "\";\n"
435 . 'export ' . "VONAME" . ";\n";
436 push(@new_env, "VONAME" . '="' . $voname . '"');
437 } else {
438 print JOB "# VO name $voname from proxy is NOT valid\n";
439 }
440 if ( $#fqans >= 0 ) {
441 print JOB "# FQAN order in proxy:\n";
442 foreach my $qfan ( @fqans ) { print JOB "# $fqan\n"; }
443 };
444 } else {
445 print JOB "# $vomsinfo not available, ignored\n";
446 }
447 } else {
448 print JOB "# Proxy location undefined or no vomsinfo requested\n";
449 }
450 ##########################################################################
451
452
453 $rc = print JOB "$rsh_env\n"
454 . "#Change to directory requested by user\n"
455 . 'cd ' . $description->directory() . "\n";
456 if (!$rc)
457 {
458 return $self->respond_with_failure_extension(
459 "print: $pbs_job_script_name: $!",
460 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
461 }
462
463 @arguments = $description->arguments();
464
465 foreach(@arguments)
466 {
467 if(ref($_))
468 {
469 return Globus::GRAM::Error::RSL_ARGUMENTS();
470 }
471 }
472 if($#arguments >= 0)
473 {
474 foreach(@arguments)
475 {
476 $self->log("Transforming argument \"$_\"\n");
477 $_ =~ s/\\/\\\\/g;
478 $_ =~ s/\$/\\\$/g;
479 $_ =~ s/"/\\\"/g; #"
480 $_ =~ s/`/\\\`/g; #`
481 $self->log("Transformed to \"$_\"\n");
482
483 $args .= '"' . $_ . '" ';
484 }
485 }
486 else
487 {
488 $args = '';
489 }
490
491 #if ($description->executable() =~ m|^[^/]|)
492 #{
493 # $description->add('executable', './' . $description->executable());
494 #}
495 if($description->jobtype() eq 'multiple' && (($description->count()==1) || !$cluster))
496 {
497 my $process_count;
498 if ($description->totalprocesses() > 0)
499 {
500 $process_count = $description->totalprocesses();
501 }
502 else
503 {
504 $process_count = $description->count();
505 }
506
507 # #####################################################################
508 # this is a simple multi-processor job on one node that can use $TMPDIR
509 # unless the user has given one explicitly
510 # refer back to JobManager.pm, but currently it seems that
511 # $self->make_scratchdir uses "gram_scratch_" as a component
512 print JOB '# tmpdir patch: multiple-cpu job on single node'."\n";
513 if ( ( $description->directory() =~ /(\/tmp|.*gram_scratch_.*)/ ||
514 $description->directory() eq $ENV{HOME} ) and
515 ( $description->host_count() <= 1 ) and
516 ( $description->count <= 1 )
517 ) {
518 print JOB '# user ended in a scratch directory, reset to TMPDIR'."\n";
519 print JOB '[ x"$TMPDIR" != x"" ] && cd $TMPDIR'."\n";
520 } else {
521 print JOB '# user requested this specific directory'."\n";
522 print JOB '# DIR = '.$description->directory()."\n";
523 }
524 # #####################################################################
525
526
527 $rc = print JOB "pids=''\n"
528 . "exit_code=0\n";
529 if (!$rc)
530 {
531 return $self->respond_with_failure_extension(
532 "print: $pbs_job_script_name: $!",
533 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
534 }
535 for(my $i = 0; $i < $process_count; $i++)
536 {
537 $rc = print JOB $description->executable(), " $args <",
538 $description->stdin(), "&\n", "pids=\"\$pids \$!\"\n";
539 if (!$rc)
540 {
541 return $self->respond_with_failure_extension(
542 "print: $pbs_job_script_name: $!",
543 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
544 }
545 }
546 $rc = print JOB <<EOF;
547 for x in \$pids; do
548 wait \$x
549 tmp_exit_code=\$?
550 if [ \$exit_code = 0 -a \$tmp_exit_code != 0 ]; then
551 exit_code=\$tmp_exit_code
552 fi
553 done
554 exit \$exit_code
555 EOF
556 if (!$rc)
557 {
558 return $self->respond_with_failure_extension(
559 "print: $pbs_job_script_name: $!",
560 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
561 }
562 }
563 # mpi and multiple only honoured on multi-core jobs
564 elsif( $description->jobtype() eq 'mpi' ||
565 ( ($description->jobtype() eq 'multiple') and
566 ($description->count > 1 ) ) )
567 {
568 my $count;
569 if ($description->totalprocesses() > 0)
570 {
571 $count = $description->totalprocesses();
572 }
573 else
574 {
575 $count = $description->count();
576 }
577 my $cmd_script_name ;
578 my $cmd_script ;
579 my $stdin = $description->stdin();
580
581 $cmd_script_name = $self->job_dir() . '/scheduler_pbs_cmd_script';
582
583 local(*CMD);
584 if ( open( CMD, ">$cmd_script_name" ) )
585 {
586 $rc = print CMD "#!/bin/sh\n";
587 if (!$rc)
588 {
589 return $self->respond_with_failure_extension(
590 "print: $cmd_script_name: $!",
591 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
592 }
593
594 ### SoftEnv extension ###
595 $rc = $self->setup_softenv(
596 $self->job_dir() . '/pbs_softenv_cmd_script',
597 $soft_msc,
598 *CMD);
599 if ($rc != 0)
600 {
601 return $self->respond_with_failure_extension(
602 "setup_softenv: $rc",
603 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
604 }
605 #########################
606
607 $rc = print CMD 'cd ', $description->directory(), "\n",
608 "$rsh_env\n",
609 $description->executable(), " $args\n";
610 if (!$rc)
611 {
612 return $self->respond_with_failure_extension(
613 "print: $cmd_script_name: $!",
614 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
615 }
616 $rc = close(CMD);
617 if (!$rc)
618 {
619 return $self->respond_with_failure_extension(
620 "close: $cmd_script_name: $!",
621 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
622 }
623 chmod 0700, $cmd_script_name;
624
625 $self->nfssync( $cmd_script_name );
626 }
627 else
628 {
629 return $self->respond_with_failure_extension(
630 "open: $cmd_script_name: $!",
631 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
632 }
633
634 if ($description->jobtype() eq "mpi")
635 {
636 if ($mpiexec ne 'no')
637 {
638 my $machinefilearg = "";
639 if ($cluster)
640 {
641 $machinefilearg = ' -machinefile $PBS_NODEFILE';
642 }
643 if ($description->totalprocesses() > 0)
644 {
645 $rc = print JOB "$mpiexec $machinefilearg -n "
646 . $description->totalprocesses();
647 }
648 else
649 {
650 $rc = print JOB "$mpiexec $machinefilearg -n "
651 . $description->count();
652 }
653 if (!$rc)
654 {
655 return $self->respond_with_failure_extension(
656 "print: $pbs_job_script_name: $!",
657 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
658 }
659 }
660 else
661 {
662 if ($description->totalprocesses() > 0)
663 {
664 $rc = print JOB "$mpirun -np " . $description->totalprocesses();
665 }
666 else
667 {
668 $rc = print JOB "$mpirun -np " . $description->count();
669 }
670 if (!$rc)
671 {
672 return $self->respond_with_failure_extension(
673 "print: $pbs_job_script_name: $!",
674 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
675 }
676 if ($cluster)
677 {
678 $rc = print JOB ' -machinefile $PBS_NODEFILE';
679 if (!$rc)
680 {
681 return $self->respond_with_failure_extension(
682 "print: $pbs_job_script_name: $!",
683 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
684 }
685 }
686 }
687
688 $rc = print JOB " $cmd_script_name < ".$description->stdin() . "\n";
689 if (!$rc)
690 {
691 return $self->respond_with_failure_extension(
692 "print: $pbs_job_script_name: $!",
693 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
694 }
695 }
696 else
697 {
698 my $exit_prefix=$self->job_dir() . '/exit';
699
700 $rc = print JOB <<"EOF";
701
702 hosts=\`cat \$PBS_NODEFILE\`;
703 counter=0
704 while test \$counter -lt $count; do
705 for host in \$hosts; do
706 if test \$counter -lt $count; then
707 $remote_shell \$host "/bin/sh $cmd_script_name; echo \\\$? > $exit_prefix.\$counter" < $stdin &
708 counter=\`expr \$counter + 1\`
709 else
710 break
711 fi
712 done
713 done
714 wait
715
716 counter=0
717 exit_code=0
718 while test \$counter -lt $count; do
719 /bin/touch $exit_prefix.\$counter;
720
721 read tmp_exit_code < $exit_prefix.\$counter
722 if [ \$exit_code = 0 -a \$tmp_exit_code != 0 ]; then
723 exit_code=\$tmp_exit_code
724 fi
725 counter=\`expr \$counter + 1\`
726 done
727
728 exit \$exit_code
729 EOF
730 if (!$rc)
731 {
732 return $self->respond_with_failure_extension(
733 "print: $pbs_job_script_name: $!",
734 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
735 }
736 }
737 }
738 else
739 {
740 # #####################################################################
741 # this is a simple single-node job that can use $TMPDIR
742 # unless the user has given one explicitly
743 # refer back to JobManager.pm, but currently it seems that
744 # $self->make_scratchdir uses "gram_scratch_" as a component
745 print JOB '# tmpdir patch: unknown job on single node'."\n";
746 if ( ( $description->directory() =~ /(\/tmp|.*gram_scratch_.*)/ ||
747 $description->directory() eq $ENV{HOME} ) and
748 ( $description->host_count() <= 1 ) and
749 ( $description->count <= 1 )
750 ) {
751 print JOB '# user ended in a scratch directory, reset to TMPDIR'."\n";
752 print JOB '[ x"$TMPDIR" != x"" ] && cd $TMPDIR'."\n";
753 } else {
754 print JOB '# user requested this specific directory'."\n";
755 print JOB '# DIR = '.$description->directory()."\n";
756 }
757 # #####################################################################
758
759 $rc = print JOB $description->executable(), " $args <",
760 $description->stdin(), "\n";
761 if (!$rc)
762 {
763 return $self->respond_with_failure_extension(
764 "print: $pbs_job_script_name: $!",
765 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
766 }
767 }
768 $rc = close(JOB);
769 if (!$rc)
770 {
771 return $self->respond_with_failure_extension(
772 "print: $pbs_job_script_name: $!",
773 Globus::GRAM::Error::TEMP_SCRIPT_FILE_FAILED());
774 }
775
776 $pbs_qsub_err_name = $self->job_dir() . '/scheduler_pbs_submit_stderr';
777 $errfile = "2>$pbs_qsub_err_name";
778
779 $self->nfssync( $pbs_job_script_name );
780 $self->nfssync( $pbs_qsub_err_name );
781 $self->nfssync( $description->stdout, 1 );
782 $self->nfssync( $description->stderr, 1 );
783 $self->log("submitting job -- $qsub < $pbs_job_script_name $errfile");
784 chomp($job_id = `$qsub < $pbs_job_script_name $errfile 2>&1`);
785
786 # #####################################################################
787 # Parsing job submission status and log accounting information
788 my $qsubstatus = $job_id;
789 $qsubstatus =~ s/[^-a-zA-Z0-9\.:@\/]/_/gs;
790 # #####################################################################
791
792 if($? == 0)
793 {
794 # #####################################################################
795 # produce CREAM-like accounting record in syslog
796 my $todaylocal = POSIX::strftime "%Y%m%d", localtime;
797 my $accountinglog = "jobmanagement accounting;";
798 $accountinglog .= " REMOTE_REQUEST_ADDRESS=$peeraddr;"
799 if defined $peeraddr;
800 $accountinglog .= " USER_DN=$gridid;" if defined $gridid;
801 if ( $#fqans >= 0 ) {
802 $accountinglog .= " USER_FQAN={";
803 foreach my $fqan ( @fqans ) {
804 $accountinglog .= " $fqan;";
805 }
806 $accountinglog .= " };";
807 }
808 $accountinglog .= " USER_VO=$voname;" if defined $voname;
809 $accountinglog .= " JOB_REPOSITORY_ID=$jmid;" if defined $jmid;
810 $accountinglog .= " CMD_NAME=JOB_START;";
811 $accountinglog .= " uid=".getpwuid($<).";";
812 $accountinglog .= " gid=".getgrgid($().";";
813 $accountinglog .= " jobId=".$description->uniqid().";";
814 $accountinglog .= " lrmsAbsJobId=pbs/$todaylocal/$qsubstatus;";
815
816 $accountinglog .= "\nqsub success (".getpwuid($<).":".getgrgid($().
817 ") $pbs_job_script_name: $qsubstatus\"\n";
818
819 open ACCOUNTING,"|logger -p daemon.info -t jobmanager-pbs\[".$$."\]" and do {
820 print ACCOUNTING $accountinglog;
821 close ACCOUNTING;
822 };
823 # #####################################################################
824
825 $self->log("job submission successful, setting state to PENDING");
826 return {JOB_ID => $job_id,
827 JOB_STATE => Globus::GRAM::JobState::PENDING };
828 }
829 else
830 {
831 local(*ERR);
832 open(ERR, "<$pbs_qsub_err_name");
833 local $/;
834 my $stderr = <ERR>;
835 close(ERR);
836
837 # #####################################################################
838 # error log in syslog
839 system("logger -p daemon.err -t jobmanager-pbs -i -- ".
840 "\"qsub error (".getpwuid($<).":".getgrgid($().") ".
841 " $pbs_job_script_name: $qsubstatus\"");
842 # #####################################################################
843
844 $self->log("qsub returned $job_id");
845 $self->log("qsub stderr $stderr");
846
847 open(ERR, ">" . $description->stderr());
848 print ERR $stderr;
849 close(ERR);
850
851 $stderr =~ s/\n/\\n/g;
852
853 $self->respond({GT3_FAILURE_MESSAGE => $stderr });
854 }
855
856 return Globus::GRAM::Error::JOB_EXECUTION_FAILED();
857 }
858
859 sub poll
860 {
861 my $self = shift;
862 my $description = $self->{JobDescription};
863 my $job_id = $description->jobid();
864 my $state;
865 my $status_line;
866 my $exit_code;
867
868 $self->log("polling job $job_id");
869
870 # Get job id from the full qstat output.
871 $_ = (grep(/job_state/, $self->pipe_out_cmd($qstat, '-f', $job_id)))[0];
872 # get the exit code of the qstat command. for info search $CHILD_ERROR
873 # in perlvar documentation.
874 $exit_code = $? >> 8;
875
876 $self->log("qstat job_state line is: $_");
877
878 # return code 153 = "Unknown Job Id".
879 # verifying that the job is no longer there.
880 if($exit_code == 153 || $exit_code == 35)
881 {
882 $self->log("qstat rc is 153 == Unknown Job ID == DONE");
883 $state = Globus::GRAM::JobState::DONE;
884 $self->nfssync( $description->stdout() )
885 if $description->stdout() ne '';
886 $self->nfssync( $description->stderr() )
887 if $description->stderr() ne '';
888 }
889 else
890 {
891
892 # Get 3rd field (after = )
893 $_ = (split(/\s+/))[3];
894
895 if(/Q|W|T/)
896 {
897 $state = Globus::GRAM::JobState::PENDING;
898 }
899 elsif(/S|H/)
900 {
901 $state = Globus::GRAM::JobState::SUSPENDED
902 }
903 elsif(/R|E/)
904 {
905 $state = Globus::GRAM::JobState::ACTIVE;
906 }
907 elsif(/C/)
908 {
909 $state = Globus::GRAM::JobState::DONE;
910 $self->nfssync( $description->stdout() )
911 if $description->stdout() ne '';
912 $self->nfssync( $description->stderr() )
913 if $description->stderr() ne '';
914 }
915 else
916 {
917 # This else is reached by an unknown response from pbs.
918 # It could be that PBS was temporarily unavailable, but that it
919 # can recover and the submitted job is fine.
920 # So, we want the JM to ignore this poll and keep the same state
921 # as the previous state. Returning an empty hash below will tell
922 # the JM to ignore the respose.
923 $self->log("qstat returned an unknown response. Telling JM to ignore this poll");
924 return {};
925 }
926 }
927
928 return {JOB_STATE => $state};
929 }
930
931 sub cancel
932 {
933 my $self = shift;
934 my $description = $self->{JobDescription};
935 my $job_id = $description->jobid();
936
937 $self->log("cancel job $job_id");
938
939 $self->fork_and_exec_cmd( $qdel, $job_id );
940
941 if($? == 0)
942 {
943 return { JOB_STATE => Globus::GRAM::JobState::FAILED }
944 }
945
946 return Globus::GRAM::Error::JOB_CANCEL_FAILED();
947 }
948
949 sub respond_with_failure_extension
950 {
951 my $self = shift;
952 my $msg = shift;
953 my $rc = shift;
954
955 $self->respond({GT3_FAILURE_MESSAGE => $msg });
956 return $rc;
957 }
958
959 1;

grid.support@nikhef.nl
ViewVC Help
Powered by ViewVC 1.1.28