udp::endpoint listen_endpoint(udp::v4(), port);
socket_.open(listen_endpoint.protocol());
socket_.set_option(socket_base::reuse_address(true));
socket_.bind(listen_endpoint);
socket_.set_option(multicast::join_group(address::from_string(group).to_v4(),
address::from_string(interface).to_v4()));
Friday, December 30, 2011
Boost Aiso Multicast
Thursday, December 29, 2011
Virtual function, template(CRTP) or branch?
Just posted this question on http://stackoverflow.com/.
I have been confronted with a C++ design problem to choose between virtual function, template and branch. The three implementations are listed as follows. I eventually chose the the second one, which looks tricky but with best performance for a low latency design.
virtual function implementation:{
void packet(...) { for (...) message(...); }
virtual void message(...)=0;
};
class ChannelA : public Channel
{
struct Header {...}
void message(...) { ... }
}
class ChannelB : public Channel
{
struct Header {...}
void message(...) { ... }
}
template implementation:template <typename TImpl>
class Channel : public BaseChannel
{
void packet(...) { for (...) message(...); }
void message(...);
};
class ChannelA : public Channel<ChannelA>
{
struct Header {...}
void message(...) { ... }
}
class ChannelB : public Channel<ChannelB>
{
struct Header {...}
void message(...) { ... }
}
template <typename TImpl>
inline void Channel<TImpl>::message(...) { static_cast<TImpl*>(this)->message(); }
branch implementation:class Channel : public BaseChannel
{
void packet(...) { for (...) message(...); }
struct HeaderA {...}
struct HeaderB {...}
void message(...)
{
if (isHeaderA(...)) messageA(...);
else if (isHeaderB(...)) messageB(...);
}
void messageA(...) { ... }
void messageB(...) { ... }
};
Wednesday, December 28, 2011
A Comparison of C++ Hash Tables
# with key based on sprintf(s, "%lx_x", i, rand()%1000) INFO - test_Hash(std::__unordered_map with __cache_hash_code=true (w)): 0.246489(s) INFO - test_Hash(std::__unordered_map with __cache_hash_code=true (r)): 13.9586(s) INFO - test_Hash(std::unordered_map (w)): 0.351654(s) INFO - test_Hash(std::unordered_map (r)): 16.2773(s) INFO - test_hash(google::dense_hash_map (w)): 0.253774(s) INFO - test_hash(google::dense_hash_map (r)): 14.4838(s) INFO - test_Hash(boost hash table (w)): 0.473683(s) INFO - test_Hash(boost hash table (r)): 15.6317(s)
# with key based on sprintf(s, "%lx", i) INFO - test_Hash(std::__unordered_map with __cache_hash_code=true (w)): 0.137569(s) INFO - test_Hash(std::__unordered_map with __cache_hash_code=true (r)): 10.5713(s) INFO - test_Hash(std::unordered_map (w)): 0.179186(s) INFO - test_Hash(std::unordered_map (r)): 12.1257(s) INFO - test_hash(google::dense_hash_map (w)): 0.570974(s) INFO - test_hash(google::dense_hash_map (r)): 18.6572(s) INFO - test_Hash(boost hash table (w)): 0.216315(s) INFO - test_Hash(boost hash table (r)): 11.17(s)
struct Hash
{
size_t operator()(const char* s) const
{
#if 0
/* magic numbers bits from http://www.isthe.com/chongo/tech/comp/fnv/ */
// FNV-1a
size_t h = 14695981039346656037U;
while (*s) {
h ^= *s;
h *= 1099511628211U;
++s;
}
return h;
#else
std::size_t seed = 0;
for (; *s; ++s) {
boost::hash_combine(seed, *s);
}
return seed;
#endif
}
};
BOOST_AUTO_TEST_CASE(test_Hash)
{
typedef std::__unordered_map<const char*, size_t, Hash, eqstr,
std::allocator<std::pair<const char*, size_t> >, true> THash1;
typedef std::unordered_map<const char*, size_t, Hash, eqstr> THash2;
typedef google::dense_hash_map<const char*, size_t, Hash, eqstr> GHash;
GHash gh;
gh.set_empty_key("");
typedef boost::unordered_map<const char*, size_t, Hash, eqstr> BHash;
THash1 th1;
THash2 th2;
BHash bh;
std::vector<const char*> strs;
const size_t loops = 1e+6;
const size_t rloops = 50;
for (size_t i = 0; i < loops; ++i) {
char* s = new char[16];
sprintf(s, "%lx_%x", i, rand()%1000); // or sprintf(s, "%lx", i)
strs.push_back(s);
}
{
{
MyTimer t("test_Hash(std::__unordered_map with __cache_hash_code=true (w))");
for (size_t i = 0; i < loops; ++i)
th1[strs[i]] = i;
}
{
MyTimer t("test_Hash(std::__unordered_map with __cache_hash_code=true (r))");
for (size_t j = 0; j < rloops; ++j)
for (size_t i = 0; i < loops; ++i)
BOOST_CHECK(th1[strs[i]] == i);
}
}
{
{
MyTimer t("test_Hash(std::unordered_map (w))");
for (size_t i = 0; i < loops; ++i)
th2[strs[i]] = i;
}
{
MyTimer t("test_Hash(std::unordered_map (r))");
for (size_t j = 0; j < rloops; ++j)
for (size_t i = 0; i < loops; ++i)
BOOST_CHECK(th2[strs[i]] == i);
}
}
{
{
MyTimer t("test_hash(google::dense_hash_map (w))");
for (size_t i = 0; i < loops; ++i)
gh[strs[i]] = i;
}
{
MyTimer t("test_hash(google::dense_hash_map (r))");
for (size_t j = 0; j < rloops; ++j)
for (size_t i = 0; i < loops; ++i)
BOOST_CHECK(gh[strs[i]] == i);
}
}
{
{
MyTimer t("test_Hash(boost hash table (w))");
for (size_t i = 0; i < loops; ++i)
bh[strs[i]] = i;
}
{
MyTimer t("test_Hash(boost hash table (r))");
for (size_t j = 0; j < rloops; ++j)
for (size_t i = 0; i < loops; ++i)
BOOST_CHECK(bh[strs[i]] == i);
}
}
}
Saturday, December 17, 2011
Replay Multicast
I tried tcpdump/tcpreplay to do multicast replay.
# snaplen = maximum udp payload size + udp header size + ip header size + ethernet header size # need to tune -B -s carefully for packet lossless tcpdump -B 10240 -s 2112 -tt -vv -i eth4 -w eth4.pcap multicast tcpreplay -t -i lo eth4.pcap
However, tcpdump could capture packets from tcpreplay, my program failed to subscribe. I tried to tcpedit MAC per [Tcpreplay-users] Could not replay a multicast, but still did not work. After search for a while, I decided to write my own tcpreplay.
#include <err.h>
#include <pcap.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/udp.h>
#include <netinet/ip.h>
#include <arpa/inet.h>
// ethernet headers are always exactly 14 bytes [1]
#define SIZE_ETHERNET 14 // http://www.tcpdump.org/pcap.html
void replayOnePcap(pcap_t* pcap, long offset, int fd)
{
struct sockaddr_in addr;
bzero((char *)&addr, sizeof(addr));
addr.sin_family = AF_INET;
struct pcap_pkthdr header;
const u_char* packet;
long num = 0;
long vol = 0;
time_t lastReportTm;
while ((packet = pcap_next(pcap, &header)) != NULL) {
const struct ip* ip = (struct ip*)(packet + SIZE_ETHERNET);
if (*((const u_char*)ip + 16) < 224) // check if it is multicast
continue;
assert(ip->ip_p == IPPROTO_UDP);
timeval now;
time_t t;
for (;;) {
gettimeofday(&now, NULL);
t = now.tv_sec+offset;
if (header.ts.tv_sec < t ||
(header.ts.tv_sec == t && header.ts.tv_usec <= now.tv_usec))
break;
//usleep(1);
}
int size_ip = (ip->ip_hl)*4;
const struct udphdr* udp = (struct udphdr*)(packet + SIZE_ETHERNET + size_ip);
int n = SIZE_ETHERNET + size_ip + sizeof(udphdr);
const u_char* payload = packet + n;
addr.sin_addr.s_addr = ip->ip_dst.s_addr;
addr.sin_port = udp->dest;
++num;
vol += sizeof(header) + header.len;
::sendto(fd, payload, header.len - n, 0, (struct sockaddr*)&addr, sizeof(addr));
if (now.tv_sec - lastReportTm >= 10) {
lastReportTm = now.tv_sec;
LOG_INFO('#' << num/1e+6 << "M " << vol/1e+6 << "M : " <<
header.ts.tv_usec << "us " << asctime(localtime(&header.ts.tv_sec)));
LOG_INFO("expect: " << asctime(localtime(&t)));
}
}
}
Sunday, November 13, 2011
Log Publisher with Scala
The idea is to develop a Scala program to monitor log files and publish updated lines to remote subscribers. We could use zmq or Redis to do pub/sub. Here, I choose Redis, for details about Redis as PubSub, please check PubSub with Redis and Akka Actors.
import java.io._
import com.redis.{RedisClient, PubSubMessage, S, U, M}
import akka.persistence.redis._
import akka.actor.Actor._
object LogMonitor {
var pub: MyPublisher = null
val line = new StringBuffer(1024)
def publish(files: Map[String, String], port: Int) {
var fileInfo = List[LogFile]()
files.foreach(f => {
var lf = new LogFile(f._1, new File(f._2))
if (!lf.file.exists) {
println("Error: " + lf.file)
return
}
fileInfo = lf :: fileInfo
})
pub = new MyPublisher("localhost", port)
while (true) {
fileInfo.foreach(pubNewLines)
Thread.sleep(1000) // sleep 1s
}
}
def pubNewLines(file: LogFile) {
val rand = new RandomAccessFile(file.file, "r")
rand.seek(file.offset)
try {
while (true) {
val b = rand.readByte()
line.append(b.toChar)
if (b == '\n') {
file.offset += line.length
//println(file.id, line.toString)
pub.publish(file.id, line.toString)
line.delete(0, line.length)
}
}
} catch {
case _: EOFException =>
}
rand.close
line.delete(0, line.length)
}
class LogFile(
var id: String,
var file: File,
var offset: Int=0)
class MyPublisher(val host: String, val port: Int) {
println("starting publishing service ...")
println(host + ":" + port)
val r = new RedisClient(host, port)
val p = actorOf(new Publisher(r))
p.start
def publish(channel: String, message: String) = {
p ! Publish(channel, message)
}
}
}
Sunday, October 30, 2011
Add timer to task pool
using boost::asio::deadline_timer;
struct TaskPoolTimer : public TaskPool
{
template
void run(T func, uint64_t us)
{
auto t = new deadline_timer(service_, boost::posix_time::microseconds(us));
tt->async_wait([=](const boost::system::error_code&) {
func(); delete tt;
});
}
};
int main() {
TaskPoolTimer p;
std::cout << time(NULL) << std::endl;
auto f = [](){std::cout<< time(NULL) << std::endl;};
p.run(f, 2e+6);
p.run(f, 4e+6);
return 0;
}
A comparison of boost::bind and std::bind
#include <boost/bind.hpp>
//g++ test.cc -std=c++0x
struct A
{
A() { std::cout << "A" << std::endl; }
A(const A& a) { std::cout << "A(A)" << std::endl; }
A& operator=(const A& a) { std::cout << "A=" << std::endl; return *this; }
~A() { std::cout << "~A" << std::endl; }
};
int main() {
auto f = [=](A& a) {};
A a;
std::cout << std::endl;
std::cout << "boost" << std::endl;
boost::bind<void>(f, a)();
std::cout << std::endl;
std::cout << "boost ref" << std::endl;
boost::bind<void>(f, std::ref(a))();
std::cout << std::endl;
std::cout << "std ref" << std::endl;
std::bind(f, std::ref(a))();
std::cout << std::endl;
std::cout << "std" << std::endl;
std::bind(f, a)();
std::cout << std::endl;
std::cout << "end of main" << std::endl;
return 0;
}
Output:
A boost A(A) A(A) A(A) A(A) ~A A(A) ~A ~A ~A ~A boost ref std ref std A(A) ~A end of main ~A
Task Pool with boost::asio
We can design a simple task pool with the boost::aiso recipe, A thread pool for executing arbitrary tasks.
However, if you want to implement parallel computation, tbb:task_group is faster, or you should choose OpenMP. tbb:task_group schedules the execution of tasks, so the execution order is not guaranteed.
class TaskPool
{
public:
TaskPool(size_t nthreads=1)
{
work_ = new boost::asio::io_service::work(service_);
for (std::size_t i = 0; i < nthreads; ++i)
threads_.create_thread(boost::bind(&boost::asio::io_service::run, &service_));
}
~TaskPool()
{
if (work_)
stop();
}
void stop(bool wait=true)
{
if (wait) {
delete work_; // if not delete this, io_service::run will never exit
threads_.join_all();
service_.stop();
} else {
service_.stop();
threads_.join_all();
delete work_;
}
work_ = NULL;
}
template <typename T>
void run(T func)
{
service_.post(func);
}
protected:
boost::thread_group threads_;
boost::asio::io_service service_;
boost::asio::io_service::work* work_;
};