当前位置:网站首页>Database connection pool

Database connection pool

2021-01-23 19:29:31 It's a beautiful laugh

High and issued a database connection scheme :

  1. Use the existing database connection pool ( As described below )

  2. Each execution unit establishes a connection for exclusive use ;( Efficiency is low , Difficult to maintain , Not recommended )

  3. Maintain the thread pool manually ( I am MAC Testing is currently the most efficient );

Measured code case :

  1 import time
  2 import pymysql
  3 from dbutils import persistent_db, pooled_db, simple_pooled_db, steady_db
  4 from multiprocessing.pool import ThreadPool
  5 from abc import abstractmethod,ABCMeta
  6 from queue import Queue
  7 # print(help(pooled_db.PooledDB))
  8 # pooled_db  Thread safety 
  9 # print(pymysql.threadsafety)
 10 
 11 db_conf = {"host": "127.0.0.1",
 12            "port": 3306,
 13            "user": "root",
 14            "password": "123456",
 15            "database": "userinfo"
 16            }
 17 
 18 
 19 class TaskConsumer(metaclass=ABCMeta):
 20     @abstractmethod
 21     def _task_worker(self, i):
 22         pass
 23 
 24     def consumer(self):
 25         task_pool = ThreadPool(self.task_pool_num)
 26         start_time = time.time()
 27         for i in range(self.task_num):
 28             task_pool.apply_async(func=self._task_worker, args=(i,))
 29         task_pool.close()
 30         task_pool.join()
 31         return time.time() - start_time
 32 
 33     def db_read(self, cur, sql, i):
 34         # time.sleep(2)
 35         cur.execute(sql)
 36         print(i, cur.fetchall())
 37 
 38 class DbPoolTest(TaskConsumer):
 39     def __init__(self, task_num, task_pool_num, max_conn, blocking=True, **db_conf):
 40         self.task_num = task_num
 41         self.task_pool_num = task_pool_num
 42         self.max_conn=max_conn
 43         self.blocking = blocking
 44         self.kwargs = db_conf
 45         self._get_db_pool()
 46 
 47     def _get_db_pool(self):
 48         self.db_pool = pooled_db.PooledDB(
 49             creator=pymysql,  #  Connect objects or conform to DB-API 2 Database module of ( Use the linked database module )
 50             mincached=self.max_conn,  #  The initial number of free connections in the pool ( The default value is 0 Indicates that no connection was established at startup )
 51             maxcached=0,  #  The maximum number of idle connections in the pool ( The default value is 0 or None Indicates that the pool size is unlimited )
 52             maxshared=0,
 53             #  Maximum number of shared connections allowed ( The default value is 0 or None Indicates that all connections are dedicated ); After reaching this maximum number , If the connection request has been made sharable , They will be shared . because pymysql and MySQLdb Wait for the module  threadsafety All for 1, All values regardless of setting to ,_maxcached For ever 0, So always all links are shared .
 54             maxconnections=self.max_conn,  #  The maximum number of connections normally allowed ( The default value is 0 Or none represents any number of connections )
 55             blocking=self.blocking,  #  Determine the behavior beyond the maximum ( If you set it to true, Block and wait until the connection is reduced , But by default, errors are reported )
 56             maxusage=None,  #  The maximum reuse times of a single connection ( The default value is 0 Or no representation of infinite reuse ); When the maximum number of uses of the connection is reached , The connection will automatically reset ( Close and reopen )
 57             setsession=None,  #  That can be used to prepare a conversation SQL An optional list of commands , for example  [“ Set the date style to German ”,...]
 58             reset=True,  #  How to reset a connection when it is returned to the pool (False or None For begin() Start the rollback transaction , For safety reasons , The default value is True Always trigger rollback )
 59             failures=None,  #  If default (OperationalError,InternalError) Insufficient , The optional exception class or tuple of exception class for which the connection fail over mechanism is applied 
 60             ping=1,
 61             #  Optional logo , Used to control when to use ping() Method to check the connection ( If this method is available )(0 = None = never, 1 = default = whenever it is requested,2 = when a cursor is created, 4 = when a query is executed, 7 = always)
 62             **self.kwargs)
 63 
 64     def _task_worker(self, i):
 65         conn = self.db_pool.connection()
 66         cur = conn.cursor()
 67         sql = "select * from username;"
 68         self.db_read(cur, sql, i)
 69         cur.close()
 70         conn.close()
 71 
 72     def close(self):
 73         self.db_pool.close()
 74 
 75 class PymysqlTest(TaskConsumer):
 76     def __init__(self,task_num, task_pool_num, **db_conf):
 77         self.task_num = task_num
 78         self.task_pool_num = task_pool_num
 79         self.kwargs = db_conf
 80 
 81     def _task_worker(self, i):
 82         conn = pymysql.Connect(**self.kwargs)
 83         cur = conn.cursor()
 84         sql = "select * from username;"
 85         self.db_read(cur, sql, i)
 86         cur.close()
 87         conn.close()
 88 
 89 
 90 class PymysqlPoolTest(TaskConsumer):
 91     def __init__(self, task_num, task_pool_num, max_conn, blocking=True, **db_conf):
 92         self.task_num = task_num
 93         self.task_pool_num = task_pool_num
 94         self.max_conn = max_conn
 95         self.blocking = blocking
 96         self.kwargs = db_conf
 97         self._get_pymysql_pool()
 98 
 99     def _get_pymysql_pool(self):
100         self.queue_pool = Queue(maxsize=self.max_conn)
101         for i in range(self.max_conn):
102             self.queue_pool.put(pymysql.Connect(**self.kwargs))
103 
104     def _task_worker(self, i):
105         conn = self.queue_pool.get()
106         cur = conn.cursor()
107         sql = "select * from username;"
108         self.db_read(cur, sql, i)
109         cur.close()
110         self.queue_pool.put(conn)
111 
112     def close(self):
113         while not self.queue_pool.empty():
114             self.queue_pool.get().close()
115 
116 if __name__ == '__main__':
117     TASK_NUM = 10000
118     TASK_POOL_NUM = 100
119 
120 
121     #  programme 1: Database connection pool scheme ( It is recommended to set blocking=True,maxconnections Maximum number of connections )
122     start_time_dbpool=time.time()
123     DB_POOL_NUM = 100 #DB_POOL_NUM>=TASK_POOL_NUM The task execution time remains basically unchanged 
124     db_pool_test = DbPoolTest(TASK_NUM, TASK_POOL_NUM, DB_POOL_NUM,**db_conf)
125     # start_time_dbpool = time.time()
126     time_dbpool_task = db_pool_test.consumer()
127     db_pool_test.close()
128     time_dbpool = (time.time()-start_time_dbpool, time_dbpool_task)
129 
130 
131     #  programme 2: Manually establish a connection in each thread pool 
132     start_time_pymysql = time.time()
133     pymysql_test = PymysqlTest(TASK_NUM, TASK_POOL_NUM, **db_conf)
134     time_pymysql_task = pymysql_test.consumer()
135     time_pymysql = (time.time()-start_time_pymysql, time_pymysql_task)
136 
137     #  programme 3: Manually maintain the database connection pool 
138     start_time_pymysqlpool = time.time()
139     DB_POOL_NUM = 100  # DB_POOL_NUM>=TASK_POOL_NUM The task execution time remains basically unchanged 
140     pymysql_pool_test = PymysqlPoolTest(TASK_NUM, TASK_POOL_NUM, DB_POOL_NUM, **db_conf)
141     # start_time_dbpool = time.time()
142     time_pymysqlpool_task = pymysql_pool_test.consumer()
143     pymysql_pool_test.close()
144     time_pymysqlpool = (time.time() - start_time_pymysqlpool, time_pymysqlpool_task)
145 
146 
147     print(" Database connection pool scheme :", time_dbpool)
148     print(" In thread manual connection scheme :", time_pymysql)
149     print(" Manually maintain the database connection pool scheme :", time_pymysqlpool)
Three schemes of database connection in concurrency

 

 

DBUtils It's actually a two submodule Python package , One for connecting DB-API 2 modular , The other is used to connect a typical PyGreSQL modular .

Overall DB-API 2 Variable
SteadyDB.py Used to stabilize database connections
PooledDB.py Connection pool
PersistentDB.py Maintain a continuous database connection
SimplePooledDB.py Simple connection pool

 

Typical PyGreSQL Variable
SteadyPg.py Stable PyGreSQL Connect
PooledPg.py PyGreSQL Connection pool
PersistentPg.py Maintain a sustained PyGreSQL Connect
SimplePooledPg.py ordinary PyGreSQL Connection pool

 

SteadyDB

DBUtils.SteadyDB  It's a module that implements " tough " Database connection to , be based on DB-API 2 The original connection established . One " tough " The connection means that after the connection is closed , Or it will be reconnected when the number of operations is limited .

A typical example is when the database is restarted , And your program is still running and needs to access the database , Or when your program connects to a remote database behind a firewall , When the firewall restarts and loses its state .

Generally speaking, you don't need to use it directly  SteadyDB  It just provides basic services for the next two modules , PersistentDB  and  PooledDB .

SimplePooledDB

DBUtils.SimplePooledDB  Is a very simple database connection pool implementation . He's better than perfect  PooledDB  Modules lack a lot of functionality

PersistentDB

DBUtils.PersistentDB  Achieved a tough 、 Thread safe 、 Stubborn database connection , Use DB-API 2 modular .

When a thread first opens a database connection , A connection will open and only be used by this thread . When the thread closes the connection , The connection is still open for the thread to use the opened connection the next time it requests . The connection closes automatically when the thread dies .

In a nutshell  PersistentDB  Try to reuse database connection to improve database access performance of threaded program , And he makes sure that the connection is not shared between threads .

therefore , PersistentDB  It can be at the bottom DB-API Modules also work well when they are not thread safe , And he will also avoid problems when other threads change database sessions or use multi statement transactions .

PooledDB

DBUtils.PooledDB  Achieved a tough 、 Thread safe 、 There are cached 、 Reusable database connection , Use any DB-API 2 modular .

PooledDB  Open database connections can be shared between different threads . This is when you connect and specify  maxshared  Parameters , And the bottom DB-API 2 Interface is thread safe , But you can still use dedicated database connections without sharing connections between threads . In addition to shared connections , You can also set up at least  mincached  The connection pool , And you can use at most  maxcached  A connection , This can be used for both private and shared connection pools . When a thread closes a non shared connection , Will be returned to the free connection pool for the next use .

If the bottom layer DB-API Modules are not thread safe , Thread locks ensure that  PooledDB  It's thread safe . So you don't have to worry about it , But you have to be careful when you use private connections to change database sessions or perform multi command transactions .

 

版权声明
本文为[It's a beautiful laugh]所创,转载请带上原文链接,感谢
https://chowdera.com/2021/01/20210123192855713S.html

随机推荐