文章目录

  • 1. Flask初识
    • 1. Flask概述
    • 2. wsgi
    • 3. 安装Flask
    • 4. werkzurg
    • 5. 第一个Flask应用
  • 2. 配置文件的使用
    • 1. 配置文件导入原理
    • 2. 配置文件的使用
  • 3. 路由系统
    • 1. 反向生成URL
    • 2. 动态路由的构造
  • 4. 请求与响应相关数据
    • 1. 请求相关数据
    • 2. 响应响应体
    • 3. 响应响应头
  • 5. 简单登录案例
    • 1. 案例目录结构
    • 2. 案例主要代码
    • 3. 案例页面效果
  • 6. 用户认证
    • 1. 第一种用户认证
    • 2. 装饰器知识点补充
    • 3. endpoint参数
    • 4. 装饰器的先后顺序
    • 5. 第二种用户认证方式
    • 6. 第三种用户认证方式
  • 7. 模板渲染
    • 1. 数据类型的渲染
    • 2. 传入函数
    • 3. 全局定义函数
    • 4. 模板的继承
    • 5. 模板的包含
    • 6. 宏定义的使用
    • 7. 模板安全
  • 8. Session
    • 1. Session原理
    • 2. 自定义Session配置
  • 9. flash
  • 10. 中间件
    • 1. call方法的执行
    • 2. 自定义中间件
  • 11. 特殊的装饰器(重要)
  • 12. 路由和视图
    • 1. 路由和视图函数对应关系
    • 2. 路由设置两种方式
    • 3. @app.route和app.add_url_url参数
    • 4. CBV
    • 5. 自定义正则
  • 13. 蓝图
    • 1. 蓝图(一)
    • 2. 蓝图(二)
  • 14. threading.local
    • 1. threading.local的作用
    • 2. 如何获取一个线程的唯一标记
    • 3. 根据字典自定义一个类似于threading.local功能
    • 4. 根据字典为每个协程开辟空间进行存取数据
    • 5. __ getattr__ 与__ setattr__
    • 6. 通过getattr与setattr构造出threading.local
  • 15. 上下文管理
  • 16. 偏函数与面向对象
    • 1. 偏函数
    • 2. super和执行类的区别
    • 3. 面向对象中特殊的方法
  • 17. Local与LocalStack
    • 1. Local对象与LocalStack作用
    • 2. 自定义一个简单的Local
    • 3. 对栈中的数据进行添加和维护
    • 4. 使用LocalStack向Local添加对象
  • 18. Flask请求上下文管理原理
    • 1. request上下文管理
    • 2. 请求相关数据流程图
    • 3. session上下文管理
  • 19. flask-session
  • 20. wtforms组件
    • 1. 用户登录
    • 2. 用户注册
    • 3. 数据库实时更新
    • 4. wtforms的作用与django示例
  • 21. SQLAlchemy
    • 1. 简介
    • 2. CURD
    • 3. 常用操作
    • 4. 外键操作
    • 5. 多对多操作
    • 6. 两种连接数据库的方式
    • 7. 原生SQL

我的网站:https://pythoneers.cn

1. Flask初识
1. Flask概述

Flask框架是一个短小精悍且可扩展强的Web框架,同Django框架一样依赖于第三方实现socket模块。

Flask特有上下文管理机制!

2. wsgi

WEB服务网关接口,是一个协议。Flask 中实现该协议的模块有 wsgirefwerkzeug,模块本质上就是 socket服务端用于接收客户端的请求并处理

面试中可能会被用到wsgi是什么?

3. 安装Flask

一般使用 pip 安装 Flask:pip install Flask,如果想加速下载,可以指定国内阿里云 pipy 的源,安装方法是:pip install flask -i https://mirrors.aliyun.com/pypi/simple

4. werkzurg

Flask 中实现 wsgi 的模块是 werkzurg,它是Flask第三方模块。

from werkzeug.serving import run_simpledef run(environ, start_response):return 'Hello Flask!'if __name__ == '__main__':run_simple('localhost', 5000, run)
from werkzeug.wrappers import Request, Response
from werkzeug.serving import run_simple@Request.application
def hello_flask(request):return Response('Hello Flask!')if __name__ == '__main__':run_simple('localhost', 5000, hello_flask)

Django中实现wsgi的模块是wsgiref

5. 第一个Flask应用
from flask import Flaskapp = Flask(__name__)  # 实例化Flask类@app.route('/index')
def index():return 'Hello Flask!'if __name__ == '__main__':app.run()  # run方法启动socket
2. 配置文件的使用
1. 配置文件导入原理

给一个类的字符串路径,找到类并且获取类中大写的静态字段。

main.py:

class FlaskClass:FLAG = True

test.py:

import importlibpath = 'setting.FlaskClass'
p, c = path.rsplit('.', maxsplit=1)
# print(p, c)  # setting FlaskClass
m = importlib.import_module(p)
# print(m)  # <module 'setting' from 'C:\\Users\\Thanlon\\PycharmProjects\\flask_pro\\setting.py'>
cls = getattr(m, c)
print(cls)  # 找到类:<class 'setting.FlaskClass'>
# print(dir(cls))  # 类成员
for key in dir(cls):if key.isupper():print('key:', key)print('value:', getattr(cls, key))"""
<class 'setting.FlaskClass'>
key: FLAG
value: True
"""
2. 配置文件的使用

Flask配置文件是一个 flask.config.Config 对象,它继承字典,默认配置文件如下:

print(type(app.config), app.config)
"""
<class 'flask.config.Config'>
"""
{'ENV': 'production',
'DEBUG': False,
'TESTING': False,
'PROPAGATE_EXCEPTIONS': None,
'PRESERVE_CONTEXT_ON_EXCEPTION': None,
'SECRET_KEY': None,
'PERMANENT_SESSION_LIFETIME': datetime.timedelta(days=31),
'USE_X_SENDFILE': False,
'SERVER_NAME': None, 'APPLICATION_ROOT': '/',
'SESSION_COOKIE_NAME': 'session',
'SESSION_COOKIE_DOMAIN': None,
'SESSION_COOKIE_PATH': None,
'SESSION_COOKIE_HTTPONLY': True,
'SESSION_COOKIE_SECURE': False,
'SESSION_COOKIE_SAMESITE': None,
'SESSION_REFRESH_EACH_REQUEST': True,
'MAX_CONTENT_LENGTH': None,
'SEND_FILE_MAX_AGE_DEFAULT': datetime.timedelta(seconds=43200), 'TRAP_BAD_REQUEST_ERRORS': None,
'TRAP_HTTP_EXCEPTIONS': False,
'EXPLAIN_TEMPLATE_LOADING': False,
'PREFERRED_URL_SCHEME': 'http',
'JSON_AS_ASCII': True,
'JSON_SORT_KEYS': True,
'JSONIFY_PRETTYPRINT_REGULAR': False,
'JSONIFY_MIMETYPE': 'application/json',
'TEMPLATES_AUTO_RELOAD': None,
'MAX_COOKIE_SIZE': 4093}

修改默认配置文件,创建配置文件 settings.py,用于存放修改的配置文件:

settings.py

# 存放开发环境、线上环境、测试环境共有的配置文件
class Config(object):pass# 线上环境配置文件
class ProductionConfig(Config):DEBUG = False# 开发环境配置文件
class DevelopmentConfig(Config):DEBUG = True# 测试环境配置文件
class TestingConfig(Config):DEBUG = True

app.py:

from flask import Flaskapp = Flask(__name__)
# print(type(app.config), app.config)
app.config.from_object('settings.DevelopmentConfig')  # 引入配置文件if __name__ == '__main__':app.run()
3. 路由系统
1. 反向生成URL

使用 url_for 方法生成 url

@app.route('/index', methods=['GET', 'POST'], endpoint='name')
def index():print(url_for('name'))return 'Flask!'

使用 url_for 方法生成url时设置参数:

@app.route('/index/<int:id>')
def index(id):print(url_for('index', id=1))return 'Flask!'
"""
/index/1
"""

如果不指定 endpoint 的值,默认为函数名:

def _endpoint_from_view_func(view_func):assert view_func is not None, 'expected view func if endpoint ' \'is not provided.'return view_func.__name__ # 返回函数名
def add_url_rule(self,rule,endpoint=None,view_func=None,provide_automatic_options=None, **options):if endpoint is None:endpoint = _endpoint_from_view_func(view_func)options['endpoint'] = endpointmethods = options.pop('methods', None)
@app.route('/index', methods=['GET', 'POST'])
def index():print(url_for('index'))return 'Flask!'

如果endpoint非要重名,必须保证函数相同。

2. 动态路由的构造
@app.route('/index/<username>') # 没有指定类型是字符串类型
@app.route('/index/<int:id>')
@app.route('/index/<float:id>')
@app.route('/index/<path:path>')

示例:

@app.route('/index/<float:id>')
def index(id):print(type(id),id)return 'Flask!'
浏览器输入:http://127.0.0.1:5000/index/1.0
控制台输出:<class 'float'> 1.0
4. 请求与响应相关数据
1. 请求相关数据

常用请求的相关信息:

request.method
request.args
request.form
request.cookies
request.headers

其它请求的相关信息:

request.values
request.path
request.full_path
request.script_root
request.url
request.base_url
request.url_root
request.host_url
request.host
request.files
obj = request.files['the_file_name']
obj.save('/uploads/'+secure_filename(f.filename))
2. 响应响应体

响应字符串:

@app.route('/index')
def index():return 'Flask'  # 字符串

响应模板:

@app.route('/index')
def index():return render_template() # 模板

响应重定向:

@app.route('/index')
def index():return redirect('') # 重定向

响应json格式数据 [from flask import json]:

@app.route('/index')
def index():return json.dumps({'name': 'thanlon'})  # {'name': 'thanlon'}

另一种响应json格式数据的方式 [from flask import jsonify]:

@app.route('/index')
def index():return jsonify({'name': 'thanlon'}) # {'name': 'thanlon'}

不能响应字典,即return dic是错误的!

3. 响应响应头

以上是响应响应体,还可以响应响应头和cookie,只需要从Flask模块中导入make_response进行封装即可。(from flask import make_response

示例:将字符串类型的响应体与响应头和cookie一同封装响应:

@app.route('/index')
def index():obj = make_response('Flask')obj.headers['response_flag'] = 'response_info'obj.set_cookie('key','value')return obj

响应模板、响应重定向、响应json格式数据和示例就一样的。

5. 简单登录案例
1. 案例目录结构

2. 案例主要代码

app.py:

from flask import Flask, render_template, request, redirect, session# app = Flask(__name__, static_folder='static', static_url_path='/thanlon')  # 可指定前缀
app = Flask(__name__)
app.secret_key = 'thanlon'@app.route('/login', methods=['GET', 'POST'])  # 默认允许GET请求
def login():if request.method == 'GET':return render_template('login.html')username = request.form.get('username')pwd = request.form.get('pwd')if username == 'Thanlon' and pwd == '123':session['username'] = username  # 默认session保存在签名或加密的cookie中return redirect('/index')# return render_template('login.html', error='用户名或者密码错误!')return render_template('login.html', **{'error': '用户名或者密码错误!'})@app.route('/index')  # 默认允许GET请求
def index():username = session.get('username')if not username:return redirect('/login')return render_template('index.html')if __name__ == '__main__':app.run()

login.html:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1"><title>登录</title><link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css"integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">
</head>
<body>
<div class="container"><div class="row" style="margin-top:30px"><div class="col-md-4 col-md-offset-4"><div class="panel panel-primary"><div class="panel-heading">登录</div><div class="panel-body"><form method="POST"><div class="form-group"><label>用户名</label><input type="text" name="username" class="form-control" id="" placeholder="Username"required="required"></div><div class="form-group"><label>密码</label><input type="password" name="pwd" class="form-control" placeholder="Password"required="required"><div style="color:red">{{error}}</div></div><button type="submit" class="btn btn-primary">登录</button></form></div></div></div></div>
</div>
</body>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/js/bootstrap.min.js"integrity="sha384-Tc5IQib027qvyjSMfHjOMaLkfuWVxZxUPnCJA7l2mCWNIpG9mGCD8wGNIcPD7Txa"crossorigin="anonymous"></script>
</html>

index.html:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1"><title>首页</title><link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/css/bootstrap.min.css"integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">
</head>
<body>
<div class="container"><div class="row" style="margin-top:30px"><div class="panel panel-default"><div class="panel-body">欢迎{{session['username']}}进入首页!</div></div></div>
</div>
</body>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@3.3.7/dist/js/bootstrap.min.js"integrity="sha384-Tc5IQib027qvyjSMfHjOMaLkfuWVxZxUPnCJA7l2mCWNIpG9mGCD8wGNIcPD7Txa"crossorigin="anonymous"></script>
</html>
3. 案例页面效果

登录失败:

登录成功:

6. 用户认证
1. 第一种用户认证

settings.py

# 存放开发环境、线上环境、测试环境共有的配置文件
class Config(object):DEBUG = FalseSECRET_KEY = 'thanlon'

app.py

from flask import Flask, render_template, redirect, url_for, request, sessionapp = Flask(__name__)
app.config.from_object('settings.DevelopmentConfig')  # 引入配置文件USER_DICT = {1: {'username': 'thanlon', 'age': '23', 'gender': '男'},2: {'username': 'kiku', 'age': '25', 'gender': '女'}
}
@app.route('/login', methods=['GET', 'POST'])  # 默认允许GET请求
def login():if request.method == 'GET':return render_template('login.html')username = request.form.get('username')pwd = request.form.get('pwd')if username == 'Thanlon' and pwd == '123':session['username'] = username  # 默认session保存在签名或加密的cookie中return redirect('/index')# return render_template('login.html', error='用户名或者密码错误!')return render_template('login.html', **{'error': '用户名或者密码错误!'})@app.route('/index')
def hello_world():if not session.get('username'):return redirect(url_for('login'))return render_template('index.html', user_dict=USER_DICT)@app.route('/delete/<int:nid>')
def delete(nid):if not session.get('username'):return redirect(url_for('login'))del USER_DICT[nid]return redirect(url_for('hello_world'))  # url_for('')写函数名,和路由无关@app.route('/detail/<int:nid>')
def detail(nid):if not session.get('username'):return redirect(url_for('login'))info = USER_DICT[nid]return render_template('detail.html', info=info)if __name__ == '__main__':app.run()

index.html:

……
<div class="container"><div class="row"><div class="col-md-4"><table class="table table-hover"><tr><th style="text-align: center">ID</th><th style="text-align: center">用户名</th><th style="text-align: center">年龄</th><th style="text-align: center">性别</th><th style="text-align: center">操作</th></tr>{% for k,v in user_dict.items() %}<tr style="text-align: center"><td>{{ k }}</td><td>{{ v.username }}</td><td>{{ v.age }}</td><td>{{ v.gender }}</td><td><a href="/detail/{{ k }}">详情</a> | <a href="/delete/{{ k }}">删除</a></td></tr>{#                    <tr>#}{#                        <td>{{ v['username'] }}</td>#}{#                        <td>{{ v['age'] }}</td>#}{#                        <td>{{ v['gender'] }}</td>#}{#                    </tr> #}{#                                        <tr>#}{#                                            <td>{{ v.get('usename','默认') }}</td>#}{#                                            <td>{{ v.get('age') }}</td>#}{#                                            <td>{{ v.get('gender') }}</td>#}{#                                        </tr>#}{% endfor %}</table></div></div>
</div>
……

detail.html

……
<body>
{#{'username': 'thanlon', 'age': '23', 'gender': '男'}#}
<br>
{% for item in info.values() %}{#% for item in info.values() %#}{{ item }}
{% endfor %}
</body>
2. 装饰器知识点补充

函数没有加装饰器:

# coding:utf-8
def auth(func):def inner(*args, **kwargs):ret = func(*args, **kwargs)return retreturn inner
def index():print('index')
print(index.__name__)  # 打印函数名
'''index'''

函数加装饰器,让inner函数伪装成index函数:

# coding:utf-8
def auth(func):def inner(*args, **kwargs):ret = func(*args, **kwargs)return retreturn inner
@auth
def index():print('index')
@auth
def login():print('login')
print(index.__name__)  # 打印函数名
print(login.__name__)  # 打印函数名
'''inner'''
'''inner'''

上面的例子中加装饰器虽然把名字伪装成功,但是内部没有伪装成功,可以使用functools.wraps(func):把原来函数执行的信息放到inner函数中(把原来的函数名放进闭包):

# coding:utf-8
import functools
def auth(func):@functools.wraps(func)def inner(*args, **kwargs):ret = func(*args, **kwargs)return retreturn inner
@auth
def index():print('index')
@auth
def login():print('login')
print(index.__name__)  # 打印index函数名
print(login.__name__)  # 打印login函数名
'''
index
login
'''
3. endpoint参数

可以指定,默认是函数名。作用是:反向生成url,如果同名会出错。查看route函数源码:

def add_url_rule(self, rule, endpoint=None, view_func=None,
provide_automatic_options=None, **options):if view_func is not None:old_func = self.view_functions.get(endpoint)if old_func is not None and old_func != view_func:raise AssertionError('View function mapping is overwriting an ''existing endpoint function: %s' % endpoint)self.view_functions[endpoint] = view_func
……def route(self, rule, **options):def decorator(f):endpoint = options.pop('endpoint', None)self.add_url_rule(rule, endpoint, f, **options)return freturn decorator
4. 装饰器的先后顺序

装饰器和路由配合,把url和函数加入到路由关系中:

@app.route('/login', methods=['GET', 'POST'])  # 默认允许GET请求
def hello_world():if not session.get('username'):return redirect(url_for('login'))return render_template('index.html', user_dict=USER_DICT)

如果想要再加入一个登录验证的装饰器,如果把 @auth</kbd>加入到<kbd>@app.route('/login', methods=['GET', 'POST']) 上面,那就是url和函数是一个大整体,再用 @auth 装饰,这显然不是我们的目标。函数应该优先和 @auth 结合,再去做路由和函数的路由对应关系。装饰完成之后,多个路由都对应 inner,endpoint 默认是函数名,这样所有的函数都是 inner,这也是有问题。有两种解决方式。

方式一:自己定义endpoint,

@app.route('/index',endpoint='1')
……
@app.route('/delete/<int:nid>',endpoint='2')
……

方式二:使用 @functools.wraps(func)

import functools
def auth(func):@functools.wraps(func)def inner(*args, **kwargs):result = func(*args, **kwargs)return resultreturn inner@app.route('/index', endpoint='1')
@auth
def hello_world():if not session.get('username'):return redirect(url_for('login'))return render_template('index.html', user_dict=USER_DICT)
5. 第二种用户认证方式

app.py

def auth(func):@functools.wraps(func)def inner(*args, **kwargs):result = func(*args, **kwargs)return resultreturn inner@app.route('/index', endpoint='1')
@auth
def hello_world():if not session.get('username'):return redirect(url_for('login'))return render_template('index.html', user_dict=USER_DICT)

这有个不好的地方,就是每一个需要做认证的都需要一个一个加入装饰器@auth。

应用场景:比较少的函数中需要额外加入功能。

6. 第三种用户认证方式

可以使用 before_request,返回None表示可以通过执行,如果返回其它,不能通过执行:

@app.before_request
def auth():if request.path == '/login':return Noneif session.get('username'):return None# return redirect('/login')return redirect(url_for('login'))

应用场景:批量给函数加额外的功能!

7. 模板渲染
1. 数据类型的渲染

显示列表:

app.py

@app.route('/tpl')
def tpl():content = {'user': ['thanlon', 'kiku']}return render_template('tpl.html', **content)

tpl.html

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
{{ user.0 }}{#支持django的写法#}
{{ user[1] }}{#支持python的写法#}
</body>
</html>


显示元组:

app.py

@app.route('/tpl')
def tpl():content = {'user': ('thanlon', 'kiku')}return render_template('tpl.html', **content)

tpl.html

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
{{ user.0 }}
{{ user[1] }}
</body>
</html>


显示字符串:

app.py:

@app.route('/tpl')
def tpl():content = {'txt': '<input type="text"/>'}return render_template('tpl.html', **content)

tpl.html:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
{{ txt }}
{{ txt|safe }}
</body>
</html>


使用Markup函数:
app.py:

@app.route('/tpl')
def tpl():content = {'txt': Markup('<input type="text"/>')}return render_template('tpl.html', **content)

tpl.html:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
{{ txt }}
</body>
</html>
2. 传入函数

app.py:

def func(arg):return arg + 1
@app.route('/tpl')
def tpl():content = {'func': func}return render_template('tpl.html', **content)

tpl.html:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
{{ func(1) }}<!--2-->
</body>
</html>
3. 全局定义函数

传参的时候自己可以传,全局也可以传参,template_global() 装饰器就是用来传参的。

app.py:

@app.template_global()
def test(a, b):# {{ test(1,2) }}return a + b@app.route('/tpl')
def tpl():content = {}return render_template('tpl.html', **content)

tpl.html:

……
<body>
{{ test(1,2) }}
</body>
……

template_filter() 装饰器也就是用来传参的,

app.py:

@app.template_filter()
def test(a, b, c):# {{ 1|test(2,3) }}return a + b + c@app.route('/tpl')
def tpl():content = {}return render_template('tpl.html', **content)

tpl.html:

<body>
{{ 1|test(2,3) }}
{% if 1|test(2,3) %}
</body>

template_filter与template_global区别是:template_filter可以放在 if 后面做条件。

4. 模板的继承

tpl.html 继承 layout.html,

tpl.html:

{% extends 'layout.html' %}
{% block content %}{% if 1|test(2,3) %}<div>True</div>{% else %}<div>False</div>{% endif %}
{% endblock %}

layout.html:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
<h4>继承</h4>
{% block content %}{% endblock %}
</body>
</html>
5. 模板的包含

tpl.html:

{% include 'form.html' %}

form.html:

<form action="">form表单
</form>
6. 宏定义的使用

tpl.html:

{#默认是不显示的#}
{% macro input(name,type='text',value='') %}<input type="{{ type }}" name="{{ name }}" value="{{ value }}">
{% endmacro %}
{#如果想显示#}
{{ input('username') }}
7. 模板安全

前端:{{u|safe}}

后端:Markup("<input type=“text”/ >")

8. Session
1. Session原理


当请求刚进来时,flask 读取 cookie 中 session 对应的值,将该值解密并反序列化为字典,放入内存以便视图函数使用。

视图函数:

@app.route('/session')
def sess():# from flask.sessions import SecureCookieSessionprint(type(session))  # session继承了字典,字典有什么方法,它就有什么方法session['k1'] = 'v1'return 'session''''<class 'werkzeug.local.LocalProxy'>'''

当请求结束时,flask会读取内存中字典的值,进行序列化加密,写入到用户的cookie中。

2. 自定义Session配置

settings.py:

class Config(object):# PERMANENT_SESSION_LIFETIME = datetime.timedelta(days=31)  # 生命周期,最多的保存时间# PERMANENT_SESSION_LIFETIME = datetime.timedelta(hours=1)PERMANENT_SESSION_LIFETIME = datetime.timedelta(minutes=30)  # 30min后过期SESSION_COOKIE_NAME = 'session',  # cookie的名称SESSION_COOKIE_DOMAIN = None,  # 域名SESSION_COOKIE_PATH = None  # 路径SESSION_COOKIE_HTTPONLY = True  # http读取SESSION_COOKIE_SECURE = False  # 安全设置SESSION_REFRESH_EACH_REQUEST = True  # 在最后访问的基础上加上生命周期
9. flash
@app.route('/page1')
def page1():session['xxx'] = 123456return 'session'@app.route('/page2')
def page2():print(session.get('xxx'))  # print(session['xxx'])del session['xxx']session.pop('xxx')return 'session'
@app.route('/page1')
def page1():flash('临时数据存储') # 临时存在内存中一个数据return 'session'@app.route('/page2')
def page2():print(get_flashed_messages()) # 取一次就没有了,实际上用的时popreturn 'session'
'''
'''
['临时数据存储', '临时数据存储']
[]
'''
'''

对临时数据也可以做分类:

@app.route('/page1')
def page1():flash('临时数据存储','info')flash('error','error')return 'session'@app.route('/page2')
def page2():print(get_flashed_messages(category_filter=['error']))return 'session''''
['error', 'error']
[]
'''
10. 中间件
1. call方法的执行

call方法在用户发起请求时才执行。flask 依赖 wsgi ,程序启动时执行的是 run_simple(host, port, self, **options)。在 app.py 文件中,会把 app 对象传入 run_simple 方法中。请求进来,第三个参数加括号,对象加括号执行__call__方法

def wsgi_app(self, environ, start_response):ctx = self.request_context(environ)error = Nonetry:try:ctx.push()response = self.full_dispatch_request()except Exception as e:error = eresponse = self.handle_exception(e)except:error = sys.exc_info()[1]raisereturn response(environ, start_response)finally:if self.should_ignore_error(error):error = Nonectx.auto_pop(error)def __call__(self, environ, start_response):return self.wsgi_app(environ, start_response)

所有的请求入口就在 call 方法。

if __name__ == '__main__':app.run()

app.run() 方法执行后,运行 run_simple(host, port, self, **options),在内部就是死循环,等待客户端发送请求(socket在监听连接)。此时 __call__ 方法不会执行,有请求就会执行该方法。

修改源码:

def __call__(self, environ, start_response):"""The WSGI server calls the Flask application object as theWSGI application. This calls :meth:`wsgi_app` which can bewrapped to applying middleware."""print('请求之前做的操作')data = self.wsgi_app(environ, start_response)print('请求之后做的操作')return data
if __name__ == '__main__':app.run()
'''
请求之前做的操作
请求之后做的操作
'''

这样做是不合适的,修改源代码是不可选的。

2. 自定义中间件
class Middleware(object):def __init__(self, old):self.old = olddef __call__(self, *args, **kwargs):print('请求之前做的操作')ret = self.old(*args, **kwargs)print('请求之后做的操作')return retif __name__ == '__main__':app.wsgi_app = Middleware(app.wsgi_app)app.run()

通过 Middleware 中间件,在请求之前和请求之后做一些操作。

11. 特殊的装饰器(重要)

(1)before_request 与 after_request

……
app = Flask(__name__)
app.config.from_object('settings.DevelopmentConfig')  # 引入配置文件@app.before_request
def test1():print('before_request')@app.after_request
def test2(response):print('after_request')return response@app.route('/index')
def index():print('index')return 'index'@app.route('/login')
def login():print('login')return 'login'if __name__ == '__main__':app.run()
'''
before_request
index
after_request
'''
@app.before_request
def test1():print('before_request_01')@app.before_request
def test2():print('before_request_02')@app.after_request
def test3(response):print('after_request_01')return response@app.after_request
def test4(response):print('after_request_02')return response@app.route('/index')
def index():print('index')return 'index'@app.route('/login')
def login():print('login')return 'login'if __name__ == '__main__':app.run()
'''
before_request_01
before_request_02
index
after_request_02
after_request_01
'''

如果 test1 函数有返回值,不执行 test2。

……
@app.before_request
def test1():print('before_request_01')return ''@app.before_request
def test2():print('before_request_02')@app.after_request
def test3(response):print('after_request_01')return response@app.after_request
def test4(response):print('after_request_02')return response@app.route('/index')
def index():print('index')return 'index'@app.route('/login')
def login():print('login')return 'login'if __name__ == '__main__':app.run()
'''
before_request_01
after_request_02
after_request_01
'''

flask和<=django1.9会执行所有的response返回,但是django1.10及之后的版本会这样执行:

(2) before_first_request

第一次请求的时候才执行,

@app.before_first_request
def test():print('test')@app.route('/index')
def index():return 'index'

(3)template_global 与 template_filter

模板中有介绍,在这里不赘述。

(4) errorhandler(应用比较广)

@app.errorhandler(404)
def page_not_found(arg):return 'This page does not exit'@app.route('/index')
def index():return 'index'if __name__ == '__main__':app.run()
12. 路由和视图
1. 路由和视图函数对应关系
# /index和/login是路由;index和login是视图函数
[('/index', 'index'),('/login', 'login')
]

请求来了,一个个筛选,并且前面的匹配成功,后面的都不再进行匹配了。

2. 路由设置两种方式

① @app.route(’/xxx’)

@app.route('/')
def index():return 'index'

② app.add_url_rule(’/index’,None,index)

def index():return 'index'
app.add_url_rule('/index', None, index)

注意:不要让endpoint重名,如果重名函数一定要相同。

@app.route('/index', endpoint=1)
def index():return 'index'
@app.route('/index', endpoint=1)
def index():return 'index'

上面两个index函数是不同的。

3. @app.route和app.add_url_url参数
  • rule:url规则
  • view_func:视图函数名称
  • defaults = None:默认值,当url中无参数数时,使用defaults = {‘k’:‘v’}为函数提供参数
@app.route('/index', defaults={'k': 'v'})
def index(k):return 'index'
  • endpoint = None:名称,用于反向生成 url,即 url_for(‘名称’)
  • methods = None:允许请求的方式,如:[‘get’,‘post’]
  • strict_slashes = None:对url最后/符号是严格要求的。对于@app.route(’/login’,strict_slashes = False),访问https://www.blueflags.cn/login或https://www.blueflags.cn/login/都可以。但对于@app.route(’/login’,strict_slashes = True),仅能访问https://www.blueflags.cn/login。建议使strict_slashes = True,只有写了什么,才能访问什么,默认也是True。
  • redirect_to = None:重定向到指定的地址

① 一般的重定向:

@app.route('/index', redirect_to='/login')
def index():return 'index'@app.route('/login')
def login():return 'login'

② 重定向的时候传参数:

@app.route('/index/<int:nid>', redirect_to='/home/<nid>')
def index():return 'index'@app.route('/home/<int:nid>')
def home(nid):return str(nid)
def func(adapter, nid):return '/home/' + str(nid)@app.route('/index/<int:nid>', redirect_to=func)
def index():return 'index'
  • subdomain = None:子域名访问
    绑定域名:

hosts

# localhost name resolution is handled within DNS itself.
#   127.0.0.1       localhost
#   ::1             localhost127.0.0.1       thanlon.com127.0.0.1       home.thanlon.com127.0.0.1       admin.thanlon.com

app.py:

from flask import Flask
app = Flask(__name__)
app.config['SERVER_NAME'] = 'thanlon.com:5000'# http://admin.thanlon.com:5000
@app.route('/index', subdomain='admin')
def admin():return 'admin.thanlon.com'# http://home.thanlon.com:5000
@app.route('/index', subdomain='home')
def home():return 'home.thanlon.com'if __name__ == '__main__':app.run()

from flask import Flask
app = Flask(__name__)
app.config['SERVER_NAME'] = 'thanlon.com:5000'# http://xxxxx.thanlon.com:5000
@app.route('/', subdomain='<username>')
def index(username):return username + '.thanlon.com'if __name__ == '__main__':app.run()
4. CBV
from flask import Flask, views
import functools
app = Flask(__name__)def wrapper(func):@functools.wraps(func)def inner(*args, **kwargs):return func(*args, **kwargs)return innerclass UserView(views.MethodView):methods = ['GET']  # 表示只支持get请求decorators = [wrapper]  # 在执行get和post时,先执行这个装饰器def get(self, *args, **kwargs):return 'GET'def post(self, *args, **kwargs):return 'POST'app.add_url_rule('/index', None, UserView.as_view('uuu'))
5. 自定义正则
  • python默认支持的转换器
#: the default converter mapping for the map.
DEFAULT_CONVERTERS = {"default": UnicodeConverter,"string": UnicodeConverter,"any": AnyConverter,"path": PathConverter,"int": IntegerConverter,"float": FloatConverter,"uuid": UUIDConverter,
}
  • 自定义正则的三个步骤

① 定制类

class RegexConverter(BaseConveter):def __init__(self,map,regex):super(RegexConverter,self).__ini__(map)self.regex = regexdef to_python(self,value):# 路由匹配时,匹配成功后传递给视图函数中参数的值return int(value) # 可以做一些自定义操作def to_url(self,value):# 反向生成urlvar = super(RegexConverter, self).to_url(value)return var

② 添加转换器

app.url_map.converters['reg'] = RegexConverter  # reg是自己定义

③ 使用自定义正则(to_python方法是匹配的时候使用,to_url方法是反向生成url的时候使用)

from flask import Flask, url_forapp = Flask(__name__)
from werkzeug.routing import BaseConverter# 第一步:定制类
class RegexConverter(BaseConverter):def __init__(self, map, regex):super(RegexConverter, self).__init__(map)self.regex = regexdef to_python(self, value):return str(value)def to_url(self, value):var = super(RegexConverter, self).to_url(value)return var# 第二步:添加转换器
app.url_map.converters['reg'] = RegexConverter  # reg是自己定义# 第三步:使用自定义正则(to_python方法是匹配的时候使用,to_url方法是反向生成url的时候使用)
# 如果匹配成功,执行to_python方法,该方法返回什么,nid就是什么
@app.route("/index/<reg('\d+'):nid>", methods=['GET'])
def index(nid):'''1. 用户发送请求2. flask内部进行正则匹配3. 调用to_python(正则匹配的结果)方法4. to_python方法的返回值会交给视图函数的参数:param nid: :return: '''# print(nid, type(nid))  # 查看nid的类型print(url_for('index', nid=123))  # 如果反向生成url,执行流程,先将参数nid=123传入to_url方法,让to_url方法帮助我们去反向生成。'''控制台:/index/123'''return 'index'if __name__ == '__main__':app.run()
13. 蓝图
1. 蓝图(一)

实际项目中,需要进行项目目录结构的划分,蓝图就是用来帮助开发者进行目录结构的划分,下面是目录结构划分的几个步骤。
① 新建项目,在项目下新建一个和项目名同名的目录

② 在与项目名同名的目录中新建__ init __.py,用来实例化Flask对象

from flask import Flaskdef create_app():app = Flask(__name__)return app

③ 在项目目录下新建manage.py(也可以是其它的名字),

将create_app函数导入manage.py,在manage.py中:

from test_blueprint import create_appapp = create_app()
if __name__ == '__main__':app.run()

④ 在test_blueprint(非项目目录)下新建views目录,用来放视图函数,在视图函数中可以根据视图的不同进行分类。

⑤ 创建视图函数与app对象的关系,首先在视图函数中,创建Blueprint对象。

admin.py

from flask import Blueprint
admin_bp = Blueprint('admin_bp', __name__)  # 创建Blueprint(蓝图)对象,第一个参数name不能同名@admin_bp.route('/login')
def login():return 'admin.login'@admin_bp.route('/logout')
def logout():return 'logout'

home.py

from flask import Blueprint
home_bp = Blueprint('home_bp', __name__)  # 第一个参数是name,第一个参数name不能同名@home_bp.route('/login')
def login():return 'home.login'@home_bp.route('/list')
def list():return 'list'

需要修改__ init __.py,注册蓝图,

from flask import Flask
from .views.admin import admin_bp
from .views.home import home_bpdef create_app():app = Flask(__name__)# 注册蓝图app.register_blueprint(home_bp)app.register_blueprint(admin_bp)return app

注意:如果两个或多个视图函数中有同名的路由,如home.py和admin.py都有/login路由,谁先注册先执行谁的视图函数。

⑥ 创建模板。在__ init __.py所在的目录中新建templates目录(Flask实例化的文件所在的目录中),如果有静态文件,放静态文件的目录static与templates目录处于同一级。

在templates目录中建模板文件login.html,可以修改admin.py,

@admin_bp.route('/login')
def login():return render_template('login.html')

这时候就可以访问模板了。

2. 蓝图(二)

① 自定义静态文件和模板的路径

admin_bp = Blueprint('admin_bp', __name__,template_folder='xxx')
@admin_bp.route('/login')
def login():return render_template('login.html')

注意:蓝图优先去templates去找loging.html,找不到去xxx中找。当然静态文件也是可以指定的:

admin_bp = Blueprint('admin_bp', __name__,template_folder='xxx',static_folder='xxxx')@admin_bp.route('/login')
def login():return render_template('login.html')

② 为某一个蓝图地址加上前缀

def create_app():app = Flask(__name__)# 注册蓝图app.register_blueprint(home_bp, url_prefix='/home')app.register_blueprint(admin_bp)return app

按照原先访问方式:http://127.0.0.1:5000/login ,访问失败。

现在需要:http://127.0.0.1:5000/home/login

③ 为所有蓝图加上@before_request

__ init __.py:

from flask import Flask
from .views.admin import admin_bp
from .views.home import home_bpdef create_app():app = Flask(__name__)@app.before_requestdef xxx():return '123'# 注册蓝图app.register_blueprint(home_bp)app.register_blueprint(admin_bp)return app

④ 为指定的蓝图加上@before_request

admin.py:

from flask import Blueprint, render_template
admin_bp = Blueprint('admin_bp', __name__, template_folder='xxx', static_folder='xxxx')@admin_bp.before_request
def xxx():return '123'@admin_bp.route('/login')
def login():return render_template('login.html')@admin_bp.route('/logout')
def logout():return 'logout'

应用场景:可以在指定蓝图中加入登录验证。

14. threading.local

threading.local本身与Flask无关,只是说根据threading.local做的local对象与Flask是有关系的。

# coding:utf-8
import threading
import time
v = 0def task(i):global vv = itime.sleep(1)print(v)for i in range(10):t = threading.Thread(target=task, args=(i,))t.start()
1. threading.local的作用

为每个线程创建一个独立的空间,使得线程对自己空间中的数据进行操作(数据隔离)。

# coding:utf-8
import threading
import time
from threading import localobj = local()def task(i):obj.xxx = itime.sleep(2)print(obj.xxx,i)for i in range(10):t = threading.Thread(target=task, args=(i,))t.start()
2. 如何获取一个线程的唯一标记
# coding:utf-8
import threadingdef task(i):'''获取线程的唯一标识:param i::return:'''print(threading.get_ident(), i)for i in range(10):t = threading.Thread(target=task, args=(i,))t.start()
3. 根据字典自定义一个类似于threading.local功能
# coding:utf-8
import threading
import timeDIC = {}
def task(i):ident = threading.get_ident()if ident in DIC:DIC[ident]['xxx'] = ielse:DIC[ident] = {'xxx': i}time.sleep(2)print(DIC[ident]['xxx'], i)for i in range(10):t = threading.Thread(target=task, args=(i,))t.start()
4. 根据字典为每个协程开辟空间进行存取数据
# coding:utf-8
import threading
import time
import greenletDIC = {}
def task(i):# ident = threading.get_ident()ident = greenlet.getcurrent()if ident in DIC:DIC[ident]['xxx'] = ielse:DIC[ident] = {'xxx': i}time.sleep(2)print(DIC[ident]['xxx'], i)for i in range(10):t = threading.Thread(target=task, args=(i,))t.start()
5. __ getattr__ 与__ setattr__
class Local(object):def __getattr__(self, item):print('__getattr__', item)def __setattr__(self, key, value):print('__setattr__', key, value)obj = Local()
obj.xx
obj.xx = 123
'''
__getattr__ xx
__setattr__ xx 123
'''
6. 通过getattr与setattr构造出threading.local

不仅可以给线程做数据隔离还可以和协程做数据隔离。

import threading
import time# 有协程就使用协程,没有使用线程
try:import greenletget_ident = greenlet.getcurrent
except Exception as e:get_ident = threading.get_identclass Local(object):DIC = {}def __getattr__(self, item):# print('__getattr__', item)ident = get_identif ident in self.DIC:return self.DIC[ident].get(item)else:return Nonedef __setattr__(self, key, value):# print('__setattr__', key, value)ident = get_identif ident in self.DIC:self.DIC[ident][key] = valueelse:self.DIC[ident] = {key: value}
obj = Local()def task(i):obj.xxx = itime.sleep(2)print(obj.xxx, i)for i in range(10):t = threading.Thread(target=task, args=(i,))t.start()

重要总结:如果同时来了几个人请求,每个人请求的数据都是不一样的。每个人请求的数据放在不同的位置,进行数据隔离。取每个请求的数据时,可以根据线程的不同。真正请求来的时候,会通过local对象为请求创建一个空间,在它里面放值,每个请求来都会做数据隔离。视图函数取数据的时候就不会混乱了。如果不使用数据隔离,会导致第一个请求来了,正在处理时第二个请求又来了,把第一个覆盖了。django里面没有这种东西,django请求和session都是通过参数传递过来了,请求之间不会混乱。在django中,请求过来,拿着请求的数据处理请求就行了。

15. 上下文管理

请求到来后创建一个RequestContext对象,里面有request和session。把它放在某个特殊的地方,以后视图函数再去取,这就叫做上下文管理。

① 请求到来时,将session和request封装到ctx对象中,ctx中有request和session

ctx = self.request_context(environ) # self是app对象,environ是请求相关的原始数据
ctx.request = Request(environ)
ctx.session = None

② 对session作补充
③ 将包含了request和session的ctx对象放到一个容器中(可以把这个容器认为是一个字典)。每个请求都会根据线程/协程加一个唯一标识:(线程与线程之间的隔离)

{1234:{ctx:ctx对象},# 1234是线程/协程号1235:{ctx:ctx对象},1236:{ctx:ctx对象},……
}

④ 视图函数使用的时候,需要根据当前线程获取数据。隐含的操作是:根据当前线程或协程的唯一标识,获取ctx对象,再取ctx对象中取request和session。

from flask import request,session
# 如request.method,是从request中取得method

⑤ 请求结束时,根据当前线程/协程的唯一标记,将这个容器上的数据移除。

16. 偏函数与面向对象
1. 偏函数

偏函数可以帮助我们自动传参。

import functoolsdef index(a, b):return a + b# 原来的调用方式
# ret = index(1, 2)
# print(ret)
#偏函数:自动传递参数
new_func = functools.partial(index, 1)  # “1”会被当作第一个参数传递进来
ret = new_func(2)  # “2”会被当作第二个参数传递进来
print(ret)
2. super和执行类的区别

实例化对象时会执行构造方法,如果没有写,会执行父类的构造方法:

# coding:utf-8
class Foo(object):passobj = Foo()  # 会执行构造方法,如果没有写,会执行父类的构造方法

① 根据mro的顺序执行方法:super(Foo, self).func()

# coding:utf-8
class Base(object):def func(self):print('Base.func')class Foo(Base):def func(self):# 方式一:根据mro的顺序执行方法super(Foo, self).func()print('Foo.func')obj = Foo()
obj.func()
'''
Base.func
Foo.func
'''
# coding:utf-8
class Base(object):def func(self):super(Base, self).func()print('Base.func')class Bar(object):def func(self):print('Bar.func')class Foo(Base, Bar):passobj = Foo()
obj.func()
'''
Base.func
Foo.func
'''
# coding:utf-8
class Base(object):def func(self):super(Base, self).func()print('Base.func')class Bar(object):def func(self):print('Bar.func')class Foo(Base, Bar):pass# obj = Foo()
# obj.func()
print(Foo.__mro__)
'''
(<class '__main__.Foo'>, <class '__main__.Base'>, <class '__main__.Bar'>, <class 'object'>)
'''

② 主动执行Base类的方法:Base.func(self)

# coding:utf-8
class Base(object):def func(self):print('Base.func')class Foo(Base):def func(self):# 方式一:根据mro的顺序执行方法# super(Foo, self).func()# 方式二:主动执行Base类的方法Base.func(self)  # 如果是对象.func(),self会自动传进来print('Foo.func')obj = Foo()
obj.func()
'''
Base.func
Foo.func
'''
# coding:utf-8
class Base(object):def func(self):super(Base, self).func()print('Base.func')class Bar(object):def func(self):print('Bar.func')class Foo(Base, Bar):passobj = Base()
obj.func()
'''
AttributeError: 'super' object has no attribute 'func'
'''
3. 面向对象中特殊的方法

__ setattr__与__getattr__:当前类中有__setattr__或__getattr__方法会执行当前类的:

class Foo(object):def __getattr__(self, item):print(item)def __setattr__(self, key, value):print(key, value)obj = Foo()
obj.xxx = 'thanlon'
obj.xxx # 会执行父类的__getattr__方法
'''
xxx thanlon
xxx
None
'''

类中没有__setattr__或__getattr__方法,会调用父类的:

class Foo(object):passobj = Foo()
obj.xxx = 'thanlon'#Foo类中没有__setattr__方法,会调用的是父类的
obj.xxx

__ init__

# coding:utf-8
class Foo(object):def __init__(self):self.storage = {}def __getattr__(self, item):print(item)def __setattr__(self, key, value):print(key, value)obj = Foo()  # 实例化的时候执行__init__方法,对象.?执行__setattr__方法。
obj.xxx = 'thanlon'
'''
storage {}
xxx thanlon
'''
# coding:utf-8
class Foo(object):def __init__(self):self.storage = {}def __getattr__(self, item):print(item)def __setattr__(self, key, value):print(self.storage)obj = Foo()
obj.xxx = 'thanlon'
'''
storage
None
storage
None
'''

在构造方法中使用self.storage = {},可以通过object类的__setattr__设置其值。(重点理解)

# coding:utf-8
class Foo(object):def __init__(self):# self.storage = {}object.__setattr__(self, 'storage', {})def __setattr__(self, key, value):print(key, value, self.storage)obj = Foo()
obj.xxx = 'thanlon'
'''
xxx thanlon {}
'''
17. Local与LocalStack
1. Local对象与LocalStack作用

Local对象:帮助我们为每个线程/协程开辟空间。
LocalStack对象:帮助我们在Local中维护一个列表,把它维护成一个栈。然后对列表中的数据进行添加和维护。

2. 自定义一个简单的Local
try:from greenlet import getcurrent as get_ident
except:from threading import get_identclass local(object):def __init__(self):'''实例化local的时候,在self中存了一个空字典'''object.__setattr__(self, 'storage', {})def __setattr__(self, key, value):ident = get_ident()  # 获取这个标记if ident not in self.storage:self.storage[ident] = {key: value}else:self.storage[ident][key] = valuedef __getattr__(self, item):ident = get_ident()if ident in self.storage:return self.storage[ident].get(item)
3. 对栈中的数据进行添加和维护
try:from greenlet import getcurrent as get_ident
except:from threading import get_identclass Local(object):__slots__ = ("__storage__", "__ident_func__")  # 对象只能.__storage__和.__ident_func__,不能.其它def __init__(self):object.__setattr__(self, "__storage__", {})object.__setattr__(self, "__ident_func__", get_ident)def __getattr__(self, name):try:return self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)def __setattr__(self, name, value):ident = self.__ident_func__()storage = self.__storage__try:storage[ident][name] = valueexcept KeyError:storage[ident] = {name: value}def __delattr__(self, name):try:del self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)obj = Local()
obj.name = 'thanlon'
print(obj.name)
'''thanlon'''
try:from greenlet import getcurrent as get_ident
except:from threading import get_identclass Local(object):__slots__ = ("__storage__", "__ident_func__")  # 对象只能.__storage__和.__ident_func__,不能.其它def __init__(self):# __storage__ = {1234:{'session':[]}}object.__setattr__(self, "__storage__", {})object.__setattr__(self, "__ident_func__", get_ident)def __getattr__(self, name):try:return self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)def __setattr__(self, name, value):ident = self.__ident_func__()storage = self.__storage__try:storage[ident][name] = valueexcept KeyError:storage[ident] = {name: value}def __delattr__(self, name):try:del self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)obj = Local()
obj.stack = []
obj.stack.append('Thanlon')
obj.stack.append('Kiku')
print(obj.stack)
'''thanlon'''
print(obj.stack.pop())
print(obj.stack)
print(obj.stack.pop())
'''
['Thanlon', 'Kiku']
Kiku
['Thanlon']
Thanlon
'''
try:from greenlet import getcurrent as get_ident
except:from threading import get_identclass Local(object):'''为每个线程开辟空间'''__slots__ = ("__storage__", "__ident_func__")  # 对象只能.__storage__和.__ident_func__,不能.其它def __init__(self):# __storage__ = {1234:{'session':[]}}object.__setattr__(self, "__storage__", {})object.__setattr__(self, "__ident_func__", get_ident)def __getattr__(self, name):try:return self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)def __setattr__(self, name, value):ident = self.__ident_func__()storage = self.__storage__try:storage[ident][name] = valueexcept KeyError:storage[ident] = {name: value}def __delattr__(self, name):try:del self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)
# obj = Local()
# obj.stack = []
# obj.stack.append('Thanlon')
# obj.stack.append('Kiku')
# print(obj.stack)
# '''thanlon'''
# print(obj.stack.pop())
# print(obj.stack)
# print(obj.stack.pop())
'''
['Thanlon', 'Kiku']
Kiku
['Thanlon']
Thanlon
'''
class LocalStack(object):'''帮助我们方便对local中的一个列表维护成一个栈,没有它也可以在local中存取数据,有了LocalStack操作跟家便捷'''def __init__(self):'''__storage__ = {}'''self._local = Local()def push(self, value):rv = getattr(self._local, 'stack', None)  # self._local.stack=>local.getattrif rv is None:self._local.stack = rv = []  # self._local.stack=>local.setattrrv.append(value)return rvdef pop(self):stack = getattr(self._local, "stack", None)if stack is None:return Noneelif len(stack) == 1:return stack[-1]else:return stack.pop()xxx = LocalStack()
xxx.push('thanlon')
xxx.push('Kiku')
xxx.pop()
try:from greenlet import getcurrent as get_ident
except:from threading import get_identclass Local(object):'''为每个线程开辟空间'''__slots__ = ("__storage__", "__ident_func__")  # 对象只能.__storage__和.__ident_func__,不能.其它def __init__(self):# __storage__ = {1234:{'session':[]}}object.__setattr__(self, "__storage__", {})object.__setattr__(self, "__ident_func__", get_ident)def __getattr__(self, name):try:return self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)def __setattr__(self, name, value):ident = self.__ident_func__()storage = self.__storage__try:storage[ident][name] = valueexcept KeyError:storage[ident] = {name: value}def __delattr__(self, name):try:del self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)class LocalStack(object):'''帮助我们方便对local中的一个列表维护成一个栈,没有它也可以在local中存取数据,有了LocalStack操作跟家便捷'''def __init__(self):'''__storage__ = {}'''self._local = Local()def push(self, value):rv = getattr(self._local, 'stack', None)  # self._local.stack=>local.getattrif rv is None:self._local.stack = rv = []  # self._local.stack=>local.setattrrv.append(value)return rvdef pop(self):stack = getattr(self._local, "stack", None)if stack is None:return Noneelif len(stack) == 1:return stack[-1]else:return stack.pop()def top(self):try:return self._local.stack[-1]except (AttributeError, IndexError):return Nonels_obj = LocalStack()class RequestContent(object):def __init__(self):self.request = 'request'self.session = 'session'ls_obj.push(RequestContent())
obj = ls_obj.top()
print(obj)
print(obj.request)
print(obj.session)
'''
<__main__.RequestContent object at 0x0000021DC4A3CA90>
request
session
'''
4. 使用LocalStack向Local添加对象
try:from greenlet import getcurrent as get_ident
except:from threading import get_identclass Local(object):'''为每个线程开辟空间'''__slots__ = ("__storage__", "__ident_func__")  # 对象只能.__storage__和.__ident_func__,不能.其它def __init__(self):# __storage__ = {1234:{'session':[]}}object.__setattr__(self, "__storage__", {})object.__setattr__(self, "__ident_func__", get_ident)def __getattr__(self, name):try:return self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)def __setattr__(self, name, value):ident = self.__ident_func__()storage = self.__storage__try:storage[ident][name] = valueexcept KeyError:storage[ident] = {name: value}def __delattr__(self, name):try:del self.__storage__[self.__ident_func__()][name]except KeyError:raise AttributeError(name)class LocalStack(object):'''帮助我们方便对local中的一个列表维护成一个栈,没有它也可以在local中存取数据,有了LocalStack操作跟家便捷'''def __init__(self):'''__storage__ = {}'''self._local = Local()def push(self, value):rv = getattr(self._local, 'stack', None)  # self._local.stack=>local.getattrif rv is None:self._local.stack = rv = []  # self._local.stack=>local.setattrrv.append(value)return rvdef pop(self):stack = getattr(self._local, "stack", None)if stack is None:return Noneelif len(stack) == 1:return stack[-1]else:return stack.pop()def top(self):try:return self._local.stack[-1]except (AttributeError, IndexError):return Noneclass RequestContent(object):def __init__(self):self.request = 'request'self.session = 'session'_request_ctx_stack = LocalStack()
_request_ctx_stack.push(RequestContent())def get_request():ctx = _request_ctx_stack.top()return ctx.requestdef get_session():ctx = _request_ctx_stack.top()return ctx.sessiondef get_request_or_session(arg):ctx = _request_ctx_stack.top()return getattr(ctx, arg)  # ctx.arg,如果是arg是request就是ctx.request,如果arg是session就是ctx.session# print(get_request())
# print(get_session())
# print(get_request_or_session('request'))
# print(get_request_or_session('session'))
import functools# 使用偏函数
request = functools.partial(get_request_or_session, 'request')
session = functools.partial(get_request_or_session, 'session')
print(request())
print(session())
'''
request
session
'''
18. Flask请求上下文管理原理
1. request上下文管理

请求的流程:

① wsgi
② 创建ctx = RequestContext(session,request),ctx.push()
③ LocalStack,把ctx对象添加到Local中
④ Local中存取数据使用__storage__ = {唯一标识: {stack: [ctx, ]}}

获取请求相关的数据(视图函数取请求相关的数据):

# coding:utf-8
from flask import Flask,Request
from flask.globals import _request_ctx_stackapp = Flask(__name__)@app.route('/')
def index():ctx = _request_ctx_stack.top  # 获取ctx对象print(ctx)  # <RequestContext 'http://127.0.0.1:5000/' [GET] of request上下文管理>print(ctx.request)  # <Request 'http://127.0.0.1:5000/' [GET]>print(ctx.request.environ) #{'wsgi.version': (1, 0), 'wsgi.url_scheme': 'http', 'wsgi.input': <_io.BufferedReader name=816>, 'wsgi.errors': <_io.TextIOWrapper name='<stderr>' mode='w' encoding='UTF-8'>, 'wsgi.multithread': True, 'wsgi.multiprocess': False, 'wsgi.run_once': False, 'werkzeug.server.shutdown': <function WSGIRequestHandler.make_environ.<locals>.shutdown_server at 0x000001D68A80BBF8>, 'SERVER_SOFTWARE': 'Werkzeug/0.15.4', 'REQUEST_METHOD': 'GET', 'SCRIPT_NAME': '', 'PATH_INFO': '/', 'QUERY_STRING': '', 'REQUEST_URI': '/', 'RAW_URI': '/', 'REMOTE_ADDR': '127.0.0.1', 'REMOTE_PORT': 55003, 'SERVER_NAME': '127.0.0.1', 'SERVER_PORT': '5000', 'SERVER_PROTOCOL': 'HTTP/1.1', 'HTTP_HOST': '127.0.0.1:5000', 'HTTP_CONNECTION': 'keep-alive', 'HTTP_CACHE_CONTROL': 'max-age=0', 'HTTP_UPGRADE_INSECURE_REQUESTS': '1', 'HTTP_USER_AGENT': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36', 'HTTP_SEC_FETCH_MODE': 'navigate', 'HTTP_SEC_FETCH_USER': '?1', 'HTTP_ACCEPT': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3', 'HTTP_SEC_FETCH_SITE': 'none', 'HTTP_ACCEPT_ENCODING': 'gzip, deflate, br', 'HTTP_ACCEPT_LANGUAGE': 'zh-CN,zh;q=0.9', 'HTTP_COOKIE': 'session=.eJwVzDEOgCAQRNG7bO0muyOoy2UMAoWxw2BjvLvQTF4x-S-lVuve7tKnnZkCZVEVD-GcLLETr2yIkRFNt2OFMxSa6NL-fXQIQ-gaGQqK2fnl-wG6SBh8.XVNcEA.0NiiN7ldvhokI3sKsstrb0GhOQs', 'werkzeug.request': <Request 'http://127.0.0.1:5000/' [GET]>}print(ctx.request.method)  # GETreturn 'index'if __name__ == '__main__':app.run()# app.__call__  # 请求进来执行__call__,把数据交给__call__方法# app.wsgi_app  # __call__方法会调用wsgi_app
2. 请求相关数据流程图


如果是多线程(并发),如果同时来了多个请求,wsgi_app对请求数据进行分别封装(每个线程有自己的ctx),然后交给LocalStack,LocalStack负责把数据放在Local中。注意使用一个LocalStack,一个Local。为了标记不同的请求,使用线程的唯一标识。最后

__storage__{123:{stack:[ctx,]},345:{stack:[ctx,]}},

取数据时,根据线程的ID取各自的值。

3. session上下文管理

① wsgi
② 创建ctx

ctx = RequestContext(session=None,request)
ctx.push()

③ LocalStack,把ctx对象添加到Local中
④ Local中存取数据使用

__storage__ = {唯一标识: {stack: [ctx, ]}
}

⑤ 通过LocalStack获取ctx中的session,给session赋值(从cookie中读取数据,进行解密反序列化)。调用的是open_session,处理完执行save_session。

获取session:

# coding:utf-8
from flask import Flask,Request
from flask.globals import _request_ctx_stack
app = Flask(__name__)@app.route('/')
def index():ctx = _request_ctx_stack.top  # 获取ctx对象print(ctx.session)return 'index'if __name__ == '__main__':app.run()# app.__call__  # 请求进来执行__call__,把数据交给__call__方法# app.wsgi_app  # __call__方法会调用wsgi_app
19. flask-session

flask-sesion组件可以帮助我们把默认的 session 放入到加密的 cookie 中。

(1)安装 flask-session:pip install flask-session

(2)session 保存在 Redis 中

# coding:utf-8
from flask import Flask, request, session
from flask.sessions import SecureCookieSessionInterface
# from flask.ext.session import Session#以前导入session的方式,与下面是一样的
from flask_session import Session
import redisapp = Flask(__name__)
'''
session保存在redis中
'''
# app.session_interface = SecureCookieSessionInterface()  # 默认
# app.session_interface 换成flaks-session中的内容
# app.session_interface =RedisSessionInterface()
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = redis.Redis(host='主机IP', port=6739, password='密码!')
Session(app)@app.route('/login')
def login():session['user'] = 'thanlon'return '……'@app.route('/')
def index():print(session.get('user'))  # 没有不报错,session['user']没有会报错return 'index'if __name__ == '__main__':app.run()# app.__call__  # 请求进来执行__call__,把数据交给__call__方法# app.wsgi_app  # __call__方法会调用wsgi_app

(3)查看Redis中存入的session

look_cookie.py:

# coding:utf-8
import redis
conn = redis.Redis(host='106.12.115.136',port=6379)
print(conn.keys())
'''
[b'session:.eJwVzDEOgCAQRNG7bO0muyOoy2UMAoWxw2BjvLvQTF4x-S-lVuve7tKnnZkCZVEVD-GcLLETr2yIkRFNt2OFMxSa6NL-fXQIQ-gaGQqK2fnl-wG6SBh8.XVNcEA.0NiiN7ldvhokI3sKsstrb0GhOQs']
'''

(4) flask-session的原理

① session数据保存在Redis。Redis的key是session:随机字符串,每一个随机字符串对应一个自己的值。每一个用户都会有一个随机字符串。

② 将随机字符串返回给用户。

(5)flask-session源码

from flask_session import RedisSessionInterface
20. wtforms组件
1. 用户登录

form.py

# coding:utf-8
from wtforms import Form
from wtforms.fields import simple
from wtforms import widgets, validatorsclass LoginForm(Form):# name = simple.StringField()name = simple.StringField(validators=[validators.DataRequired(message='用户名不能为空'),validators.Length(min=6,max=20,message='用户名长度必须大于%(min)d且小于%(max)d')],widget=widgets.TextInput(),render_kw={'placeholder': '请输入用户名', 'class': 'form-control'})# pwd = simple.PasswordField()pwd = simple.PasswordField(validators=[validators.DataRequired(message='密码不能为空')],render_kw={'placeholder': '请输入密码','class': 'form-control'})

app.py:

from flask import Flask, render_template, request
from forms import LoginFormapp = Flask(__name__)
app.debug = True@app.route('/')
def index():return 'index'@app.route('/login', methods=['get', 'post'])
def login():if request.method == 'GET':form = LoginForm()# print(form.name, type(form.name))  # name是一个对象,print的时候执行这个对象的__str__方法,# print(form.pwd, type(form.pwd))  # name是一个对象,print的时候执行这个对象的__str__方法'''<input id="name" name="name" type="text" value=""> <class 'wtforms.fields.core.StringField'><input id="pwd" name="pwd" type="password" value=""> <class 'wtforms.fields.simple.PasswordField'>'''return render_template('login.html', form=form)form = LoginForm(formdata=request.form)if form.validate():print(form.data)  # {'name': '123456', 'pwd': '123456'}return '验证成功!'else:# print(form.errors)  # {'name': ['用户名不能为空'], 'pwd': ['密码不能为空']}return render_template('login.html', form=form)if __name__ == '__main__':app.run()

login.html:

  <form method="POST" novalidate><div class="form-group"><label>用户名</label>{{form.name}}<div style="color:red">{{form.name.errors[0]}}</div></div><div class="form-group"><label>密&nbsp;&nbsp;&nbsp;&nbsp;码</label>{{form.pwd}}<div style="color:red">{{form.pwd.errors[0]}}</div></div><button type="submit" class="btn btn-primary">登录</button></form>

2. 用户注册

form.py:

class RegisterForm(Form):name = simple.StringField(label='用户名',validators=[validators.DataRequired(message='密码不能为空!')],widget=widgets.TextInput(),render_kw={'class': 'form-control'},default='thanlon')pwd = simple.PasswordField(label='密码',validators=[validators.DataRequired(message='密码不能为空!')],widget=widgets.PasswordInput(),render_kw={'class': 'form-control'})pwd_confirm = simple.PasswordField(label='重复密码',validators=[validators.DataRequired(message='重复密码不能为空!'),validators.EqualTo('pwd', message='两次密码输入不一致!')],widget=widgets.PasswordInput(),render_kw={'class': 'form-control'})email = html5.EmailField(label='邮箱',validators=[validators.DataRequired(message='邮箱不能为空!'),validators.Email(message='邮箱格式错误!')],widget=widgets.TextInput(input_type='email'),render_kw={'class': 'form-control'})gender = core.RadioField(label='性别',choices=((1, '男'), (2, '女')),coerce=int,  # int('1'))city = core.SelectField(label='城市',choices=(('bj', '北京'),('sh', '上海')))hobby = core.SelectMultipleField(label='爱好',choices=((1, '羽毛球'),(2, '篮球')),coerce=int)favor = core.SelectMultipleField(label='喜好',choices=((1, '羽毛球'),(2, '篮球')),widget=widgets.ListWidget(prefix_label=False),option_widget=widgets.CheckboxInput(),coerce=int,default=[2])

register.html:

<form method="post" novalidate>{% for field in form %}{{field.label}} {{field}}{{field.errors[0]}}{% endfor %}<button type="submit" class="btn btn-primary">登录</button></form>

app.py:

@app.route('/register',methods=['get','post'])
def register():if request.method == 'GET':form = RegisterForm()return render_template('register.html', form=form)form = RegisterForm(formdata=request.form)if form.validate():print(form.data)return redirect('https://www.blueflags.cn')return render_template('register.html', form=form)
3. 数据库实时更新

① 数据相关

mysql> create table city(id int primary key auto_increment,city_name varchar(20) not null );
mysql> insert city values(null,'beijin');
mysql> insert city values(null,'shanghai');
mysql> insert city values(null,'shenzhen');

② 效果图

③ 主要代码

class UserForm(Form):city = core.SelectField(label='城市',choices=helper.fetch_all('select *from city', [], type=None),  # 只是在第一次从数据库获取一次coerce=int,  # 自动把用户提交的字符串转换为数字)

选项中的数据只是在第一次请求的时候从数据库获取一次,当数据更新时,用户页面中的数据并没有实时更新。所以,代码是存在问题的。

解决方案:对于UserForm类,每次实例化的时候都去从数据库中获取一次数据,可以这样写:

class UserForm(Form):city = core.SelectField(label='城市',choices=(),  # 只是在第一次从数据库获取一次coerce=int,  # 自动把用户提交的字符串转换为数字)# 如果没有写执行Form的构造方法def __init__(self, *args, **kwargs):super(UserForm, self).__init__(*args, **kwargs)  # 在UserForm中执行什么在父类中还要执行什么self.city.choices = helper.fetch_all('select *from city', [], type=None)

实例化UserForm的时候传值:

@app.route('/user', methods=['get', 'post'])
def user():if request.method == 'GET':form = UserForm(data = {'name':'thanlon','city':3})return render_template('user.html',form=form)
4. wtforms的作用与django示例

wtforms有两个作用:1. 自动生成html标签;2.对用户请求数据进行校验
在django中想实现实时更新,也需要重写构造方法。

views.py:

from django.shortcuts import render
from django.forms import Form
from django.forms import fields
from app01 import modelsclass IndexForm(Form):title = fields.CharField()# group = fields.ChoiceField(#     choices=(#         (1, 'A'),#         (2, 'B')#     )# )group = fields.ChoiceField(choices=())def __init__(self, *args, **kwargs):super(IndexForm, self).__init__(*args, **kwargs)self.fields['group'].choices = models.UserGroup.objects.all().values_list('id', 'title')# Create your views here.
def index(request):if request.method == 'GET':form = IndexForm()return render(request, 'index.html', {'form': form})
21. SQLAlchemy
1. 简介

SQLAlchemy 是 ORM(对象关系映射)框架,类名对应表名,类中的字段对应数据表的列,对象对应数据表的一行。SQLAlchemy 可以帮助我们使用类和对象快速实现数据库操作。我们在公司里要么是用原生sql,要么是用ORM框架。原生 SQL 中可以用 MySQLdb 和 pymysql 模块,MySQLdb 的用法和 pymysql 基本上是完全一样的。两者在使用上是有区别的,MySQLdb不支持python3,pymysql 支持 python2 和 python3。对于Python Web框架,如果有自己的使用自己的,如果没有ORM一般都用SQLAlchemy。

2. CURD

① 创建一个数据表

models.py:

# coding:utf-8
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, Date, DateTime
from sqlalchemy import create_engine
Base = declarative_base()class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=False)engine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1,  # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
# Base.metadata.drop_all(engine)#删除这个数据表
Base.metadata.create_all(engine)  # 创建这个数据表

users表:

+-------+-------------+------+-----+---------+----------------+
| Field | Type        | Null | Key | Default | Extra          |
+-------+-------------+------+-----+---------+----------------+
| id    | int(11)     | NO   | PRI | NULL    | auto_increment |
| name  | varchar(32) | NO   | MUL | NULL    |                |
+-------+-------------+------+-----+---------+----------------+

注意:SQLAlchemy默认是不可以修改创建好的数据表的,需要用到第三方组件

② 封装操作:防止models.py被导入的时候执行,把创建表和删除表封装在函数中

# coding:utf-8
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, Date, DateTime
from sqlalchemy import create_engineBase = declarative_base()class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=False)def create_all():engine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1,  # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置)Base.metadata.create_all(engine)  # 创建这个数据表def drop_all():engine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1,  # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置)Base.metadata.drop_all(engine)  # 删除这个数据表
if __name__ == '__main__':pass# create_all()# drop_all()

③ 添加数据

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Usersengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1,  # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)
'''
根据Users类对users表进行增删改查
'''
session = SessionFactory()  # 创建一个连接
'''
增加单条数据
'''
# obj = Users(name='thanlon')
# session.add(obj)
'''
添加多条数据
'''
# obj = Users(name='thanlon')
# obj2 = Users(name='kiku')
# session.add(obj)
# session.add(obj2)
'''
添加多条数据的时候也可以把这些对对象放在列表中
'''
session.add_all([Users(name='thanlon'),Users(name='kiku')
])session.commit()
session.close()

④ 查询数据

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Usersengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1,  # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)
'''
根据Users类对users表进行增删改查
'''
session = SessionFactory()  # 创建一个连接
'''
查询表中所有内容
'''
# ret = session.query(Users).all()
# # print(ret)#拿到的是对象
# # [<models.Users object at 0x7f9cc4ca1080>, <models.Users object at 0x7f9cc4ca10f0>, <models.Users object at 0x7f9cc4ca1160>, <models.Users object at 0x7f9cc4ceee80>, <models.Users object at 0x7f9cc4cf50f0>, <models.Users object at 0x7f9cc4cf5160>]
# for row in ret:
#     print(row.id)
#     print(row.name)
'''
根据条件查询
'''
# ret = session.query(Users).filter(Users.id > 3)  # filter中直接加表达式
# for row in ret:
#     print(row.id, row.name)
#     '''
#     4 kiku
#     5 thanlon
#     6 kiku
#     '''
'''
根据条件查询,拿到第一条数据
'''
ret = session.query(Users).filter(Users.id > 3).first()  # filter中直接加表达式
print(ret.id, ret.name)
'''
4 kiku
'''
session.commit()
session.close()

⑤ 删除数据

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Usersengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1,  # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)
session = SessionFactory()  # 创建一个连接
'''
删除数据
'''
# session.query(Users).delete()
session.query(Users).filter(Users.id > 3).delete()
session.commit()

⑥ 修改数据

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Usersengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1,  # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)
session = SessionFactory()  # 创建一个连接
'''
修改数据
'''
# session.query(Users).filter(Users.id == 1).update({Users.name: 'liuyuqin'})
# session.query(Users).filter(Users.id == 1).update({'name': Users.name + 'hello'}, synchronize_session=False)#等效于下面的
session.query(Users).filter(Users.id == 1).update({Users.name: Users.name + 'hello'}, synchronize_session=False)
session.commit()
3. 常用操作

① 指定列

ret = session.query(Users.id, Users.name.label('cname')).all()
for item in ret:print(item, type(item), item[0], item.id, item.cname)# item是<class 'sqlalchemy.util._collections.result'>,打印的是元组,实际不是元组类型,但可以通过元组去值。能够通过索引去取,说明该类中有__item__方法
query = session.query(Users.id, Users.name.label('cname'))  # query的值就是sql语句

② 1. 默认条件是and

session.query(Users).filter(Users.id == 1, Users.name == 'thanlon').all()

③ between

session.query(Users).filter(Users.id.between(1, 2), Users.name == 'thanlon').all()

④ in

 session.query(Users).filter(Users.id.in_([1, 2, 3])).all()
session.query(Users).filter(~Users.id.in_([1, 2, 3])).all()

⑤ 子查询

session.query(Users).filter(Users.id.in_(session.query(Users.id).filter(Users.name == 'thanlon'))).all()

⑥ and和or

from sqlalchemy import and_, or_session.query(Users).filter(and_(Users.id > 1, Users.name == 'thanlon')).all()  # 没有and_也是一样的表示and
session.query(Users).filter(or_(Users.id > 1, Users.name == 'thanlon')).all()  # 或者
session.query(Users).filter(or_(Users.id > 1, and_(Users.name == 'thanlon', Users.id > 3))
).all()

⑦ filter_by

session.query(Users).filter_by(name='thanlon').all()  # 条件,内部也是使用filter

⑧ 通配符

session.query(Users).filter(Users.name.like('t_')).all()  # t后面有一个字符
session.query(Users).filter(~Users.name.like('t%')).all()  # 以t开头

⑨ 切片/分页

session.query(Users)[1, 3]

排序:

session.query(Users).order_by(Users.name.desc()).all()  # 按照姓名从大到小
session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()  # 先按照name从大到小排列,如果同名按照id升序排列

分组:

from sqlalchemy.sql import func'''
第一种情况
'''
# ret = session.query(func.max(Users.id)).group_by(Users.depart_id).all()
# for item in ret:
#     print(item)
'''
第二种情况
'''
# ret = session.query(func.min(Users.id)).group_by(Users.depart_id).all()
# for item in ret:
#     print(item)
'''
第三种情况
'''
# ret = session.query(Users.depart_id,func.count(Users.id)).group_by(Users.depart_id).all()
# for item in ret:
#     print(item)
#     '''
#     (1, 2)
#     (2, 1)
#     '''
'''
第四种情况:根据聚合条件二次筛选用having(部门人数大于等于2)
'''
ret = session.query(Users.depart_id, func.count(Users.id)).group_by(Users.depart_id).having(func.count(Users.id) >= 2).all()
for item in ret:print(item)'''(1, 2)'''
# 注意:如果使用group,根据聚合的结果进行二次筛选时,只能用having

组合:将两个表上下拼接(union和union all),而join是左右拼接

'''
union:不会去重
'''
# q = session.query(Users.name).filter(Users.id > 0)
# q2 = session.query(Admin.admin_name).filter(Admin.id > 0)
# ret = q.union(q2).all()
# for item in ret:
#     print(item.name)
'''
union:去重
'''
q = session.query(Users.name).filter(Users.id > 0)
q2 = session.query(Admin.admin_name).filter(Admin.id > 0)
ret = q.union_all(q2).all()
for item in ret:print(item.name)
4. 外键操作

① 创建两个表,注意Users表中是加了外键的

class Depart(Base):__tablename__ = 'depart'id = Column(Integer, primary_key=True)title = Column(String(32), index=True, nullable=False)class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=False)depart_id = Column(Integer, ForeignKey('depart.id'))  # depart小写代表表名,不是类名

插入数据:

mysql> insert depart values(null,'technology');
mysql> insert depart values(null,'develop');
mysql> insert users values(null,'thanlon',1);
mysql> insert users values(null,'yuqin',2);

② 外键操作:
我们在这里可以查询所有用户的信息和所属部门名称,具体操作如下:

# ret = session.query(Users, Depart).join(Depart).all()  #depart_id = Column(Integer, ForeignKey('depart.id')),默认根据depart.id等效于下面的
ret = session.query(Users, Depart).join(Depart, Users.depart_id == Depart.id).all()  # on的条件是:depart_id == Depart.id
for row in ret:print(row[0].id, row[0].name, row[1].title)

也可以指定要查询的具体内容,

# 也可以指定获取指定的内容
ret = session.query(Users.id, Users.name, Depart.title.label('as_title')).join(Depart,Users.depart_id == Depart.id).all()
for row in ret:print(row.id, row.name, row.as_title)

可以使用多个join,但只有左连接没有右连接的概念,右连接是没有意义的。此外,下面query变量打印出来是一个sql语句,query变量的类型是:<class ‘sqlalchemy.orm.query.Query’>。

query = session.query(Users.id, Users.name, Depart.title).join(Depart, Users.depart_id == Depart.id,isouter=True)  # 没有right join是没有意义的

我们把在对应users的Users类中添加一个dp字段,注意这个字段不会在表中存在,只有Column实例化的对象才会对应数据表中的列。加入relationship可以帮助我们(跨表)做关联查询和创建关联数据。

from sqlalchemy.orm import relationship
class Users(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=False)depart_id = Column(Integer, ForeignKey('depart.id'))  # depart小写代表表名,不是类名dp = relationship('Depart',backref = 'pers')

比如上面的“查询所有用户的信息和所属部门名称”可以这样操作:

ret = session.query(Users).all()
print(ret)  # [<models.Users object at 0x7f5b935bb7f0>, <models.Users object at 0x7f5b935bb860>……]
for row in ret:print(row.id, row.name, row.depart_id, row.dp.title)

还可以反向跨表,比如查询销售部为“develop”的所有人员,我们可以这样操作:

obj = session.query(Depart).filter(Depart.title == 'develop').first()
print(obj.pers)  # [<models.Users object at 0x7fef08122160>]
for row in obj.pers:print(row.id, row.name, obj.title)

上面两个例子是使用表示relationship可以做关联查询,下面的例子则会介绍一次性创建关联数据。
比如有这样的需求“创建一个名称是‘finance’的部门并在该部门中添加员工:thanlon”,一般情况下我们是这样操作的:

dp = Depart(title='finance')
session.add(dp)
session.commit()
#再根据根据finance部门id创建员工
u = Users(name='thanlon', depart_id=dp.id)
session.add(u)
session.commit()

上面是一种方式,有了relationship,我们可以这样创建:

u = Users(name='thanlon', dp=Depart(title='finance'))
session.add(u)
session.commit()

可以看到简单多了吧!
问题来了,如果创建一个部门再在这个部门创建多个员工,如果按照上面的例子,会又创建若干个finance部门。当然,如果按照上上个例子一个个添加员工也是可以的。但是,我们还有简便的方式,你可以这样做:

# 创建一个部门,添加多个员工
dp = Depart(title='finance')
dp.pers = [Users(name='a'),Users(name='b'),Users(name='c')
]
session.add(dp)
session.commit()
5. 多对多操作

① 多对多表的创建

创建学生表、课程表和中间表(部分代码省略)

from sqlalchemy import Column, Integer, String, Text, Date, DateTime, ForeignKey, UniqueConstraint, Indexclass Student(Base):__tablename__ = 'student'id = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=False)class Course(Base):__tablename__ = 'course'id = Column(Integer, primary_key=True)title = Column(String(32), index=True, nullable=False)class Student2Course(Base):__tablename__ = 'student2course'id = Column(Integer, primary_key=True, autoincrement=True)student_id = Column(Integer, ForeignKey('student.id'))course_id = Column(Integer, ForeignKey('course.id'))__table_args__ = (UniqueConstraint('student_id', 'course_id', name='uc_student_course'),  # 联合唯一索引# Index('i_student_course', 'student_id', 'course_id')#联合索引)

② 添加数据

添加Student表和Course表数据:

session.add_all([Student(name='thanlon'),Student(name='Aurora'),Student(name='kiku'),Course(title='English'),Course(title='Computer'),
])
session.commit()

添加中间表数据:

session.add_all([Student2Course(student_id=4,course_id=1),Student2Course(student_id=4,course_id=2),
])
session.commit()
session.add_all([Student2Course(student_id=5,course_id=1),
])
session.commit()
session.add_all([Student2Course(student_id=6,course_id=2),
])
session.commit()

③ 多对多操作

操作1:查询学生姓名和对应的选课课程名(三张表关联查询)

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Users, Depart, Student, Course, Student2Courseengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1,  # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)
session = SessionFactory()
ret = session.query(Student2Course.id, Student.name, Course.title).join(Student,Student2Course.student_id == Student.id,isouter=True).join(Course, Student2Course.course_id == Course.id, isouter=True).order_by(Student2Course.id.asc())
for row in ret:print(row)'''(1, 'thanlon', 'English')(2, 'thanlon', 'Computer')(3, 'Aurora', 'English')(4, 'kiku', 'Computer')'''
session.close()

操作2:学生“thanlon”选的所有课

……
ret = session.query(Student2Course.id, Student.name, Course.title).join(Student,Student2Course.student_id == Student.id,isouter=True).join(Course, Student2Course.course_id == Course.id, isouter=True).filter(Student.name == 'Thanlon').order_by(Student2Course.id.asc()).all()
print(ret)
# [(1, 'thanlon', 'English'), (2, 'thanlon', 'Computer')]
……

上面两个例子是一般多对关联查询操作,下面使用relationship,首先需要修改建Student表语句:

model.py:

class Student(Base):__tablename__ = 'student'id = Column(Integer, primary_key=True)name = Column(String(32), index=True, nullable=False)course_list = relationship('Course', secondary='student2course', backref='student_list')  # 根据与Course做关联

使用relationship来查询来执行操作2:学生“thanlon”选的所有课,可以这样:

obj = session.query(Student).filter(Student.name == 'thanlon').first()
for item in obj.course_list:print(item.title)'''ComputerEnglish'''

除了使用relationship正向查找,还可以反向查找,比如:查找选择“Computer”的所有人,

obj = session.query(Course).filter(Course.title == 'computer').first()
# print(obj)#<models.Course object at 0x7fe264215668>
for item in obj.student_list:print(item.name)'''thanlonkiku'''

操作3:插入数据示例,需求“创建一个课程,创建两个学生,两个学生选新创建的课程”
这里使用relationship,

obj = Course(title='math')
obj.student_list = [Student(name='Maria'), Student(name='Michael')]
session.add(obj)#在course表增加数据,在student表增加两条数据,在中间关系表中增加两条
session.commit()
6. 两种连接数据库的方式

① 自己实例化session对象

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Users
from threading import Threadengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1,  # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)def task():# 去连接池获取一个连接session = SessionFactory()ret = session.query(Users).all()# 将连接交给连接池session.close()if __name__ == '__main__':t = Thread(target=task)t.start()

如果用同一个连接,多线程请求数据时,这个连接很可能忙不过来,数据很有可能出现混乱。所以,实例化session对象(创建一个连接)的代码是不可以放在task函数外面(全局)。此外,session.close()会把仅有的一个连接释放,其它线程是不可以用到的,因为这种情况下连接只创建了一次。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Users
from threading import Threadengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1,  # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)# 去连接池获取一个连接
session = SessionFactory()def task():ret = session.query(Users).all()# 将连接交给连接池session.close()if __name__ == '__main__':t = Thread(target=task)t.start()

② 通过Threading.Local实现的(推荐使用)
基于Threading.Local也是为每个线程获取一个连接。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Users
from threading import Thread
from sqlalchemy.orm import scoped_sessionengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1,  # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)
# 去连接池获取一个连接
session = scoped_session(SessionFactory)def task():ret = session.query(Users).all()# 将连接交给连接池session.remove()if __name__ == '__main__':t = Thread(target=task)t.start()
7. 原生SQL

Django里面可以执行原生SQL,SQLAlchemy也可以执行原生SQL,如果SQL语句比较复杂,ORM解决不了,那么可以使用原生SQL。SQLAlchemy 执行原生 SQL的 方式有好多种,在这里介绍其中两种。

方式1

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Users
from threading import Thread
from sqlalchemy.orm import scoped_sessionengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1,  # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)
# 去连接池获取一个连接
session = scoped_session(SessionFactory)def task():'''方式1'''# 查询# cursor = session.execute('select *from users')# ret = cursor.fetchall()# 插入cursor = session.execute('insert users(name) values(:value)', params={'value': 'thanlon'})session.commit()print(cursor.lastrowid)if __name__ == '__main__':t = Thread(target=task)t.start()

方式2

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from models import Users
from threading import Thread
from sqlalchemy.orm import scoped_sessionengine = create_engine('mysql+pymysql://root:123456@127.0.0.1:3306/sqlalchemy_test?charset=utf8',max_overflow=0,  # 超过连接池大小外最多创建的连接pool_size=5,  # 连接池大小pool_timeout=30,  # 连接池中没有连接最多等待的时间,否则会报错,30spool_recycle=-1,  # 多久之后对线程池中的线程中进行一次连接的回收(重置)-1表示不重置
)
SessionFactory = sessionmaker(bind=engine)
# 去连接池获取一个连接
session = scoped_session(SessionFactory)def task():'''方式2'''conn = engine.raw_connection()cursor = conn.cursor()cursor.execute('select *from users')ret = cursor.fetchall()print(ret)  # ((1, 'liuyuqinhellohello'), (2, 'thanlon'), (3, 'thanlon'), (7, 'thanlon'))cursor.close()conn.close()if __name__ == '__main__':t = Thread(target=task)t.start()

Flask入门到实战相关推荐

  1. 《Flask Web开发实战:入门、进阶与原理解析》读书笔记

    写在前面 学docker编排,有一个用Flask框架的Demo,感觉挺方便,所以学习下 基于<Flask Web开发实战:入门.进阶与原理解析>做的读书笔记 个人还是比较喜欢看书,看书的话 ...

  2. Flask入门之Jinjia模板的一些语法

    原文:https://www.cnblogs.com/wongbingming/p/6807771.html Flask入门之Jinjia模板的一些语法 1. 变量表示 {{ argv }} 2. 赋 ...

  3. 《Docker技术入门与实战》pdf

    下载地址:网盘下载 内容简介  · · · · · · [编辑推荐] <Docker技术入门与实战>是中国首部Docker著作,一线Docker先驱实战经验结晶,来自IBM和新浪等多位技术 ...

  4. 【Python全栈开发从入门到实战】持续更新中......

    本专栏为Python全栈开发系列文章,技术包括Python基础.函数.文件.面向对象.网络编程.并发编程.MySQL数据库.HTML.JavaScript.CSS.JQuery.bootstrap.W ...

  5. 《Go语言从入门到实战》学习笔记(1)——Go语言学习路线图、简介

    非常有幸在<极客时间>上看到<Go语言从入门到实战>这门课程,本课程的作者给出了较为详细的学习路线图,具体如下: 学习路线图  学习目的 个人学习的目的主要是了解Go语言的基本 ...

  6. PyTorch深度学习入门与实战(案例视频精讲)

    作者:孙玉林,余本国 著 出版社:中国水利水电出版社 品牌:智博尚书 出版时间:2020-07-01 PyTorch深度学习入门与实战(案例视频精讲)

  7. 7-Python3从入门到实战—基础之数据类型(字典-Dictionary)

    Python从入门到实战系列--目录 字典的定义 字典是另一种可变容器模型,且可存储任意类型对象:使用键-值(key-value)存储,具有极快的查找速度: 字典的每个键值(key=>value ...

  8. 《Android 开发入门与实战(第二版)》——6.6节配置改变

    本节书摘来自异步社区<Android 开发入门与实战(第二版)>一书中的第6章,第6.6节配置改变,作者eoe移动开发者社区 组编 , 姚尚朗 , 靳岩,更多章节内容可以访问云栖社区&qu ...

  9. 【前端开发】HTML入门与实战

    [什么是HTML]: HTML: 超文本标记语言,标准通用标记语言下的一个应用. "超文本"就是指页面内可以包含图片.链接,甚至音乐.程序等非文字元素. HTML 是用来描述网页的 ...

最新文章

  1. MyEclipse中运行环境jre、编译级别、tomcat运行环境区别
  2. Plitch for the final Feb 16
  3. zcmu-1783(01字典树)
  4. 红旗linux怎么更新,红旗linux7.0下自动更新firefox
  5. 妙用Python集合求解啤酒问题(携程2016笔试题)
  6. pandas小记:pandas索引和选择
  7. C#给图片加文字水印
  8. web of knowledge分析文献引用情况(引)
  9. 《Adobe Fireworks CS5中文版经典教程》——1.2 工具面板
  10. MyQQ:可以在终端里面上的QQ
  11. Nordic 52832工程报错undefined reference to `__start_pwr_mgmt_data'解决办法
  12. 华为p20支持手机云闪付吗_华为哪些手机支持云闪付
  13. [工业互联-2]:工业有线互联总线之CAN总线
  14. [附源码]JAVA+ssm计算机毕业设计房屋租赁管理系统设计(程序+Lw)
  15. oracle-DDL对表的操作
  16. 一次 WebResource.axd 异常处理经历
  17. JQuery修改对象的属性值
  18. 函数极限的24种定义
  19. Ubuntu硬盘挂载
  20. 读LEO《程序员羊皮卷》心得笔记

热门文章

  1. 什么是5G advanced
  2. 写代码赚钱的一些门路
  3. 阿里云国际版如何使用NGINX作为HTTPS转发代理服务器
  4. Caché SQL 高性能优化
  5. AcWing寒假每日一题2058. 笨拙的手指
  6. Fractions (水)
  7. STM32单片机接直流减速电机编码器注意点
  8. 修改登录页面Login
  9. 200行Go代码实现自己的区块链——区块生成与网络通信
  10. SDL_Image,d3d9与OpenGL Shader混用方法(一)