#include #include #include #include #include #include #include #include "list.h" #include "pthread_pool.h" #include "task_queue.h" #define mass( _cond, _msg ) \ if( _cond ) { \ perror( _msg ); \ } \ #define ALLOCATE( type, size ) (type*)malloc(sizeof(type)*size) #define STACK_UNIT 1000 #define MAX_STACK_UNIT 1000000 #define MAX_POOL_SIZE 100 /* struct __pthread_list_t -- linked list of the pthreads. * @next: next node. * @thread: id of pthread. */ typedef struct __pthread_list_t { struct __pthread_list_t *next; pthread_t thread; int free; } pthread_list_t; /* struct __pthread_pool_attr_t * @attr: attributes of pthreads. * @pool_size: size of the pool. * @min_free: minimal value of the free threads. * @max_free: maximum value of the free threads. * @step: step for inc/dec count of threads. */ struct __pthread_pool_attr_t { pthread_attr_t *attr; int pool_size, min_free, max_free, step; }; /* struct sync_t -- syncronization objects. * @mutex: pthread mutex. * @cond: condition varibale. */ struct sync_t { pthread_cond_t cond; pthread_cond_t master_cond; pthread_mutex_t work_mutex; pthread_mutex_t master_mutex; pthread_mutex_t lock_queue; }; /* * struct __pthread_pool_t -- pool stuff. * @head: head of the pthread_list_t. * @tail: tail of the pthread_list_t. * @attr: attributes of the pool. * @queue: working queue for the pool. * @sync: synchronization objects. */ struct __pthread_pool_t { pthread_list_t *head, *tail; pthread_pool_attr_t *attr; task_queue_t *queue; struct sync_t *sync; pthread_t master_id; int cur_len; int free; }; static void* work_func( void* ); static void* pthread_master( void* ); static int pthread_init_slave( pthread_pool_t*); static int init_pthread_node( pthread_list_t**,pthread_attr_t*, func_t, void*); static void pthread_pool_init_sync( struct sync_t** ); static void pthread_pool_destroy_sync( struct sync_t* ); static void clean_on_cancel( void* ); static pthread_list_t *get_pthread_node( pthread_pool_t* pool, pthread_t id ); /* pthread_pool_init_sync -- create synchronization objects. * @sync: struct of objects. */ static void pthread_pool_init_sync( struct sync_t ** sync ) { #define sync (*sync) sync = ALLOCATE( struct sync_t, (1)); pthread_mutex_init( &sync->work_mutex, NULL ); pthread_mutex_init( &sync->master_mutex, NULL ); pthread_mutex_init( &sync->lock_queue, NULL ); pthread_cond_init( &sync->cond, NULL ); pthread_cond_init( &sync->master_cond, NULL ); #undef sync } /* pthread_pool_destroy_sync -- destroy synchronization objects. * @sync: ptr to object's storage. */ static void pthread_pool_destroy_sync( struct sync_t* sync ) { mass( pthread_mutex_destroy( &sync->work_mutex ) < 0, "mutex_destroy" ); mass( pthread_mutex_destroy( &sync->master_mutex) < 0, "mutex_destroy" ); mass( pthread_mutex_destroy( &sync->lock_queue ) < 0, "mutex_destroy" ); mass( pthread_cond_destroy( &sync->cond ) < 0, "cond_destroy" ); mass( pthread_cond_destroy( &sync->master_cond ) < 0, "cond_destroy" ); free( sync ); } /* * pthread_pool_attr_init -- initialize pool's attributes. * @attr: ptr to pthread_attr_t, attrubutes of the pthread. * @size: count of threads. * @min: minimal value of the free threads. * @max: maximum value of the free threads. * @step: step for inrement/decrement count of threads. * NOTE: OBVIOUSLY * @max must be greater than @min. * @size must be greater than @min. * @size must be less than @max. */ pthread_pool_attr_t * pthread_pool_attr_init( pthread_attr_t *attr, int size, int min, int max, int step){ pthread_pool_attr_t *pool_attr; size_t stack; if( (pool_attr = ALLOCATE( pthread_pool_attr_t, (1))) == NULL ) return NULL; if( attr == NULL ) { attr = ALLOCATE( pthread_attr_t, (1)); pthread_attr_init( attr ); } stack = sizeof(double)*size*STACK_UNIT+ 2*MAX_STACK_UNIT; pthread_attr_setstacksize( attr, stack ); pool_attr->attr = attr; pool_attr->min_free = min; pool_attr->max_free = max; pool_attr->pool_size = size; pool_attr->step = step; return pool_attr; } /* * pthread_pool_attr -- destory pool's attributes. * @attr: attributes of the pool. */ void pthread_pool_attr_destroy( pthread_pool_attr_t *attr ) { if( attr ) { if( attr->attr) pthread_attr_destroy( attr->attr ); free( attr ); } } static void clean_on_cancel( void* args ) { pthread_mutex_t *mutex = (pthread_mutex_t*)args; pthread_mutex_unlock( mutex ); } /* work_func -- main slave subroutine. * @args: ptr to the main pool. * return: state of execution casted to void*. */ void *work_func( void* args ) { pthread_pool_t *pool = (pthread_pool_t*)args; pthread_list_t *self; task_t *task; void* ret; pthread_mutex_lock( &pool->sync->master_mutex ); self = get_pthread_node( pool, pthread_self() ); pthread_mutex_unlock( &pool->sync->master_mutex ); if( self == NULL ) { #ifdef debug fprintf( stderr, "Self in NULL.\n" ); #endif pthread_exit( NULL ); } #define mutex pool->sync->work_mutex #define master_mutex pool->sync->master_mutex #define lock_queue pool->sync->lock_queue #define cond pool->sync->cond #define queue pool->queue /* * NOTE: Threads are blocked on CV and wait until an element is * added to the queue. */ pthread_cleanup_push( clean_on_cancel, &mutex ); pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL ); while( 1 ) { pthread_mutex_lock( &mutex ); while( queue->size == 0 ) pthread_cond_wait( &cond, &mutex ); pthread_mutex_unlock( &mutex ); pthread_setcancelstate( PTHREAD_CANCEL_DISABLE, NULL ); self->free = 0; pthread_mutex_lock( &master_mutex ); pool->free--; pthread_mutex_unlock( &master_mutex ); pthread_mutex_lock( &lock_queue ); task = dequeue( queue ); pthread_mutex_unlock( &lock_queue ); if( task ) { ret = task->func( task->args ); free( task ); self->free = 1; pthread_mutex_lock( &master_mutex ); pool->free++; pthread_mutex_unlock( &master_mutex ); } else { #ifdef debug fputs( "dequeue: no tasks.\n", stderr ); #endif } if( pool->free >= pool->attr->max_free || pool->free <= pool->attr->min_free ) { pthread_cond_signal( &pool->sync->master_cond ); } pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL ); pthread_testcancel(); } pthread_cleanup_pop( 0 ); #undef lock_queue #undef queue #undef cond #undef mutex #undef master_mutex } /* * init_pthread_node -- initialize a pthread. * @node: node in the pthread_list_t. * @attr: attribute of the pool. * @func: funtion for execution in a thread. * @args: arguments for the @func. * return: zero value on success, other values otherwise. */ static int init_pthread_node( pthread_list_t **node, pthread_attr_t *attr, func_t func, void* args) { if( (*node = ALLOCATE( pthread_list_t, (1) )) == NULL ) { perror("allocate" ); return -1; } (*node)->next = NULL; (*node)->free = 1; if( pthread_create( &(*node)->thread, attr, func, args ) < 0 ) { perror("pthread_create" ); return -1; } if( pthread_detach( (*node)->thread ) < 0 ) { perror("pthread_detach"); return -1; } return 0; } int pthread_pool_add_task( pthread_pool_t *pool, task_t *task ) { pthread_mutex_lock( &pool->sync->lock_queue ); enqueue( pool->queue, task ); pthread_cond_signal( &pool->sync->cond ); pthread_mutex_unlock( &pool->sync->lock_queue ); return 0; } /* * pthread_init_slave -- create slave threads. * @pool: ptr to the pool ( for saving pthread_list_t ). * return: zero value on success, other values otherwise. */ static int pthread_init_slave( pthread_pool_t *pool ) { pthread_list_t *tmp_node = NULL; int i; #define attrib pool->attr->attr #define sync pool->sync if( init_pthread_node( &pool->head, attrib, work_func, (void*)pool ) < 0 ) { perror("init_pthread_node"); return -1; } pool->tail = pool->head; for( i = 1; i < pool->attr->pool_size; i++ ) { if( init_pthread_node( &tmp_node, attrib, work_func, (void*)pool) < 0 ) { perror( "init_pthread_node" ); return -1; } pool->tail->next = tmp_node; pool->tail = tmp_node; } pool->cur_len = i; #ifdef debug fprintf( stderr, "Created %d threads\n", i ); #endif #undef attrib #undef sync return 0; } /* * pthread_master -- master pthread's routine. * @args: void* ptr to pthread_pool_t. */ static void* pthread_master( void* args ) { pthread_pool_t *pool = (pthread_pool_t*)args; #define master_mutex pool->sync->master_mutex #define cond pool->sync->cond #define master_cond pool->sync->master_cond #define lock_queue pool->sync->lock_queue #define attr pool->attr pthread_mutex_lock( &master_mutex ); if( pthread_init_slave( pool ) < 0 ) { perror("Master thread: pthread_init_slave"); return (void*)(-1); } pthread_mutex_unlock( &master_mutex ); pthread_cleanup_push( clean_on_cancel, &master_mutex ); pthread_setcancelstate( PTHREAD_CANCEL_ENABLE, NULL ); pthread_mutex_lock( &master_mutex ); pthread_cond_wait( &cond, &master_mutex ); pthread_mutex_unlock( &master_mutex ); while( 1 ) { /* Watch counts. Manage threads. */ pthread_mutex_lock( &master_mutex ); pthread_cond_wait( &master_cond, &master_mutex ); if( pool->free >= attr->max_free ) { pthread_pool_resize( pool, -attr->step ); } else if( pool->free <= attr->min_free || pool->free == 0 ) { pthread_pool_resize( pool, attr->step ); } pthread_mutex_unlock( &master_mutex ); pthread_testcancel(); } pthread_cleanup_pop( 0 ); return NULL; #undef attr #undef master_mutex #undef master_cond #undef lock_queue #undef cond } /* pthread_pool_resize -- change pool size. * @pool: target pool. * @step: count of step to create/destroy. * NOTE: if @step > 0 than increase, decrease otherwise. */ int pthread_pool_resize( pthread_pool_t *pool, int step ) { pthread_list_t *tmp, *tmp_prev; int i = step < 0 ? -step : step; if( step < 0 ) { /* Decrease. */ tmp = tmp_prev = pool->head; while( tmp && i ) { if( tmp->free ) { /* If free, delete node. */ #ifdef debug fprintf( stderr, "Delete thread: %d.\n",tmp->thread ); fprintf( stderr, "Free: %d. Size: %d.\n", pool->free, pool->cur_len ); #endif if( tmp == pool->head ) { pool->head = pool->head->next; } else { tmp_prev->next = tmp->next; } pthread_cancel( tmp->thread ); free( tmp ); tmp = tmp_prev->next; pool->cur_len--; pool->free--; i--; } else { tmp_prev = tmp; tmp = tmp->next; } } } else { /* Increase. */ while( i-- ) { init_pthread_node( &tmp, pool->attr->attr, work_func, (void*)pool ); pool->tail->next = tmp; pool->tail = tmp; pool->cur_len++; pool->free++; #ifdef debug fprintf( stderr, "Created thread: %d.\n",tmp->thread ); fprintf( stderr, "Free: %d. Size: %d.\n", pool->free, pool->cur_len ); #endif } } return 0; } /* * pthread_pool_init -- initialize a pool. * @pool: the pool. * @attr: attributes of the pool. * @func: fucntion for execution in the threads. * @args: arguments of the @func * return: zero value on success, other values otherwise. */ int pthread_pool_init( pthread_pool_t** pool, pthread_pool_attr_t *attr) { #define pool (*pool) pool = ALLOCATE( pthread_pool_t, (1)); pool->attr = attr; pool->queue = queue_init( ); pthread_pool_init_sync( &pool->sync ); pthread_create( &(pool->master_id), NULL, pthread_master, (void*)pool ); pthread_detach( (pool->master_id) ); while( pool->cur_len < attr->pool_size ) { usleep( 10 );} pool->free = pool->cur_len; #ifdef debug fprintf( stderr, "Pool inited: %d. Master ID: %d\n", pool->cur_len, pool->master_id ); #endif return 0; #undef pool } /* * pthread_pool_destroy -- destroy pool. * @pool: ptr to pthread_pool_t -- inited pool. * return: zero value on success, other values otherwise. */ int pthread_pool_destroy( pthread_pool_t *pool ) { pthread_list_t *tmp; while( pool->queue->size ) usleep( 100 ); pthread_cancel( pool->master_id ); while( pool->head ) { tmp = pool->head->next; #ifdef debug fprintf( "Destroy pthread.\n", stderr ); #endif /* * Waiting for cancellation. */ pthread_cond_broadcast( &pool->sync->cond ); pthread_cancel( pool->head->thread ); free( pool->head ); pool->head = tmp; } queue_destroy( pool->queue ); pthread_pool_attr_destroy( pool->attr ); pthread_pool_destroy_sync( pool->sync ); free( pool ); return 0; } int get_queue_size( pthread_pool_t *pool ) { return pool->queue->size; } static pthread_list_t *get_pthread_node( pthread_pool_t* pool, pthread_t id ) { pthread_list_t *tmp = pool->head; while( tmp ) { if( tmp->thread == id ) { return tmp; } tmp = tmp->next; } return NULL; }