`
hyshucom
  • 浏览: 802854 次
文章分类
社区版块
存档分类
最新评论

Linux 下C语言简单实现线程池

 
阅读更多

C语言简单实现线程池

0前言

网上关于线程池的例子还是不少,简单明了的倒是比较少,看了网上的资料,打算借鉴网上的一些例子,自己实现以下。

线程的概念就不多说,首先说一下多线程的好处:多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。

那么为什么又需要线程池呢?

我们知道应用程序创建一个对象,然后销毁对象是很耗费资源的。创建线程,销毁线程,也是如此。因此,我们就预先生成一些线程,等到我们使用的时候在进行调度,于是,一些"池化资源"技术就这样的产生了。

1线程池优点

下面使用网上资源验证线程池如何提高服务器性能的。

我所提到服务器程序是指能够接受客户请求并能处理请求的程序,而不只是指那些接受网络客户请求的网络服务器程序。

多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。但如果对多线程应用不当,会增加对单个任务的处理时间。可以举一个简单的例子:

假设在一台服务器完成一项任务的时间为T

T1 创建线程的时间

T2 在线程中执行任务的时间,包括线程间同步所需时间

T3 线程销毁的时间

显然T = T1+T2+T3。注意这是一个极度简化的假设。

可以看出T1,T3是多线程本身的带来的开销,我们渴望减少T1,T3所用的时间,从而减少T的时间。但一些线程的使用者并没有注意到这一点,所以在程序中频繁的创建或销毁线程,这导致T1和T3在T中占有相当比例。显然这是突出了线程的弱点(T1,T3),而不是优点(并发性)。

线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。

线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目。在看一个例子:

假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。我们比较利用线程池技术和不利于线程池技术的服务器处理这些请求时所产生的线程总数。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目或者上限(以下简称线程池尺寸),而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池尺寸是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。

这些都是假设,不能充分说明问题,下面我将讨论线程池的简单实现并对该程序进行对比测试,以说明线程技术优点及应用领域。

2线程池的简单实现

一般一个简单线程池至少包含下列组成部分。

  1. 线程池管理器(ThreadPoolManager):用于创建并管理线程池
  2. 工作线程(WorkThread): 线程池中线程
  3. 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行。
  4. 任务队列:用于存放没有处理的任务。提供一种缓冲机制。

下面是代码:

全局文件:

  1. /**********************************
  2. *@author<ahref="mailto:wallwind@yeah.net">wallwind@<spanstyle="color:#000000;">yeah.net</span></a>
  3. *@date2012/06/13
  4. *Lastupdate:2012/06/13
  5. *License:LGPL
  6. *
  7. **********************************/
  8. #ifndef_GLOBAL_H_
  9. #define_GLOBAL_H_
  10. #include<sys/types.h>
  11. #include<sys/time.h>
  12. #include<unistd.h>/**/
  13. #include<stdarg.h>
  14. #include<stddef.h>/*offsetof()*/
  15. #include<stdio.h>
  16. #include<stdlib.h>
  17. #include<errno.h>
  18. #include<string.h>
  19. #include<signal.h>
  20. #include<pwd.h>
  21. #include<grp.h>
  22. #include<dirent.h>
  23. #include<glob.h>
  24. #include<sys/vfs.h>/*statfs()*/
  25. #include<sys/uio.h>
  26. #include<sys/stat.h>
  27. #include<fcntl.h>
  28. #include<sys/wait.h>
  29. #include<sys/mman.h>
  30. #include<sys/resource.h>
  31. #include<sched.h>
  32. #include<sys/socket.h>
  33. #include<netinet/in.h>
  34. #include<netinet/tcp.h>/*TCP_NODELAY,TCP_CORK*/
  35. #include<arpa/inet.h>
  36. #include<netdb.h>
  37. #include<sys/un.h>
  38. #include<time.h>/*tzset()*/
  39. #include<malloc.h>/*memalign()*/
  40. #include<limits.h>/*IOV_MAX*/
  41. #include<sys/ioctl.h>
  42. #include<sys/sysctl.h>
  43. #include<crypt.h>
  44. #include<sys/utsname.h>/*uname()*/
  45. #include<semaphore.h>
  46. #include<sys/epoll.h>
  47. #include<poll.h>
  48. #include<sys/syscall.h>
  49. #include<pthread.h>
  50. #endif


thread.c文件

  1. /**********************************
  2. *@authorwallwind@yeah.net
  3. *@date2012/06/13
  4. *Lastupdate:2012/06/13
  5. *License:LGPL
  6. *
  7. **********************************/
  8. #ifndef_THPOOL_
  9. #define_THPOOL_
  10. #include"global.h"
  11. /**
  12. 定义一个任务节点
  13. **/
  14. typedefvoid*(*FUNC)(void*arg);
  15. typedefstruct_thpool_job_t{
  16. //void*(*function)(void*arg);//函数指针
  17. FUNCfunction;
  18. void*arg;//函数参数。
  19. struct_thpool_job_t*prev;//指向上一个节点
  20. struct_thpool_job_t*next;//指向下一个节点
  21. }thpool_job_t;
  22. /**
  23. 定义一个工作队列
  24. **/
  25. typedefstruct_thpool_job_queue{
  26. thpool_job_t*head;//队列头指针
  27. thpool_job_t*tail;//队列末尾指针
  28. intjobN;//任务数
  29. sem_t*queueSem;//x信号量
  30. }thpool_jobqueue;
  31. /**
  32. 线程池
  33. **/
  34. typedefstruct_thpool_t{
  35. pthread_t*threads;////线程指针数
  36. intthreadsN;////线程数
  37. thpool_jobqueue*jobqueue;//指向队列指针
  38. }thpool_t;
  39. typedefstructthread_data{
  40. pthread_mutex_t*mutex_p;
  41. thpool_t*tp_p;
  42. }thread_data;
  43. //初始化线程池内部的线程数
  44. thpool_t*thpool_init(intthreadN);
  45. voidthpool_thread_do(thpool_t*tp_p);
  46. intthpool_add_work(thpool_t*tp_p,void*(*function_p)(void*),void*arg_p);
  47. voidthpool_destroy(thpool_t*tp_p);
  48. intthpool_jobqueue_init(thpool_t*tp_p);
  49. voidthpool_jobqueue_add(thpool_t*tp_p,thpool_job_t*newjob_p);
  50. intthpool_jobqueue_removelast(thpool_t*tp_p);
  51. thpool_job_t*thpool_jobqueue_peek(thpool_t*tp_p);
  52. voidthpool_jobqueue_empty(thpool_t*tp_p);
  53. #endif

thread.c

  1. /**********************************
  2. *@authorwallwind@yeah.net
  3. *@date2012/06/13
  4. *Lastupdate:2012/06/13
  5. *License:LGPL
  6. *
  7. **********************************/
  8. #include"global.h"
  9. #include"Thread.h"
  10. #include<errno.h>
  11. staticintthpool_keepalive=1;
  12. /*创建互斥量,并初始化*/
  13. pthread_mutex_tmutex=PTHREAD_MUTEX_INITIALIZER;/*usedtoserializequeueaccess*/
  14. thpool_t*thpool_init(intthreadN)
  15. {
  16. thpool_t*thpool;
  17. if(!threadN||threadN<1)
  18. threadN=1;
  19. ///分配线程池内存
  20. thpool=(thpool_t*)malloc(sizeof(thpool_t));
  21. if(thpool==NULL)
  22. {
  23. printf("mallocthpool_terror");
  24. returnNULL;
  25. }
  26. //分配线程数
  27. thpool->threadsN=threadN;
  28. thpool->threads=(pthread_t*)malloc(threadN*sizeof(pthread_t));
  29. if(thpool->threads==NULL)
  30. {
  31. printf("mallocthpool->threadserror");
  32. returnNULL;
  33. }
  34. if(thpool_jobqueue_init(thpool))
  35. return-1;
  36. thpool->jobqueue->queueSem=(sem_t*)malloc(sizeof(sem_t));
  37. sem_init(thpool->jobqueue->queueSem,0,1);
  38. intt;
  39. for(t=0;t<threadN;t++)
  40. {
  41. pthread_create(&(thpool->threads[t]),NULL,(void*)thpool_thread_do,(void*)thpool);
  42. }
  43. returnthpool;
  44. }
  45. voidthpool_destroy(thpool_t*tp_p)
  46. {
  47. inti;
  48. thpool_keepalive=0;
  49. for(i=0;i<(tp_p->threadsN);i++)
  50. {
  51. if(sem_post(tp_p->jobqueue->queueSem))
  52. {
  53. fprintf(stderr,"thpool_destroy():Couldnotbypasssem_wait()\n");
  54. }
  55. }
  56. if(sem_post(tp_p->jobqueue->queueSem)!=0)
  57. {
  58. fprintf(stderr,"thpool_destroy():Couldnotdestroysemaphore\n");
  59. }
  60. for(i=0;i<(tp_p->threadsN);i++)
  61. {
  62. pthread_join(tp_p->threads[i],NULL);
  63. }
  64. thpool_jobqueue_empty(tp_p);
  65. free(tp_p->threads);
  66. free(tp_p->jobqueue->queueSem);
  67. free(tp_p->jobqueue);
  68. free(tp_p);
  69. }
  70. ////对双向队列初始化
  71. /*Initialisequeue*/
  72. intthpool_jobqueue_init(thpool_t*tp_p){
  73. tp_p->jobqueue=(thpool_jobqueue*)malloc(sizeof(thpool_jobqueue));/*MALLOCjobqueue*/
  74. if(tp_p->jobqueue==NULL)return-1;
  75. tp_p->jobqueue->tail=NULL;
  76. tp_p->jobqueue->head=NULL;
  77. tp_p->jobqueue->jobN=0;
  78. return0;
  79. }
  80. ////
  81. voidthpool_thread_do(thpool_t*tp_p)
  82. {
  83. while(thpool_keepalive==1)
  84. {
  85. if(sem_wait(tp_p->jobqueue->queueSem))///线程阻塞,等待通知直到消息队列有数据
  86. {
  87. perror("thpool_thread_do():Waitingforsemaphore");
  88. exit(1);
  89. }
  90. if(thpool_keepalive)
  91. {
  92. //(void*)(*function)(void*arg);
  93. FUNCfunction;
  94. void*arg_buff;
  95. thpool_job_t*job_p;
  96. pthread_mutex_lock(&mutex);
  97. job_p=thpool_jobqueue_peek(tp_p);
  98. function=job_p->function;
  99. arg_buff=job_p->arg;
  100. if(thpool_jobqueue_removelast(tp_p))
  101. return;
  102. pthread_mutex_unlock(&mutex);
  103. function(arg_buff);//运行你的方法。
  104. free(job_p);////释放掉。
  105. }
  106. else
  107. {
  108. return;
  109. }
  110. }
  111. return;
  112. }
  113. //得到第一个队列的一个节点
  114. thpool_job_t*thpool_jobqueue_peek(thpool_t*tp_p)
  115. {
  116. returntp_p->jobqueue->tail;
  117. }
  118. /////删除队列的最后一个节点
  119. intthpool_jobqueue_removelast(thpool_t*tp_p)
  120. {
  121. if(tp_p==NULL)
  122. return-1;
  123. thpool_job_t*theLastJob;
  124. theLastJob=tp_p->jobqueue->tail;
  125. switch(tp_p->jobqueue->jobN)
  126. {
  127. case0:
  128. return-1;
  129. case1:
  130. tp_p->jobqueue->head=NULL;
  131. tp_p->jobqueue->tail=NULL;
  132. break;
  133. default:
  134. theLastJob->prev->next=NULL;
  135. tp_p->jobqueue->tail=theLastJob->prev;
  136. }
  137. (tp_p->jobqueue->jobN)--;
  138. intreval;
  139. sem_getvalue(tp_p->jobqueue->queueSem,&reval);
  140. return0;
  141. }
  142. voidthpool_jobqueue_add(thpool_t*tp_p,thpool_job_t*newjob_p)
  143. {
  144. newjob_p->next=NULL;
  145. newjob_p->prev=NULL;
  146. thpool_job_t*oldFirstJob;
  147. oldFirstJob=tp_p->jobqueue->head;
  148. switch(tp_p->jobqueue->jobN)
  149. {
  150. case0:
  151. tp_p->jobqueue->head=newjob_p;
  152. tp_p->jobqueue->tail=newjob_p;
  153. break;
  154. default:
  155. oldFirstJob->prev=newjob_p;
  156. newjob_p->next=oldFirstJob;
  157. tp_p->jobqueue->head=newjob_p;
  158. }
  159. (tp_p->jobqueue->jobN)++;
  160. sem_post(tp_p->jobqueue->queueSem);
  161. intreval;
  162. sem_getvalue(tp_p->jobqueue->queueSem,&reval);
  163. return;
  164. }
  165. /////将消息加入线程池
  166. intthpool_add_work(thpool_t*tp_p,void*(*function_p)(void*),void*arg_p)
  167. {
  168. thpool_job_t*newjob;
  169. newjob=(thpool_job_t*)malloc(sizeof(thpool_job_t));
  170. if(newjob==NULL)
  171. {
  172. fprintf(stderr,"thpool_add_work():Couldnotallocatememoryfornewjob\n");
  173. exit(1);
  174. }
  175. newjob->function=function_p;
  176. newjob->arg=arg_p;
  177. pthread_mutex_lock(&mutex);
  178. thpool_jobqueue_add(tp_p,newjob);
  179. pthread_mutex_unlock(&mutex);
  180. return0;
  181. }
  182. ///清空队列
  183. voidthpool_jobqueue_empty(thpool_t*tp_p)
  184. {
  185. thpool_job_t*curjob;
  186. curjob=tp_p->jobqueue->tail;
  187. while(tp_p->jobqueue->jobN)
  188. {
  189. tp_p->jobqueue->tail=curjob->prev;
  190. free(curjob);
  191. curjob=tp_p->jobqueue->tail;
  192. (tp_p->jobqueue->jobN)--;
  193. }
  194. tp_p->jobqueue->head=NULL;
  195. tp_p->jobqueue->tail=NULL;
  196. }


下面是mian函数文件

  1. /**********************************
  2. *@authorwallwind@yeah.net
  3. *@date2012/06/13
  4. *Lastupdate:2012/06/13
  5. *License:LGPL
  6. *
  7. **********************************/
  8. #include"global.h"
  9. #include"Thread.h"
  10. void*task1()
  11. {
  12. printf("#Threadworking:%u\n",(int)pthread_self());
  13. printf("Task1running..\n");
  14. }
  15. /*Somearbitrarytask2*/
  16. void*task2(inta)
  17. {
  18. printf("#Threadworking:%u\n",(int)pthread_self());
  19. printf("Task2running..\n");
  20. printf("%d\n",a);
  21. }
  22. intmain()
  23. {
  24. printf("~~~~~~~~~~~");
  25. thpool_t*thpool;
  26. inti;
  27. thpool=thpool_init(5);
  28. puts("Adding20taskstothreadpool");
  29. inta=54;
  30. for(i=0;i<20;i++){
  31. thpool_add_work(thpool,(void*)task1,NULL);
  32. thpool_add_work(thpool,(void*)task2,(void*)a);
  33. };
  34. puts("Willkillthreadpool");
  35. thpool_destroy(thpool);
  36. }


在linux下写程序少不了makefile文件。于是我自己写了一个比较通用的makefile文件。仅供大家参考

makefile 代码

  1. SRCS=$(wildcard*.c)
  2. OBJS=$(SRCS:.c=.o)
  3. CC=gcc
  4. INCLUDES=-I/
  5. LIBS=-L/-lpthread
  6. CCFLAGS=-g-Wall-O0
  7. cThreadPool:$(OBJS)
  8. $(CC)$^-o$@$(INCLUDES)$(LIBS)
  9. %.o:%.cpp
  10. $(CC)-c$<$(CCFLAGS)
  11. clean:
  12. rm*.o
  13. .PHONY:clean

运行效果如下图

  1. ./test
  2. Createdthread0inpool
  3. Createdthread1inpool
  4. Createdthread2inpool
  5. Createdthread3inpool
  6. Adding20taskstothreadpool
  7. #Threadworking:3086773136
  8. Task1running..
  9. #Threadworking:3076283280
  10. Task2running..
  11. 54
  12. #Threadworking:3086773136
  13. Task1running..
  14. #Threadworking:3086773136
  15. Task2running..
  16. 54
  17. #Threadworking:3076283280
  18. Task1running..
  19. #Threadworking:3086773136
  20. Task2running..
  21. 54
  22. #Threadworking:3076283280
  23. Task1running..
  24. #Threadworking:3086773136
  25. Task2running..
  26. 54
  27. #Threadworking:3076283280
  28. Task1running..
  29. #Threadworking:3086773136
  30. Task2running..
  31. 54
  32. #Threadworking:3076283280
  33. Task1running..
  34. #Threadworking:3086773136
  35. Task2running..
  36. 54
  37. #Threadworking:3076283280
  38. Task1running..
  39. #Threadworking:3086773136
  40. Task2running..
  41. 54
  42. #Threadworking:3076283280
  43. Task1running..
  44. #Threadworking:3086773136
  45. Task2running..
  46. 54
  47. #Threadworking:3076283280
  48. Task1running..
  49. #Threadworking:3086773136
  50. Task2running..
  51. 54
  52. #Threadworking:3076283280
  53. Task1running..
  54. #Threadworking:3086773136
  55. Task2running..
  56. 54
  57. Willkillthreadpool


线程池也是参考了别人的。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics