背景

最近应客户端的需求,为客户端开发了一套c++版本的日志系统(后续会专门讲述客户端日志系统的文章),其中有个功能点是需要每条日志发送到网络,于是按照后台的开发思维方式,我就好不犹豫的设计了一下日志生产和消费者模型,同时也将自己带到了坑中-_-。

使用队列的消费

按照后台的开发方式,如果要实现一个生产者-消费者模型,设计如下(伪代码):

// 静态队列
static std::list<Node> queue_;
// 生产者类
class Producer {
public:
	int PushQueue(Node &node) {
		// 将数据node写入到queue_中
	}
}
// 消费者
class Comsumer {
public:
	int PopQueue(Node &node) {
		// 从queue_取出数据写入到node中
	}
	void ComsumeThreadFunc() {
		while (1) {
			Node tmp_node;
			PopQueue(tmp_node);
			// 取完数据做其他的逻辑
		}
	}
	void CreateComsumeThread() {
		pthread_t thread_id;
		pthread_create(&thread_id, NULL, ComsumeThreadFunc, (void*)this);
		// ...
	}
}

很快形如流水的把第一个版本的这个功能写完了,不知道读者有没有发现了哪些问题?
首先,队列没有加锁,这样在使用的过程中必然会导致日志错乱;
其次,while循环处理逻辑PopQueue没有判断,如果队列为空,我们需要等待或者直接退出,这里使用等待40ms;
最后,设计模型不合理,我们应该将PushQueue和PopQueue封装到一起,这样才能解耦相关逻辑;
review一遍代码以后发现一些问题,马上修改一个版本:

// 静态队列
static std::list<Node> queue_;
static Comsumer comsumer_;
// 生产者类
class Producer {
	void Dispose() {
		// 添加数据
		comsumer_.PushQueue(node);
		// ...
	}
}
// 消费者
class Comsumer {
	int PopQueue(Node &node) {
		queue_mutex_.lock();
		// 从queue_取出数据写入到node中
		queue_mutex_.unlock();
	}
	// 为了逻辑解耦,所以将入队列逻辑直接封装到消费者队列中(其实最好单独提出一个类)
	int PushQueue(Node &node) {
		queue_mutex_.lock();
		// 将数据node写入到queue_中
		queue_mutex_.unlock();
	}
	void ComsumeThreadFunc() {
		int thread_run_count = 0;
		while (1) {
			// 防止线程出问题,当运行一段时间以后直接退出
			if (thread_run_count > 某个阈值) {
				thread_run_flag = false;
				break; // 退出线程
			}
			Node tmp_node;
			int ret = PopQueue(tmp_node);
			if (ret == 0) {
				usleep(40000); // 休眠
				continue; // 退出以后
			}
			// 取完数据做其他的逻辑

			++thread_run_count;
		}

		pthread_exit(NULL);
	}
	void CreateComsumeThread() {
		pthread_t thread_id;
		if (!thread_run_flag) {
			thread_run_flag = true;
			if (pthread_create(&thread_id, NULL, 
				ComsumeThreadFunc, (void*)this) != 0) {
				thread_run_flag = false;
			}
		}
		
		// ...
	}
private:
	// 线程锁
	Mutex queue_mutex_;
	// 线程运行时标志,默认为false
	int thread_run_flag;
}

这次似乎比较完美了,于是马上提交给同事review,正在庆幸又完成了一个小功能的时候,发现同事弱弱的说:如果用户没有往队列消费数据或者不使用这个功能,是不是有一个线程一直在跑,我只能:-_-,的确是一个轮询的消费线程一直在执行,与服务端开发不同,在用户的手机上开一个线程是会影响用户耗电,流畅等功能,看来一上午写的功能都白费了,于是重新思考这个消费者模型…

添加信号量通知消费线程

线程的信号量是与进程间通讯中使用的信号量的概念是一样的,能被增加和减少,保障被增加和减少对应的变量是原子操作,同时让多个线程访问的同一个信号量的操作是顺序的,线程的semaphore.h中提供四个api:sem_init-初始化;sem_wait-以原子操作的方式将信号量的值减1,当信号的计数为零的时候,将休眠挂起当前调用线程,直到信号量计数不为零;sem_post-以原子操作的方式将信号量的值加1;sem_destroy-对用完的信号量的清理;看一个使用的例子:

// 主线程
...
res = sem_init(&sem, 0, 0); 
res = pthread_create(&thread, NULL, thread_func, msg);  
if(res != 0)  
{  
    perror("pthread_create failed\n");  
    exit(EXIT_FAILURE);  
}  
while (true)
{
	...
	sem_post(&sem); // 数据+1
	...
}
// 产生的线程执行代码
void* thread_func(void *msg) 
{
	while(true)
	{
		...
		sem_wait(&sem); // 阻塞 
		...
	}
}

那么用到日志的功能模块中,然后马上进行如下改造:

...
int PushQueue(Node &node) {
	queue_mutex_.lock();
	// 将数据node写入到queue_中
	queue_mutex_.unlock();
	sem_post(&sem); // +1
}
...
int ret = PopQueue(tmp_node);
if (ret == 0) {
	// usleep(40000); // 休眠
	sem_wait(&sem); // 等待sem>0
	continue; // 退出以后
}
...

最终的模型

修改完了以后,正准备提交代码,发现一个小的问题,如果sem信号量不成功,那么sem_wait要么一直组塞,要么直接跳过执行,想想这对客户端是巨大隐患,于是又思考如何修改,最终定义如下的解决方案:在sem_init初始化以后,设置一个init_status = true,否则则为false,同时将判断init_status = true则使用sem_wait,否则使用之前的usleep;

int ret = PopQueue(tmp_node);
if (ret == 0) {
	if (init_status) {
		sem_wait(&sem); // 等待sem>0
	} else {
		usleep(40000); // 休眠
	}
	
	continue; // 退出以后
}

从日志模块中的一个小功能的开发看来,后台开发与客户端开发理念有点不一样,客户端发出去的版本不能实时修复,不能远程直接调试,不能被着用户起很多耗时的操作…,只能说要写好客户端代码可能比后台要难很多^_^。