Commit ea7c3934 authored by Jens Korinth's avatar Jens Korinth
Browse files

Closes #147 - Implement maxTasks option

* now supports --maxTasks command line / JSON option to limit the number
  of parallel tasks executed by TaPaSCo
parent f8d090ae
Pipeline #150 canceled with stages
in 1 minute and 20 seconds
.TH tapasco 1 2017.1 MAN(1)
.TH tapasco 1 2018.1 MAN(1)
.SH SYNOPSIS
tapasco [global option]* [job]*
or: tapasco \-h | \-\-help [TOPIC]
......@@ -80,7 +80,13 @@ Execute all jobs in parallel (careful!)
.RS
\-\-maxThreads NUM
.RS
Limit internal parallelism of activities (e.g., Vivado) to the given number of threads.
Limit internal parallelism of tasks (e.g., Vivado) to the given number of threads.
.RE
.RE
.RS
\-\-maxTasks NUM
.RS
Limit max. number tasks executed in parallel to the given number
.RE
.RE
.SH BULK IMPORT JOB
......@@ -612,6 +618,12 @@ any quoted or unquoted string containing additional information about the core
.RE
.RE
.RS
\-\-skipEvaluation
.RS
do not perform out\-of\-context synthesis to get resource estimates, only import
.RE
.RE
.RS
\-\-averageClockCycles N
.RS
any integer > 0; number of clock cycles in an "average" execution of the PE; used to estimate the maximal throughput
......
......@@ -68,52 +68,56 @@ object Tapasco {
// scalastyle:off cyclomatic.complexity
// scalastyle:off method.length
def main(args: Array[String]) {
implicit val tasks = new Tasks
val ok = try {
val cfgargs = if (args.length > 0 && args(0).toLowerCase.equals("itapasco")) args.drop(1) else args
// try to parse all arguments
val c = CommandLineParser(cfgargs mkString " ")
logger.debug("parsed config: {}", c)
if (c.isRight) {
// get parsed Configuration
implicit val cfg = c.right.get
// dump config and exit, if dryRun is selected
cfg.dryRun foreach (dryRun _)
// else continue ...
logger.trace("configuring FileAssetManager...")
FileAssetManager(cfg)
logger.trace("SLURM: {}", cfg.slurm)
if (cfg.slurm) Slurm.enabled = cfg.slurm
FileAssetManager.start()
logger.trace("parallel: {}", cfg.parallel)
cfg.logFile map { logfile: Path => setupLogFileAppender(logfile.toString) }
logger.info("Running with configuration: {}", cfg.toString)
implicit val exe = ExecutionContext.fromExecutor(new java.util.concurrent.ForkJoinPool(UNLIMITED_THREADS))
def get(f: Future[Boolean]): Boolean = { Await.ready(f, duration.Duration.Inf); f.value map (_ getOrElse false) getOrElse false }
val cfgargs = if (args.length > 0 && args(0).toLowerCase.equals("itapasco")) args.drop(1) else args
// try to parse all arguments
val c = CommandLineParser(cfgargs mkString " ")
logger.debug("parsed config: {}", c)
val ok = if (c.isRight) {
implicit val tasks = new Tasks(c.right.get.maxTasks)
// get parsed Configuration
implicit val cfg = c.right.get
// dump config and exit, if dryRun is selected
cfg.dryRun foreach (dryRun _)
// else continue ...
logger.trace("configuring FileAssetManager...")
FileAssetManager(cfg)
logger.trace("SLURM: {}", cfg.slurm)
if (cfg.slurm) Slurm.enabled = cfg.slurm
FileAssetManager.start()
logger.trace("parallel: {}", cfg.parallel)
cfg.logFile map { logfile: Path => setupLogFileAppender(logfile.toString) }
logger.info("Running with configuration: {}", cfg.toString)
def get(f: Future[Boolean]): Boolean = { Await.ready(f, duration.Duration.Inf); f.value map (_ getOrElse false) getOrElse false }
try {
if (cfg.parallel) {
implicit val exe = ExecutionContext.fromExecutor(new java.util.concurrent.ForkJoinPool(cfg.maxTasks getOrElse UNLIMITED_THREADS))
runGui(args) || (cfg.jobs map { j => Future { jobs.executors.execute(j) } } map (get _) fold true) (_ && _)
} else {
runGui(args) || (cfg.jobs map { jobs.executors.execute(_) } fold true) (_ && _)
}
} else {
logger.error("invalid arguments: {}", c.left.get.toString)
logger.error("run `tapasco -h` or `tapasco --help` to get more info")
} catch { case ex: Exception =>
logger.error(ex.toString)
logger.error("Stack trace: {}", ex.getStackTrace() map (_.toString) mkString "\n")
false
} finally {
FileAssetManager.stop()
tasks.stop()
}
} catch { case ex: Exception =>
logger.error(ex.toString)
logger.error("Stack trace: {}", ex.getStackTrace() map (_.toString) mkString "\n")
} else {
logger.error("invalid arguments: {}", c.left.get.toString)
logger.error("run `tapasco -h` or `tapasco --help` to get more info")
false
} finally {
FileAssetManager.stop()
tasks.stop()
}
logger.debug("active threads: {}", Thread.activeCount())
if (Thread.activeCount() > 0) {
import scala.collection.JavaConverters._
val m = Thread.getAllStackTraces().asScala
m.values foreach { strace => logger.debug(strace mkString scala.util.Properties.lineSeparator) }
}
if (! ok) {
logger.error("TaPaSCo finished with errors")
sys.exit(1)
......
......@@ -50,6 +50,8 @@ trait Configuration {
def parallel(enabled: Boolean): Configuration
def maxThreads: Option[Int]
def maxThreads(mt: Option[Int]): Configuration
def maxTasks: Option[Int]
def maxTasks(mt: Option[Int]): Configuration
def dryRun(cfg: Option[Path]): Configuration
def dryRun: Option[Path]
def verbose(mode: Option[String]): Configuration
......
......@@ -44,6 +44,7 @@ private case class ConfigurationImpl (
slurm: Boolean = false,
parallel: Boolean = false,
maxThreads: Option[Int] = None,
maxTasks: Option[Int] = None,
dryRun: Option[Path] = None,
verbose: Option[String] = None,
jobs: Seq[Job] = Seq()
......@@ -64,6 +65,7 @@ private case class ConfigurationImpl (
def slurm(enabled: Boolean): Configuration = this.copy(slurm = enabled)
def parallel(enabled: Boolean): Configuration = this.copy(parallel = enabled)
def maxThreads(mt: Option[Int]): Configuration = this.copy(maxThreads = mt)
def maxTasks(mt: Option[Int]): Configuration = this.copy(maxTasks = mt)
def dryRun(cfg: Option[Path]): Configuration = this.copy(dryRun = cfg)
def verbose(mode: Option[String]): Configuration = this.copy(verbose = mode)
def jobs(js: Seq[Job]): Configuration = this.copy(jobs = js)
......
......@@ -83,6 +83,7 @@ private object PrettyPrinter {
"Slurm = " + c.slurm,
"Parallel = " + c.parallel,
"MaxThreads = " + (c.maxThreads getOrElse "unlimited"),
"MaxTasks = " + (c.maxTasks getOrElse "unlimited"),
"Jobs = " + c.jobs
) mkString NL
......
......@@ -293,6 +293,7 @@ package object json {
(JsPath \ "Slurm").readNullable[Boolean].map (_ getOrElse false) ~
(JsPath \ "Parallel").readNullable[Boolean].map (_ getOrElse false) ~
(JsPath \ "MaxThreads").readNullable[Int] ~
(JsPath \ "MaxTasks").readNullable[Int] ~
(JsPath \ "DryRun").readNullable[Path] ~
(JsPath \ "Verbose").readNullable[String] ~
(JsPath \ "Jobs").read[Seq[Job]]
......@@ -308,6 +309,7 @@ package object json {
(JsPath \ "Slurm").write[Boolean] ~
(JsPath \ "Parallel").write[Boolean] ~
(JsPath \ "MaxThreads").writeNullable[Int] ~
(JsPath \ "MaxTasks").writeNullable[Int] ~
(JsPath \ "DryRun").writeNullable[Path].transform((js: JsObject) => js - "DryRun") ~
(JsPath \ "Verbose").writeNullable[String] ~
(JsPath \ "Jobs").write[Seq[Job]]
......
......@@ -43,7 +43,8 @@ private object GlobalOptions {
longOption("logFile") |
longOption("parallel") |
longOption("slurm") |
longOption("maxThreads")
longOption("maxThreads") |
longOption("maxTasks")
).opaque("a global option")
def help: Parser[(String, String)] =
......@@ -93,8 +94,11 @@ private object GlobalOptions {
def maxThreads: Parser[(String, Int)] =
longOption("maxThreads", "MaxThreads") ~/ ws ~ posint ~ ws
def maxTasks: Parser[(String, Int)] =
longOption("maxTasks", "MaxTasks") ~/ ws ~ posint ~ ws
def globalOptionsSeq: Parser[Seq[(String, _)]] =
ws ~ (help | verbose | dirs | inputFiles | slurm | parallel | dryRun | maxThreads).rep
ws ~ (help | verbose | dirs | inputFiles | slurm | parallel | dryRun | maxThreads | maxTasks).rep
def globalOptions: Parser[Configuration] =
globalOptionsSeq map (as => mkConfig(as))
......@@ -115,6 +119,7 @@ private object GlobalOptions {
case ("ConfigFile", p: Path) => mkConfig(as, Some(loadConfigFromFile(p)))
case ("DryRun", p: Path) => mkConfig(as, Some(c getOrElse Configuration() dryRun Some(p)))
case ("MaxThreads", i: Int) => mkConfig(as, Some(c getOrElse Configuration() maxThreads Some(i)))
case ("MaxTasks", i: Int) => mkConfig(as, Some(c getOrElse Configuration() maxTasks Some(i)))
case ("Verbose", m: String) => mkConfig(as, Some(c getOrElse Configuration() verbose Some(m)))
case _ => c getOrElse Configuration()
}
......
......@@ -82,8 +82,9 @@ configuration via `tapasco -n config.json`.
Arg("--jobsFile FILE", "Path to Json file with Jobs array") &
Arg("--slurm", "Activate SLURM cluster execution (requires sbatch)") &
Arg("--parallel", "Execute all jobs in parallel (careful!)") &
Arg("--maxThreads NUM", "Limit internal parallelism of activities (e.g., Vivado)" ~
"to the given number of threads."))
Arg("--maxThreads NUM", "Limit internal parallelism of tasks (e.g., Vivado)" ~
"to the given number of threads.") &
Arg("--maxTasks NUM", "Limit max. number tasks executed in parallel to the given number"))
private def composition() = Section("Composition Syntax",
Block("A Composition specifies the number and kind of processing elements (PEs) that" ~
......@@ -341,7 +342,7 @@ configuration via `tapasco -n config.json`.
private def shortTopics() = Section("Help Topics", Block(helpTopics.keys.toSeq.sorted.mkString(", ")))
private def overview() =
Header("tapasco", 1, "2017-07-14", "2017.1") &
Header("tapasco", 1, "2018-01-22", "2018.1") &
Synopsis(" tapasco [global option]* [job]*" &
"or: tapasco -h | --help [TOPIC]" &
"")
......
......@@ -100,12 +100,13 @@ private class GenericTask(
val licences = Map[String, Int]()
}
class Tasks extends Publisher {
class Tasks(maxParallel: Option[Int] = None) extends Publisher {
type Event = Tasks.Event
import Tasks.Events._
private[this] final val _logger = de.tu_darmstadt.cs.esa.tapasco.Logging.logger(getClass)
private[this] final implicit val _exectx = scala.concurrent.ExecutionContext.fromExecutorService(
java.util.concurrent.Executors.newCachedThreadPool()
if (maxParallel.nonEmpty) java.util.concurrent.Executors.newFixedThreadPool(maxParallel.get)
else java.util.concurrent.Executors.newCachedThreadPool()
)
override def +=(el: EventListener): Unit = {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment