Commit c830c899 authored by Jens Korinth's avatar Jens Korinth
Browse files

Improve locking behavior of MultiFileWatcher

parent 58542825
Pipeline #165 failed with stages
in 74 minutes and 40 seconds
......@@ -39,14 +39,14 @@ class MultiFileWatcher(pollInterval: Int = MultiFileWatcher.POLL_INTERVAL) exten
* Add a file to the monitoring.
* @param p Path to file to be monitored.
*/
def +=(p: Path) { open(p) }
def +=(p: Path) { _waitingFor.synchronized { _waitingFor += p }; open(p) }
@inline def addPath(p: Path) { this += p }
/**
* Add a collection of files to the monitoring.
* @param ps Collection of Paths to files to be monitored.
*/
def ++=(ps: Traversable[Path]) { ps foreach (open _) }
def ++=(ps: Traversable[Path]) { ps foreach (this += _) }
@inline def addPaths(ps: Traversable[Path]) { this ++= ps }
/**
......@@ -74,21 +74,19 @@ class MultiFileWatcher(pollInterval: Int = MultiFileWatcher.POLL_INTERVAL) exten
private def open(p: Path): Boolean = {
val res = try {
_files += p -> new BufferedReader(new FileReader(p.toString))
_files.synchronized { _files += p -> new BufferedReader(new FileReader(p.toString)) }
logger.trace("opened {} successfully", p.toString)
true
} catch { case ex: java.io.IOException =>
logger.trace("could not open {}, will retry ({})", p: Any, ex: Any)
_waitingFor.synchronized { _waitingFor += p }
false
}
startWatchThread
res
}
private def close(p: Path): Unit = if (_files.contains(p)) {
_files -= p
} else if (_waitingFor.contains(p)) {
private def close(p: Path): Unit = {
_files.synchronized { _files -= p }
_waitingFor.synchronized { _waitingFor -= p }
}
......@@ -98,31 +96,30 @@ class MultiFileWatcher(pollInterval: Int = MultiFileWatcher.POLL_INTERVAL) exten
}
private def startWatchThread: Unit = {
if (_watchThread.get.isEmpty) {
logger.trace("starting file watch thread ...")
_watchThread.set(Some(new Thread(new Runnable {
def run() {
try {
while (! _files.isEmpty || ! _waitingFor.isEmpty) {
Thread.sleep(pollInterval)
val waits = _waitingFor.synchronized { _waitingFor.toList }
waits foreach { p =>
logger.trace("waiting for {}", p)
if (open(p)) _waitingFor.synchronized { _waitingFor -= p }
}
val files = _files.toMap
files foreach { case (p, br) =>
val lines = readFrom(br)
if (lines.length > 0) {
logger.trace("read {} lines from {}", lines.length, p)
publish(LinesAdded(p, lines))
}
logger.trace("starting file watch thread ...")
if (_watchThread.compareAndSet(None, Some(new Thread(new Runnable {
def run() {
try {
while (! _files.isEmpty || ! _waitingFor.isEmpty) {
Thread.sleep(pollInterval)
val waits = _waitingFor.synchronized { _waitingFor.toList }
waits foreach { p =>
logger.trace("waiting for {}", p)
if (open(p)) _waitingFor.synchronized { _waitingFor -= p }
}
val files = _files.synchronized { _files.toMap }
files foreach { case (p, br) =>
val lines = readFrom(br)
if (lines.length > 0) {
logger.trace("read {} lines from {}", lines.length, p)
publish(LinesAdded(p, lines))
}
}
_watchThread.set(None)
} catch { case e: InterruptedException => _watchThread.set(None) }
}
})))
}
_watchThread.set(None)
} catch { case e: InterruptedException => _watchThread.set(None) }
}
})))) {
_watchThread.get map (_.start)
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment