大多数网络服务器都具有一个特点,就是单位时间内必须处理数目巨大的连接请求,但是处理时间却是比较短的。在传统的多线程服务器模型中是这样的:一旦有个请求到达,就创建一个新的线程,由该线程执行任务,任务执行完毕之后,线程就退出。这就是"即时创建,即时销毁"的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数非常频繁,那么服务器就将处于一个不停的创建线程和销毁线程的状态。这笔开销是不可忽略的,尤其是线程执行的时间非常非常短的情况。线程池就能解决上述问题的。
那么什么是线程池呢?就是首先创建一些线程,它们的集合称为线程池。线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后线程池中的线程从队列中提取任务并自动启动这些任务。线程执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务。使用线程池可以很好地提高性能,线程池在系统启动时即创建大量空闲的线程,线程池中的线程都是后台线程,每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。
线程池的工作机制是怎么样的?在线程池的编程模式下,任务是提交给整个线程池,而不是直接提交给某个线程,线程池在拿到任务后,就在内部寻找是否有空闲的线程,如果有,则将任务交给某个空闲的线程。一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务。
下面来看看基于rtthread实现的线程池:
先定义线程堆栈、任务优先级等信息
#define THREAD_POOL_JOB_DEFAULT_PRIORITY 8 /**< 线程优先级 */
#define THREAD_POOL_JOB_TICK 5 /**< 线程持续运行时间 */
#define THREAD_POOL_NAME_MAX RT_NAME_MAX /**< 线程名最大长度 */
定义线程池状态码
/* 线程池错误码定义 */
typedef enum
{
THREAD_POOL_NO_ERR, /**< 无异常 */
THREAD_POOL_ALREADY_SHUTDOWN_ERR, /**< 线程池已关闭 */
THREAD_POOL_MEM_FULL_ERR, /**< 线程池内存申请错误 */
} thread_pool_err;
定义线程池对象,包括线程池操作函数、任务队列锁、任务同步信号等,队列锁跟同步信号组合成了linux中的条件等待功能。
/* 线程池对象 */
typedef struct _thread_pool
{
char name[THREAD_POOL_NAME_MAX + 1];/**< 线程池名,包括'\0' */
rt_mutex_t user_lock; /**< 提供给用户的互斥锁 */
rt_mutex_t queue_lock; /**< 线程池任务队列锁 */
rt_sem_t queue_ready; /**< 任务添加同步信号 */
uint8_t is_shutdown; /**< 线程池状态字 */
rt_thread_t *thread_id; /**< 线程对象指针,二级指针 */
uint8_t max_thread_num; /**< 最大线程数 */
uint8_t cur_wait_thread_num; /**< 当前线程池中任务数 */
rt_list_t list; /**< 挂载任务的链表 */
/** 任务添加函数 */
thread_pool_err(*add_task)(struct _thread_pool *const pool,
void (*process)(void *arg), void *arg);
/** 任务删除函数 */
thread_pool_err(*del_all)(struct _thread_pool *const pool);
/** 线程池销毁函数 */
thread_pool_err(*destroy)(struct _thread_pool *pool);
/** 线程池互斥上锁操作函数 */
void (*lock)(struct _thread_pool *pool);
/** 线程池互斥解锁操作函数*/
void (*unlock)(struct _thread_pool *pool);
} thread_pool, *thread_pool_t;
定义任务对象,主要成员是任务函数及参数
/* 线程任务对象 */
typedef struct _thread_pool_task
{
void (*process)(void *arg); /**< 任务函数 */
void *arg; /**< 任务函数的参数 */
rt_list_t tlist; /** < 链表,指向下一个任务 */
} thread_pool_task, *thread_pool_task_t;
线程池的初始化主要是对线程池中的成员进行初始化赋值并创建线程池中的线程,然后启动线程池中所有线程。参数:pool:线程池对象句柄,name:线程池名,thread_stack_size:线程池中线程堆栈大小
thread_pool_err init_thread_pool(thread_pool_t const pool, const char *name, uint8_t max_thread_num,
uint32_t thread_stack_size)
{
thread_pool_err errorCode = THREAD_POOL_NO_ERR;
char job_name[THREAD_POOL_NAME_MAX] = { 0 };
uint8_t i;
/* 初始化线程池名 */
strncpy(pool->name, name, THREAD_POOL_NAME_MAX);
strncpy(job_name, name, THREAD_POOL_NAME_MAX);
/* 初始化任务链表 */
rt_list_init(&(pool->list));
/* 创建任务队列互斥锁,用户互斥锁,任务同步信号*/
pool->queue_lock = rt_mutex_create("tp_qlock", RT_IPC_FLAG_FIFO);
pool->user_lock = rt_mutex_create("tp_ulock", RT_IPC_FLAG_FIFO);
pool->queue_ready = rt_sem_create("tp_qready", 0, RT_IPC_FLAG_FIFO);
/* 记录线程个数 */
pool->max_thread_num = max_thread_num;
pool->cur_wait_thread_num = 0;
pool->is_shutdown = RT_FALSE;
/* 线程操作函数赋值 */
pool->add_task = add_task;
pool->del_all = del_all;
pool->destroy = destroy;
pool->lock = sync_lock;
pool->unlock = sync_unlock;
/* 创建线程池线程 */
pool->thread_id = (rt_thread_t *) rt_malloc(max_thread_num * sizeof(rt_thread_t));
for (i = 0; i < max_thread_num; i++)
{
rt_snprintf(job_name, THREAD_POOL_NAME_MAX, "%s_%d", name, i);
pool->thread_id[i] = rt_thread_create(job_name, thread_job, pool, thread_stack_size,
THREAD_POOL_JOB_DEFAULT_PRIORITY, THREAD_POOL_JOB_TICK);
/* 启动线程池中创建的线程 */
rt_thread_startup(pool->thread_id[i]);
LOG_D("create thread success,number is %d", i + 1);
rt_thread_delay(10);
}
LOG_D("initialize thread pool success!");
return errorCode;
}
线程池中的线程已经被创建,等待线程从队列中提取任务并自动执行任务。
static void thread_job(void *arg)
{
thread_pool_t pool = NULL;
thread_pool_task_t task = NULL;
while (1)
{
pool = (thread_pool_t) arg;
/* 获取任务队列锁 */
rt_mutex_take(pool->queue_lock, RT_WAITING_FOREVER);
/* 判断线程池中是否有任务,无任务且线程池没有关闭的情况,则等待添加任务的同步信号 */
while (pool->cur_wait_thread_num == 0 && !pool->is_shutdown)
{
/* 同步信号值为0,释放互斥锁等待任务添加同步信号 */
if (pool->queue_ready->value == 0)
{
rt_mutex_release(pool->queue_lock);
rt_sem_take(pool->queue_ready, RT_WAITING_FOREVER);
rt_mutex_take(pool->queue_lock, RT_WAITING_FOREVER);
}
else
{
/* 无任务但信号量值不为0,则获取信号,清空信号值 */
rt_sem_take(pool->queue_ready, RT_WAITING_FOREVER);
}
}
/* 线程池已关闭,则释放互斥锁 */
if (pool->is_shutdown)
{
rt_mutex_release(pool->queue_lock);
return;
}
/* 判断任务队列中是否有任务*/
if (!rt_list_isempty(&(pool->list)))
{
/* 获取任务 */
task = rt_list_entry((pool->list.next), struct _thread_pool_task, tlist);
/* 将任务从任务队列中移除*/
rt_list_remove(&(task->tlist));
/* 当前等待任务数减1*/
pool->cur_wait_thread_num--;
}
/* 释放任务队列互斥锁 */
rt_mutex_release(pool->queue_lock);
/* 任务不为空执行任务 */
if (task != NULL)
{
/* 执行任务*/
(*(task->process))(task->arg);
/* 释放任务空间 */
rt_free(task);
}
task = NULL;
}
}
线程池中的线程在一直处于等待任务的状态,需要用户向线程池中添加任务,我们看看任务添加的过程。参数:pool:线程池对象,process:任务函数,arg:任务参数
static thread_pool_err add_task(thread_pool_t const pool, void (*process)(void *arg), void *arg)
{
thread_pool_err error_code = THREAD_POOL_NO_ERR;
/* 申请任务对象内存 */
thread_pool_task_t newtask = (thread_pool_task_t) rt_malloc(sizeof(thread_pool_task));
if (!newtask)
{
/* 任务对象内存申请失败 */
LOG_W("Memory full!");
return THREAD_POOL_MEM_FULL_ERR;
}
/* 任务赋值处理 */
newtask->process = process;
newtask->arg = arg;
rt_list_init(&(newtask->tlist));
/* 获取任务队列互斥锁,向线程池中添加任务 */
if (rt_mutex_take(pool->queue_lock, RT_WAITING_FOREVER) == RT_EOK)
{
//将任务加入到任务队列中
rt_list_insert_before(&(pool->list), &(newtask->tlist));
/* 当前等待处理任务数量加1 */
pool->cur_wait_thread_num++;
/* 释放任务队列互斥锁 */
rt_mutex_release(pool->queue_lock);
/* 向线程池中等待任务的线程的线程发送同步信号 */
rt_sem_release(pool->queue_ready);
}
LOG_D("add a task success.");
return error_code;
}
当我们不需要执行任务时,我们可以删除任务并销毁线程池
/** 删除所有等待的任务,pool:线程池对象指针 */
static thread_pool_err del_all(thread_pool_t const pool)
{
thread_pool_err error = THREAD_POOL_NO_ERR;
thread_pool_task_t thread_task = NULL;
rt_list_t *task_node = NULL;
/* 获取任务队列信号 */
rt_mutex_take(pool->queue_lock, RT_WAITING_FOREVER);
/* 判断队列中是否有任务,有就执行删除操作 */
if (!rt_list_isempty(&pool->list))
{
/* 获取任务节点信息 */
task_node = pool->list.next;
while (task_node != &(pool->list))
{
thread_task = rt_list_entry(task_node, struct _thread_pool_task, tlist);
task_node = task_node->next;
/* 释放任务空间内存 */
if (thread_task != NULL)
rt_free(thread_task);
pool->cur_wait_thread_num--;
thread_task = NULL;
}
}
/* 复位任务队列链表 */
rt_list_init(&(pool->list));
/* 清除同步信号 */
rt_sem_control(pool->queue_ready, RT_IPC_CMD_RESET, NULL);
LOG_D("delete all wait task success");
rt_mutex_release(pool->queue_lock);
return error;
}
/**
* 销毁线程池,pool:线程池对象
*/
static thread_pool_err destroy(thread_pool_t pool)
{
thread_pool_err error = THREAD_POOL_NO_ERR;
thread_pool_task_t thread_task = NULL;
rt_list_t *task_node = NULL;
uint8_t i;
if (pool->is_shutdown)
{
error = THREAD_POOL_ALREADY_SHUTDOWN_ERR;
}
if (error == THREAD_POOL_NO_ERR)
{
pool->is_shutdown = RT_TRUE;
/* 删除线程池中的线程 */
for (i = 0; i < pool->max_thread_num; i++)
{
rt_thread_delete(pool->thread_id[i]);
}
/* 删除同步信号及队列互斥信号 */
rt_mutex_delete(pool->queue_lock);
rt_sem_delete(pool->queue_ready);
/* 释放线程池中线程内存 */
rt_free(pool->thread_id);
pool->thread_id = NULL;
/* 删除任务队列中的任务 */
if (!rt_list_isempty(&pool->list))
{
task_node = pool->list.next;
while (task_node != &(pool->list))
{
thread_task = rt_list_entry(task_node, struct _thread_pool_task, tlist);
task_node = task_node->next;
if (thread_task != NULL)
rt_free(thread_task);
pool->cur_wait_thread_num--;
thread_task = NULL;
}
}
/* 线程池任务队列链表复位*/
rt_list_init(&(pool->list));
/* 删除用户互斥锁 */
rt_mutex_delete(pool->user_lock);
pool = NULL;
LOG_D("Thread pool destroy success");
}
return error;
}
下面来就来测试一下上面的代码是否可行
#include <stdlib.h>
#define DBG_SECTION_NAME "thread_pool"
#define DBG_LEVEL DBG_LOG
#include <rtdbg.h>
#include <finsh.h>
#include "thread_pool.h"
static void task(void *arg)
{
LOG_I("thread %s is running.%d\r\n", rt_thread_self()->name,*((uint32_t *)arg));
}
thread_pool pool;
void thread_pool_sample(void)
{
static uint8_t status = 0;
if(status == 0)
{
status = 1;
init_thread_pool(&pool, "sam", 10, 1024);
}
/* add 5 task to thread pool */
pool.add_task(&pool, task, (void *)(rand() % 100));
pool.add_task(&pool, task, (void *)(rand() % 100));
pool.add_task(&pool, task, (void *)(rand() % 100));
pool.add_task(&pool, task, (void *)(rand() % 10));
pool.add_task(&pool, task, (void *)(rand() % 10));
// /* wait 10S */
// rt_thread_delay(rt_tick_from_millisecond(500));
// /* delete all task */
// pool.del_all(&pool);
// /* destroy the thread pool */
// pool.destroy(&pool);
}
我们来回顾一下线程池工作方式:程序启动之前,初始化线程池,启动线程池中的线程,由于还没有任务到来,线程池中的所有线程都处在阻塞状态,当一有任务到达就从线程池中取出一个空闲线程处理,如果所有的线程都处于工作状态,就添加到队列,进行排队。如果队列中的任务个数大于队列的所能容纳的最大数量,那就不能添加任务到队列中,只能等待队列不满才能添加任务到队列中。
文章评论