Sunday, November 13, 2011

Log Publisher with Scala

The idea is to develop a Scala program to monitor log files and publish updated lines to remote subscribers. We could use zmq or Redis to do pub/sub. Here, I choose Redis, for details about Redis as PubSub, please check PubSub with Redis and Akka Actors.

import java.io._
import com.redis.{RedisClient, PubSubMessage, S, U, M}
import akka.persistence.redis._
import akka.actor.Actor._

object LogMonitor {
  var pub: MyPublisher = null
  val line = new StringBuffer(1024)

  def publish(files: Map[String, String], port: Int) {
    var fileInfo = List[LogFile]()
    files.foreach(f => {
      var lf = new LogFile(f._1, new File(f._2))
      if (!lf.file.exists) {
        println("Error: " + lf.file)
        return
      }   
      fileInfo = lf :: fileInfo
    })  

    pub = new MyPublisher("localhost", port)

    while (true) {
      fileInfo.foreach(pubNewLines)
      Thread.sleep(1000) // sleep 1s
    }   
  }

  def pubNewLines(file: LogFile) {
    val rand = new RandomAccessFile(file.file, "r")
    rand.seek(file.offset)
    try {
      while (true) {
        val b = rand.readByte()
        line.append(b.toChar)
        if (b == '\n') {
          file.offset += line.length
          //println(file.id, line.toString)
          pub.publish(file.id, line.toString)
          line.delete(0, line.length)
        }
      }
    } catch {
      case _: EOFException =>
    }
    rand.close
    line.delete(0, line.length)
  }

  class LogFile(
    var id: String,
    var file: File,
    var offset: Int=0)

  class MyPublisher(val host: String, val port: Int) {
    println("starting publishing service ...")
    println(host + ":" + port)
    val r = new RedisClient(host, port)
    val p = actorOf(new Publisher(r))
    p.start

    def publish(channel: String, message: String) = {
      p ! Publish(channel, message)
    }
  }
}