C语言简单实现线程池
0前言
网上关于线程池的例子还是不少,简单明了的倒是比较少,看了网上的资料,打算借鉴网上的一些例子,自己实现以下。
线程的概念就不多说,首先说一下多线程的好处:多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
那么为什么又需要线程池呢?
我们知道应用程序创建一个对象,然后销毁对象是很耗费资源的。创建线程,销毁线程,也是如此。因此,我们就预先生成一些线程,等到我们使用的时候在进行调度,于是,一些"池化资源"技术就这样的产生了。
1线程池优点
下面使用网上资源验证线程池如何提高服务器性能的。
我所提到服务器程序是指能够接受客户请求并能处理请求的程序,而不只是指那些接受网络客户请求的网络服务器程序。
多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。但如果对多线程应用不当,会增加对单个任务的处理时间。可以举一个简单的例子:
假设在一台服务器完成一项任务的时间为T
T1 创建线程的时间
T2 在线程中执行任务的时间,包括线程间同步所需时间
T3 线程销毁的时间
显然T = T1+T2+T3。注意这是一个极度简化的假设。
可以看出T1,T3是多线程本身的带来的开销,我们渴望减少T1,T3所用的时间,从而减少T的时间。但一些线程的使用者并没有注意到这一点,所以在程序中频繁的创建或销毁线程,这导致T1和T3在T中占有相当比例。显然这是突出了线程的弱点(T1,T3),而不是优点(并发性)。
线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。
线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目。在看一个例子:
假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。我们比较利用线程池技术和不利于线程池技术的服务器处理这些请求时所产生的线程总数。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目或者上限(以下简称线程池尺寸),而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池尺寸是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。
这些都是假设,不能充分说明问题,下面我将讨论线程池的简单实现并对该程序进行对比测试,以说明线程技术优点及应用领域。
2线程池的简单实现
一般一个简单线程池至少包含下列组成部分。
- 线程池管理器(ThreadPoolManager):用于创建并管理线程池
- 工作线程(WorkThread): 线程池中线程
- 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
- 任务队列:用于存放没有处理的任务。提供一种缓冲机制。
下面是代码:
全局文件:
-
-
-
-
-
-
-
-
-
#ifndef_GLOBAL_H_
-
#define_GLOBAL_H_
-
-
#include<sys/types.h>
-
#include<sys/time.h>
-
#include<unistd.h>/**/
-
#include<stdarg.h>
-
#include<stddef.h>/*offsetof()*/
-
#include<stdio.h>
-
#include<stdlib.h>
-
#include<errno.h>
-
#include<string.h>
-
#include<signal.h>
-
#include<pwd.h>
-
#include<grp.h>
-
#include<dirent.h>
-
#include<glob.h>
-
#include<sys/vfs.h>/*statfs()*/
-
-
#include<sys/uio.h>
-
#include<sys/stat.h>
-
#include<fcntl.h>
-
-
#include<sys/wait.h>
-
#include<sys/mman.h>
-
#include<sys/resource.h>
-
#include<sched.h>
-
-
#include<sys/socket.h>
-
#include<netinet/in.h>
-
#include<netinet/tcp.h>/*TCP_NODELAY,TCP_CORK*/
-
#include<arpa/inet.h>
-
#include<netdb.h>
-
#include<sys/un.h>
-
-
#include<time.h>/*tzset()*/
-
#include<malloc.h>/*memalign()*/
-
#include<limits.h>/*IOV_MAX*/
-
#include<sys/ioctl.h>
-
#include<sys/sysctl.h>
-
#include<crypt.h>
-
#include<sys/utsname.h>/*uname()*/
-
#include<semaphore.h>
-
-
#include<sys/epoll.h>
-
#include<poll.h>
-
#include<sys/syscall.h>
-
#include<pthread.h>
-
#endif
thread.c文件
-
-
-
-
-
-
-
-
-
-
-
#ifndef_THPOOL_
-
#define_THPOOL_
-
-
#include"global.h"
-
-
-
-
typedefvoid*(*FUNC)(void*arg);
-
-
-
typedefstruct_thpool_job_t{
-
-
FUNCfunction;
-
void*arg;
-
struct_thpool_job_t*prev;
-
struct_thpool_job_t*next;
-
}thpool_job_t;
-
-
-
-
-
-
typedefstruct_thpool_job_queue{
-
thpool_job_t*head;
-
thpool_job_t*tail;
-
intjobN;
-
sem_t*queueSem;
-
}thpool_jobqueue;
-
-
-
-
-
-
typedefstruct_thpool_t{
-
pthread_t*threads;
-
intthreadsN;
-
thpool_jobqueue*jobqueue;
-
}thpool_t;
-
-
typedefstructthread_data{
-
pthread_mutex_t*mutex_p;
-
thpool_t*tp_p;
-
}thread_data;
-
-
-
thpool_t*thpool_init(intthreadN);
-
-
voidthpool_thread_do(thpool_t*tp_p);
-
-
intthpool_add_work(thpool_t*tp_p,void*(*function_p)(void*),void*arg_p);
-
-
voidthpool_destroy(thpool_t*tp_p);
-
-
-
-
intthpool_jobqueue_init(thpool_t*tp_p);
-
-
-
-
voidthpool_jobqueue_add(thpool_t*tp_p,thpool_job_t*newjob_p);
-
-
intthpool_jobqueue_removelast(thpool_t*tp_p);
-
-
thpool_job_t*thpool_jobqueue_peek(thpool_t*tp_p);
-
-
voidthpool_jobqueue_empty(thpool_t*tp_p);
-
-
#endif
thread.c
-
-
-
-
-
-
-
-
#include"global.h"
-
#include"Thread.h"
-
#include<errno.h>
-
-
staticintthpool_keepalive=1;
-
-
-
pthread_mutex_tmutex=PTHREAD_MUTEX_INITIALIZER;
-
-
thpool_t*thpool_init(intthreadN)
-
{
-
thpool_t*thpool;
-
if(!threadN||threadN<1)
-
threadN=1;
-
-
thpool=(thpool_t*)malloc(sizeof(thpool_t));
-
if(thpool==NULL)
-
{
-
printf("mallocthpool_terror");
-
returnNULL;
-
}
-
-
thpool->threadsN=threadN;
-
thpool->threads=(pthread_t*)malloc(threadN*sizeof(pthread_t));
-
if(thpool->threads==NULL)
-
{
-
printf("mallocthpool->threadserror");
-
returnNULL;
-
}
-
if(thpool_jobqueue_init(thpool))
-
return-1;
-
-
thpool->jobqueue->queueSem=(sem_t*)malloc(sizeof(sem_t));
-
sem_init(thpool->jobqueue->queueSem,0,1);
-
intt;
-
for(t=0;t<threadN;t++)
-
{
-
pthread_create(&(thpool->threads[t]),NULL,(void*)thpool_thread_do,(void*)thpool);
-
}
-
-
returnthpool;
-
}
-
-
voidthpool_destroy(thpool_t*tp_p)
-
{
-
inti;
-
thpool_keepalive=0;
-
-
for(i=0;i<(tp_p->threadsN);i++)
-
{
-
if(sem_post(tp_p->jobqueue->queueSem))
-
{
-
fprintf(stderr,"thpool_destroy():Couldnotbypasssem_wait()\n");
-
}
-
-
}
-
if(sem_post(tp_p->jobqueue->queueSem)!=0)
-
{
-
fprintf(stderr,"thpool_destroy():Couldnotdestroysemaphore\n");
-
}
-
for(i=0;i<(tp_p->threadsN);i++)
-
{
-
pthread_join(tp_p->threads[i],NULL);
-
}
-
thpool_jobqueue_empty(tp_p);
-
-
free(tp_p->threads);
-
free(tp_p->jobqueue->queueSem);
-
free(tp_p->jobqueue);
-
free(tp_p);
-
-
}
-
-
-
intthpool_jobqueue_init(thpool_t*tp_p){
-
tp_p->jobqueue=(thpool_jobqueue*)malloc(sizeof(thpool_jobqueue));
-
if(tp_p->jobqueue==NULL)return-1;
-
tp_p->jobqueue->tail=NULL;
-
tp_p->jobqueue->head=NULL;
-
tp_p->jobqueue->jobN=0;
-
return0;
-
}
-
-
-
voidthpool_thread_do(thpool_t*tp_p)
-
{
-
while(thpool_keepalive==1)
-
{
-
if(sem_wait(tp_p->jobqueue->queueSem))
-
{
-
perror("thpool_thread_do():Waitingforsemaphore");
-
exit(1);
-
}
-
if(thpool_keepalive)
-
{
-
-
FUNCfunction;
-
void*arg_buff;
-
thpool_job_t*job_p;
-
-
pthread_mutex_lock(&mutex);
-
job_p=thpool_jobqueue_peek(tp_p);
-
function=job_p->function;
-
arg_buff=job_p->arg;
-
if(thpool_jobqueue_removelast(tp_p))
-
return;
-
pthread_mutex_unlock(&mutex);
-
function(arg_buff);
-
free(job_p);
-
}
-
else
-
{
-
return;
-
}
-
-
}
-
return;
-
}
-
-
-
thpool_job_t*thpool_jobqueue_peek(thpool_t*tp_p)
-
{
-
returntp_p->jobqueue->tail;
-
}
-
-
intthpool_jobqueue_removelast(thpool_t*tp_p)
-
{
-
if(tp_p==NULL)
-
return-1;
-
thpool_job_t*theLastJob;
-
theLastJob=tp_p->jobqueue->tail;
-
switch(tp_p->jobqueue->jobN)
-
{
-
case0:
-
return-1;
-
case1:
-
tp_p->jobqueue->head=NULL;
-
tp_p->jobqueue->tail=NULL;
-
break;
-
default:
-
theLastJob->prev->next=NULL;
-
tp_p->jobqueue->tail=theLastJob->prev;
-
-
}
-
(tp_p->jobqueue->jobN)--;
-
intreval;
-
sem_getvalue(tp_p->jobqueue->queueSem,&reval);
-
return0;
-
}
-
-
voidthpool_jobqueue_add(thpool_t*tp_p,thpool_job_t*newjob_p)
-
{
-
newjob_p->next=NULL;
-
newjob_p->prev=NULL;
-
thpool_job_t*oldFirstJob;
-
oldFirstJob=tp_p->jobqueue->head;
-
-
switch(tp_p->jobqueue->jobN)
-
{
-
case0:
-
tp_p->jobqueue->head=newjob_p;
-
tp_p->jobqueue->tail=newjob_p;
-
break;
-
default:
-
oldFirstJob->prev=newjob_p;
-
newjob_p->next=oldFirstJob;
-
tp_p->jobqueue->head=newjob_p;
-
-
}
-
(tp_p->jobqueue->jobN)++;
-
sem_post(tp_p->jobqueue->queueSem);
-
-
intreval;
-
sem_getvalue(tp_p->jobqueue->queueSem,&reval);
-
return;
-
}
-
-
-
intthpool_add_work(thpool_t*tp_p,void*(*function_p)(void*),void*arg_p)
-
{
-
thpool_job_t*newjob;
-
newjob=(thpool_job_t*)malloc(sizeof(thpool_job_t));
-
-
if(newjob==NULL)
-
{
-
fprintf(stderr,"thpool_add_work():Couldnotallocatememoryfornewjob\n");
-
exit(1);
-
}
-
newjob->function=function_p;
-
newjob->arg=arg_p;
-
pthread_mutex_lock(&mutex);
-
thpool_jobqueue_add(tp_p,newjob);
-
pthread_mutex_unlock(&mutex);
-
return0;
-
}
-
-
-
voidthpool_jobqueue_empty(thpool_t*tp_p)
-
{
-
thpool_job_t*curjob;
-
curjob=tp_p->jobqueue->tail;
-
-
while(tp_p->jobqueue->jobN)
-
{
-
tp_p->jobqueue->tail=curjob->prev;
-
free(curjob);
-
curjob=tp_p->jobqueue->tail;
-
(tp_p->jobqueue->jobN)--;
-
}
-
tp_p->jobqueue->head=NULL;
-
tp_p->jobqueue->tail=NULL;
-
}
下面是mian函数文件
-
-
-
-
-
-
-
-
-
#include"global.h"
-
#include"Thread.h"
-
-
void*task1()
-
{
-
printf("#Threadworking:%u\n",(int)pthread_self());
-
printf("Task1running..\n");
-
}
-
-
-
-
void*task2(inta)
-
{
-
printf("#Threadworking:%u\n",(int)pthread_self());
-
printf("Task2running..\n");
-
printf("%d\n",a);
-
}
-
intmain()
-
{
-
printf("~~~~~~~~~~~");
-
thpool_t*thpool;
-
inti;
-
thpool=thpool_init(5);
-
puts("Adding20taskstothreadpool");
-
inta=54;
-
for(i=0;i<20;i++){
-
thpool_add_work(thpool,(void*)task1,NULL);
-
thpool_add_work(thpool,(void*)task2,(void*)a);
-
};
-
-
-
puts("Willkillthreadpool");
-
thpool_destroy(thpool);
-
-
}
在linux下写程序少不了makefile文件。于是我自己写了一个比较通用的makefile文件。仅供大家参考
makefile 代码
-
SRCS=$(wildcard*.c)
-
-
OBJS=$(SRCS:.c=.o)
-
-
CC=gcc
-
-
INCLUDES=-I/
-
-
LIBS=-L/-lpthread
-
-
CCFLAGS=-g-Wall-O0
-
-
cThreadPool:$(OBJS)
-
-
$(CC)$^-o$@$(INCLUDES)$(LIBS)
-
-
%.o:%.cpp
-
-
$(CC)-c$<$(CCFLAGS)
-
-
-
clean:
-
-
rm*.o
-
-
.PHONY:clean
运行效果如下图
-
./test
-
Createdthread0inpool
-
Createdthread1inpool
-
Createdthread2inpool
-
Createdthread3inpool
-
Adding20taskstothreadpool
-
#Threadworking:3086773136
-
Task1running..
-
#Threadworking:3076283280
-
Task2running..
-
54
-
#Threadworking:3086773136
-
Task1running..
-
#Threadworking:3086773136
-
Task2running..
-
54
-
#Threadworking:3076283280
-
Task1running..
-
#Threadworking:3086773136
-
Task2running..
-
54
-
#Threadworking:3076283280
-
Task1running..
-
#Threadworking:3086773136
-
Task2running..
-
54
-
#Threadworking:3076283280
-
Task1running..
-
#Threadworking:3086773136
-
Task2running..
-
54
-
#Threadworking:3076283280
-
Task1running..
-
#Threadworking:3086773136
-
Task2running..
-
54
-
#Threadworking:3076283280
-
Task1running..
-
#Threadworking:3086773136
-
Task2running..
-
54
-
#Threadworking:3076283280
-
Task1running..
-
#Threadworking:3086773136
-
Task2running..
-
54
-
#Threadworking:3076283280
-
Task1running..
-
#Threadworking:3086773136
-
Task2running..
-
54
-
#Threadworking:3076283280
-
Task1running..
-
#Threadworking:3086773136
-
Task2running..
-
54
-
Willkillthreadpool
线程池也是参考了别人的。
分享到:
相关推荐
linux下C语言实现线程池
由C语言实现简单的线程池,任务调配,合理创建销毁线程处理任务
使用C语言实现一个基本线程池,包含Linux下条件变量的使用、线程池创建与销毁等等
C语言编写,Nginx线程池;Linux环境使用!
linux线程池创建c实现 linux线程池创建c实现 linux线程池创建c实现 linux线程池创建c实现 linux线程池创建c实现 linux线程池创建c实现
linux线程池,c语言实现,只是文件后缀名用的是cpp方便在qt里面测试,两种版本,都是参考网上的资料做了一些处理之后的
libstpool是一个轻便高效的跨平台 支持win32 linux unix arm 的c语言线程池 任务池库 支持优先级任务
C语言编写的可编译的线程池源码,实现线程池的管理,可移植LINUX下
linux 下用线程pthread写线程池,从而实现多生产者和消费者
包含: 线程池 互斥锁 状态机 任务投递 线程切换 高并发处理。 Makefile编译,使用方便。 可自定义 线程使用数 任务投递数。 适合LINUX C多线程技术,初学者学习和演示。
linux系统下C语言 利用线程池技术实现CP命令 压缩包包含:源代码+开发说明PPT 线程池头文件: //任务 struct task { void *(*task)(void *arg); void *arg; struct task *next; }; //线程池 typedef struct ...
操作系统课程项目,在linux下用c语言实现了多线程web服务器。可以选择不同的调度算法,来执行web请求,有FCFS, SJF。采用线程池设计思想实现。
Linux下线程池的C语言实现,可以稍微参考一下。
通常我们使用多线程的方式是,需要时创建一个新的线程,在这个新的线程里执行特定的...这在一般的应用里已经能够满足我们应用的需要,毕竟我们并不是什么时候都需要创建大量的线程,并在它们执行一个简单的任务后销毁。
本线程池采用C语言实现。包括以下内容 > - thread_pool_create:创建线程池所需要的资源,包含不限于任务队列,子线程的创建。 > - thread_pool_post:用于任务的发布,将执行任务存在任务队列中。 > - thread_pool_...
linux下通过200行C代码实现简单线程池,附源代码和文档说明: 200行C代码实现简单线程池.doc threadpool.c
C语言线程池实现实例
线程池,别人开源的,稳定性测试还不错,c语言的,linux下面运行没问题
保存的一份Linux下的线程池代码,使用C语言编写,另外添加了较详细的中文注释,附带测试代码,可以加深一些对线程池的认识,居家必备
简单的linux多线程实现,含互斥锁的使用