The idea is to develop a Scala program to monitor log files and publish updated lines to remote subscribers. We could use zmq or Redis to do pub/sub. Here, I choose Redis, for details about Redis as PubSub, please check PubSub with Redis and Akka Actors.
import java.io._
import com.redis.{RedisClient, PubSubMessage, S, U, M}
import akka.persistence.redis._
import akka.actor.Actor._
object LogMonitor {
var pub: MyPublisher = null
val line = new StringBuffer(1024)
def publish(files: Map[String, String], port: Int) {
var fileInfo = List[LogFile]()
files.foreach(f => {
var lf = new LogFile(f._1, new File(f._2))
if (!lf.file.exists) {
println("Error: " + lf.file)
return
}
fileInfo = lf :: fileInfo
})
pub = new MyPublisher("localhost", port)
while (true) {
fileInfo.foreach(pubNewLines)
Thread.sleep(1000) // sleep 1s
}
}
def pubNewLines(file: LogFile) {
val rand = new RandomAccessFile(file.file, "r")
rand.seek(file.offset)
try {
while (true) {
val b = rand.readByte()
line.append(b.toChar)
if (b == '\n') {
file.offset += line.length
//println(file.id, line.toString)
pub.publish(file.id, line.toString)
line.delete(0, line.length)
}
}
} catch {
case _: EOFException =>
}
rand.close
line.delete(0, line.length)
}
class LogFile(
var id: String,
var file: File,
var offset: Int=0)
class MyPublisher(val host: String, val port: Int) {
println("starting publishing service ...")
println(host + ":" + port)
val r = new RedisClient(host, port)
val p = actorOf(new Publisher(r))
p.start
def publish(channel: String, message: String) = {
p ! Publish(channel, message)
}
}
}