Tapasco.scala 5.06 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//
// Copyright (C) 2014 Jens Korinth, TU Darmstadt
//
// This file is part of Tapasco (TPC).
//
// Tapasco is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Tapasco is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with Tapasco.  If not, see <http://www.gnu.org/licenses/>.
//
package de.tu_darmstadt.cs.esa.tapasco
import  base._
import  filemgmt._
import  task._
import  itapasco.controller._
import  parser._
import  slurm._
import  java.nio.file.Path
import  scala.concurrent._

object Tapasco {
  import org.slf4j.LoggerFactory
  import ch.qos.logback.core.FileAppender
  import ch.qos.logback.classic.LoggerContext
  import ch.qos.logback.classic.encoder.PatternLayoutEncoder
  import ch.qos.logback.classic.spi.ILoggingEvent
35
  private[this] implicit val logger = de.tu_darmstadt.cs.esa.tapasco.Logging.logger(this.getClass)
36
  private[this] val logFileAppender: FileAppender[ILoggingEvent] = new FileAppender()
Jens Korinth's avatar
Jens Korinth committed
37
  private[this] final val UNLIMITED_THREADS = 1000
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

  private def setupLogFileAppender(file: String, quiet: Boolean = false) = {
    val ctx = LoggerFactory.getILoggerFactory().asInstanceOf[LoggerContext]
    val ple = new PatternLayoutEncoder()
    ple.setPattern("[%d{HH:mm:ss} <%thread: %c{0}> %level] %msg%n")
    ple.setContext(ctx)
    ple.start()
    logFileAppender.setFile(file)
    logFileAppender.setAppend(false)
    logFileAppender.setEncoder(ple)
    logFileAppender.setContext(ctx)
    logFileAppender.start()
    val filter = new ch.qos.logback.classic.filter.ThresholdFilter
    filter.setLevel("INFO")
    logFileAppender.addFilter(filter)
    Logging.rootLogger.addAppender(logFileAppender)
    if (quiet) Logging.rootLogger.setAdditive(quiet)
  }

  private def runGui(args: Array[String])(implicit cfg: Configuration): Boolean = args.headOption map { firstArg =>
    (firstArg.toLowerCase equals "itapasco") && { new AppController(Some(cfg)).show; true }
  } getOrElse false

  private def dryRun(p: Path)(implicit cfg: Configuration) {
    import base.json._
    logger.info("dry run, dumping configuration to {}", p)
    Configuration.to(if (cfg.jobs.isEmpty) cfg.jobs(jobs.JobExamples.jobs) else cfg, p)
    System.exit(0)
  }

Jens Korinth's avatar
Jens Korinth committed
68
69
  // scalastyle:off cyclomatic.complexity
  // scalastyle:off method.length
70
  def main(args: Array[String]) {
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
    val cfgargs = if (args.length > 0 && args(0).toLowerCase.equals("itapasco")) args.drop(1) else args
    // try to parse all arguments
    val c = CommandLineParser(cfgargs mkString " ")
    logger.debug("parsed config: {}", c)
    val ok = if (c.isRight) {
      implicit val tasks = new Tasks(c.right.get.maxTasks)
      // get parsed Configuration
      implicit val cfg = c.right.get
      // dump config and exit, if dryRun is selected
      cfg.dryRun foreach (dryRun _)
      // else continue ...
      logger.trace("configuring FileAssetManager...")
      FileAssetManager(cfg)
      logger.trace("SLURM: {}", cfg.slurm)
      if (cfg.slurm) Slurm.enabled = cfg.slurm
      FileAssetManager.start()
      logger.trace("parallel: {}", cfg.parallel)
      cfg.logFile map { logfile: Path => setupLogFileAppender(logfile.toString) }
      logger.info("Running with configuration: {}", cfg.toString)

      def get(f: Future[Boolean]): Boolean = { Await.ready(f, duration.Duration.Inf); f.value map (_ getOrElse false) getOrElse false }

      try {
Jens Korinth's avatar
Jens Korinth committed
94
        if (cfg.parallel) {
95
          implicit val exe = ExecutionContext.fromExecutor(new java.util.concurrent.ForkJoinPool(cfg.maxTasks getOrElse UNLIMITED_THREADS))
96
          runGui(args) || (cfg.jobs map { j => Future { jobs.executors.execute(j) } } map (get _) fold true) (_ && _)
Jens Korinth's avatar
Jens Korinth committed
97
        } else {
98
          runGui(args) || (cfg.jobs map { jobs.executors.execute(_) } fold true) (_ && _)
Jens Korinth's avatar
Jens Korinth committed
99
        }
100
101
102
      } catch { case ex: Exception =>
        logger.error(ex.toString)
        logger.error("Stack trace: {}", ex.getStackTrace() map (_.toString) mkString "\n")
103
        false
104
105
106
      } finally {
        FileAssetManager.stop()
        tasks.stop()
107
      }
108
109
110
    } else {
      logger.error("invalid arguments: {}", c.left.get.toString)
      logger.error("run `tapasco -h` or `tapasco --help` to get more info")
111
112
      false
    }
113

114
115
116
117
118
119
    logger.debug("active threads: {}", Thread.activeCount())
    if (Thread.activeCount() > 0) {
      import scala.collection.JavaConverters._
      val m = Thread.getAllStackTraces().asScala
      m.values foreach { strace => logger.debug(strace mkString scala.util.Properties.lineSeparator) }
    }
120

121
    if (! ok) {
122
      logger.error("TaPaSCo finished with errors")
123
124
      sys.exit(1)
    } else {
125
      logger.info("TaPaSCo finished successfully")
126
127
    }
  }
Jens Korinth's avatar
Jens Korinth committed
128
129
  // scalastyle:on method.length
  // scalastyle:on cyclomatic.complexity
130
}