Friday, December 30, 2011

Boost Aiso Multicast

Just tried the example, it did not work. Following is the working version.

  
udp::endpoint listen_endpoint(udp::v4(), port);
socket_.open(listen_endpoint.protocol());
socket_.set_option(socket_base::reuse_address(true));
socket_.bind(listen_endpoint);
socket_.set_option(multicast::join_group(address::from_string(group).to_v4(),
        address::from_string(interface).to_v4()));

Thursday, December 29, 2011

Virtual function, template(CRTP) or branch?

Just posted this question on http://stackoverflow.com/.

I have been confronted with a C++ design problem to choose between virtual function, template and branch. The three implementations are listed as follows. I eventually chose the the second one, which looks tricky but with best performance for a low latency design.

virtual function implementation:
{
  void packet(...) { for (...) message(...); }
  virtual void message(...)=0;
};

class ChannelA : public Channel
{
  struct Header {...}
  void message(...) { ... }
}
class ChannelB : public Channel
{
  struct Header {...}
  void message(...) { ... }
}
template implementation:

template <typename TImpl>
class Channel : public BaseChannel
{
  void packet(...) { for (...) message(...); }
  void message(...);
};

class ChannelA : public Channel<ChannelA>
{
  struct Header {...}
  void message(...) { ... }
}
class ChannelB : public Channel<ChannelB>
{
  struct Header {...}
  void message(...) { ... }
}
template <typename TImpl>
inline void Channel<TImpl>::message(...) { static_cast<TImpl*>(this)->message(); }
branch implementation:

class Channel : public BaseChannel
{
  void packet(...) { for (...) message(...); }
  struct HeaderA {...}
  struct HeaderB {...}
  void message(...)
  {
      if (isHeaderA(...)) messageA(...);
      else if (isHeaderB(...)) messageB(...);
  }
  void messageA(...) { ... }
  void messageB(...) { ... }
};

Wednesday, December 28, 2011

A Comparison of C++ Hash Tables

This is a comparison I did before. It is hard to say which one is the best. They have different performance with different test data. We can only determine the best one in specific scenario. Generally, std::__unordered_map with __cache_hash_code=true outperforms the others in most of conditions, google::dense_hash_map has worse performance with short or few keys. This test was done with 64bit gcc 4.5.1 on a multi-core 3.6GHz cpu.

# with key based on sprintf(s, "%lx_x", i, rand()%1000)
INFO - test_Hash(std::__unordered_map with __cache_hash_code=true (w)): 0.246489(s)
INFO - test_Hash(std::__unordered_map with __cache_hash_code=true (r)): 13.9586(s)
INFO - test_Hash(std::unordered_map (w)): 0.351654(s)
INFO - test_Hash(std::unordered_map (r)): 16.2773(s)
INFO - test_hash(google::dense_hash_map (w)): 0.253774(s)
INFO - test_hash(google::dense_hash_map (r)): 14.4838(s)
INFO - test_Hash(boost hash table (w)): 0.473683(s)
INFO - test_Hash(boost hash table (r)): 15.6317(s)

# with key based on sprintf(s, "%lx", i)
INFO - test_Hash(std::__unordered_map with __cache_hash_code=true (w)): 0.137569(s)
INFO - test_Hash(std::__unordered_map with __cache_hash_code=true (r)): 10.5713(s)
INFO - test_Hash(std::unordered_map (w)): 0.179186(s)
INFO - test_Hash(std::unordered_map (r)): 12.1257(s)
INFO - test_hash(google::dense_hash_map (w)): 0.570974(s)
INFO - test_hash(google::dense_hash_map (r)): 18.6572(s)
INFO - test_Hash(boost hash table (w)): 0.216315(s)
INFO - test_Hash(boost hash table (r)): 11.17(s)

struct Hash
{
  size_t operator()(const char* s) const
  {
#if 0
    /* magic numbers bits from http://www.isthe.com/chongo/tech/comp/fnv/ */
    // FNV-1a
    size_t h = 14695981039346656037U;
    while (*s) {
      h ^= *s; 
      h *= 1099511628211U;
      ++s;
    }
    return h;
#else
    std::size_t seed = 0;
    for (; *s; ++s) {
      boost::hash_combine(seed, *s);
    }   
    return seed;
#endif
  }
};

BOOST_AUTO_TEST_CASE(test_Hash)
{
  typedef std::__unordered_map<const char*, size_t, Hash, eqstr, 
    std::allocator<std::pair<const char*, size_t> >, true> THash1;
  typedef std::unordered_map<const char*, size_t, Hash, eqstr> THash2;
  typedef google::dense_hash_map<const char*, size_t, Hash, eqstr> GHash;
  GHash gh;
  gh.set_empty_key("");
  typedef boost::unordered_map<const char*, size_t, Hash, eqstr> BHash;
  THash1 th1;
  THash2 th2;
  BHash bh;
  std::vector<const char*> strs;
  const size_t loops = 1e+6;
  const size_t rloops = 50;
  for (size_t i = 0; i < loops; ++i) {
    char* s = new char[16];
    sprintf(s, "%lx_%x", i, rand()%1000); // or sprintf(s, "%lx", i)
    strs.push_back(s);
  }
  {
    {
      MyTimer t("test_Hash(std::__unordered_map with __cache_hash_code=true (w))");
      for (size_t i = 0; i < loops; ++i)
        th1[strs[i]] = i;
    }

    {
      MyTimer t("test_Hash(std::__unordered_map with __cache_hash_code=true (r))");
      for (size_t j = 0; j < rloops; ++j)
        for (size_t i = 0; i < loops; ++i)
          BOOST_CHECK(th1[strs[i]] == i);
    }
  }
  {
    {
      MyTimer t("test_Hash(std::unordered_map (w))");
      for (size_t i = 0; i < loops; ++i)
        th2[strs[i]] = i;
    }

    {
      MyTimer t("test_Hash(std::unordered_map (r))");
      for (size_t j = 0; j < rloops; ++j)
        for (size_t i = 0; i < loops; ++i)
          BOOST_CHECK(th2[strs[i]] == i);
    }
  }
  {
    {
      MyTimer t("test_hash(google::dense_hash_map (w))");
      for (size_t i = 0; i < loops; ++i)
        gh[strs[i]] = i;
    }

    {
      MyTimer t("test_hash(google::dense_hash_map (r))");
      for (size_t j = 0; j < rloops; ++j)
        for (size_t i = 0; i < loops; ++i)
          BOOST_CHECK(gh[strs[i]] == i);
    }
  }
  {
    {
      MyTimer t("test_Hash(boost hash table (w))");
      for (size_t i = 0; i < loops; ++i)
        bh[strs[i]] = i;
    }

    {
      MyTimer t("test_Hash(boost hash table (r))");
      for (size_t j = 0; j < rloops; ++j)
        for (size_t i = 0; i < loops; ++i)
          BOOST_CHECK(bh[strs[i]] == i);
    }
  }
}

Saturday, December 17, 2011

Replay Multicast

I tried tcpdump/tcpreplay to do multicast replay.

# snaplen = maximum udp payload size + udp header size + ip header size + ethernet header size
# need to tune -B -s carefully for packet lossless
tcpdump -B 10240 -s 2112 -tt -vv -i eth4 -w eth4.pcap multicast
tcpreplay -t -i lo eth4.pcap

However, tcpdump could capture packets from tcpreplay, my program failed to subscribe. I tried to tcpedit MAC per [Tcpreplay-users] Could not replay a multicast, but still did not work. After search for a while, I decided to write my own tcpreplay.

#include <err.h>
#include <pcap.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/udp.h>
#include <netinet/ip.h>
#include <arpa/inet.h>

// ethernet headers are always exactly 14 bytes [1]
#define SIZE_ETHERNET 14 // http://www.tcpdump.org/pcap.html
void replayOnePcap(pcap_t* pcap, long offset, int fd) 
{  
  struct sockaddr_in addr;
  bzero((char *)&addr, sizeof(addr));
  addr.sin_family = AF_INET;

  struct pcap_pkthdr header;
  const u_char* packet;
  long num = 0;
  long vol = 0;
  time_t lastReportTm;
  while ((packet = pcap_next(pcap, &header)) != NULL) {
    const struct ip* ip = (struct ip*)(packet + SIZE_ETHERNET);
    if (*((const u_char*)ip + 16) < 224) // check if it is multicast
      continue;
    assert(ip->ip_p == IPPROTO_UDP);
    timeval now;
    time_t t;
    for (;;) {
      gettimeofday(&now, NULL);
      t = now.tv_sec+offset;
      if (header.ts.tv_sec < t || 
          (header.ts.tv_sec == t && header.ts.tv_usec <= now.tv_usec))
        break;
      //usleep(1);
    }   
    int size_ip = (ip->ip_hl)*4;
    const struct udphdr* udp = (struct udphdr*)(packet + SIZE_ETHERNET + size_ip);
    int n = SIZE_ETHERNET + size_ip + sizeof(udphdr);
    const u_char* payload = packet + n;
    addr.sin_addr.s_addr = ip->ip_dst.s_addr;
    addr.sin_port = udp->dest;
    ++num;
    vol += sizeof(header) + header.len;
    ::sendto(fd, payload, header.len - n, 0, (struct sockaddr*)&addr, sizeof(addr));
    if (now.tv_sec - lastReportTm >= 10) {
      lastReportTm = now.tv_sec;
      LOG_INFO('#' << num/1e+6 << "M " << vol/1e+6 << "M : " <<
          header.ts.tv_usec << "us " << asctime(localtime(&header.ts.tv_sec)));
      LOG_INFO("expect: " << asctime(localtime(&t)));
    }
  }
}

Sunday, November 13, 2011

Log Publisher with Scala

The idea is to develop a Scala program to monitor log files and publish updated lines to remote subscribers. We could use zmq or Redis to do pub/sub. Here, I choose Redis, for details about Redis as PubSub, please check PubSub with Redis and Akka Actors.

import java.io._
import com.redis.{RedisClient, PubSubMessage, S, U, M}
import akka.persistence.redis._
import akka.actor.Actor._

object LogMonitor {
  var pub: MyPublisher = null
  val line = new StringBuffer(1024)

  def publish(files: Map[String, String], port: Int) {
    var fileInfo = List[LogFile]()
    files.foreach(f => {
      var lf = new LogFile(f._1, new File(f._2))
      if (!lf.file.exists) {
        println("Error: " + lf.file)
        return
      }   
      fileInfo = lf :: fileInfo
    })  

    pub = new MyPublisher("localhost", port)

    while (true) {
      fileInfo.foreach(pubNewLines)
      Thread.sleep(1000) // sleep 1s
    }   
  }

  def pubNewLines(file: LogFile) {
    val rand = new RandomAccessFile(file.file, "r")
    rand.seek(file.offset)
    try {
      while (true) {
        val b = rand.readByte()
        line.append(b.toChar)
        if (b == '\n') {
          file.offset += line.length
          //println(file.id, line.toString)
          pub.publish(file.id, line.toString)
          line.delete(0, line.length)
        }
      }
    } catch {
      case _: EOFException =>
    }
    rand.close
    line.delete(0, line.length)
  }

  class LogFile(
    var id: String,
    var file: File,
    var offset: Int=0)

  class MyPublisher(val host: String, val port: Int) {
    println("starting publishing service ...")
    println(host + ":" + port)
    val r = new RedisClient(host, port)
    val p = actorOf(new Publisher(r))
    p.start

    def publish(channel: String, message: String) = {
      p ! Publish(channel, message)
    }
  }
}

Sunday, October 30, 2011

Add timer to task pool

Sometime, we want to execute a task asynchronously at specified time. For example, add random latency to incoming messages in the development of simulator, or cancel an order unsolicitedly when its end time arrives in the development of OMS. Check out boost manual about Using a timer asynchronously.
using boost::asio::deadline_timer;
struct TaskPoolTimer : public TaskPool
{
  template
  void run(T func, uint64_t us)
  {
    auto t = new deadline_timer(service_, boost::posix_time::microseconds(us));
    tt->async_wait([=](const boost::system::error_code&) {
        func(); delete tt; 
        });
  }
};

int main() {
  TaskPoolTimer p;
  std::cout << time(NULL) << std::endl;
  auto f = [](){std::cout<< time(NULL) << std::endl;};
  p.run(f, 2e+6);
  p.run(f, 4e+6);
  return 0;
}

A comparison of boost::bind and std::bind

It is time for us to follow up new C++ standard. Just see the difference between boost::bind and std::bind below. For C++ lambda, check out C++11 or a quick guide to Lambdas in C++. Do not use std::ref if the functor will be called asynchronously.

#include <boost/bind.hpp>
 
//g++ test.cc -std=c++0x
 
struct A
{
  A() { std::cout << "A" << std::endl; }
  A(const A& a) { std::cout << "A(A)" << std::endl; }
  A& operator=(const A& a) { std::cout << "A=" << std::endl; return *this; }
  ~A() { std::cout << "~A" << std::endl; }
};
 
int main() {
  auto f = [=](A& a) {}; 
  A a;
  std::cout << std::endl;
  std::cout << "boost" << std::endl;
  boost::bind<void>(f, a)();
  std::cout << std::endl;
  std::cout << "boost ref" << std::endl;
  boost::bind<void>(f, std::ref(a))();
  std::cout << std::endl;
  std::cout << "std ref" << std::endl;
  std::bind(f, std::ref(a))();
  std::cout << std::endl;
  std::cout << "std" << std::endl;
  std::bind(f, a)();
  std::cout << std::endl;
  std::cout << "end of main" << std::endl;
  return 0;
}

Output:

A

boost
A(A)
A(A)
A(A)
A(A)
~A
A(A)
~A
~A
~A
~A

boost ref

std ref

std
A(A)
~A

end of main
~A

Task Pool with boost::asio

We can design a simple task pool with the boost::aiso recipe, A thread pool for executing arbitrary tasks.

However, if you want to implement parallel computation, tbb:task_group is faster, or you should choose OpenMP. tbb:task_group schedules the execution of tasks, so the execution order is not guaranteed.

class TaskPool
{
public:
  TaskPool(size_t nthreads=1)
  {
    work_ = new boost::asio::io_service::work(service_);
    for (std::size_t i = 0; i < nthreads; ++i)
      threads_.create_thread(boost::bind(&boost::asio::io_service::run, &service_));
  }

  ~TaskPool()
  {
    if (work_)
      stop();
  }

  void stop(bool wait=true)
  {
    if (wait) {
      delete work_; // if not delete this, io_service::run will never exit
      threads_.join_all();
      service_.stop();
    } else {
      service_.stop();
      threads_.join_all();
      delete work_;
    }   
    work_ = NULL;
  }

  template <typename T>
  void run(T func)
  {
    service_.post(func);
  }

protected:
  boost::thread_group threads_;
  boost::asio::io_service service_;
  boost::asio::io_service::work* work_;
};

Optimize QuickFIX

QuickFIX is a full-featured open source FIX engine. I think it is quick for design rather than quick for performance. Just having a quick browse of its source code, you may find some ideas to optimize it. One of its bottlenecks comes from log and store classes. Each time it sends or receives a message, the message will be first written to log file (or database) and store file (or database) if you enable log and store. This is bad for trading program needing quick response to markets. The first idea coming to my mind is to asynchronize this part of operations. Fortunately, QuickFix gives us the way to replace customized log and store factory class. Unfortunately in this way we can only optimize log and store separately.