分页查询
分页查询是处理大量数据时常用的技术,通过分页可以将数据分成多个小部分,方便用户逐页查看。SQLAlchemy 提供了简单易用的方法来实现分页查询。
本篇我们也会在最终实现这样的分页效果:
1. 什么是分页查询
分页查询是将查询结果按照一定数量分成多页展示,每页显示固定数量的记录。分页查询通常使用两个参数:
limit
:每页显示的记录数量。offset
:跳过的记录数量。
例如,要查询第二页,每页显示 10 条记录:
limit
:10offset
:10
2. 使用 SQLAlchemy 实现分页查询
基本查询
首先,我们需要一个基本的查询来获取数据:
import db
from model import Student
def basic_query():
students = db.session.query(Student).all()
for stu in students:
print(stu.to_dict())
使用 limit
和 offset
前文中,我们已经了解到 SQLAlchemy 提供了 limit
和 offset
方法来实现分页查询。limit
限制返回的记录数量,offset
跳过指定数量的记录。
import db
from model import Student
def paginated_query(page, per_page):
q = db.select(Student).limit(per_page).offset((page - 1) * per_page)
students = db.session.execute(q).scalars()
for stu in students:
print(stu.to_dict())
例如,要获取第 2 页,每页显示 10 条记录:
paginated_query(2, 10)
对应的 SQL 语句:
SELECT * FROM tb_student LIMIT 10 OFFSET 10;
3. 前后端实现分页功能
后端分页
在后端实现分页功能时,可以创建一个函数来处理分页逻辑。这个函数接受 page
和 per_page
参数,并返回当前页的数据和总页数。
import db
from model import Student
def get_paginated_students(page, per_page):
total = db.session.query(Student).count()
q = db.select(Student).limit(per_page).offset((page - 1) * per_page)
students = db.session.execute(q).scalars()
return {
'total': total,
'page': page,
'per_page': per_page,
'pages': (total + per_page - 1) // per_page,
'data': [stu.to_dict() for stu in students]
}
前端分页
在前端实现分页时,可以使用后端提供的分页数据来渲染页面:
{
"total": 100,
"page": 2,
"per_page": 10,
"pages": 10,
"data": [
{"id": 11, "name": "Student 11", ...},
{"id": 12, "name": "Student 12", ...},
...
]
}
前端可以根据这些数据渲染分页控件和当前页的数据。
[拓展] Flask 分页演示
下面是一个前后端不分离的 Flask 项目,代码文件比较多,你需要自行理一下。同时也要保证 Flask
、 Flask-SQLAlchemy
与 Flask-MysqlDB
的安装。
pip install flask
pip install flask-sqlalchemy # 兼容 Flask 的 SQLAlchemy 框架,提供 ORM 功能
pip install flask-mysqldb # 为 Flask-SQLAlchemy 提供 MySQL 驱动
Flask 项目目录如下:
flask_app/ # 项目目录
├── templates/ # 模板目录
│ └── list.html # 模板文件
├── config.py # Flask 配置文件
├── db.py # 数据库核心文件,包含重要操作
├── manage.py # Flask 路由和业务视图文件
└── models.py # 数据库模型文件
首先看一下配置文件 config.py
:
class Config:
SQLALCHEMY_DATABASE_URI = 'mysql://root:0908@localhost:3306/db_flask_demo_school?charset=utf8mb4' # 数据库连接。自行替换数据库用户名称和密码以及实际数据库名
SQLALCHEMY_ECHO = False # 是否打印执行的 SQL 语句及其耗时
DEBUG = True # 是否启用调试模式
db.py
"""
Create database:
> create database db_flask_demo_school charset=utf8mb4
"""
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import *
db = SQLAlchemy()
models.py
from db import *
class Student(db.Model):
__tablename__ = 'tb_student2'
id = db.Column(db.Integer, primary_key=True, comment="主键")
name = db.Column(db.String(15), index=True, comment="姓名")
age = db.Column(db.SmallInteger, comment="年龄")
sex = db.Column(db.Boolean, comment="性别")
email = db.Column(db.String(128), unique=True, comment="邮箱地址")
money = db.Column(db.Numeric(10, 2), default=0.0, comment="钱包")
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'age': self.age,
'sex': self.sex,
'email': self.email,
'money': float(self.money)
}
def __repr__(self):
return f'<{self.__class__.__name__}: {self.name}>'
然后就是 manage.py
,编写了路由与业务代码:
from pathlib import Path
from flask import Flask, jsonify, request, render_template
from config import Config
from models import db, Student
app = Flask(__name__, template_folder='./templates')
app.config.from_object(Config)
db.init_app(app)
@app.route('/', methods=['GET'])
def index():
"""没啥用,勿看"""
title = Path(__file__).name
return title
@app.route('/students', methods=['POST'])
def create_student():
"""采集访问的信息,创建学生"""
sex = request.form.get('sex')
sex = int(sex) if sex.isdigit() else 0
student = Student(
name=request.form.get('name', '未知'),
age=request.form.get('age', 0),
sex=bool(sex),
email=request.form.get('email', ''),
money=request.form.get('money', 0),
)
if request.form.get('id', None) is not None:
student.id = request.form['id']
db.session.add(student)
db.session.commit()
return jsonify({
'success': True,
'data': student.to_dict(),
'msg': 'success'
}), 201
@app.route('/students', methods=['DELETE'])
def delete_students():
"""删除学生表的所有记录"""
db.session.execute(db.delete(Student))
db.session.commit()
return jsonify({
'success': True,
'data': None,
'msg': 'success'
})
@app.route('/students', methods=['GET'])
def get_students():
# 旧版本 2.x 获取全部数据
# students = Student.query.all()
# 新版本 3.1.x 获取全部数据
students = db.session.execute(db.select(Student).where()).scalars()
return jsonify({
'success': True,
'data': {
'count': Student.query.count(),
'students': [student.to_dict() for student in students]
},
'msg': 'success'
})
@app.route('/students/<int:student_id>', methods=['GET'])
def get_student(student_id):
# 根据主键查询数据,不存在则为 None
student = db.session.get(Student, student_id)
if not student:
return jsonify({
'success': False,
'data': None,
'msg': 'student not found'
})
return jsonify({
'success': True,
'data': student.to_dict(),
'msg': 'success'
})
@app.route('/students/data', methods=['GET'])
def students_data():
"""这里是分页器的使用,不同于我们所使用的 limit 和 offset 需要自己编写"""
# 不采取数据分页时,大量数据时会导致服务器运存膨胀,这是非常不妥的
page = request.args.get('page', 1, type=int)
per_page = request.args.get('size', 3, type=int)
# 创建分页器对象
pagination = Student.query.paginate(page=page, per_page=per_page, max_per_page=20)
print('当前页对象', pagination)
print('总数据量', pagination.total)
print('当前页数据列表', pagination.items)
print('总页码', pagination.pages)
print()
print('是否有上一页', pagination.has_prev)
print('上一页页码', pagination.prev_num)
print('上一页对象', pagination.prev())
print('上一页对象的数据列表', pagination.prev().items)
print()
print('是否有下一页', pagination.has_next)
print('下一页页码', pagination.next_num)
print('下一页对象', pagination.next())
print('下一页对象的数据列表', pagination.next().items)
# """前后端分离推荐使用的 json 结果,这里没用到"""
data = {
"page": pagination.page, # 当前页码
"pages": pagination.pages, # 总页码
"has_prev": pagination.has_prev, # 是否有上一页
"prev_num": pagination.prev_num, # 上一页页码
"has_next": pagination.has_next, # 是否有下一页
"next_num": pagination.next_num, # 下一页页码
"items": [{
"id": item.id,
"name": item.name,
"age": item.age,
"sex": item.sex,
"money": item.money,
} for item in pagination.items]
}
return render_template('list.html', **locals())
if __name__ == '__main__':
with app.app_context():
db.drop_all() # 启动时先删除相关表,后创建相关表
db.create_all()
app.run('0.0.0.0', 9527)
最后就是 list.html
这个模板文件,呈现一个分页的演示:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
<style>
body {
font-family: Arial, sans-serif;
background-color: #f4f7fa;
color: #333;
}
table {
border-collapse: collapse;
margin: 50px auto;
width: 80%;
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1);
background-color: #fff;
}
th, td {
padding: 12px 15px;
text-align: center;
}
th {
background-color: #007bff;
color: #fff;
text-transform: uppercase;
}
tr:nth-child(even) {
background-color: #f2f2f2;
}
tr:hover {
background-color: #e9f5ff;
}
.page {
margin: 20px auto;
text-align: center;
}
.page a, .page span {
padding: 8px 16px;
margin: 0 4px;
color: #007bff;
background: #fff;
border: 1px solid #007bff;
border-radius: 4px;
text-decoration: none;
transition: background-color 0.3s, color 0.3s;
}
.page a:hover {
background-color: #007bff;
color: #fff;
}
.page span {
background-color: #007bff;
color: #fff;
}
</style>
</head>
<body>
<table border="1" align="center" width="600">
<tr>
<th>ID</th>
<th>Age</th>
<th>Name</th>
<th>Sex</th>
<th>Money</th>
</tr>
{% for student in pagination.items %}
<tr>
<td>{{ student.id }}</td>
<td>{{ student.age }}</td>
<td>{{ student.name }}</td>
<td>{{ "男" if student.sex else "女" }}</td>
<td>{{ student.money }}</td>
</tr>
{% endfor %}
<tr align="center">
<td colspan="5" class="page">
{% if pagination.has_prev %}
<a href="?page=1">首 页</a>
<a href="?page={{ pagination.page - 1 }}">上一页</a>
<a href="?page={{ pagination.page - 1 }}">{{ pagination.page - 1 }}</a>
{% endif %}
<span>{{ pagination.page }}</span>
{% if pagination.has_next %}
<a href="?page={{ pagination.page + 1 }}">{{ pagination.page + 1 }}</a>
<a href="?page={{ pagination.page + 1 }}">下一页</a>
<a href="?page={{ pagination.pages }}">尾 页</a>
{% endif %}
</td>
</tr>
</table>
</body>
</html>
为了确保能够有一定数量的数据,请你另外新建一个 request.py
,用于创建大量数据(如果你知道 faker 的使用,也可以自己弄一些数据),先启动 manage.py
,保证后端服务的开启和路由可用,然后直接运行该文件后可添加测试数据:
# request.py
import requests # pip install requests
students = [ # 虚拟数据,务必当真
{
'name': '王毅',
'age': 21,
'sex': 1,
'email': '[email protected]',
'money': 4488.5
},
{
'name': '张晓',
'age': 19,
'sex': 0,
'email': '[email protected]',
'money': 2389.75
},
{
'name': '李春阳',
'age': 23,
'sex': 1,
'email': '[email protected]',
'money': 6715.32
},
{
'name': '刘瑞',
'age': 20,
'sex': 0,
'email': '[email protected]',
'money': 3456.89
},
{
'name': '陈欢',
'age': 22,
'sex': 1,
'email': '[email protected]',
'money': 5678.12
},
{
'name': '吴娜',
'age': 18,
'sex': 0,
'email': '[email protected]',
'money': 1234.56
},
{
'name': '赵丹',
'age': 24,
'sex': 0,
'email': '[email protected]',
'money': 7890.43
},
{
'name': '孙宇',
'age': 21,
'sex': 1,
'email': '[email protected]',
'money': 4567.89
},
{
'name': '黄宇',
'age': 19,
'sex': 1,
'email': '[email protected]',
'money': 2345.67
},
{
'name': '杨静',
'age': 22,
'sex': 0,
'email': '[email protected]',
'money': 6789.01
}
]
for student in students:
response = requests.request('POST', 'http://127.0.0.1:9527/students', data=student)
print('添加一条记录', response.json())
确定 Flask 项目正常启动,并且上面的数据也完成了注入,如果你发现启动失败了,请检查路由、数据库连接是否有问题,你可能需要一定的 Flask 基础知识。接下来如何访问我们渲染的模板呢?
根据路由视图和设置的访问端口(9527):
@app.route('/students/data', methods=['GET'])
def students_data():
...
return render_template('list.html', **locals())
我们直接在浏览器访问:http://127.0.0.1:9527/students/data 这个地址即可。
上述案例是演示所用,随意写的,小部分代码参考了某机构的教程代码示例,平台原因无法标注,路由设计也是很随便的,这种代码如果存在版权纠纷,emmm.....,请联系我删除,谢谢。无私开源,只为搞懂后端开发的学习,请勿钻牛角……
文章评论