Flask入门到实战
文章目录
- 1. Flask初识
- 1. Flask概述
- 2. wsgi
- 3. 安装Flask
- 4. werkzurg
- 5. 第一个Flask应用
- 2. 配置文件的使用
- 1. 配置文件导入原理
- 2. 配置文件的使用
- 3. 路由系统
- 1. 反向生成URL
- 2. 动态路由的构造
- 4. 请求与响应相关数据
- 1. 请求相关数据
- 2. 响应响应体
- 3. 响应响应头
- 5. 简单登录案例
- 1. 案例目录结构
- 2. 案例主要代码
- 3. 案例页面效果
- 6. 用户认证
- 1. 第一种用户认证
- 2. 装饰器知识点补充
- 3. endpoint参数
- 4. 装饰器的先后顺序
- 5. 第二种用户认证方式
- 6. 第三种用户认证方式
- 7. 模板渲染
- 1. 数据类型的渲染
- 2. 传入函数
- 3. 全局定义函数
- 4. 模板的继承
- 5. 模板的包含
- 6. 宏定义的使用
- 7. 模板安全
- 8. Session
- 1. Session原理
- 2. 自定义Session配置
- 9. flash
- 10. 中间件
- 1. call方法的执行
- 2. 自定义中间件
- 11. 特殊的装饰器(重要)
- 12. 路由和视图
- 1. 路由和视图函数对应关系
- 2. 路由设置两种方式
- 3. @app.route和app.add_url_url参数
- 4. CBV
- 5. 自定义正则
- 13. 蓝图
- 1. 蓝图(一)
- 2. 蓝图(二)
- 14. threading.local
- 1. threading.local的作用
- 2. 如何获取一个线程的唯一标记
- 3. 根据字典自定义一个类似于threading.local功能
- 4. 根据字典为每个协程开辟空间进行存取数据
- 5. __ getattr__ 与__ setattr__
- 6. 通过getattr与setattr构造出threading.local
- 15. 上下文管理
- 16. 偏函数与面向对象
- 1. 偏函数
- 2. super和执行类的区别
- 3. 面向对象中特殊的方法
- 17. Local与LocalStack
- 1. Local对象与LocalStack作用
- 2. 自定义一个简单的Local
- 3. 对栈中的数据进行添加和维护
- 4. 使用LocalStack向Local添加对象
- 18. Flask请求上下文管理原理
- 1. request上下文管理
- 2. 请求相关数据流程图
- 3. session上下文管理
- 19. flask-session
- 20. wtforms组件
- 1. 用户登录
- 2. 用户注册
- 3. 数据库实时更新
- 4. wtforms的作用与django示例
- 21. SQLAlchemy
- 1. 简介
- 2. CURD
- 3. 常用操作
- 4. 外键操作
- 5. 多对多操作
- 6. 两种连接数据库的方式
- 7. 原生SQL
我的网站:https://pythoneers.cn
1. Flask初识
1. Flask概述
Flask框架是一个短小精悍且可扩展强的Web框架,同Django框架一样依赖于第三方实现socket模块。
Flask特有上下文管理机制!
2. wsgi
WEB服务网关接口,是一个协议。Flask 中实现该协议的模块有 wsgiref
和 werkzeug
,模块本质上就是 socket服务端用于接收客户端的请求并处理
。
面试中可能会被用到wsgi是什么?
3. 安装Flask
一般使用 pip 安装 Flask:pip install Flask
,如果想加速下载,可以指定国内阿里云 pipy 的源,安装方法是:pip install flask -i https://mirrors.aliyun.com/pypi/simple
4. werkzurg
Flask 中实现 wsgi 的模块是 werkzurg,它是Flask第三方模块。
from werkzeug.serving import run_simpledef run(environ, start_response):return 'Hello Flask!'if __name__ == '__main__':run_simple('localhost', 5000, run)
from werkzeug.wrappers import Request, Response
from werkzeug.serving import run_simple@Request.application
def hello_flask(request):return Response('Hello Flask!')if __name__ == '__main__':run_simple('localhost', 5000, hello_flask)
Django中实现wsgi的模块是wsgiref
5. 第一个Flask应用
from flask import Flaskapp = Flask(__name__) # 实例化Flask类@app.route('/index')
def index():return 'Hello Flask!'if __name__ == '__main__':app.run() # run方法启动socket
2. 配置文件的使用
1. 配置文件导入原理
给一个类的字符串路径,找到类并且获取类中大写的静态字段。
main.py:
class FlaskClass:FLAG = True
test.py:
import importlibpath = 'setting.FlaskClass'
p, c = path.rsplit('.', maxsplit=1)
# print(p, c) # setting FlaskClass
m = importlib.import_module(p)
# print(m) # <module 'setting' from 'C:\\Users\\Thanlon\\PycharmProjects\\flask_pro\\setting.py'>
cls = getattr(m, c)
print(cls) # 找到类:<class 'setting.FlaskClass'>
# print(dir(cls)) # 类成员
for key in dir(cls):if key.isupper():print('key:', key)print('value:', getattr(cls, key))"""
<class 'setting.FlaskClass'>
key: FLAG
value: True
"""
2. 配置文件的使用
Flask配置文件是一个 flask.config.Config
对象,它继承字典,默认配置文件如下:
print(type(app.config), app.config)
"""
<class 'flask.config.Config'>
"""
{'ENV': 'production',
'DEBUG': False,
'TESTING': False,
'PROPAGATE_EXCEPTIONS': None,
'PRESERVE_CONTEXT_ON_EXCEPTION': None,
'SECRET_KEY': None,
'PERMANENT_SESSION_LIFETIME': datetime.timedelta(days=31),
'USE_X_SENDFILE': False,
'SERVER_NAME': None, 'APPLICATION_ROOT': '/',
'SESSION_COOKIE_NAME': 'session',
'SESSION_COOKIE_DOMAIN': None,
'SESSION_COOKIE_PATH': None,
'SESSION_COOKIE_HTTPONLY': True,
'SESSION_COOKIE_SECURE': False,
'SESSION_COOKIE_SAMESITE': None,
'SESSION_REFRESH_EACH_REQUEST': True,
'MAX_CONTENT_LENGTH': None,
'SEND_FILE_MAX_AGE_DEFAULT': datetime.timedelta(seconds=43200), 'TRAP_BAD_REQUEST_ERRORS': None,
'TRAP_HTTP_EXCEPTIONS': False,
'EXPLAIN_TEMPLATE_LOADING': False,
'PREFERRED_URL_SCHEME': 'http',
'JSON_AS_ASCII': True,
'JSON_SORT_KEYS': True,
'JSONIFY_PRETTYPRINT_REGULAR': False,
'JSONIFY_MIMETYPE': 'application/json',
'TEMPLATES_AUTO_RELOAD': None,
'MAX_COOKIE_SIZE': 4093}
修改默认配置文件,创建配置文件 settings.py
,用于存放修改的配置文件:
settings.py
:
# 存放开发环境、线上环境、测试环境共有的配置文件
class Config(object):pass# 线上环境配置文件
class ProductionConfig(Config):DEBUG = False# 开发环境配置文件
class DevelopmentConfig(Config):DEBUG = True# 测试环境配置文件
class TestingConfig(Config):DEBUG = True
app.py:
from flask import Flaskapp = Flask(__name__)
# print(type(app.config), app.config)
app.config.from_object('settings.DevelopmentConfig') # 引入配置文件if __name__ == '__main__':app.run()
3. 路由系统
1. 反向生成URL
使用 url_for
方法生成 url
:
@app.route('/index', methods=['GET', 'POST'], endpoint='name')
def index():print(url_for('name'))return 'Flask!'
使用 url_for
方法生成url时设置参数:
@app.route('/index/<int:id>')
def index(id):print(url_for('index', id=1))return 'Flask!'
"""
/index/1
"""
如果不指定 endpoint
的值,默认为函数名:
def _endpoint_from_view_func(view_func):assert view_func is not None, 'expected view func if endpoint ' \'is not provided.'return view_func.__name__ # 返回函数名
def add_url_rule(self,rule,endpoint=None,view_func=None,provide_automatic_options=None, **options):if endpoint is None:endpoint = _endpoint_from_view_func(view_func)options['endpoint'] = endpointmethods = options.pop('methods', None)
@app.route('/index', methods=['GET', 'POST'])
def index():print(url_for('index'))return 'Flask!'
如果endpoint非要重名,必须保证函数相同。
2. 动态路由的构造
@app.route('/index/<username>') # 没有指定类型是字符串类型
@app.route('/index/<int:id>')
@app.route('/index/<float:id>')
@app.route('/index/<path:path>')
示例:
@app.route('/index/<float:id>')
def index(id):print(type(id),id)return 'Flask!'
浏览器输入:http://127.0.0.1:5000/index/1.0
控制台输出:<class 'float'> 1.0
4. 请求与响应相关数据
1. 请求相关数据
常用请求的相关信息:
request.method
request.args
request.form
request.cookies
request.headers
其它请求的相关信息:
request.values
request.path
request.full_path
request.script_root
request.url
request.base_url
request.url_root
request.host_url
request.host
request.files
obj = request.files['the_file_name']
obj.save('/uploads/'+secure_filename(f.filename))
2. 响应响应体
响应字符串:
@app.route('/index')
def index():return 'Flask' # 字符串
响应模板:
@app.route('/index')
def index():return render_template() # 模板
响应重定向:
@app.route('/index')
def index():return redirect('') # 重定向
响应json格式数据 [from flask import json
]:
@app.route('/index')
def index():return json.dumps({'name': 'thanlon'}) # {'name': 'thanlon'}
另一种响应json格式数据的方式 [from flask import jsonify
]:
@app.route('/index')
def index():return jsonify({'name': 'thanlon'}) # {'name': 'thanlon'}
不能响应字典,即return dic是错误的!
3. 响应响应头
以上是响应响应体,还可以响应响应头和cookie,只需要从Flask模块中导入make_response进行封装即可。(from flask import make_response
)
示例:将字符串类型的响应体与响应头和cookie一同封装响应:
@app.route('/index')
def index():obj = make_response('Flask')obj.headers['response_flag'] = 'response_info'obj.set_cookie('key','value')return obj
响应模板、响应重定向、响应json格式数据和示例就一样的。
5. 简单登录案例
1. 案例目录结构
2. 案例主要代码
app.py:
from flask import Flask, render_template, request, redirect, session# app = Flask(__name__, static_folder='static', static_url_path='/thanlon') # 可指定前缀
app = Flask(__name__)
app.secret_key = 'thanlon'@app.route('/login', methods=['GET', 'POST']) # 默认允许GET请求
def login():if request.method == 'GET':return render_template('login.html')username = request.form.get('username')pwd = request.form.get('pwd')if username == 'Thanlon' and pwd == '123':session['username'] = username # 默认session保存在签名或加密的cookie中return redirect('/index')# return render_template('login.html', error='用户名或者密码错误!')return render_template('login.html', **{'error': '用户名或者密码错误!'})@app.route('/index') # 默认允许GET请求
def index():username = session.get('username')if not username:return redirect('/login')return render_template('index.html')if __name__ == '__main__':app.run()
login.html:
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1"><title>登录</title><link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css"integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">
</head>
<body>
<div class="container"><div class="row" style="margin-top:30px"><div class="col-md-4 col-md-offset-4"><div class="panel panel-primary"><div class="panel-heading">登录</div><div class="panel-body"><form method="POST"><div class="form-group"><label>用户名</label><input type="text" name="username" class="form-control" id="" placeholder="Username"required="required"></div><div class="form-group"><label>密码</label><input type="password" name="pwd" class="form-control" placeholder="Password"required="required"><div style="color:red">{{error}}</div></div><button type="submit" class="btn btn-primary">登录</button></form></div></div></div></div>
</div>
</body>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/js/bootstrap.min.js"integrity="sha384-Tc5IQib027qvyjSMfHjOMaLkfuWVxZxUPnCJA7l2mCWNIpG9mGCD8wGNIcPD7Txa"crossorigin="anonymous"></script>
</html>
index.html:
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1"><title>首页</title><link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css"integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">
</head>
<body>
<div class="container"><div class="row" style="margin-top:30px"><div class="panel panel-default"><div class="panel-body">欢迎{{session['username']}}进入首页!</div></div></div>
</div>
</body>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/js/bootstrap.min.js"integrity="sha384-Tc5IQib027qvyjSMfHjOMaLkfuWVxZxUPnCJA7l2mCWNIpG9mGCD8wGNIcPD7Txa"crossorigin="anonymous"></script>
</html>
3. 案例页面效果
登录失败:
登录成功:
6. 用户认证
1. 第一种用户认证
settings.py
:
# 存放开发环境、线上环境、测试环境共有的配置文件
class Config(object):DEBUG = FalseSECRET_KEY = 'thanlon'
app.py
:
from flask import Flask, render_template, redirect, url_for, request, sessionapp = Flask(__name__)
app.config.from_object('settings.DevelopmentConfig') # 引入配置文件USER_DICT = {1: {'username': 'thanlon', 'age': '23', 'gender': '男'},2: {'username': 'kiku', 'age': '25', 'gender': '女'}
}
@app.route('/login', methods=['GET', 'POST']) # 默认允许GET请求
def login():if request.method == 'GET':return render_template('login.html')username = request.form.get('username')pwd = request.form.get('pwd')if username == 'Thanlon' and pwd == '123':session['username'] = username # 默认session保存在签名或加密的cookie中return redirect('/index')# return render_template('login.html', error='用户名或者密码错误!')return render_template('login.html', **{'error': '用户名或者密码错误!'})@app.route('/index')
def hello_world():if not session.get('username'):return redirect(url_for('login'))return render_template('index.html', user_dict=USER_DICT)@app.route('/delete/<int:nid>')
def delete(nid):if not session.get('username'):return redirect(url_for('login'))del USER_DICT[nid]return redirect(url_for('hello_world')) # url_for('')写函数名,和路由无关@app.route('/detail/<int:nid>')
def detail(nid):if not session.get('username'):return redirect(url_for('login'))info = USER_DICT[nid]return render_template('detail.html', info=info)if __name__ == '__main__':app.run()
index.html
:
……
<div class="container"><div class="row"><div class="col-md-4"><table class="table table-hover"><tr><th style="text-align: center">ID</th><th style="text-align: center">用户名</th><th style="text-align: center">年龄</th><th style="text-align: center">性别</th><th style="text-align: center">操作</th></tr>{% for k,v in user_dict.items() %}<tr style="text-align: center"><td>{{ k }}</td><td>{{ v.username }}</td><td>{{ v.age }}</td><td>{{ v.gender }}</td><td><a href="/detail/{{ k }}">详情</a> | <a href="/delete/{{ k }}">删除</a></td></tr>{# <tr>#}{# <td>{{ v['username'] }}</td>#}{# <td>{{ v['age'] }}</td>#}{# <td>{{ v['gender'] }}</td>#}{# </tr> #}{# <tr>#}{# <td>{{ v.get('usename','默认') }}</td>#}{# <td>{{ v.get('age') }}</td>#}{# <td>{{ v.get('gender') }}</td>#}{# </tr>#}{% endfor %}</table></div></div>
</div>
……
detail.html
:
……
<body>
{#{'username': 'thanlon', 'age': '23', 'gender': '男'}#}
<br>
{% for item in info.values() %}{#% for item in info.values() %#}{{ item }}
{% endfor %}
</body>
2. 装饰器知识点补充
函数没有加装饰器:
# coding:utf-8
def auth(func):def inner(*args, **kwargs):ret = func(*args, **kwargs)return retreturn inner
def index():print('index')
print(index.__name__) # 打印函数名
'''index'''
函数加装饰器,让inner函数伪装成index函数:
# coding:utf-8
def auth(func):def inner(*args, **kwargs):ret = func(*args, **kwargs)return retreturn inner
@auth
def index():print('index')
@auth
def login():print('login')
print(index.__name__) # 打印函数名
print(login.__name__) # 打印函数名
'''inner'''
'''inner'''
上面的例子中加装饰器虽然把名字伪装成功,但是内部没有伪装成功,可以使用functools.wraps(func):把原来函数执行的信息放到inner函数中(把原来的函数名放进闭包):
# coding:utf-8
import functools
def auth(func):@functools.wraps(func)def inner(*args, **kwargs):ret = func(*args, **kwargs)return retreturn inner
@auth
def index():print('index')
@auth
def login():print('login')
print(index.__name__) # 打印index函数名
print(login.__name__) # 打印login函数名
'''
index
login
'''
3. endpoint参数
可以指定,默认是函数名。作用是:反向生成url,如果同名会出错。查看route函数源码:
def add_url_rule(self, rule, endpoint=None, view_func=None,
provide_automatic_options=None, **options):if view_func is not None:old_func = self.view_functions.get(endpoint)if old_func is not None and old_func != view_func:raise AssertionError('View function mapping is overwriting an ''existing endpoint function: %s' % endpoint)self.view_functions[endpoint] = view_func
……def route(self, rule, **options):def decorator(f):endpoint = options.pop('endpoint', None)self.add_url_rule(rule, endpoint, f, **options)return freturn decorator
4. 装饰器的先后顺序
装饰器和路由配合,把url和函数加入到路由关系中:
@app.route('/login', methods=['GET', 'POST']) # 默认允许GET请求
def hello_world():if not session.get('username'):return redirect(url_for('login'))return render_template('index.html', user_dict=USER_DICT)
如果想要再加入一个登录验证的装饰器,如果把 @auth</kbd>加入到<kbd>@app.route('/login', methods=['GET', 'POST'])
上面,那就是url和函数是一个大整体,再用 @auth
装饰,这显然不是我们的目标。函数应该优先和 @auth
结合,再去做路由和函数的路由对应关系。装饰完成之后,多个路由都对应 inner,endpoint 默认是函数名,这样所有的函数都是 inner,这也是有问题。有两种解决方式。
方式一:自己定义endpoint,
@app.route('/index',endpoint='1')
……
@app.route('/delete/<int:nid>',endpoint='2')
……
方式二:使用 @functools.wraps(func)
import functools
def auth(func):@functools.wraps(func)def inner(*args, **kwargs):result = func(*args, **kwargs)return resultreturn inner@app.route('/index', endpoint='1')
@auth
def hello_world():if not session.get('username'):return redirect(url_for('login'))return render_template('index.html', user_dict=USER_DICT)
5. 第二种用户认证方式
app.py
:
def auth(func):@functools.wraps(func)def inner(*args, **kwargs):result = func(*args, **kwargs)return resultreturn inner@app.route('/index', endpoint='1')
@auth
def hello_world():if not session.get('username'):return redirect(url_for('login'))return render_template('index.html', user_dict=USER_DICT)
这有个不好的地方,就是每一个需要做认证的都需要一个一个加入装饰器@auth。
应用场景:比较少的函数中需要额外加入功能。
6. 第三种用户认证方式
可以使用 before_request
,返回None表示可以通过执行,如果返回其它,不能通过执行:
@app.before_request
def auth():if request.path == '/login':return Noneif session.get('username'):return None# return redirect('/login')return redirect(url_for('login'))
应用场景:批量给函数加额外的功能!
7. 模板渲染
1. 数据类型的渲染
显示列表:
app.py
@app.route('/tpl')
def tpl():content = {'user': ['thanlon', 'kiku']}return render_template('tpl.html', **content)
tpl.html
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
{{ user.0 }}{#支持django的写法#}
{{ user[1] }}{#支持python的写法#}
</body>
</html>
显示元组:
app.py
@app.route('/tpl')
def tpl():content = {'user': ('thanlon', 'kiku')}return render_template('tpl.html', **content)
tpl.html
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
{{ user.0 }}
{{ user[1] }}
</body>
</html>
显示字符串:
app.py:
@app.route('/tpl')
def tpl():content = {'txt': '<input type="text"/>'}return render_template('tpl.html', **content)
tpl.html:
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
{{ txt }}
{{ txt|safe }}
</body>
</html>
使用Markup函数:
app.py:
@app.route('/tpl')
def tpl():content = {'txt': Markup('<input type="text"/>')}return render_template('tpl.html', **content)
tpl.html:
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
{{ txt }}
</body>
</html>
2. 传入函数
app.py:
def func(arg):return arg + 1
@app.route('/tpl')
def tpl():content = {'func': func}return render_template('tpl.html', **content)
tpl.html:
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
{{ func(1) }}<!--2-->
</body>
</html>
3. 全局定义函数
传参的时候自己可以传,全局也可以传参,template_global() 装饰器就是用来传参的。
app.py:
@app.template_global()
def test(a, b):# {{ test(1,2) }}return a + b@app.route('/tpl')
def tpl():content = {}return render_template('tpl.html', **content)
tpl.html:
……
<body>
{{ test(1,2) }}
</body>
……
template_filter() 装饰器也就是用来传参的,
app.py:
@app.template_filter()
def test(a, b, c):# {{ 1|test(2,3) }}return a + b + c@app.route('/tpl')
def tpl():content = {}return render_template('tpl.html', **content)
tpl.html:
<body>
{{ 1|test(2,3) }}
{% if 1|test(2,3) %}
</body>
template_filter与template_global区别是:template_filter可以放在 if 后面做条件。
4. 模板的继承
tpl.html 继承 layout.html,
tpl.html:
{% extends 'layout.html' %}
{% block content %}{% if 1|test(2,3) %}<div>True</div>{% else %}<div>False</div>{% endif %}
{% endblock %}
layout.html:
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
<h4>继承</h4>
{% block content %}{% endblock %}
</body>
</html>
5. 模板的包含
tpl.html:
{% include 'form.html' %}
form.html:
<form action="">form表单
</form>
6. 宏定义的使用
tpl.html:
{#默认是不显示的#}
{% macro input(name,type='text',value='') %}<input type="{{ type }}" name="{{ name }}" value="{{ value }}">
{% endmacro %}
{#如果想显示#}
{{ input('username') }}
7. 模板安全
前端:{{u|safe}}
后端:Markup("<input type=“text”/ >")
8. Session
1. Session原理
当请求刚进来时,flask 读取 cookie 中 session 对应的值,将该值解密并反序列化为字典,放入内存以便视图函数使用。
视图函数:
@app.route('/session')
def sess():# from flask.sessions import SecureCookieSessionprint(type(session)) # session继承了字典,字典有什么方法,它就有什么方法session['k1'] = 'v1'return 'session''''<class 'werkzeug.local.LocalProxy'>'''
当请求结束时,flask会读取内存中字典的值,进行序列化加密,写入到用户的cookie中。
2. 自定义Session配置
settings.py:
class Config(object):# PERMANENT_SESSION_LIFETIME = datetime.timedelta(days=31) # 生命周期,最多的保存时间# PERMANENT_SESSION_LIFETIME = datetime.timedelta(hours=1)PERMANENT_SESSION_LIFETIME = datetime.timedelta(minutes=30) # 30min后过期SESSION_COOKIE_NAME = 'session', # cookie的名称SESSION_COOKIE_DOMAIN = None, # 域名SESSION_COOKIE_PATH = None # 路径SESSION_COOKIE_HTTPONLY = True # http读取SESSION_COOKIE_SECURE = False # 安全设置SESSION_REFRESH_EACH_REQUEST = True # 在最后访问的基础上加上生命周期
9. flash
@app.route('/page1')
def page1():session['xxx'] = 123456return 'session'@app.route('/page2')
def page2():print(session.get('xxx')) # print(session['xxx'])del session['xxx']session.pop('xxx')return 'session'
@app.route('/page1')
def page1():flash('临时数据存储') # 临时存在内存中一个数据return 'session'@app.route('/page2')
def page2():print(get_flashed_messages()) # 取一次就没有了,实际上用的时popreturn 'session'
'''
'''
['临时数据存储', '临时数据存储']
[]
'''
'''
对临时数据也可以做分类:
@app.route('/page1')
def page1():flash('临时数据存储','info')flash('error','error')return 'session'@app.route('/page2')
def page2():print(get_flashed_messages(category_filter=['error']))return 'session''''
['error', 'error']
[]
'''
10. 中间件
1. call方法的执行
call方法在用户发起请求时才执行。flask 依赖 wsgi ,程序启动时执行的是 run_simple(host, port, self, **options)
。在 app.py
文件中,会把 app 对象传入 run_simple 方法中。请求进来,第三个参数加括号,对象加括号执行__call__方法
。
def wsgi_app(self, environ, start_response):ctx = self.request_context(environ)error = Nonetry:try:ctx.push()response = self.full_dispatch_request()except Exception as e:error = eresponse = self.handle_exception(e)except:error = sys.exc_info()[1]raisereturn response(environ, start_response)finally:if self.should_ignore_error(error):error = Nonectx.auto_pop(error)def __call__(self, environ, start_response):return self.wsgi_app(environ, start_response)
所有的请求入口就在 call 方法。
if __name__ == '__main__':app.run()
app.run()
方法执行后,运行 run_simple(host, port, self, **options)
,在内部就是死循环,等待客户端发送请求(socket在监听连接)。此时 __call__
方法不会执行,有请求就会执行该方法。
修改源码:
def __call__(self, environ, start_response):"""The WSGI server calls the Flask application object as theWSGI application. This calls :meth:`wsgi_app` which can bewrapped to applying middleware."""print('请求之前做的操作')data = self.wsgi_app(environ, start_response)print('请求之后做的操作')return data
if __name__ == '__main__':app.run()
'''
请求之前做的操作
请求之后做的操作
'''
这样做是不合适的,修改源代码是不可选的。
2. 自定义中间件
class Middleware(object):def __init__(self, old):self.old = olddef __call__(self, *args, **kwargs):print('请求之前做的操作')ret = self.old(*args, **kwargs)print('请求之后做的操作')return retif __name__ == '__main__':app.wsgi_app = Middleware(app.wsgi_app)app.run()
通过 Middleware 中间件,在请求之前和请求之后做一些操作。
11. 特殊的装饰器(重要)
(1)before_request 与 after_request
……
app = Flask(__name__)
app.config.from_object('settings.DevelopmentConfig') # 引入配置文件@app.before_request
def test1():print('before_request')@app.after_request
def test2(response):print('after_request')return response@app.route('/index')
def index():print('index')return 'index'@app.route('/login')
def login():print('login')return 'login'if __name__ == '__main__':app.run()
'''
before_request
index
after_request
'''
@app.before_request
def test1():print('before_request_01')@app.before_request
def test2():print('before_request_02')@app.after_request
def test3(response):print('after_request_01')return response@app.after_request
def test4(response):print('after_request_02')return response@app.route('/index')
def index():print('index')return 'index'@app.route('/login')
def login():print('login')return 'login'if __name__ == '__main__':app.run()
'''
before_request_01
before_request_02
index
after_request_02
after_request_01
'''
如果 test1 函数有返回值,不执行 test2。
……
@app.before_request
def test1():print('before_request_01')return ''@app.before_request
def test2():print('before_request_02')@app.after_request
def test3(response):print('after_request_01')return response@app.after_request
def test4(response):print('after_request_02')return response@app.route('/index')
def index():print('index')return 'index'@app.route('/login')
def login():print('login')return 'login'if __name__ == '__main__':app.run()
'''
before_request_01
after_request_02
after_request_01
'''
flask和<=django1.9会执行所有的response返回,但是django1.10及之后的版本会这样执行:
(2) before_first_request
第一次请求的时候才执行,
@app.before_first_request
def test():print('test')@app.route('/index')
def index():return 'index'
(3)template_global 与 template_filter
模板中有介绍,在这里不赘述。
(4) errorhandler(应用比较广)
@app.errorhandler(404)
def page_not_found(arg):return 'This page does not exit'@app.route('/index')
def index():return 'index'if __name__ == '__main__':app.run()
12. 路由和视图
1. 路由和视图函数对应关系
# /index和/login是路由;index和login是视图函数
[('/index', 'index'),('/login', 'login')
]
请求来了,一个个筛选,并且前面的匹配成功,后面的都不再进行匹配了。
2. 路由设置两种方式
① @app.route(’/xxx’)
@app.route('/')
def index():return 'index'
② app.add_url_rule(’/index’,None,index)
def index():return 'index'
app.add_url_rule('/index', None, index)
注意:不要让endpoint重名,如果重名函数一定要相同。
@app.route('/index', endpoint=1)
def index():return 'index'
@app.route('/index', endpoint=1)
def index():return 'index'
上面两个index函数是不同的。
3. @app.route和app.add_url_url参数
- rule:url规则
- view_func:视图函数名称
- defaults = None:默认值,当url中无参数数时,使用defaults = {‘k’:‘v’}为函数提供参数
@app.route('/index', defaults={'k': 'v'})
def index(k):return 'index'
- endpoint = None:名称,用于反向生成 url,即 url_for(‘名称’)
- methods = None:允许请求的方式,如:[‘get’,‘post’]
- strict_slashes = None:对url最后/符号是严格要求的。对于@app.route(’/login’,strict_slashes = False),访问https://www.blueflags.cn/login或https://www.blueflags.cn/login/都可以。但对于@app.route(’/login’,strict_slashes = True),仅能访问https://www.blueflags.cn/login。建议使strict_slashes = True,只有写了什么,才能访问什么,默认也是True。
- redirect_to = None:重定向到指定的地址
① 一般的重定向:
@app.route('/index', redirect_to='/login')
def index():return 'index'@app.route('/login')
def login():return 'login'
② 重定向的时候传参数:
@app.route('/index/<int:nid>', redirect_to='/home/<nid>')
def index():return 'index'@app.route('/home/<int:nid>')
def home(nid):return str(nid)
def func(adapter, nid):return '/home/' + str(nid)@app.route('/index/<int:nid>', redirect_to=func)
def index():return 'index'
- subdomain = None:子域名访问
绑定域名:
hosts
# localhost name resolution is handled within DNS itself.
# 127.0.0.1 localhost
# ::1 localhost127.0.0.1 thanlon.com127.0.0.1 home.thanlon.com127.0.0.1 admin.thanlon.com
app.py:
from flask import Flask
app = Flask(__name__)
app.config['SERVER_NAME'] = 'thanlon.com:5000'# http://admin.thanlon.com:5000
@app.route('/index', subdomain='admin')
def admin():return 'admin.thanlon.com'# http://home.thanlon.com:5000
@app.route('/index', subdomain='home')
def home():return 'home.thanlon.com'if __name__ == '__main__':app.run()
from flask import Flask
app = Flask(__name__)
app.config['SERVER_NAME'] = 'thanlon.com:5000'# http://xxxxx.thanlon.com:5000
@app.route('/', subdomain='<username>')
def index(username):return username + '.thanlon.com'if __name__ == '__main__':app.run()
4. CBV
from flask import Flask, views
import functools
app = Flask(__name__)def wrapper(func):@functools.wraps(func)def inner(*args, **kwargs):return func(*args, **kwargs)return innerclass UserView(views.MethodView):methods = ['GET'] # 表示只支持get请求decorators = [wrapper] # 在执行get和post时,先执行这个装饰器def get(self, *args, **kwargs):return 'GET'def post(self, *args, **kwargs):return 'POST'app.add_url_rule('/index', None, UserView.as_view('uuu'))
5. 自定义正则
- python默认支持的转换器
#: the default converter mapping for the map.
DEFAULT_CONVERTERS = {"default": UnicodeConverter,"string": UnicodeConverter,"any": AnyConverter,"path": PathConverter,"int": IntegerConverter,"float": FloatConverter,"uuid": UUIDConverter,
}
- 自定义正则的三个步骤
① 定制类
class RegexConverter(BaseConveter):def __init__(self,map,regex):super(RegexConverter,self).__ini__(map)self.regex = regexdef to_python(self,value):# 路由匹配时,匹配成功后传递给视图函数中参数的值return int(value) # 可以做一些自定义操作def to_url(self,value):# 反向生成urlvar = super(RegexConverter, self).to_url(value)return var
② 添加转换器
app.url_map.converters['reg'] = RegexConverter # reg是自己定义
③ 使用自定义正则(to_python方法是匹配的时候使用,to_url方法是反向生成url的时候使用)
from flask import Flask, url_forapp = Flask(__name__)
from werkzeug.routing import BaseConverter# 第一步:定制类
class RegexConverter(BaseConverter):def __init__(self, map, regex):super(RegexConverter, self).__init__(map)self.regex = regexdef to_python(self, value):return str(value)def to_url(self, value):var = super(RegexConverter, self).to_url(value)return var# 第二步:添加转换器
app.url_map.converters['reg'] = RegexConverter # reg是自己定义# 第三步:使用自定义正则(to_python方法是匹配的时候使用,to_url方法是反向生成url的时候使用)
# 如果匹配成功,执行to_python方法,该方法返回什么,nid就是什么
@app.route("/index/<reg('\d+'):nid>", methods=['GET'])
def index(nid):'''1. 用户发送请求2. flask内部进行正则匹配3. 调用to_python(正则匹配的结果)方法4. to_python方法的返回值会交给视图函数的参数:param nid: :return: '''# print(nid, type(nid)) # 查看nid的类型print(url_for('index', nid=123)) # 如果反向生成url,执行流程,先将参数nid=123传入to_url方法,让to_url方法帮助我们去反向生成。'''控制台:/index/123'''return 'index'if __name__ == '__main__':app.run()
13. 蓝图
1. 蓝图(一)
实际项目中,需要进行项目目录结构的划分,蓝图就是用来帮助开发者进行目录结构的划分,下面是目录结构划分的几个步骤。
① 新建项目,在项目下新建一个和项目名同名的目录
② 在与项目名同名的目录中新建__ init __.py,用来实例化Flask对象
from flask import Flaskdef create_app():app = Flask(__name__)return app
③ 在项目目录下新建manage.py(也可以是其它的名字),
将create_app函数导入manage.py,在manage.py中:
from test_blueprint import create_appapp = create_app()
if __name__ == '__main__':app.run()
④ 在test_blueprint(非项目目录)下新建views目录,用来放视图函数,在视图函数中可以根据视图的不同进行分类。
⑤ 创建视图函数与app对象的关系,首先在视图函数中,创建Blueprint对象。
admin.py
from flask import Blueprint
admin_bp = Blueprint('admin_bp', __name__) # 创建Blueprint(蓝图)对象,第一个参数name不能同名@admin_bp.route('/login')
def login():return 'admin.login'@admin_bp.route('/logout')
def logout():return 'logout'
home.py
from flask import Blueprint
home_bp = Blueprint('home_bp', __name__) # 第一个参数是name,第一个参数name不能同名@home_bp.route('/login')
def login():return 'home.login'@home_bp.route('/list')
def list():return 'list'
需要修改__ init __.py,注册蓝图,
from flask import Flask
from .views.admin import admin_bp
from .views.home import home_bpdef create_app():app = Flask(__name__)# 注册蓝图app.register_blueprint(home_bp)app.register_blueprint(admin_bp)return app
注意:如果两个或多个视图函数中有同名的路由,如home.py和admin.py都有/login路由,谁先注册先执行谁的视图函数。
⑥ 创建模板。在__ init __.py所在的目录中新建templates目录(Flask实例化的文件所在的目录中),如果有静态文件,放静态文件的目录static与templates目录处于同一级。
在templates目录中建模板文件login.html,可以修改admin.py,
@admin_bp.route('/login')
def login():return render_template('login.html')
这时候就可以访问模板了。
2. 蓝图(二)
① 自定义静态文件和模板的路径
admin_bp = Blueprint('admin_bp', __name__,template_folder='xxx')
@admin_bp.route('/login')
def login():return render_template('login.html')
注意:蓝图优先去templates去找loging.html,找不到去xxx中找。当然静态文件也是可以指定的:
admin_bp = Blueprint('admin_bp', __name__,template_folder='xxx',static_folder='xxxx')@admin_bp.route('/login')
def login():return render_template('login.html')
② 为某一个蓝图地址加上前缀
def create_app():app = Flask(__name__)# 注册蓝图app.register_blueprint(home_bp, url_prefix='/home')app.register_blueprint(admin_bp)return app
按照原先访问方式:http://127.0.0.1:5000/login ,访问失败。
现在需要:http://127.0.0.1:5000/home/login
③ 为所有蓝图加上@before_request
__ init __.py:
from flask import Flask
from .views.admin import admin_bp
from .views.home import home_bpdef create_app():app = Flask(__name__)@app.before_requestdef xxx():return '123'# 注册蓝图app.register_blueprint(home_bp)app.register_blueprint(admin_bp)return app
④ 为指定的蓝图加上@before_request
admin.py:
from flask import Blueprint, render_template
admin_bp = Blueprint('admin_bp', __name__, template_folder='xxx', static_folder='xxxx')@admin_bp.before_request
def xxx():return '123'@admin_bp.route('/login')
def login():return render_template('login.html')@admin_bp.route('/logout')
def logout():return 'logout'
应用场景:可以在指定蓝图中加入登录验证。
14. threading.local
threading.local本身与Flask无关,只是说根据threading.local做的local对象与Flask是有关系的。
# coding:utf-8
import threading
import time
v = 0def task(i):global vv = itime.sleep(1)print(v)for i in range(10):t = threading.Thread(target=task, args=(i,))t.start()
1. threading.local的作用
为每个线程创建一个独立的空间,使得线程对自己空间中的数据进行操作(数据隔离)。
# coding:utf-8
import threading
import time
from threading import localobj = local()def task(i):obj.xxx = itime.sleep(2)print(obj.xxx,i)for i in range(10):t = threading.Thread(target=task, args=(i,))t.start()
2. 如何获取一个线程的唯一标记
# coding:utf-8
import threadingdef task(i):'''获取线程的唯一标识:param i::return:'''print(threading.get_ident(), i)for i in range(10):t = threading.Thread(target=task, args=(i,))t.start()
3. 根据字典自定义一个类似于threading.local功能
# coding:utf-8
import threading
import timeDIC = {}
def task(i):ident = threading.get_ident()if ident in DIC:DIC[ident]['xxx'] = ielse:DIC[ident] = {'xxx': i}time.sleep(2)print(DIC[ident]['xxx'], i)for i in range(10):t = threading.Thread(target=task, args=(i,))t.start()
4. 根据字典为每个协程开辟空间进行存取数据
# coding:utf-8
import threading
import time
import greenletDIC = {}
def task(i):# ident = threading.get_ident()ident = greenlet.getcurrent()if ident in DIC:DIC[ident]['xxx'] = ielse:DIC[ident] = {'xxx': i}time.sleep(2)print(DIC[ident]['xxx'], i)for i in range(10):t = threading.Thread(target=task, args=(i,))t.start()
5. __ getattr__ 与__ setattr__
class Local(object):def __getattr__(self, item):print('__getattr__', item)def __setattr__(self, key, value):print('__setattr__', key, value)obj = Local()
obj.xx
obj.xx = 123
'''
__getattr__ xx
__setattr__ xx 123
'''
6. 通过getattr与setattr构造出threading.local
不仅可以给线程做数据隔离还可以和协程做数据隔离。
import threading
import time# 有协程就使用协程,没有使用线程
try:import greenletget_ident = greenlet.getcurrent
except Exception as e:get_ident = threading.get_identclass Local(object):DIC = {}def __getattr__(self, item):# print('__getattr__', item)ident = get_identif ident in self.DIC:return self.DIC[ident].get(item)else:return Nonedef __setattr__(self, key, value):# print('__setattr__', key, value)ident = get_identif ident in self.DIC:self.DIC[ident][key] = valueelse:self.DIC[ident] = {key: value}
obj = Local()def task(i):obj.xxx = itime.sleep(2)print(obj.xxx, i)for i in range(10):t = threading.Thread(target=task, args=(i,))t.start()
重要总结:如果同时来了几个人请求,每个人请求的数据都是不一样的。每个人请求的数据放在不同的位置,进行数据隔离。取每个请求的数据时,可以根据线程的不同。真正请求来的时候,会通过local对象为请求创建一个空间,在它里面放值,每个请求来都会做数据隔离。视图函数取数据的时候就不会混乱了。如果不使用数据隔离,会导致第一个请求来了,正在处理时第二个请求又来了,把第一个覆盖了。django里面没有这种东西,django请求和session都是通过参数传递过来了,请求之间不会混乱。在django中,请求过来,拿着请求的数据处理请求就行了。
15. 上下文管理
请求到来后创建一个RequestContext对象,里面有request和session。把它放在某个特殊的地方,以后视图函数再去取,这就叫做上下文管理。
① 请求到来时,将session和request封装到ctx对象中,ctx中有request和session
ctx = self.request_context(environ) # self是app对象,environ是请求相关的原始数据
ctx.request = Request(environ)
ctx.session = None
② 对session作补充
③ 将包含了request和session的ctx对象放到一个容器中(可以把这个容器认为是一个字典)。每个请求都会根据线程/协程加一个唯一标识:(线程与线程之间的隔离)
{1234:{ctx:ctx对象},# 1234是线程/协程号1235:{ctx:ctx对象},1236:{ctx:ctx对象},……
}
④ 视图函数使用的时候,需要根据当前线程获取数据。隐含的操作是:根据当前线程或协程的唯一标识,获取ctx对象,再取ctx对象中取request和session。
from flask import request,session
# 如request.method,是从request中取得method
⑤ 请求结束时,根据当前线程/协程的唯一标记,将这个容器上的数据移除。
16. 偏函数与面向对象
1. 偏函数
偏函数可以帮助我们自动传参。
import functoolsdef index(a, b):return a + b# 原来的调用方式
# ret = index(1, 2)
# print(ret)
#偏函数:自动传递参数
new_func = functools.partial(index, 1) # “1”会被当作第一个参数传递进来
ret = new_func(2) # “2”会被当作第二个参数传递进来
print(ret)
2. super和执行类的区别
实例化对象时会执行构造方法,如果没有写,会执行父类的构造方法:
# coding:utf-8
class Foo(object):passobj = Foo() # 会执行构造方法,如果没有写,会执行父类的构造方法
① 根据mro的顺序执行方法:super(Foo, self).func()
# coding:utf-8
class Base(object):def func(self):print('Base.func')class Foo(Base):def func(self):# 方式一:根据mro的顺序执行方法super(Foo, self).func()print('Foo.func')obj = Foo()
obj.func()
'''
Base.func
Foo.func
'''
# coding:utf-8
class Base(object):def func(self):super(Base, self).func()print('Base.func')class Bar(object):def func(self):print('Bar.func')class Foo(Base, Bar):passobj = Foo()
obj.func()
'''
Base.func
Foo.func
'''
# coding:utf-8
class Base(object):def func(self):super(Base, self).func()print('Base.func')class Bar(object):def func(self):print('Bar.func')class Foo(Base, Bar):pass# obj = Foo()
# obj.func()
print(Foo.__mro__)
'''
(<class '__main__.Foo'>, <class '__main__.Base'>, <class '__main__.Bar'>, <class 'object'>)
'''
② 主动执行Base类的方法:Base.func(self)
# coding:utf-8
class Base(object):def func(self):print('Base.func')class Foo(Base):def func(self):# 方式一:根据mro的顺序执行方法# super(Foo, self).func()# 方式二:主动执行Base类的方法Base.func(self) # 如果是对象.func(),self会自动传进来print('Foo.func')obj = Foo()
obj.func()
'''
Base.func
Foo.func
'''
# coding:utf-8
class Base(object):def func(self):super(Base, self).func()print('Base.func')class Bar(object):def func(self):print('Bar.func')class Foo(Base, Bar):passobj = Base()
obj.func()
'''
AttributeError: 'super' object has no attribute 'func'
'''
3. 面向对象中特殊的方法
__ setattr__与__getattr__:当前类中有__setattr__或__getattr__方法会执行当前类的:
class Foo(object):def __getattr__(self, item):print(item)def __setattr__(self, key, value):print(key, value)obj = Foo()
obj.xxx = 'thanlon'
obj.xxx # 会执行父类的__getattr__方法
'''
xxx thanlon
xxx
None
'''
类中没有__setattr__或__getattr__方法,会调用父类的:
class Foo(object):passobj = Foo()
obj.xxx = 'thanlon'#Foo类中没有__setattr__方法,会调用的是父类的
obj.xxx
__ init__
# coding:utf-8
class Foo(object):def __init__(self):self.storage = {}def __getattr__(self, item):print(item)def __setattr__(self, key, value):print(key, value)obj = Foo() # 实例化的时候执行__init__方法,对象.?执行__setattr__方法。
obj.xxx = 'thanlon'
'''
storage {}
xxx thanlon
'''
# coding:utf-8
class Foo(object):def __init__(self):self.storage = {}def __getattr__(self, item):print(item)def __setattr__(self, key, value):print(self.storage)obj = Foo()
obj.xxx = 'thanlon'
'''
storage
None
storage
None
'''
在构造方法中使用self.storage = {},可以通过object类的__setattr__设置其值。(重点理解)
# coding:utf-8
class Foo(object):def __init__(self):# self.storage = {}object.__setattr__(self, 'storage', {})def __setattr__(self, key, value):print(key, value, self.storage)obj = Foo()
obj.xxx = 'thanlon'
'''
xxx thanlon {}
'''
17. Local与LocalStack
1. Local对象与LocalStack作用
Local对象:帮助我们为每个线程/协程开辟空间。
LocalStack对象:帮助我们在Local中维护一个列表,把它维护成一个栈。然后对列表中的数据进行添加和维护。
2. 自定义一个简单的Local
try:from greenlet import getcurrent as get_ident
except:from threading import get_identclass local(object):def __init__(self):'''实例化local的时候,在self中存了一个空字典'''object.__setattr__(self, 'storage', {})def __setattr__(self, key, value):ident = get_ident() # 获取这个标记if ident not in self.storage:self.storage[ident] = {key: value}else:self.storage[ident][key] = valuedef __getattr__(self, item):ident = get_ident()if ident in self.storage:return self.storage[ident].get(item)
3. 对栈中的数据进行添加和维护
try:from greenlet import getcurrent as get_ident
except:from threading import get_identclass Local(object):__slots__ = ("__storage__", "__ident_func__") # 对象只能.__storage__和.__ident_func__,不能.其它def __init__(self):object.__setattr__(self, "__storage__", {})object.__setattr__(self, "__ident_func__", get_ident)def __getattr__(self, name):try:return self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)def __setattr__(self, name, value):ident = self.__ident_func__()storage = self.__storage__try:storage[ident][name] = valueexcept KeyError:storage[ident] = {name: value}def __delattr__(self, name):try:del self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)obj = Local()
obj.name = 'thanlon'
print(obj.name)
'''thanlon'''
try:from greenlet import getcurrent as get_ident
except:from threading import get_identclass Local(object):__slots__ = ("__storage__", "__ident_func__") # 对象只能.__storage__和.__ident_func__,不能.其它def __init__(self):# __storage__ = {1234:{'session':[]}}object.__setattr__(self, "__storage__", {})object.__setattr__(self, "__ident_func__", get_ident)def __getattr__(self, name):try:return self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)def __setattr__(self, name, value):ident = self.__ident_func__()storage = self.__storage__try:storage[ident][name] = valueexcept KeyError:storage[ident] = {name: value}def __delattr__(self, name):try:del self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)obj = Local()
obj.stack = []
obj.stack.append('Thanlon')
obj.stack.append('Kiku')
print(obj.stack)
'''thanlon'''
print(obj.stack.pop())
print(obj.stack)
print(obj.stack.pop())
'''
['Thanlon', 'Kiku']
Kiku
['Thanlon']
Thanlon
'''
try:from greenlet import getcurrent as get_ident
except:from threading import get_identclass Local(object):'''为每个线程开辟空间'''__slots__ = ("__storage__", "__ident_func__") # 对象只能.__storage__和.__ident_func__,不能.其它def __init__(self):# __storage__ = {1234:{'session':[]}}object.__setattr__(self, "__storage__", {})object.__setattr__(self, "__ident_func__", get_ident)def __getattr__(self, name):try:return self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)def __setattr__(self, name, value):ident = self.__ident_func__()storage = self.__storage__try:storage[ident][name] = valueexcept KeyError:storage[ident] = {name: value}def __delattr__(self, name):try:del self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)
# obj = Local()
# obj.stack = []
# obj.stack.append('Thanlon')
# obj.stack.append('Kiku')
# print(obj.stack)
# '''thanlon'''
# print(obj.stack.pop())
# print(obj.stack)
# print(obj.stack.pop())
'''
['Thanlon', 'Kiku']
Kiku
['Thanlon']
Thanlon
'''
class LocalStack(object):'''帮助我们方便对local中的一个列表维护成一个栈,没有它也可以在local中存取数据,有了LocalStack操作跟家便捷'''def __init__(self):'''__storage__ = {}'''self._local = Local()def push(self, value):rv = getattr(self._local, 'stack', None) # self._local.stack=>local.getattrif rv is None:self._local.stack = rv = [] # self._local.stack=>local.setattrrv.append(value)return rvdef pop(self):stack = getattr(self._local, "stack", None)if stack is None:return Noneelif len(stack) == 1:return stack[-1]else:return stack.pop()xxx = LocalStack()
xxx.push('thanlon')
xxx.push('Kiku')
xxx.pop()
try:from greenlet import getcurrent as get_ident
except:from threading import get_identclass Local(object):'''为每个线程开辟空间'''__slots__ = ("__storage__", "__ident_func__") # 对象只能.__storage__和.__ident_func__,不能.其它def __init__(self):# __storage__ = {1234:{'session':[]}}object.__setattr__(self, "__storage__", {})object.__setattr__(self, "__ident_func__", get_ident)def __getattr__(self, name):try:return self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)def __setattr__(self, name, value):ident = self.__ident_func__()storage = self.__storage__try:storage[ident][name] = valueexcept KeyError:storage[ident] = {name: value}def __delattr__(self, name):try:del self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)class LocalStack(object):'''帮助我们方便对local中的一个列表维护成一个栈,没有它也可以在local中存取数据,有了LocalStack操作跟家便捷'''def __init__(self):'''__storage__ = {}'''self._local = Local()def push(self, value):rv = getattr(self._local, 'stack', None) # self._local.stack=>local.getattrif rv is None:self._local.stack = rv = [] # self._local.stack=>local.setattrrv.append(value)return rvdef pop(self):stack = getattr(self._local, "stack", None)if stack is None:return Noneelif len(stack) == 1:return stack[-1]else:return stack.pop()def top(self):try:return self._local.stack[-1]except (AttributeError, IndexError):return Nonels_obj = LocalStack()class RequestContent(object):def __init__(self):self.request = 'request'self.session = 'session'ls_obj.push(RequestContent())
obj = ls_obj.top()
print(obj)
print(obj.request)
print(obj.session)
'''
<__main__.RequestContent object at 0x0000021DC4A3CA90>
request
session
'''
4. 使用LocalStack向Local添加对象
try:from greenlet import getcurrent as get_ident
except:from threading import get_identclass Local(object):'''为每个线程开辟空间'''__slots__ = ("__storage__", "__ident_func__") # 对象只能.__storage__和.__ident_func__,不能.其它def __init__(self):# __storage__ = {1234:{'session':[]}}object.__setattr__(self, "__storage__", {})object.__setattr__(self, "__ident_func__", get_ident)def __getattr__(self, name):try:return self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)def __setattr__(self, name, value):ident = self.__ident_func__()storage = self.__storage__try:storage[ident][name] = valueexcept KeyError:storage[ident] = {name: value}def __delattr__(self, name):try:del self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)class LocalStack(object):'''帮助我们方便对local中的一个列表维护成一个栈,没有它也可以在local中存取数据,有了LocalStack操作跟家便捷'''def __init__(self):'''__storage__ = {}'''self._local = Local()def push(self, value):rv = getattr(self._local, 'stack', None) # self._local.stack=>local.getattrif rv is None:self._local.stack = rv = [] # self._local.stack=>local.setattrrv.append(value)return rvdef pop(self):stack = getattr(self._local, "stack", None)if stack is None:return Noneelif len(stack) == 1:return stack[-1]else:return stack.pop()def top(self):try:return self._local.stack[-1]except (AttributeError, IndexError):return Noneclass RequestContent(object):def __init__(self):self.request = 'request'self.session = 'session'_request_ctx_stack = LocalStack()
_request_ctx_stack.push(RequestContent())def get_request():ctx = _request_ctx_stack.top()return ctx.requestdef get_session():ctx = _request_ctx_stack.top()return ctx.sessiondef get_request_or_session(arg):ctx = _request_ctx_stack.top()return getattr(ctx, arg) # ctx.arg,如果是arg是request就是ctx.request,如果arg是session就是ctx.session# print(get_request())
# print(get_session())
# print(get_request_or_session('request'))
# print(get_request_or_session('session'))
import functools# 使用偏函数
request = functools.partial(get_request_or_session, 'request')
session = functools.partial(get_request_or_session, 'session')
print(request())
print(session())
'''
request
session
'''
18. Flask请求上下文管理原理
1. request上下文管理
请求的流程:
① wsgi
② 创建ctx = RequestContext(session,request),ctx.push()
③ LocalStack,把ctx对象添加到Local中
④ Local中存取数据使用__storage__ = {唯一标识: {stack: [ctx, ]}}
获取请求相关的数据(视图函数取请求相关的数据):
# coding:utf-8
from flask import Flask,Request
from flask.globals import _request_ctx_stackapp = Flask(__name__)@app.route('/')
def index():ctx = _request_ctx_stack.top # 获取ctx对象print(ctx) # <RequestContext 'http://127.0.0.1:5000/' [GET] of request上下文管理>print(ctx.request) # <Request 'http://127.0.0.1:5000/' [GET]>print(ctx.request.environ) #{'wsgi.version': (1, 0), 'wsgi.url_scheme': 'http', 'wsgi.input': <_io.BufferedReader name=816>, 'wsgi.errors': <_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>, 'wsgi.multithread': True, 'wsgi.multiprocess': False, 'wsgi.run_once': False, 'werkzeug.server.shutdown': <function WSGIRequestHandler.make_environ.<locals>.shutdown_server at 0x000001D68A80BBF8>, 'SERVER_SOFTWARE': 'Werkzeug/0.15.4', 'REQUEST_METHOD': 'GET', 'SCRIPT_NAME': '', 'PATH_INFO': '/', 'QUERY_STRING': '', 'REQUEST_URI': '/', 'RAW_URI': '/', 'REMOTE_ADDR': '127.0.0.1', 'REMOTE_PORT': 55003, 'SERVER_NAME': '127.0.0.1', 'SERVER_PORT': '5000', 'SERVER_PROTOCOL': 'HTTP/1.1', 'HTTP_HOST': '127.0.0.1:5000', 'HTTP_CONNECTION': 'keep-alive', 'HTTP_CACHE_CONTROL': 'max-age=0', 'HTTP_UPGRADE_INSECURE_REQUESTS': '1', 'HTTP_USER_AGENT': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36', 'HTTP_SEC_FETCH_MODE': 'navigate', 'HTTP_SEC_FETCH_USER': '?1', 'HTTP_ACCEPT': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3', 'HTTP_SEC_FETCH_SITE': 'none', 'HTTP_ACCEPT_ENCODING': 'gzip, deflate, br', 'HTTP_ACCEPT_LANGUAGE': 'zh-CN,zh;q=0.9', 'HTTP_COOKIE': 'session=.eJwVzDEOgCAQRNG7bO0muyOoy2UMAoWxw2BjvLvQTF4x-S-lVuve7tKnnZkCZVEVD-GcLLETr2yIkRFNt2OFMxSa6NL-fXQIQ-gaGQqK2fnl-wG6SBh8.XVNcEA.0NiiN7ldvhokI3sKsstrb0GhOQs', 'werkzeug.request': <Request 'http://127.0.0.1:5000/' [GET]>}print(ctx.request.method) # GETreturn 'index'if __name__ == '__main__':app.run()# app.__call__ # 请求进来执行__call__,把数据交给__call__方法# app.wsgi_app # __call__方法会调用wsgi_app
2. 请求相关数据流程图
如果是多线程(并发),如果同时来了多个请求,wsgi_app对请求数据进行分别封装(每个线程有自己的ctx),然后交给LocalStack,LocalStack负责把数据放在Local中。注意使用一个LocalStack,一个Local。为了标记不同的请求,使用线程的唯一标识。最后
__storage__{123:{stack:[ctx,]},345:{stack:[ctx,]}},
取数据时,根据线程的ID取各自的值。
3. session上下文管理
① wsgi
② 创建ctx
ctx = RequestContext(session=None,request)
ctx.push()
③ LocalStack,把ctx对象添加到Local中
④ Local中存取数据使用
__storage__ = {唯一标识: {stack: [ctx, ]}
}
⑤ 通过LocalStack获取ctx中的session,给session赋值(从cookie中读取数据,进行解密反序列化)。调用的是open_session,处理完执行save_session。
获取session:
# coding:utf-8
from flask import Flask,Request
from flask.globals import _request_ctx_stack
app = Flask(__name__)@app.route('/')
def index():ctx = _request_ctx_stack.top # 获取ctx对象print(ctx.session)return 'index'if __name__ == '__main__':app.run()# app.__call__ # 请求进来执行__call__,把数据交给__call__方法# app.wsgi_app # __call__方法会调用wsgi_app
19. flask-session
flask-sesion组件可以帮助我们把默认的 session 放入到加密的 cookie 中。
(1)安装 flask-session:pip install flask-session
(2)session 保存在 Redis 中
# coding:utf-8
from flask import Flask, request, session
from flask.sessions import SecureCookieSessionInterface
# from flask.ext.session import Session#以前导入session的方式,与下面是一样的
from flask_session import Session
import redisapp = Flask(__name__)
'''
session保存在redis中
'''
# app.session_interface = SecureCookieSessionInterface() # 默认
# app.session_interface 换成flaks-session中的内容
# app.session_interface =RedisSessionInterface()
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = redis.Redis(host='主机IP', port=6739, password='密码!')
Session(app)@app.route('/login')
def login():session['user'] = 'thanlon'return '……'@app.route('/')
def index():print(session.get('user')) # 没有不报错,session['user']没有会报错return 'index'if __name__ == '__main__':app.run()# app.__call__ # 请求进来执行__call__,把数据交给__call__方法# app.wsgi_app # __call__方法会调用wsgi_app
(3)查看Redis中存入的session
look_cookie.py:
# coding:utf-8
import redis
conn = redis.Redis(host='106.12.115.136',port=6379)
print(conn.keys())
'''
[b'session:.eJwVzDEOgCAQRNG7bO0muyOoy2UMAoWxw2BjvLvQTF4x-S-lVuve7tKnnZkCZVEVD-GcLLETr2yIkRFNt2OFMxSa6NL-fXQIQ-gaGQqK2fnl-wG6SBh8.XVNcEA.0NiiN7ldvhokI3sKsstrb0GhOQs']
'''
(4) flask-session的原理
① session数据保存在Redis。Redis的key是session:随机字符串,每一个随机字符串对应一个自己的值。每一个用户都会有一个随机字符串。
② 将随机字符串返回给用户。
(5)flask-session源码
from flask_session import RedisSessionInterface
20. wtforms组件
1. 用户登录
form.py
:
# coding:utf-8
from wtforms import Form
from wtforms.fields import simple
from wtforms import widgets, validatorsclass LoginForm(Form):# name = simple.StringField()name = simple.StringField(validators=[validators.DataRequired(message='用户名不能为空'),validators.Length(min=6,max=20,message='用户名长度必须大于%(min)d且小于%(max)d')],widget=widgets.TextInput(),render_kw={'placeholder': '请输入用户名', 'class': 'form-control'})# pwd = simple.PasswordField()pwd = simple.PasswordField(validators=[validators.DataRequired(message='密码不能为空')],render_kw={'placeholder': '请输入密码','class': 'form-control'})
app.py:
from flask import Flask, render_template, request
from forms import LoginFormapp = Flask(__name__)
app.debug = True@app.route('/')
def index():return 'index'@app.route('/login', methods=['get', 'post'])
def login():if request.method == 'GET':form = LoginForm()# print(form.name, type(form.name)) # name是一个对象,print的时候执行这个对象的__str__方法,# print(form.pwd, type(form.pwd)) # name是一个对象,print的时候执行这个对象的__str__方法'''<input id="name" name="name" type="text" value=""> <class 'wtforms.fields.core.StringField'><input id="pwd" name="pwd" type="password" value=""> <class 'wtforms.fields.simple.PasswordField'>'''return render_template('login.html', form=form)form = LoginForm(formdata=request.form)if form.validate():print(form.data) # {'name': '123456', 'pwd': '123456'}return '验证成功!'else:# print(form.errors) # {'name': ['用户名不能为空'], 'pwd': ['密码不能为空']}return render_template('login.html', form=form)if __name__ == '__main__':app.run()
login.html:
<form method="POST" novalidate><div class="form-group"><label>用户名</label>{{form.name}}<div style="color:red">{{form.name.errors[0]}}</div></div><div class="form-group"><label>密 码</label>{{form.pwd}}<div style="color:red">{{form.pwd.errors[0]}}</div></div><button type="submit" class="btn btn-primary">登录</button></form>
2. 用户注册
form.py:
class RegisterForm(Form):name = simple.StringField(label='用户名',validators=[validators.DataRequired(message='密码不能为空!')],widget=widgets.TextInput(),render_kw={'class': 'form-control'},default='thanlon')pwd = simple.PasswordField(label='密码',validators=[validators.DataRequired(message='密码不能为空!')],widget=widgets.PasswordInput(),render_kw={'class': 'form-control'})pwd_confirm = simple.PasswordField(label='重复密码',validators=[validators.DataRequired(message='重复密码不能为空!'),validators.EqualTo('pwd', message='两次密码输入不一致!')],widget=widgets.PasswordInput(),render_kw={'class': 'form-control'})email = html5.EmailField(label='邮箱',validators=[validators.DataRequired(message='邮箱不能为空!'),validators.Email(message='邮箱格式错误!')],widget=widgets.TextInput(input_type='email'),render_kw={'class': 'form-control'})gender = core.RadioField(label='性别',choices=((1, '男'), (2, '女')),coerce=int, # int('1'))city = core.SelectField(label='城市',choices=(('bj', '北京'),('sh', '上海')))hobby = core.SelectMultipleField(label='爱好',choices=((1, '羽毛球'),(2, '篮球')),coerce=int)favor = core.SelectMultipleField(label='喜好',choices=((1, '羽毛球'),(2, '篮球')),widget=widgets.ListWidget(prefix_label=False),option_widget=widgets.CheckboxInput(),coerce=int,default=[2])
register.html:
<form method="post" novalidate>{% for field in form %}{{field.label}} {{field}}{{field.errors[0]}}{% endfor %}<button type="submit" class="btn btn-primary">登录</button></form>
app.py:
@app.route('/register',methods=['get','post'])
def register():if request.method == 'GET':form = RegisterForm()return render_template('register.html', form=form)form = RegisterForm(formdata=request.form)if form.validate():print(form.data)return redirect('https://www.blueflags.cn')return render_template('register.html', form=form)
3. 数据库实时更新
① 数据相关
mysql> create table city(id int primary key auto_increment,city_name varchar(20) not null );
mysql> insert city values(null,'beijin');
mysql> insert city values(null,'shanghai');
mysql> insert city values(null,'shenzhen');
② 效果图
③ 主要代码
class UserForm(Form):city = core.SelectField(label='城市',choices=helper.fetch_all('select *from city', [], type=None), # 只是在第一次从数据库获取一次coerce=int, # 自动把用户提交的字符串转换为数字)
选项中的数据只是在第一次请求的时候从数据库获取一次,当数据更新时,用户页面中的数据并没有实时更新。所以,代码是存在问题的。
解决方案:对于UserForm类,每次实例化的时候都去从数据库中获取一次数据,可以这样写:
class UserForm(Form):city = core.SelectField(label='城市',choices=(), # 只是在第一次从数据库获取一次coerce=int, # 自动把用户提交的字符串转换为数字)# 如果没有写执行Form的构造方法def __init__(self, *args, **kwargs):super(UserForm, self).__init__(*args, **kwargs) # 在UserForm中执行什么在父类中还要执行什么self.city.choices = helper.fetch_all('select *from city', [], type=None)
实例化UserForm的时候传值:
@app.route('/user', methods=['get', 'post'])
def user():if request.method == 'GET':form = UserForm(data = {'name':'thanlon','city':3})return render_template('user.html',form=form)
4. wtforms的作用与django示例
wtforms有两个作用:1. 自动生成html标签;2.对用户请求数据进行校验
在django中想实现实时更新,也需要重写构造方法。
views.py:
from django.shortcuts import render
from django.forms import Form
from django.forms import fields
from app01 import modelsclass IndexForm(Form):title = fields.CharField()# group = fields.ChoiceField(# choices=(# (1, 'A'),# (2, 'B')# )# )group = fields.ChoiceField(choices=())def __init__(self, *args, **kwargs):super(IndexForm, self).__init__(*args, **kwargs)self.fields['group'].choices = models.UserGroup.objects.all().values_list('id', 'title')# Create your views here.
def index(request):if request.method == 'GET':form = IndexForm()return render(request, 'index.html', {'form': form})
21. SQLAlchemy
1. 简介
SQLAlchemy 是 ORM(对象关系映射)框架,类名对应表名,类中的字段对应数据表的列,对象对应数据表的一行。SQLAlchemy 可以帮助我们使用类和对象快速实现数据库操作
。我们在公司里要么是用原生sql,要么是用ORM框架。原生 SQL 中可以用 MySQLdb 和 pymysql 模块,MySQLdb 的用法和 pymysql 基本上是完全一样的。两者在使用上是有区别的,MySQLdb不支持python3,pymysql 支持 python2 和 python3
。对于Python Web框架,如果有自己的使用自己的,如果没有ORM一般都用SQLAlchemy。
2. CURD
① 创建一个数据表
models.py:
# coding:utf-8
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, Date, DateTime
from sqlalchemy import create_engine
Base = declarative_base()class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=False)engine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1, # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
# Base.metadata.drop_all(engine)#删除这个数据表
Base.metadata.create_all(engine) # 创建这个数据表
users表:
+-------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------+-------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| name | varchar(32) | NO | MUL | NULL | |
+-------+-------------+------+-----+---------+----------------+
注意:SQLAlchemy默认是不可以修改创建好的数据表的,需要用到第三方组件
② 封装操作:防止models.py被导入的时候执行,把创建表和删除表封装在函数中
# coding:utf-8
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, Date, DateTime
from sqlalchemy import create_engineBase = declarative_base()class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=False)def create_all():engine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1, # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置)Base.metadata.create_all(engine) # 创建这个数据表def drop_all():engine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1, # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置)Base.metadata.drop_all(engine) # 删除这个数据表
if __name__ == '__main__':pass# create_all()# drop_all()
③ 添加数据
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Usersengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1, # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)
'''
根据Users类对users表进行增删改查
'''
session = SessionFactory() # 创建一个连接
'''
增加单条数据
'''
# obj = Users(name='thanlon')
# session.add(obj)
'''
添加多条数据
'''
# obj = Users(name='thanlon')
# obj2 = Users(name='kiku')
# session.add(obj)
# session.add(obj2)
'''
添加多条数据的时候也可以把这些对对象放在列表中
'''
session.add_all([Users(name='thanlon'),Users(name='kiku')
])session.commit()
session.close()
④ 查询数据
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Usersengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1, # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)
'''
根据Users类对users表进行增删改查
'''
session = SessionFactory() # 创建一个连接
'''
查询表中所有内容
'''
# ret = session.query(Users).all()
# # print(ret)#拿到的是对象
# # [<models.Users object at 0x7f9cc4ca1080>, <models.Users object at 0x7f9cc4ca10f0>, <models.Users object at 0x7f9cc4ca1160>, <models.Users object at 0x7f9cc4ceee80>, <models.Users object at 0x7f9cc4cf50f0>, <models.Users object at 0x7f9cc4cf5160>]
# for row in ret:
# print(row.id)
# print(row.name)
'''
根据条件查询
'''
# ret = session.query(Users).filter(Users.id > 3) # filter中直接加表达式
# for row in ret:
# print(row.id, row.name)
# '''
# 4 kiku
# 5 thanlon
# 6 kiku
# '''
'''
根据条件查询,拿到第一条数据
'''
ret = session.query(Users).filter(Users.id > 3).first() # filter中直接加表达式
print(ret.id, ret.name)
'''
4 kiku
'''
session.commit()
session.close()
⑤ 删除数据
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Usersengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1, # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)
session = SessionFactory() # 创建一个连接
'''
删除数据
'''
# session.query(Users).delete()
session.query(Users).filter(Users.id > 3).delete()
session.commit()
⑥ 修改数据
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Usersengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1, # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)
session = SessionFactory() # 创建一个连接
'''
修改数据
'''
# session.query(Users).filter(Users.id == 1).update({Users.name: 'liuyuqin'})
# session.query(Users).filter(Users.id == 1).update({'name': Users.name + 'hello'}, synchronize_session=False)#等效于下面的
session.query(Users).filter(Users.id == 1).update({Users.name: Users.name + 'hello'}, synchronize_session=False)
session.commit()
3. 常用操作
① 指定列
ret = session.query(Users.id, Users.name.label('cname')).all()
for item in ret:print(item, type(item), item[0], item.id, item.cname)# item是<class 'sqlalchemy.util._collections.result'>,打印的是元组,实际不是元组类型,但可以通过元组去值。能够通过索引去取,说明该类中有__item__方法
query = session.query(Users.id, Users.name.label('cname')) # query的值就是sql语句
② 1. 默认条件是and
session.query(Users).filter(Users.id == 1, Users.name == 'thanlon').all()
③ between
session.query(Users).filter(Users.id.between(1, 2), Users.name == 'thanlon').all()
④ in
session.query(Users).filter(Users.id.in_([1, 2, 3])).all()
session.query(Users).filter(~Users.id.in_([1, 2, 3])).all()
⑤ 子查询
session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name == 'thanlon'))).all()
⑥ and和or
from sqlalchemy import and_, or_session.query(Users).filter(and_(Users.id > 1, Users.name == 'thanlon')).all() # 没有and_也是一样的表示and
session.query(Users).filter(or_(Users.id > 1, Users.name == 'thanlon')).all() # 或者
session.query(Users).filter(or_(Users.id > 1, and_(Users.name == 'thanlon', Users.id > 3))
).all()
⑦ filter_by
session.query(Users).filter_by(name='thanlon').all() # 条件,内部也是使用filter
⑧ 通配符
session.query(Users).filter(Users.name.like('t_')).all() # t后面有一个字符
session.query(Users).filter(~Users.name.like('t%')).all() # 以t开头
⑨ 切片/分页
session.query(Users)[1, 3]
排序:
session.query(Users).order_by(Users.name.desc()).all() # 按照姓名从大到小
session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 先按照name从大到小排列,如果同名按照id升序排列
分组:
from sqlalchemy.sql import func'''
第一种情况
'''
# ret = session.query(func.max(Users.id)).group_by(Users.depart_id).all()
# for item in ret:
# print(item)
'''
第二种情况
'''
# ret = session.query(func.min(Users.id)).group_by(Users.depart_id).all()
# for item in ret:
# print(item)
'''
第三种情况
'''
# ret = session.query(Users.depart_id,func.count(Users.id)).group_by(Users.depart_id).all()
# for item in ret:
# print(item)
# '''
# (1, 2)
# (2, 1)
# '''
'''
第四种情况:根据聚合条件二次筛选用having(部门人数大于等于2)
'''
ret = session.query(Users.depart_id, func.count(Users.id)).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all()
for item in ret:print(item)'''(1, 2)'''
# 注意:如果使用group,根据聚合的结果进行二次筛选时,只能用having
组合:将两个表上下拼接(union和union all),而join是左右拼接
'''
union:不会去重
'''
# q = session.query(Users.name).filter(Users.id > 0)
# q2 = session.query(Admin.admin_name).filter(Admin.id > 0)
# ret = q.union(q2).all()
# for item in ret:
# print(item.name)
'''
union:去重
'''
q = session.query(Users.name).filter(Users.id > 0)
q2 = session.query(Admin.admin_name).filter(Admin.id > 0)
ret = q.union_all(q2).all()
for item in ret:print(item.name)
4. 外键操作
① 创建两个表,注意Users表中是加了外键的
class Depart(Base):__tablename__ = 'depart'id = Column(Integer, primary_key=True)title = Column(String(32), index=True, nullable=False)class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=False)depart_id = Column(Integer, ForeignKey('depart.id')) # depart小写代表表名,不是类名
插入数据:
mysql> insert depart values(null,'technology');
mysql> insert depart values(null,'develop');
mysql> insert users values(null,'thanlon',1);
mysql> insert users values(null,'yuqin',2);
② 外键操作:
我们在这里可以查询所有用户的信息和所属部门名称,具体操作如下:
# ret = session.query(Users, Depart).join(Depart).all() #depart_id = Column(Integer, ForeignKey('depart.id')),默认根据depart.id等效于下面的
ret = session.query(Users, Depart).join(Depart, Users.depart_id == Depart.id).all() # on的条件是:depart_id == Depart.id
for row in ret:print(row[0].id, row[0].name, row[1].title)
也可以指定要查询的具体内容,
# 也可以指定获取指定的内容
ret = session.query(Users.id, Users.name, Depart.title.label('as_title')).join(Depart,Users.depart_id == Depart.id).all()
for row in ret:print(row.id, row.name, row.as_title)
可以使用多个join,但只有左连接没有右连接的概念,右连接是没有意义的。此外,下面query变量打印出来是一个sql语句,query变量的类型是:<class ‘sqlalchemy.orm.query.Query’>。
query = session.query(Users.id, Users.name, Depart.title).join(Depart, Users.depart_id == Depart.id,isouter=True) # 没有right join是没有意义的
我们把在对应users的Users类中添加一个dp字段,注意这个字段不会在表中存在,只有Column实例化的对象才会对应数据表中的列。加入relationship可以帮助我们(跨表)做关联查询和创建关联数据。
from sqlalchemy.orm import relationship
class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=False)depart_id = Column(Integer, ForeignKey('depart.id')) # depart小写代表表名,不是类名dp = relationship('Depart',backref = 'pers')
比如上面的“查询所有用户的信息和所属部门名称”可以这样操作:
ret = session.query(Users).all()
print(ret) # [<models.Users object at 0x7f5b935bb7f0>, <models.Users object at 0x7f5b935bb860>……]
for row in ret:print(row.id, row.name, row.depart_id, row.dp.title)
还可以反向跨表,比如查询销售部为“develop”的所有人员,我们可以这样操作:
obj = session.query(Depart).filter(Depart.title == 'develop').first()
print(obj.pers) # [<models.Users object at 0x7fef08122160>]
for row in obj.pers:print(row.id, row.name, obj.title)
上面两个例子是使用表示relationship可以做关联查询,下面的例子则会介绍一次性创建关联数据。
比如有这样的需求“创建一个名称是‘finance’的部门并在该部门中添加员工:thanlon”,一般情况下我们是这样操作的:
dp = Depart(title='finance')
session.add(dp)
session.commit()
#再根据根据finance部门id创建员工
u = Users(name='thanlon', depart_id=dp.id)
session.add(u)
session.commit()
上面是一种方式,有了relationship,我们可以这样创建:
u = Users(name='thanlon', dp=Depart(title='finance'))
session.add(u)
session.commit()
可以看到简单多了吧!
问题来了,如果创建一个部门再在这个部门创建多个员工,如果按照上面的例子,会又创建若干个finance部门。当然,如果按照上上个例子一个个添加员工也是可以的。但是,我们还有简便的方式,你可以这样做:
# 创建一个部门,添加多个员工
dp = Depart(title='finance')
dp.pers = [Users(name='a'),Users(name='b'),Users(name='c')
]
session.add(dp)
session.commit()
5. 多对多操作
① 多对多表的创建
创建学生表、课程表和中间表(部分代码省略)
from sqlalchemy import Column, Integer, String, Text, Date, DateTime, ForeignKey, UniqueConstraint, Indexclass Student(Base):__tablename__ = 'student'id = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=False)class Course(Base):__tablename__ = 'course'id = Column(Integer, primary_key=True)title = Column(String(32), index=True, nullable=False)class Student2Course(Base):__tablename__ = 'student2course'id = Column(Integer, primary_key=True, autoincrement=True)student_id = Column(Integer, ForeignKey('student.id'))course_id = Column(Integer, ForeignKey('course.id'))__table_args__ = (UniqueConstraint('student_id', 'course_id', name='uc_student_course'), # 联合唯一索引# Index('i_student_course', 'student_id', 'course_id')#联合索引)
② 添加数据
添加Student表和Course表数据:
session.add_all([Student(name='thanlon'),Student(name='Aurora'),Student(name='kiku'),Course(title='English'),Course(title='Computer'),
])
session.commit()
添加中间表数据:
session.add_all([Student2Course(student_id=4,course_id=1),Student2Course(student_id=4,course_id=2),
])
session.commit()
session.add_all([Student2Course(student_id=5,course_id=1),
])
session.commit()
session.add_all([Student2Course(student_id=6,course_id=2),
])
session.commit()
③ 多对多操作
操作1:查询学生姓名和对应的选课课程名(三张表关联查询)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Users, Depart, Student, Course, Student2Courseengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1, # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)
session = SessionFactory()
ret = session.query(Student2Course.id, Student.name, Course.title).join(Student,Student2Course.student_id == Student.id,isouter=True).join(Course, Student2Course.course_id == Course.id, isouter=True).order_by(Student2Course.id.asc())
for row in ret:print(row)'''(1, 'thanlon', 'English')(2, 'thanlon', 'Computer')(3, 'Aurora', 'English')(4, 'kiku', 'Computer')'''
session.close()
操作2:学生“thanlon”选的所有课
……
ret = session.query(Student2Course.id, Student.name, Course.title).join(Student,Student2Course.student_id == Student.id,isouter=True).join(Course, Student2Course.course_id == Course.id, isouter=True).filter(Student.name == 'Thanlon').order_by(Student2Course.id.asc()).all()
print(ret)
# [(1, 'thanlon', 'English'), (2, 'thanlon', 'Computer')]
……
上面两个例子是一般多对关联查询操作,下面使用relationship,首先需要修改建Student表语句:
model.py:
class Student(Base):__tablename__ = 'student'id = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=False)course_list = relationship('Course', secondary='student2course', backref='student_list') # 根据与Course做关联
使用relationship来查询来执行操作2:学生“thanlon”选的所有课,可以这样:
obj = session.query(Student).filter(Student.name == 'thanlon').first()
for item in obj.course_list:print(item.title)'''ComputerEnglish'''
除了使用relationship正向查找,还可以反向查找,比如:查找选择“Computer”的所有人,
obj = session.query(Course).filter(Course.title == 'computer').first()
# print(obj)#<models.Course object at 0x7fe264215668>
for item in obj.student_list:print(item.name)'''thanlonkiku'''
操作3:插入数据示例,需求“创建一个课程,创建两个学生,两个学生选新创建的课程”
这里使用relationship,
obj = Course(title='math')
obj.student_list = [Student(name='Maria'), Student(name='Michael')]
session.add(obj)#在course表增加数据,在student表增加两条数据,在中间关系表中增加两条
session.commit()
6. 两种连接数据库的方式
① 自己实例化session对象
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Users
from threading import Threadengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1, # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)def task():# 去连接池获取一个连接session = SessionFactory()ret = session.query(Users).all()# 将连接交给连接池session.close()if __name__ == '__main__':t = Thread(target=task)t.start()
如果用同一个连接,多线程请求数据时,这个连接很可能忙不过来,数据很有可能出现混乱。所以,实例化session对象(创建一个连接)的代码是不可以放在task函数外面(全局)。此外,session.close()会把仅有的一个连接释放,其它线程是不可以用到的,因为这种情况下连接只创建了一次。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Users
from threading import Threadengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1, # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)# 去连接池获取一个连接
session = SessionFactory()def task():ret = session.query(Users).all()# 将连接交给连接池session.close()if __name__ == '__main__':t = Thread(target=task)t.start()
② 通过Threading.Local实现的(推荐使用)
基于Threading.Local也是为每个线程获取一个连接。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Users
from threading import Thread
from sqlalchemy.orm import scoped_sessionengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1, # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)
# 去连接池获取一个连接
session = scoped_session(SessionFactory)def task():ret = session.query(Users).all()# 将连接交给连接池session.remove()if __name__ == '__main__':t = Thread(target=task)t.start()
7. 原生SQL
Django里面可以执行原生SQL,SQLAlchemy也可以执行原生SQL,如果SQL语句比较复杂,ORM解决不了,那么可以使用原生SQL。SQLAlchemy 执行原生 SQL的 方式有好多种,在这里介绍其中两种。
方式1
:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Users
from threading import Thread
from sqlalchemy.orm import scoped_sessionengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1, # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)
# 去连接池获取一个连接
session = scoped_session(SessionFactory)def task():'''方式1'''# 查询# cursor = session.execute('select *from users')# ret = cursor.fetchall()# 插入cursor = session.execute('insert users(name) values(:value)', params={'value': 'thanlon'})session.commit()print(cursor.lastrowid)if __name__ == '__main__':t = Thread(target=task)t.start()
方式2
:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Users
from threading import Thread
from sqlalchemy.orm import scoped_sessionengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0, # 超过连接池大小外最多创建的连接pool_size=5, # 连接池大小pool_timeout=30, # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1, # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)
# 去连接池获取一个连接
session = scoped_session(SessionFactory)def task():'''方式2'''conn = engine.raw_connection()cursor = conn.cursor()cursor.execute('select *from users')ret = cursor.fetchall()print(ret) # ((1, 'liuyuqinhellohello'), (2, 'thanlon'), (3, 'thanlon'), (7, 'thanlon'))cursor.close()conn.close()if __name__ == '__main__':t = Thread(target=task)t.start()
Flask入门到实战相关推荐
- 《Flask Web开发实战:入门、进阶与原理解析》读书笔记
写在前面 学docker编排,有一个用Flask框架的Demo,感觉挺方便,所以学习下 基于<Flask Web开发实战:入门.进阶与原理解析>做的读书笔记 个人还是比较喜欢看书,看书的话 ...
- Flask入门之Jinjia模板的一些语法
原文:https://www.cnblogs.com/wongbingming/p/6807771.html Flask入门之Jinjia模板的一些语法 1. 变量表示 {{ argv }} 2. 赋 ...
- 《Docker技术入门与实战》pdf
下载地址:网盘下载 内容简介 · · · · · · [编辑推荐] <Docker技术入门与实战>是中国首部Docker著作,一线Docker先驱实战经验结晶,来自IBM和新浪等多位技术 ...
- 【Python全栈开发从入门到实战】持续更新中......
本专栏为Python全栈开发系列文章,技术包括Python基础.函数.文件.面向对象.网络编程.并发编程.MySQL数据库.HTML.JavaScript.CSS.JQuery.bootstrap.W ...
- 《Go语言从入门到实战》学习笔记(1)——Go语言学习路线图、简介
非常有幸在<极客时间>上看到<Go语言从入门到实战>这门课程,本课程的作者给出了较为详细的学习路线图,具体如下: 学习路线图 学习目的 个人学习的目的主要是了解Go语言的基本 ...
- PyTorch深度学习入门与实战(案例视频精讲)
作者:孙玉林,余本国 著 出版社:中国水利水电出版社 品牌:智博尚书 出版时间:2020-07-01 PyTorch深度学习入门与实战(案例视频精讲)
- 7-Python3从入门到实战—基础之数据类型(字典-Dictionary)
Python从入门到实战系列--目录 字典的定义 字典是另一种可变容器模型,且可存储任意类型对象:使用键-值(key-value)存储,具有极快的查找速度: 字典的每个键值(key=>value ...
- 《Android 开发入门与实战(第二版)》——6.6节配置改变
本节书摘来自异步社区<Android 开发入门与实战(第二版)>一书中的第6章,第6.6节配置改变,作者eoe移动开发者社区 组编 , 姚尚朗 , 靳岩,更多章节内容可以访问云栖社区&qu ...
- 【前端开发】HTML入门与实战
[什么是HTML]: HTML: 超文本标记语言,标准通用标记语言下的一个应用. "超文本"就是指页面内可以包含图片.链接,甚至音乐.程序等非文字元素. HTML 是用来描述网页的 ...
最新文章
- MyEclipse中运行环境jre、编译级别、tomcat运行环境区别
- Plitch for the final Feb 16
- zcmu-1783(01字典树)
- 红旗linux怎么更新,红旗linux7.0下自动更新firefox
- 妙用Python集合求解啤酒问题(携程2016笔试题)
- pandas小记:pandas索引和选择
- C#给图片加文字水印
- web of knowledge分析文献引用情况(引)
- 《Adobe Fireworks CS5中文版经典教程》——1.2 工具面板
- MyQQ:可以在终端里面上的QQ
- Nordic 52832工程报错undefined reference to `__start_pwr_mgmt_data'解决办法
- 华为p20支持手机云闪付吗_华为哪些手机支持云闪付
- [工业互联-2]:工业有线互联总线之CAN总线
- [附源码]JAVA+ssm计算机毕业设计房屋租赁管理系统设计(程序+Lw)
- oracle-DDL对表的操作
- 一次 WebResource.axd 异常处理经历
- JQuery修改对象的属性值
- 函数极限的24种定义
- Ubuntu硬盘挂载
- 读LEO《程序员羊皮卷》心得笔记