MultiFileWatcher.scala 5.31 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//
// Copyright (C) 2017 Jens Korinth, TU Darmstadt
//
// This file is part of Tapasco (TPC).
//
// Tapasco is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Tapasco is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with Tapasco.  If not, see <http://www.gnu.org/licenses/>.
//
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package de.tu_darmstadt.cs.esa.tapasco.filemgmt
import  de.tu_darmstadt.cs.esa.tapasco.util._
import scala.collection.mutable.{ArrayBuffer, Map}
import java.nio.file.Path
import java.io.{BufferedReader, FileReader}
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._

/**
 * MultiFileWatcher monitors the contents of multiple files at once.
 * Content is polled regularly (pollInterval), changes in the content are
 * published as Events (using the [[util.Publisher]] methods).
 * @param pollInterval Polling interval in ms (default: [[MultiFileWatcher.POLL_INTERVAL]]).
 **/
class MultiFileWatcher(pollInterval: Int = MultiFileWatcher.POLL_INTERVAL) extends Publisher {
  type Event = MultiFileWatcher.Event
  import MultiFileWatcher.Events._

  /**
   * Add a file to the monitoring.
   * @param p Path to file to be monitored.
   */
42
  def +=(p: Path) { _waitingFor.synchronized { _waitingFor += p }; open(p) }
43
44
45
46
47
48
  @inline def addPath(p: Path) { this += p }

  /**
   * Add a collection of files to the monitoring.
   * @param ps Collection of Paths to files to be monitored.
   */
49
  def ++=(ps: Traversable[Path]) { ps foreach (this += _) }
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
  @inline def addPaths(ps: Traversable[Path]) { this ++= ps }

  /**
   * Remove a file from the monitoring.
   * @param p Path to file to be removed.
   */
  def -=(p: Path) { close(p) }
  @inline def remPath(p: Path) { this -= p }

  /**
   * Remove a collection of files from the monitoring.
   * @param ps Collection of Paths to files to be removed.
   */
  def --=(ps: Traversable[Path]): Unit = ps foreach (close _)
  @inline def remPaths(ps: Traversable[Path]) { this --= ps }

  /** Remove and close all files. */
  def closeAll(): Unit = {
68
69
    _watchThread.set(None)
    _files.synchronized { _files.clear }
70
71
72
73
74
75
76
    _waitingFor.synchronized { _waitingFor.clear }
  }

  private[this] var _waitingFor: ArrayBuffer[Path] = ArrayBuffer()

  private def open(p: Path): Boolean = {
    val res = try {
77
      _files.synchronized { _files += p -> new BufferedReader(new FileReader(p.toString)) }
78
79
80
81
82
83
84
85
86
87
      logger.trace("opened {} successfully", p.toString)
      true
    } catch { case ex: java.io.IOException =>
      logger.trace("could not open {}, will retry ({})", p: Any, ex: Any)
      false
    }
    startWatchThread
    res
  }

88
89
  private def close(p: Path): Unit = {
    _files.synchronized { _files -= p }
90
91
92
    _waitingFor.synchronized { _waitingFor -= p }
  }

93
  @scala.annotation.tailrec
94
  private def readFrom(br: BufferedReader, ls: Seq[String] = Seq()): Seq[String] = {
95
96
    val line = scala.util.Try(Option(br.readLine())).toOption.flatten
    if (line.isEmpty) ls else readFrom(br, ls :+ line.get)
97
98
99
  }

  private def startWatchThread: Unit = {
100
101
102
103
104
105
106
107
108
109
110
111
112
113
    logger.trace("starting file watch thread ...")
    if (_watchThread.compareAndSet(None, Some(new Thread(new Runnable {
      def run() {
        try {
          var lastWasEmpty = false
          while (! _files.isEmpty || ! _waitingFor.isEmpty || ! lastWasEmpty) {
            val waits = _waitingFor.synchronized { _waitingFor.toList }
            val files = _files.synchronized { _files.toMap }
            Thread.sleep(pollInterval)
            waits foreach { p =>
              logger.trace("waiting for {}", p)
              if (open(p)) _waitingFor.synchronized { _waitingFor -= p }
            }
            val all_files = files ++ _files.synchronized { _files.toMap }
Jens Korinth's avatar
Jens Korinth committed
114
115
            logger.trace("reading from files: {}", all_files)
            all_files foreach { case (p, br) =>
116
117
118
119
              val lines = readFrom(br)
              if (lines.length > 0) {
                logger.trace("read {} lines from {}", lines.length, p)
                publish(LinesAdded(p, lines))
120
121
              }
            }
Jens Korinth's avatar
Jens Korinth committed
122
            lastWasEmpty = all_files.isEmpty
123
124
125
126
127
          }
          _watchThread.set(None)
        } catch { case e: InterruptedException => _watchThread.set(None) }
      }
    })))) {
128
      _watchThread.get map (_.start)
129
      Thread.sleep(100)
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
    }
  }

  private[this] var _watchThread: AtomicReference[Option[Thread]] = new AtomicReference(None)
  private[this] val _files: Map[Path, BufferedReader] = new ConcurrentHashMap().asScala
  private[this] val logger = de.tu_darmstadt.cs.esa.tapasco.Logging.logger(getClass)
}

/** MultiFileWatcher companion object. */
object MultiFileWatcher {
  sealed trait Event
  object Events {
    /** Lines ls have been added to file at src. **/
    final case class LinesAdded(src: Path, ls: Traversable[String]) extends Event
  }
Jens Korinth's avatar
Jens Korinth committed
145
146
  /** Default polling interval for files: once every 2 seconds. **/
  final val POLL_INTERVAL = 2000 // 2sec
147
}