本文最后更新于185 天前,其中的信息可能已经过时,如有错误请发送邮件到big_fw@foxmail.com
用户信息模块
项目已经完成初始化。
因为其他模块都需要有用户信息关联,因此需要先完成用户信息模块功能。
实现流程的开发思路
已完成:登录/注册 安全方案:JWT+REDIS
已完成:用户管理->用户的增删改查
已完成:更新用户信息的前端页面
已完成:判断查询参数是否为空
使用JWT+REDIS 作为单点登录的实现方案
可参考:【Python_Flask系列之_JWT+Redis的token管理(包含失效)】
实现方案:
- 注册:将密码通过MD5加密后存储进数据库
- 登陆时用密码加密后的值进行验证
- 单点登录:将脱敏后的用户信息使用JWT生成token
- 将token放进REDIS,将token返回前端
- 前端每次访问时在请求头判断有无token,无token则跳转到登录页面
- 后端获取到token,将token解析为哦用户信息,确认用户登录状态
优点:token能够让用户登录信息更安全 / 管理员可以通过redis控制用户登录状态
代码实现:
redisUtil.py
# 封装REDIS工具:REDIS存储当前的登录用户信息 import redis class MyRedis(): def __init__(self, host="127.0.0.1", port=6379, password='admin123'): pools = redis.ConnectionPool(host=host, port=port, password=password, decode_responses=True) self.__redis = redis.StrictRedis(connection_pool=pools) # 在 Redis 中设置值,默认,不存在则创建,存在则修改。 def set(self, key, value): return self.__redis.set(key, value) # 取出键对应的值 def get(self, key): if self.__redis.exists(key): return self.__redis.get(key) else: return None # 获取有序集合中所有元素 def zrange(self, key): if self.__redis.exists(key): return self.__redis.zrange(key, 0, -1, desc=False, withscores=True) else: return None # 获取值的索引号 def zrank(self, key): if self.__redis.exists(key): return self.__redis.zrank(key) else: return None # 按照分数范围获取name对应的有序集合的元素 def zrangebyscore(self, key): if self.__redis.exists(key): return self.__redis.zrangebyscore(key, 0, -1, withscores=True) else: return None # 按照分数范围获取有序集合的元素并排序(默认从大到小排序) def zrevrange(self, key): if self.__redis.exists(key): return self.__redis.zrevrange(key, 0, -1) else: return None # 删除键值对 def delete(self, key): if self.__redis.exists(key): return self.__redis.delete(key) else: return None # 获取key对应hash的所有键值 def hgetall(self, key): if self.__redis.exists(key): return self.__redis.hgetall(key) else: return None # 统计返回的总个数 def rcount(self, re, key): if self.__redis.exists(key): sum = 0 for i in re: print(i) sum += 1 print("总共有%s个" % (sum))
app.py
# app.py from flask import Flask, request, jsonify from flask_cors import CORS from api.AccountApi import account_api from api.ProductApi import product_api from dao.userDAO import UserDAO from utils.authUtil import jwt_and_redis_required from utils.redisUtil import MyRedis from utils.responseUtil import responseOK, responseError, ErrorCode, BAD_REQUEST from flask_jwt_extended import ( JWTManager, jwt_required, create_access_token, create_refresh_token, get_jwt_identity, get_jwt ) app = Flask(__name__) # 跨域 CORS(app, origins='http://localhost:5173') # redis redist = MyRedis() # JWT app.config['JWT_SECRET_KEY'] = 'preOwnedMarketplace' app.config["JWT_ACCESS_TOKEN_EXPIRES"] = 6660 jwt = JWTManager(app) # 接口 app.register_blueprint(account_api, url_prefix='/api/ac') app.register_blueprint(product_api, url_prefix='/api/pro') # 注册功能实现 @app.route('/api/regis', methods=['POST']) def toregis(): print("注册用户!") user_data = request.json # 获取 JSON 数据 try: # @TODO 可以提供email/phone 提供注册 # @TODO 校验用户名是否存在 UserDAO.create_user(user_data) user = UserDAO.read_user_by_username(user_data['username']) except Exception as e: print(f'注册失败!用户: {user_data}, 错误: {str(e)}') return responseError(BAD_REQUEST) return responseOK('用户注册成功', user) # 返回 JSON 响应 # 登录功能实现 @app.route('/api/login', methods=['POST']) def tologin(): print("登录用户!") user_data = request.json # 获取 JSON 数据 userid = UserDAO.login_user(user_data) print('登录用户:' + user_data['username']) print('登录id:' + str(userid)) if userid == 0: return responseError(ErrorCode('400', '用户信息不存在')) user = UserDAO.read_user_by_id(userid) access_token = create_access_token(identity=user) redist.set(user.get('username'),access_token) return responseOK(access_token) # 返回 JSON 响应 @app.route('/api/cur', methods=['GET']) @jwt_and_redis_required() def getCuruser(): current_user = get_jwt_identity() print(f'username:' + current_user['username']) return responseOK(current_user) if __name__ == "__main__": app.run(debug=True) # debug==True是为了方便修改代码之后,能够不重启项目就能够更新,否则,每次更改代码都需要重新启动项目 # 其他参数的设置可以查阅文档,这里越简单越好
UserDAO.py
import hashlib from pageModel.User import UserDisplay from utils.dbUtil import db class UserDAO: @staticmethod def create_user(user_data): query = "INSERT INTO Users (username, password, email, phone) VALUES (%s, %s, %s, %s)" hashed_password = hashlib.md5(user_data['password'].encode()).hexdigest() params = ( user_data['username'], hashed_password, user_data.get('email', None), user_data.get('phone', None), ) db.execute_update(query, params) @staticmethod def read_user_by_id(user_id): query = "SELECT user_id, username, email, phone, role, status FROM Users WHERE user_id = %s" params = (user_id,) result = db.execute_query(query, params) if result: user_data = result[0] return UserDisplay( user_data['user_id'], user_data['username'], user_data.get('email'), user_data.get('phone'), user_data.get('role'), user_data.get('status') ).to_dict() return None @staticmethod def read_user_by_username(username): query = "SELECT user_id, username, email, phone, role, status FROM Users WHERE status!=0 and username = %s" params = (username,) result = db.execute_query(query, params) if result: user_data = result[0] return UserDisplay( user_data['user_id'], user_data['username'], user_data.get('email'), user_data.get('phone'), user_data.get('role'), user_data.get('status') ).to_dict() return None @staticmethod def update_user(user_id, update_data): set_clause = ", ".join(f"{key} = %s" for key in update_data.keys()) query = f"UPDATE Users SET {set_clause}, updated_at = NOW() WHERE user_id = %s" params = tuple(update_data.values()) + (user_id,) db.execute_update(query, params) @staticmethod def delete_user(user_id): # 逻辑删除 query = "UPDATE Users SET status=0 WHERE user_id = %s" params = (user_id,) db.execute_update(query, params) # 根据用户名和密码匹配 @staticmethod def login_user(user_data): hashed_password = hashlib.md5(user_data['password'].encode()).hexdigest() query = "SELECT user_id FROM Users WHERE username = %s AND password = %s" params = ( user_data['username'], hashed_password ) result = db.execute_query(query, params) if result: return result[0]['user_id'] return 0 @staticmethod def select_all_user(): query = "SELECT user_id, username, email, phone, role, status FROM Users WHERE status != 0" result = db.execute_query(query) if result: user_data = result user_res = [] for user in user_data: user_res.append(UserDisplay( user['user_id'], user['username'], user.get('email'), user.get('phone'), user.get('role'), user.get('status') ).to_dict()) return user_res return None # 修改后的 query_user 方法 @staticmethod def query_user(userid=None, username=None, role=None): query = "SELECT * FROM Users WHERE 1=1" params = [] if userid: query += " AND user_id = %s" params.append(userid) if username: query += " AND username = %s" params.append(username) if role: query += " AND role = %s" params.append(role) result = db.execute_query(query, params) if result: user_data = result user_res = [] for user in user_data: user_res.append(UserDisplay( user['user_id'], user['username'], user.get('email'), user.get('phone'), user.get('role'), user.get('status') ).to_dict()) return user_res return None
类似于responseOK…都是作为封装的工具类,返回给前端数据的一种规范化处理,可以参考源代码中关于util文件夹部分。
前端
- 封装axios,配置请求/响应拦截器
request.ts
// 引入axios import axios from 'axios' import { ElMessage } from 'element-plus' //1. 创建新的axios实例, const instance = axios.create() // 请求拦截器(请求拦截器作用就是用于发送接口请求前需要做的操作) instance.interceptors.request.use((config) => { //发请求前做的一些处理,数据转化,配置请求头,设置token,设置loading等,根据需求去添加 const token = window.sessionStorage.getItem('token') if (token) { // 判断是否存在token,如果存在的话,则每个http header都加上token config.headers.Authorization = "Bearer "+`${token}` //请求头加上token } return config }) // 响应拦截器(响应拦截器作用就是接收到响应数据后需要做的一些操作,比如判断接口状态) instance.interceptors.response.use((res) => { // 接收到响应数据并成功后的一些共有的处理,关闭loading等 // res为回调函数,也就是接口请求成功后返回的数据 // res.data.meta.status为当前接口的状态 let status = res.data.code // switch函数判断 // 响应拦截器执行完后需要将接口返回的数据也就是res回调函数,return给到之后的请求方法 return res.data }) // 将新创建的一个axios导出 export default instance
userLogin.vue > 登录按钮绑定的登录功能
<script> import { useRouter } from 'vue-router'; import { reactive, ref } from 'vue'; import instance from '@/request/request'; import { ElMessage } from 'element-plus' export default { name: "UserLogin", setup() { const userLoginForm = reactive({ username: "", password: "" }); const router = useRouter(); const error = ref(''); async function userLogin() { try { console.log(111) const response = await instance.post('/api/login', userLoginForm); if (response.code === 200) { window.sessionStorage.setItem('token', response.data) } else { error.value = response.message; ElMessage({ message: '登录失败.请检查登录信息', type: 'error', }) } } catch (e) { error.value = '登录失败,请重试'; } try { const response = await instance.get('/api/cur'); if (response.code === 200) { await router.push('/admin'); ElMessage({ message: '登录成功.欢迎' + response.data.username, type: 'success', }) } else { error.value = response.message; ElMessage({ message: '登录失败.请检查登录信息', type: 'error', }) } } catch (e) { error.value = '登录失败,请重试'; } } return { userLoginForm, error, userLogin }; } } </script>
至此,你已经可以在页面上添加一个简单的登录按钮,绑定登录的功能,进行登录的测试。 如有疑惑,在下方留下您的邮箱和相关问题,本站将尽快为您提供解决方案。
源码地址
请返回第一章底部:二手产品管理网站(1)-LANJ – 个人博客