记一次线程之间的数据消费的优化
背景
最近应客户端的需求,为客户端开发了一套c++版本的日志系统(后续会专门讲述客户端日志系统的文章),其中有个功能点是需要每条日志发送到网络,于是按照后台的开发思维方式,我就好不犹豫的设计了一下日志生产和消费者模型,同时也将自己带到了坑中-_-。
使用队列的消费
按照后台的开发方式,如果要实现一个生产者-消费者模型,设计如下(伪代码):
// 静态队列
static std::list<Node> queue_;
// 生产者类
class Producer {
public:
int PushQueue(Node &node) {
// 将数据node写入到queue_中
}
}
// 消费者
class Comsumer {
public:
int PopQueue(Node &node) {
// 从queue_取出数据写入到node中
}
void ComsumeThreadFunc() {
while (1) {
Node tmp_node;
PopQueue(tmp_node);
// 取完数据做其他的逻辑
}
}
void CreateComsumeThread() {
pthread_t thread_id;
pthread_create(&thread_id, NULL, ComsumeThreadFunc, (void*)this);
// ...
}
}
很快形如流水的把第一个版本的这个功能写完了,不知道读者有没有发现了哪些问题?
首先,队列没有加锁,这样在使用的过程中必然会导致日志错乱;
其次,while循环处理逻辑PopQueue没有判断,如果队列为空,我们需要等待或者直接退出,这里使用等待40ms;
最后,设计模型不合理,我们应该将PushQueue和PopQueue封装到一起,这样才能解耦相关逻辑;
review一遍代码以后发现一些问题,马上修改一个版本:
// 静态队列
static std::list<Node> queue_;
static Comsumer comsumer_;
// 生产者类
class Producer {
void Dispose() {
// 添加数据
comsumer_.PushQueue(node);
// ...
}
}
// 消费者
class Comsumer {
int PopQueue(Node &node) {
queue_mutex_.lock();
// 从queue_取出数据写入到node中
queue_mutex_.unlock();
}
// 为了逻辑解耦,所以将入队列逻辑直接封装到消费者队列中(其实最好单独提出一个类)
int PushQueue(Node &node) {
queue_mutex_.lock();
// 将数据node写入到queue_中
queue_mutex_.unlock();
}
void ComsumeThreadFunc() {
int thread_run_count = 0;
while (1) {
// 防止线程出问题,当运行一段时间以后直接退出
if (thread_run_count > 某个阈值) {
thread_run_flag = false;
break; // 退出线程
}
Node tmp_node;
int ret = PopQueue(tmp_node);
if (ret == 0) {
usleep(40000); // 休眠
continue; // 退出以后
}
// 取完数据做其他的逻辑
++thread_run_count;
}
pthread_exit(NULL);
}
void CreateComsumeThread() {
pthread_t thread_id;
if (!thread_run_flag) {
thread_run_flag = true;
if (pthread_create(&thread_id, NULL,
ComsumeThreadFunc, (void*)this) != 0) {
thread_run_flag = false;
}
}
// ...
}
private:
// 线程锁
Mutex queue_mutex_;
// 线程运行时标志,默认为false
int thread_run_flag;
}
这次似乎比较完美了,于是马上提交给同事review,正在庆幸又完成了一个小功能的时候,发现同事弱弱的说:如果用户没有往队列消费数据或者不使用这个功能,是不是有一个线程一直在跑,我只能:-_-,的确是一个轮询的消费线程一直在执行,与服务端开发不同,在用户的手机上开一个线程是会影响用户耗电,流畅等功能,看来一上午写的功能都白费了,于是重新思考这个消费者模型…
添加信号量通知消费线程
线程的信号量是与进程间通讯中使用的信号量的概念是一样的,能被增加和减少,保障被增加和减少对应的变量是原子操作,同时让多个线程访问的同一个信号量的操作是顺序的,线程的semaphore.h中提供四个api:sem_init-初始化;sem_wait-以原子操作的方式将信号量的值减1,当信号的计数为零的时候,将休眠挂起当前调用线程,直到信号量计数不为零;sem_post-以原子操作的方式将信号量的值加1;sem_destroy-对用完的信号量的清理;看一个使用的例子:
// 主线程
...
res = sem_init(&sem, 0, 0);
res = pthread_create(&thread, NULL, thread_func, msg);
if(res != 0)
{
perror("pthread_create failed\n");
exit(EXIT_FAILURE);
}
while (true)
{
...
sem_post(&sem); // 数据+1
...
}
// 产生的线程执行代码
void* thread_func(void *msg)
{
while(true)
{
...
sem_wait(&sem); // 阻塞
...
}
}
那么用到日志的功能模块中,然后马上进行如下改造:
...
int PushQueue(Node &node) {
queue_mutex_.lock();
// 将数据node写入到queue_中
queue_mutex_.unlock();
sem_post(&sem); // +1
}
...
int ret = PopQueue(tmp_node);
if (ret == 0) {
// usleep(40000); // 休眠
sem_wait(&sem); // 等待sem>0
continue; // 退出以后
}
...
最终的模型
修改完了以后,正准备提交代码,发现一个小的问题,如果sem信号量不成功,那么sem_wait要么一直组塞,要么直接跳过执行,想想这对客户端是巨大隐患,于是又思考如何修改,最终定义如下的解决方案:在sem_init初始化以后,设置一个init_status = true,否则则为false,同时将判断init_status = true则使用sem_wait,否则使用之前的usleep;
int ret = PopQueue(tmp_node);
if (ret == 0) {
if (init_status) {
sem_wait(&sem); // 等待sem>0
} else {
usleep(40000); // 休眠
}
continue; // 退出以后
}
从日志模块中的一个小功能的开发看来,后台开发与客户端开发理念有点不一样,客户端发出去的版本不能实时修复,不能远程直接调试,不能被着用户起很多耗时的操作…,只能说要写好客户端代码可能比后台要难很多^_^。