Commit 1e24bf6e authored by Jens Korinth's avatar Jens Korinth
Browse files

Slurm fixes

parent 37331e13
...@@ -6,13 +6,9 @@ ...@@ -6,13 +6,9 @@
#SBATCH --mem-per-cpu=@@MEM_PER_CPU@@ #SBATCH --mem-per-cpu=@@MEM_PER_CPU@@
#SBATCH -n @@CPUS@@ #SBATCH -n @@CPUS@@
#SBATCH -t @@TIMELIMIT@@ #SBATCH -t @@TIMELIMIT@@
#SBATCh --comment="@@COMMENT@@" #SBATCH --comment="@@COMMENT@@"
export TAPASCO_HOME=@@TAPASCO_HOME@@ source @@TAPASCO_HOME@@/setup.sh
source ~/vivado.sh
pushd $TAPASCO_HOME
source ./setup.sh
popd
# user commands begin here # user commands begin here
echo "SLURM job #$SLURM_JOB_ID started at $(date)" echo "SLURM job #$SLURM_JOB_ID started at $(date)"
......
...@@ -113,14 +113,15 @@ final object Slurm extends Publisher { ...@@ -113,14 +113,15 @@ final object Slurm extends Publisher {
* @param script Job script file to schedule via `sbatch`. * @param script Job script file to schedule via `sbatch`.
* @return Either a positive integer (SLURM id), or an Exception. * @return Either a positive integer (SLURM id), or an Exception.
**/ **/
def apply(script: Path, retries: Int = 3): Option[Int] = catchAllDefault[Option[Int]](None, "Slurm scheduling failed: ") { def apply(script: Path, retries: Int = 10): Option[Int] = catchAllDefault[Option[Int]](None, "Slurm scheduling failed: ") {
val cmd = "sbatch %s".format(script.toAbsolutePath().normalize().toString) val cmd = "sbatch %s".format(script.toAbsolutePath().normalize().toString)
logger.debug("running slurm batch job: '%s'".format(cmd)) logger.debug("running slurm batch job: '%s'".format(cmd))
val res = cmd.!! val res = cmd.!!
val id = slurmSubmissionAck.findFirstMatchIn(res) map (_ group (1) toInt) val id = slurmSubmissionAck.findFirstMatchIn(res) map (_ group (1) toInt)
if (id.isEmpty) { if (id.isEmpty) {
if (retries > 0) { if (retries > 0) {
Thread.sleep(slurmRetryDelay) // wait 10 secs // wait for 10 secs + random up to 5 secs to avoid congestion
Thread.sleep(slurmRetryDelay + scala.util.Random.nextInt() % (slurmRetryDelay / 2))
apply(script, retries - 1) apply(script, retries - 1)
} else { throw new SlurmException(script.toString, res) } } else { throw new SlurmException(script.toString, res) }
} else { } else {
......
...@@ -28,7 +28,7 @@ class ComposeTask(composition: Composition, ...@@ -28,7 +28,7 @@ class ComposeTask(composition: Composition,
private[this] implicit val _logger = de.tu_darmstadt.cs.esa.tapasco.Logging.logger(getClass) private[this] implicit val _logger = de.tu_darmstadt.cs.esa.tapasco.Logging.logger(getClass)
private[this] val _slurm = Slurm.enabled private[this] val _slurm = Slurm.enabled
private[this] var _composerResult: Option[Composer.Result] = None private[this] var _composerResult: Option[Composer.Result] = None
private[this] val _outDir = cfg.outputDir(composition, target, designFrequency) private[this] val _outDir = cfg.outputDir(composition, target, designFrequency, features getOrElse Seq())
private[this] val _logFile = logFile getOrElse "%s/%s.log".format(_outDir, "tapasco") private[this] val _logFile = logFile getOrElse "%s/%s.log".format(_outDir, "tapasco")
private[this] val _errorLogFile = Paths.get(_logFile).resolveSibling("slurm-compose.errors.log") private[this] val _errorLogFile = Paths.get(_logFile).resolveSibling("slurm-compose.errors.log")
...@@ -85,13 +85,13 @@ class ComposeTask(composition: Composition, ...@@ -85,13 +85,13 @@ class ComposeTask(composition: Composition,
) )
// define SLURM job // define SLURM job
val job = Slurm.Job( val job = Slurm.Job(
name = "compose-%s-%s-%s-%1.2f".format(composition.id, target.ad.name, target.pd.name, designFrequency), name = elementdesc,
slurmLog = slgFile.toString, slurmLog = slgFile.toString,
errorLog = _errorLogFile.toString, errorLog = _errorLogFile.toString,
consumer = this, consumer = this,
maxHours = ComposeTask.MAX_COMPOSE_HOURS, maxHours = ComposeTask.MAX_COMPOSE_HOURS,
commands = Seq("tapasco --configFile %s".format(cfgFile.toString)), commands = Seq("tapasco --configFile %s".format(cfgFile.toString)),
comment = Some("%s".format(composition.composition map (ce => "%s % d".format(ce.kernel, ce.count)) mkString ", ")) comment = Some(_outDir.toString)
) )
// generate non-SLURM config with single job // generate non-SLURM config with single job
val newCfg = cfg val newCfg = cfg
...@@ -144,7 +144,7 @@ class ComposeTask(composition: Composition, ...@@ -144,7 +144,7 @@ class ComposeTask(composition: Composition,
object ComposeTask { object ComposeTask {
import scala.io._ import scala.io._
import de.tu_darmstadt.cs.esa.tapasco.reports._ import de.tu_darmstadt.cs.esa.tapasco.reports._
private final val MAX_COMPOSE_HOURS = 48 private final val MAX_COMPOSE_HOURS = 23
private final val RE_RESULT = """compose run .*result: ([^,]+)""".r.unanchored private final val RE_RESULT = """compose run .*result: ([^,]+)""".r.unanchored
private final val RE_LOG = """compose run .*result: \S+.*logfile: '([^']+)'""".r.unanchored private final val RE_LOG = """compose run .*result: \S+.*logfile: '([^']+)'""".r.unanchored
private final val RE_TIMING = """compose run .*result: \S+.*timing report: '([^']+)'""".r.unanchored private final val RE_TIMING = """compose run .*result: \S+.*timing report: '([^']+)'""".r.unanchored
......
...@@ -3,7 +3,6 @@ import de.tu_darmstadt.cs.esa.tapasco.util.Publisher ...@@ -3,7 +3,6 @@ import de.tu_darmstadt.cs.esa.tapasco.util.Publisher
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.{Failure, Success} import scala.util.{Failure, Success}
import scala.concurrent.ExecutionContext.Implicits.global
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import java.time.LocalDateTime import java.time.LocalDateTime
...@@ -87,6 +86,9 @@ class Tasks extends Publisher { ...@@ -87,6 +86,9 @@ class Tasks extends Publisher {
type Event = Tasks.Event type Event = Tasks.Event
import Tasks.Events._ import Tasks.Events._
private[this] final val _logger = de.tu_darmstadt.cs.esa.tapasco.Logging.logger(getClass) private[this] final val _logger = de.tu_darmstadt.cs.esa.tapasco.Logging.logger(getClass)
private[this] final implicit val _exectx = scala.concurrent.ExecutionContext.fromExecutorService(
java.util.concurrent.Executors.newCachedThreadPool()
)
override def +=(el: EventListener): Unit = { override def +=(el: EventListener): Unit = {
super.+=(el) super.+=(el)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment