Sunday, October 30, 2011

Add timer to task pool

Sometime, we want to execute a task asynchronously at specified time. For example, add random latency to incoming messages in the development of simulator, or cancel an order unsolicitedly when its end time arrives in the development of OMS. Check out boost manual about Using a timer asynchronously.
using boost::asio::deadline_timer;
struct TaskPoolTimer : public TaskPool
{
  template
  void run(T func, uint64_t us)
  {
    auto t = new deadline_timer(service_, boost::posix_time::microseconds(us));
    tt->async_wait([=](const boost::system::error_code&) {
        func(); delete tt; 
        });
  }
};

int main() {
  TaskPoolTimer p;
  std::cout << time(NULL) << std::endl;
  auto f = [](){std::cout<< time(NULL) << std::endl;};
  p.run(f, 2e+6);
  p.run(f, 4e+6);
  return 0;
}

A comparison of boost::bind and std::bind

It is time for us to follow up new C++ standard. Just see the difference between boost::bind and std::bind below. For C++ lambda, check out C++11 or a quick guide to Lambdas in C++. Do not use std::ref if the functor will be called asynchronously.

#include <boost/bind.hpp>
 
//g++ test.cc -std=c++0x
 
struct A
{
  A() { std::cout << "A" << std::endl; }
  A(const A& a) { std::cout << "A(A)" << std::endl; }
  A& operator=(const A& a) { std::cout << "A=" << std::endl; return *this; }
  ~A() { std::cout << "~A" << std::endl; }
};
 
int main() {
  auto f = [=](A& a) {}; 
  A a;
  std::cout << std::endl;
  std::cout << "boost" << std::endl;
  boost::bind<void>(f, a)();
  std::cout << std::endl;
  std::cout << "boost ref" << std::endl;
  boost::bind<void>(f, std::ref(a))();
  std::cout << std::endl;
  std::cout << "std ref" << std::endl;
  std::bind(f, std::ref(a))();
  std::cout << std::endl;
  std::cout << "std" << std::endl;
  std::bind(f, a)();
  std::cout << std::endl;
  std::cout << "end of main" << std::endl;
  return 0;
}

Output:

A

boost
A(A)
A(A)
A(A)
A(A)
~A
A(A)
~A
~A
~A
~A

boost ref

std ref

std
A(A)
~A

end of main
~A

Task Pool with boost::asio

We can design a simple task pool with the boost::aiso recipe, A thread pool for executing arbitrary tasks.

However, if you want to implement parallel computation, tbb:task_group is faster, or you should choose OpenMP. tbb:task_group schedules the execution of tasks, so the execution order is not guaranteed.

class TaskPool
{
public:
  TaskPool(size_t nthreads=1)
  {
    work_ = new boost::asio::io_service::work(service_);
    for (std::size_t i = 0; i < nthreads; ++i)
      threads_.create_thread(boost::bind(&boost::asio::io_service::run, &service_));
  }

  ~TaskPool()
  {
    if (work_)
      stop();
  }

  void stop(bool wait=true)
  {
    if (wait) {
      delete work_; // if not delete this, io_service::run will never exit
      threads_.join_all();
      service_.stop();
    } else {
      service_.stop();
      threads_.join_all();
      delete work_;
    }   
    work_ = NULL;
  }

  template <typename T>
  void run(T func)
  {
    service_.post(func);
  }

protected:
  boost::thread_group threads_;
  boost::asio::io_service service_;
  boost::asio::io_service::work* work_;
};

Optimize QuickFIX

QuickFIX is a full-featured open source FIX engine. I think it is quick for design rather than quick for performance. Just having a quick browse of its source code, you may find some ideas to optimize it. One of its bottlenecks comes from log and store classes. Each time it sends or receives a message, the message will be first written to log file (or database) and store file (or database) if you enable log and store. This is bad for trading program needing quick response to markets. The first idea coming to my mind is to asynchronize this part of operations. Fortunately, QuickFix gives us the way to replace customized log and store factory class. Unfortunately in this way we can only optimize log and store separately.