import ctypes
import threading
import time
import queue
import types
PENDING = "pending"
CLOSED = "closed"
RUNNING = "running"
TIMEOUT = "timeout"
POOL_EXCEPTION = "pool_exception"
KILLED_AT_THE_END = "killed at the end"
STATUS = [PENDING, CLOSED, RUNNING, TIMEOUT, KILLED_AT_THE_END, POOL_EXCEPTION]
EXIT_THREAD_POOL = "EXIT_THREAD_POOL"
class Thread(threading.Thread):
def __init__(self, func, args=(), daemon=True, name=None):
super(Thread, self).__init__(daemon=daemon)
self.__set_thread_name(name)
parent_thread = threading.current_thread()
self.func = func
self.args = args
self.result = None
self.exc = None
self.is_killed = False
self.child_threads = []
if hasattr(parent_thread, 'child_threads'):
parent_thread.child_threads.append(self)
def run(self):
try:
self.result = self.func(*self.args)
except SystemExit:
return
except Exception as e:
self.exc = e
def join(self, timeout=None):
if timeout and (isinstance(timeout, float)
or isinstance(timeout, int)):
super(Thread, self).join(timeout=timeout)
else:
super(Thread, self).join()
if self.exc:
raise self.exc
return
def get_result(self):
try:
return self.result
except Exception:
return None
def _raise_exc(self, exc_obj):
if not self.isAlive():
return
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(self.ident), ctypes.py_object(exc_obj))
if res == 0:
raise RuntimeError("Not existent thread id.")
elif res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(self.ident, None)
raise SystemError("PyThreadState_SetAsyncExc failed.")
def kill(self):
if hasattr(self, 'child_threads'):
for child_thread in self.child_threads:
if child_thread.is_alive():
child_thread.kill()
self._raise_exc(SystemExit)
self.is_killed = True
def __set_thread_name(self, name):
if name:
self._name = name
class ThreadPool(object):
def __init__(self, max_workers=1, timeout_total=None, timeout_each=None,
logger=None):
"""
:param max_workers: Max numbers of threads that program allows to run
at the same time
:param timeout_total: The longest time of this thread pool exists.None
means this thread pool will wait until running threads finished.
:param logger: Record log.
"""
self._logger = logger
# Max numbers of threads that program allows to run at the same time
if isinstance(max_workers, int):
self._max_workers = max_workers
else:
if self._logger:
self._logger.error("ThreadPool max_workers parameter type "
"should be int,not"
" {}".format(type(max_workers)))
raise Exception("ThreadPool max_workers parameter type should "
"be int,not {}".format(type(max_workers)))
# A switch when shutdown thread pool.False means remove threads not
# running then kill running threads.True means remove threads not
# running and wait running threads finished.Default is True
self._shutdown_wait = True
# Workers are working threads,each thread will start a task.When any
# workers are idle,the worker will get task from reserves_threads.
self._workers = list()
# The longest time of this thread pool exists.None means this thread
# pool will wait until running threads finished.
self._timeout_total = None
if timeout_total:
if isinstance(timeout_total, int) or isinstance(timeout_total,
float):
self._timeout_total = timeout_total
else:
if self._logger:
self._logger.error("ThreadPool timeout_total parameter "
"type should be int or float,not"
" {}".format(type(timeout_total)))
raise Exception("ThreadPool timeout_total parameter type "
"should be int or float,not "
"{}".format(type(timeout_total)))
# Record added threads
self._reserves_threads = queue.SimpleQueue()
# Record running threads
self._running_threads = list()
# Every time submitting thread into ThreadPool,this thread will
# get lock so that other threads can't submit until this thread
# submits successfully
self._change_reserves_threads_lock = threading.Lock()
# Every time change running_threads will get this lock
self._change_running_threads_lock = threading.Lock()
# Time of creating this thread pool
self._thread_pool_start = time.time()
self._find_threads_future = dict()
self._timeout_each = None
if timeout_each:
if isinstance(self._timeout_each, int) or \
not isinstance(self._timeout_each, float):
self._timeout_each = timeout_each
else:
if self._logger:
self._logger.error("submit function timeout_each parameter"
" type should be int or float,not "
"{}".format(type(self._timeout_each)))
raise Exception("submit function timeout_each parameter type "
"should be int or float,not "
"{}".format(type(self._timeout_each)))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
time.sleep(1)
if self._timeout_total:
this_time = time.time()
while this_time - self._thread_pool_start < self._timeout_total:
if self._reserves_threads.qsize() == 0 and \
len(self._running_threads) == 0:
break
this_time = time.time()
self.shutdown()
return False
def set_shutdown_wait(self, switch):
"""
Set shutdown switch.
:param switch: The switch when thread pool shutdown.False means remove
threads not running then kill running threads.True means remove threads
not running and wait running threads finished.Default is True.
:return:
"""
if isinstance(switch, bool):
self._shutdown_wait = switch
else:
raise Exception("set_shutdown_switch get parameter type wrong."
"parameter switch type should be bool,not {} "
"".format(type(switch)))
def get_reserves_threads(self):
return self._reserves_threads
def submit(self, func=None, args=(), record_str=""):
"""
Packing task to a thread.Add the thread into thread pool if the
thread pool has free worker,otherwise wait the thread pool has free
worker.Thread will start to work as soon as added.
:param func:The task function.
:param args:The parameters task required.
:param record_str:The thing or name this thread collect.
:return:
"""
if func:
if not isinstance(func, types.FunctionType) and \
not isinstance(func, types.MethodType):
if self._logger:
self._logger.error("submit function func parameter type "
"should be function,not"
" {}".format(type(func)))
raise Exception("submit function func parameter type should be"
" function,not {}".format(type(func)))
else:
if self._logger:
self._logger.error("submit function func parameter is None")
raise Exception("submit function func parameter is None")
if not isinstance(args, tuple):
if self._logger:
self._logger.error("submit function args parameter type should"
" be tuple,not {}".format(type(args)))
raise Exception("submit function args parameter type should be "
"tuple,not {}".format(type(args)))
if not isinstance(record_str, str):
if self._logger:
self._logger.error("submit function record_str parameter type "
"should be str,not"
" {}".format(type(record_str)))
raise Exception("submit function record_str parameter type should"
" be str,not {}".format(type(record_str)))
thread = Thread(func, args)
if thread not in self._running_threads:
future = self._record_added_threads(thread)
self._add_worker(self._timeout_each, record_str)
else:
if self._logger:
self._logger.warning("thread had been added!")
return self._find_threads_future[thread]
return future
def _record_added_threads(self, thread=None):
"""
Add a thread to reserves queue.
:param thread:The thread to add.
:return:
"""
if thread:
with self._change_reserves_threads_lock:
self._reserves_threads.put(thread)
future = Future(thread, PENDING, self._logger)
self._find_threads_future[thread] = future
return future
else:
raise Exception("record_added_threads function requires "
"thread parameter")
def _add_worker(self, timeout_each, record_str):
"""
Add worker.Workers are working threads,each thread will start a task.
When any workers are idle,the worker will get task from reserves
threads.
:param timeout_each: The time each thread within this thread pool
lasts. If the time is exceeded, an error will be reported to terminate.
None means this thread pool will wait until running threads finished.
:param record_str:The thing or name this thread collect.
:return:
"""
if len(self._workers) < self._max_workers:
worker = Thread(self._run_added_thread,
(timeout_each, record_str))
worker.setDaemon(True)
self._workers.append(worker)
worker.start()
def _run_added_thread(self, timeout_each, record_str):
"""
Workers will continuously get task to run from reserves threads.Work
thread will be killed only in thread pool shutdown.
:param timeout_each:The time each thread within this thread pool lasts.
If the time is exceeded, an error will be reported to terminate.None
means this thread pool will wait until running threads finished.
:param record_str:The thing or name this thread collect.
:return:
"""
while True:
with self._change_reserves_threads_lock:
if self._reserves_threads.qsize() != 0:
reserve_thread = self._reserves_threads.get()
else:
reserve_thread = None
if not reserve_thread:
continue
if isinstance(reserve_thread, str):
if reserve_thread == EXIT_THREAD_POOL:
self._reserves_threads.put(EXIT_THREAD_POOL)
break
else:
if self._logger:
self._logger.error("reserves_threads str abnormal")
if reserve_thread in self._running_threads:
if self._logger:
self._logger.error("The thread ready to run has been "
"running!")
continue
else:
reserve_thread.start()
# get lock then add into _running_threads or there will be
# a process this thread not in _running_threads and
# _reserves_threads,shutdown function will not manage this
# thread
self._running_threads.append(reserve_thread)
future = self._find_threads_future.get(reserve_thread)
future.set_status(RUNNING)
running_thread = reserve_thread
self._run_added_thread_end(
timeout_each, running_thread, future, record_str)
def _run_added_thread_end(self, timeout_each, running_thread,
future, record_str):
"""
Run added thread end
:param timeout_each:
:param running_thread:
:param future:
:param record_str:
:return:
"""
if timeout_each:
try:
running_thread.join(timeout=timeout_each)
except Exception as ex:
if self._logger:
self._logger.error(str(ex))
self._end_of_thread(running_thread, future, POOL_EXCEPTION)
return
if not running_thread.is_alive():
self._end_of_thread(running_thread, future, CLOSED)
else:
running_thread.kill()
self._end_of_thread(running_thread, future, TIMEOUT)
if self._logger:
self._logger.warning("{} collect break because "
"timeout".format(record_str))
else:
try:
running_thread.join()
except Exception as ex:
if self._logger:
self._logger.error(str(ex))
self._end_of_thread(running_thread, future, POOL_EXCEPTION)
return
if not running_thread.is_alive():
self._end_of_thread(running_thread, future, CLOSED)
else:
if self._logger:
self._logger.error("Thread abnormal.Thread is alive "
"after join()")
self._end_of_thread(running_thread, future, POOL_EXCEPTION)
future.set_exception = Exception("Thread abnormal.Thread "
"is alive after join()")
return
def _end_of_thread(self, thread, future, status):
if thread in self._running_threads:
self._running_threads.remove(thread)
future.set_status(status)
else:
if status == KILLED_AT_THE_END:
future.set_status(status)
else:
if self._logger:
self._logger.error("thread status abnormal,thread not "
"in _running_threads")
future.set_status(POOL_EXCEPTION)
future.set_exception = Exception("thread status abnormal,"
"thread not in _running_"
"threads")
def shutdown(self):
"""
Clear threads reserves threads queue and running threads list when
closing thread pool.
:return:
"""
with self._change_reserves_threads_lock:
while self._reserves_threads.qsize() != 0:
reserves_thread = self._reserves_threads.get()
self._find_threads_future.get(reserves_thread).set_status(
KILLED_AT_THE_END)
self._reserves_threads.put(EXIT_THREAD_POOL)
if self._shutdown_wait:
while len(self._running_threads) != 0:
time.sleep(5)
else:
for running_thread in self._running_threads:
running_thread.kill()
self._end_of_thread(running_thread,
self._find_threads_future.get(
running_thread), KILLED_AT_THE_END)
self._max_workers = 0
class Future(object):
def __init__(self, thread, stauts, logger=None):
self._logger = logger
if isinstance(thread, Thread):
self._thread = thread
else:
if self._logger:
self._logger.error("Thread abnormal.Thread is alive "
"after join()")
raise Exception("Future thread parameter type is not "
"Thread,not {}".format(type(thread)))
self._stauts = None
if isinstance(stauts, str):
if stauts in STATUS:
self._stauts = stauts
else:
if self._logger:
self._logger.error("Future status abnormal,Future status "
"should be within {0}, not"
" {1}".format(STATUS, stauts))
raise Exception("Future status abnormal,Future status should "
"be within {0}, not {1}"
"".format(STATUS, stauts))
else:
if self._logger:
self._logger.error("Future status is not str")
raise Exception("Future status is not str")
def exception(self):
if self._thread.exc:
return self._thread.exc
else:
return None
def set_exception(self, ex):
if isinstance(ex, Exception):
self._thread.exc = ex
return True
else:
return False
@property
def status(self):
return self._stauts
def set_status(self, stauts):
if isinstance(stauts, str):
if stauts in STATUS:
self._stauts = stauts
else:
if self._logger:
self._logger.error("Future status abnormal,Future status "
"should be within {0}, not"
" {1}".format(STATUS, stauts))
raise Exception("Future status abnormal,Future status should "
"be within {0}, not {1}"
"".format(STATUS, stauts))
else:
if self._logger:
self._logger.error("Future status is not str")
raise Exception("Future status is not str")
def get_result(self):
if self.status == CLOSED:
try:
return self._thread.get_result()
except Exception as ex:
if self._logger:
self._logger.error(ex)
return None
elif self.status == TIMEOUT:
return None
else:
return None
用法案例:
with ThreadPool(timeout_total=3600, timeout_each=300,
max_workers=int(THREAD_NEED * 5),
logger=self.logger) as my_pool:
for ele in exec_list:
result_record_list.append(
my_pool.submit(func=self._do_something,
args=ele.parameters))
for future in result_record_list:
print(future.status)
文章评论