上一节介绍了Ceph的一些关键数据结构,本节将介绍线程池Threadpool的数据结构。
线程池可以极大的提高分布式系统的消息并发处理能力,因此十分重要。它的结构如下:
在线程中,join方法的作用是调用线程等待该线程完成后,才能继续往下顺序运行。_old_threads就是等待Join操作的旧线程集合。
工作队列集合,保存所有要处理的任务。一般情况下,一个工作队列对应一个类型的处理任务,一个线程池对应一个工作队列,专门用于处理该类型的任务。如果是后台任务,不紧急,就可以将多个工作队列放置到一个线程池里,该线程池可以处理不同类型的任务。
线程池的实现一般包括线程启动,工作队列管理,和任务执行。ceph的线程池还实现了线程的超时检查和ShardedThreadPool功能。接下来对它们进行一一介绍。
线程启动
线程池是许多线程的集合,当要处理一个工作队列里的任务时,就要启动一个空闲的线程。函数start 用来启动线程池,其在加锁的情况下,调用函数start_threads,该函数检查当前线程数,如果小于配置的线程池,就创建新的工作线程。
工作队列管理:
工作队列是线程池需要处理的任务的集合。在ceph中,有一个工作队列的基类WorkQueue_,只定义了一个队列应该有的一些特定的虚函数,目的是为了调用到自己的子类WorkQueue等自己定义的函数。而在子类中对应函数_void_process,_void_process_finish中又分别调用了使用者自己继承它们而自己实现的具体操作函数如_process,_process_finish等。
工作队列WorkQueue是在继承基类WorkQueue_的基础上实现的,实现了一部分功能:进队列和出队列,加锁,并通过条件变量通知相应的处理线程。
任务执行:
在ThreadPool中的工作线程是WorkThread集合,线程池通过调用workthread来启动任务。任务执行的入口函数是ThreadPool::worker函数,此函数内定义了WorkThread类线程的操作逻辑。基本流程就是轮询所有WorkQueue_,当发现某种类型WorkQueue_中有数据时拿出,然后依次调用该WorkQueue_自己定义的函数_void_process和_void_process_finish等函数来顺序执行操作。
void ThreadPool::worker(WorkThread *wt)
{
……
while (!_stop) {//确保线程池未关闭
……
if (!_pause && !work_queues.empty()) {//确保线程池未中止,且工作队列不为空
WorkQueue_* wq;
int tries = workqueues.size();
bool did = false;
while(tries--) {//从last_work_queue遍历每一个工作任务
last_work_queue++;
last_work_queue %= work_queues.size();
wq = work_queues[last_work_queue];
void *item = wq->_void_dequeue();
if (item) {//调用工作队列的处理函数进行处理
wq->_void_process(item,tp_handle);//这一块就是需要自定义的
wq->_void_process_finish(item);
}
……
}
……
}
……
}
超时检查:每次线程函数执行时,TPHandle都会设置一个grace超时时间,当线程执行超过该时间,就认为是unhealthy的状态。当执行时间超过suicide_grace时,OSD就会产生断言而导致线程自杀:
其中,结构heartbeat_handle_d记录了相关信息,并把该结构添加到HeartbeatMap的系统链表中保存。OSD会有一个定时器,定时检查该链表是否超时。
ShardedThreadPool
ThreadPool实现的线程池,其每个线程都有机会处理工作队列的任意一个任务。这就会导致一个问题,如果任务之间有互斥性,那么正在处理该任务的两个线程有一个必须等待另一个处理完成后才能处理,从而导致线程的阻塞,性能下降。
如上图所示,线程Thread1和Thread2分别正在处理Job1和Job2。由于Job1和Job2的关联性,二者不能并发执行,只能顺序执行,二者之间用一个互斥锁来控制。如果Thread1先获得互斥锁就先执行,Thread2必须等待,直到Thread1执行完Job1后释放了该互斥锁,Thread2获得该互斥锁后才能执行Job2。显然,这种任务的调度方式应对这种不能完全并行的任务是有缺陷的。实际上Thread2可以去执行其他任务,比如Job5。Job1和Job2既然是顺序的,就都可以交给Thread1执行。
ShardedThreadPool即对上述的任务调度方式做了改进,其在线程的执行函数里,添加了表示线程的thread_index:void shardedthreadpool_worker(uint32_t thread_index);Shard方式,还需要使用者自己去实现。其基本的思想就是:每个线程对应一个任务队列,所有需要顺序执行的任务都放在同一个线程的任务队列里,全部由该线程执行。
(以上内容均为对参考资料的学习总结)
参考资料:
《Ceph源码分析》 常涛 编著 机械工业出版社
ceph源码分析之线程介绍_ywy463726588的专栏-CSDN博客_ceph源码分析
《Ceph源码分析》——第2章,第3节线程池-阿里云开发者社区