Tapasco.scala 5.12 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
//
// Copyright (C) 2014 Jens Korinth, TU Darmstadt
//
// This file is part of Tapasco (TPC).
//
// Tapasco is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Tapasco is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with Tapasco.  If not, see <http://www.gnu.org/licenses/>.
//
package de.tu_darmstadt.cs.esa.tapasco
import  base._
import  filemgmt._
import  task._
import  itapasco.controller._
import  parser._
import  slurm._
import  java.nio.file.Path
import  scala.concurrent._
28
import  java.util.Locale
29 30 31 32 33 34 35

object Tapasco {
  import org.slf4j.LoggerFactory
  import ch.qos.logback.core.FileAppender
  import ch.qos.logback.classic.LoggerContext
  import ch.qos.logback.classic.encoder.PatternLayoutEncoder
  import ch.qos.logback.classic.spi.ILoggingEvent
36
  private[this] implicit val logger = de.tu_darmstadt.cs.esa.tapasco.Logging.logger(this.getClass)
37
  private[this] val logFileAppender: FileAppender[ILoggingEvent] = new FileAppender()
Jens Korinth's avatar
Jens Korinth committed
38
  private[this] final val UNLIMITED_THREADS = 1000
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68

  private def setupLogFileAppender(file: String, quiet: Boolean = false) = {
    val ctx = LoggerFactory.getILoggerFactory().asInstanceOf[LoggerContext]
    val ple = new PatternLayoutEncoder()
    ple.setPattern("[%d{HH:mm:ss} <%thread: %c{0}> %level] %msg%n")
    ple.setContext(ctx)
    ple.start()
    logFileAppender.setFile(file)
    logFileAppender.setAppend(false)
    logFileAppender.setEncoder(ple)
    logFileAppender.setContext(ctx)
    logFileAppender.start()
    val filter = new ch.qos.logback.classic.filter.ThresholdFilter
    filter.setLevel("INFO")
    logFileAppender.addFilter(filter)
    Logging.rootLogger.addAppender(logFileAppender)
    if (quiet) Logging.rootLogger.setAdditive(quiet)
  }

  private def runGui(args: Array[String])(implicit cfg: Configuration): Boolean = args.headOption map { firstArg =>
    (firstArg.toLowerCase equals "itapasco") && { new AppController(Some(cfg)).show; true }
  } getOrElse false

  private def dryRun(p: Path)(implicit cfg: Configuration) {
    import base.json._
    logger.info("dry run, dumping configuration to {}", p)
    Configuration.to(if (cfg.jobs.isEmpty) cfg.jobs(jobs.JobExamples.jobs) else cfg, p)
    System.exit(0)
  }

Jens Korinth's avatar
Jens Korinth committed
69 70
  // scalastyle:off cyclomatic.complexity
  // scalastyle:off method.length
71
  def main(args: Array[String]) {
72 73
    Locale.setDefault(Locale.US);

74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
    val cfgargs = if (args.length > 0 && args(0).toLowerCase.equals("itapasco")) args.drop(1) else args
    // try to parse all arguments
    val c = CommandLineParser(cfgargs mkString " ")
    logger.debug("parsed config: {}", c)
    val ok = if (c.isRight) {
      implicit val tasks = new Tasks(c.right.get.maxTasks)
      // get parsed Configuration
      implicit val cfg = c.right.get
      // dump config and exit, if dryRun is selected
      cfg.dryRun foreach (dryRun _)
      // else continue ...
      logger.trace("configuring FileAssetManager...")
      FileAssetManager(cfg)
      logger.trace("SLURM: {}", cfg.slurm)
      if (cfg.slurm) Slurm.enabled = cfg.slurm
      FileAssetManager.start()
      logger.trace("parallel: {}", cfg.parallel)
      cfg.logFile map { logfile: Path => setupLogFileAppender(logfile.toString) }
      logger.info("Running with configuration: {}", cfg.toString)

      def get(f: Future[Boolean]): Boolean = { Await.ready(f, duration.Duration.Inf); f.value map (_ getOrElse false) getOrElse false }

      try {
Jens Korinth's avatar
Jens Korinth committed
97
        if (cfg.parallel) {
98
          implicit val exe = ExecutionContext.fromExecutor(new java.util.concurrent.ForkJoinPool(cfg.maxTasks getOrElse UNLIMITED_THREADS))
99
          runGui(args) || (cfg.jobs map { j => Future { jobs.executors.execute(j) } } map (get _) fold true) (_ && _)
Jens Korinth's avatar
Jens Korinth committed
100
        } else {
101
          runGui(args) || (cfg.jobs map { jobs.executors.execute(_) } fold true) (_ && _)
Jens Korinth's avatar
Jens Korinth committed
102
        }
103 104 105
      } catch { case ex: Exception =>
        logger.error(ex.toString)
        logger.error("Stack trace: {}", ex.getStackTrace() map (_.toString) mkString "\n")
106
        false
107 108 109
      } finally {
        FileAssetManager.stop()
        tasks.stop()
110
      }
111 112 113
    } else {
      logger.error("invalid arguments: {}", c.left.get.toString)
      logger.error("run `tapasco -h` or `tapasco --help` to get more info")
114 115
      false
    }
116

117 118 119 120 121 122
    logger.debug("active threads: {}", Thread.activeCount())
    if (Thread.activeCount() > 0) {
      import scala.collection.JavaConverters._
      val m = Thread.getAllStackTraces().asScala
      m.values foreach { strace => logger.debug(strace mkString scala.util.Properties.lineSeparator) }
    }
123

124
    if (! ok) {
125
      logger.error("TaPaSCo finished with errors")
126
      sys.exit(1)
127
    } else {
128
      logger.info("TaPaSCo finished successfully")
129 130
    }
  }
Jens Korinth's avatar
Jens Korinth committed
131 132
  // scalastyle:on method.length
  // scalastyle:on cyclomatic.complexity
133
}