Flask简介#
Flask是一个“微” python web 框架,只实现了最基本的功能,简单、灵活、可拓展性强。
- 简单:使用简单,通过
@app.route()
装饰器即可配置路由
- 灵活:配置灵活,支持多种不同类型的配置方式。可自定义
- 使用dict对象
- 使用环境变量
app.config.from_envvar()
- 使用配置文件
app.config.from_file()
包括json、toml
- 可拓展性强:可自定义其他部分的组件,如
flask-SQLAlchemy、flask-session
等
flask依赖的核心组件有2个: werkzeug
和 jinja
。
jinja
是一个模版引擎,负责模版的渲染,werkzeug
是一个 WSGI 工具集的库
启动流程#
在Flask的官方文档中给了一个 ‘Hello World’ 启动案例,我们本地启动一下
1
2
3
4
5
6
7
8
9
10
|
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello_world():
return "<p>Hello, World!</p>"
if __name__ == '__main__':
app.run("127.0.0.1", 5000)
|
我们可以看出来,启动的时候首先调用了 app.run()
方法,我们来看一下这个方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
class Flask(Scaffold):
def run(
self,
host: t.Optional[str] = None,
port: t.Optional[int] = None,
debug: t.Optional[bool] = None,
load_dotenv: bool = True,
**options: t.Any,
) -> None:
# Change this into a no-op if the server is invoked from the
# command line. Have a look at cli.py for more information.
if os.environ.get("FLASK_RUN_FROM_CLI") == "true":
from .debughelpers import explain_ignored_app_run
explain_ignored_app_run()
return
if get_load_dotenv(load_dotenv):
cli.load_dotenv()
# if set, let env vars override previous values
if "FLASK_ENV" in os.environ:
self.env = get_env()
self.debug = get_debug_flag()
elif "FLASK_DEBUG" in os.environ:
self.debug = get_debug_flag()
# debug passed to method overrides all other sources
if debug is not None:
self.debug = bool(debug)
server_name = self.config.get("SERVER_NAME")
sn_host = sn_port = None
if server_name:
sn_host, _, sn_port = server_name.partition(":")
if not host:
if sn_host:
host = sn_host
else:
host = "127.0.0.1"
if port or port == 0:
port = int(port)
elif sn_port:
port = int(sn_port)
else:
port = 5000
options.setdefault("use_reloader", self.debug)
options.setdefault("use_debugger", self.debug)
options.setdefault("threaded", True)
cli.show_server_banner(self.env, self.debug, self.name, False)
from werkzeug.serving import run_simple
try:
run_simple(t.cast(str, host), port, self, **options)
finally:
# reset the first request information if the development server
# reset normally. This makes it possible to restart the server
# without reloader and that stuff from an interactive shell.
self._got_first_request = False
|
我们可以看到,前面一部分主要是处理一些启动参数。在参数处理完成之后,调用了 werkzeug.serving
中的 run_simple
函数,能看到这个函数传入了一个 self
参数,即我们的 flask application,我们再进入这个 run_simple
当中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
# werkzeug相关代码
def run_simple(
hostname: str,
port: int,
application: "WSGIApplication",
use_reloader: bool = False,
use_debugger: bool = False,
use_evalex: bool = True,
extra_files: t.Optional[t.Iterable[str]] = None,
exclude_patterns: t.Optional[t.Iterable[str]] = None,
reloader_interval: int = 1,
reloader_type: str = "auto",
threaded: bool = False,
processes: int = 1,
request_handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
static_files: t.Optional[t.Dict[str, t.Union[str, t.Tuple[str, str]]]] = None,
passthrough_errors: bool = False,
ssl_context: t.Optional[_TSSLContextArg] = None,
) -> None:
if not isinstance(port, int):
raise TypeError("port must be an integer")
if static_files:
from .middleware.shared_data import SharedDataMiddleware
# 静态文件的加载
application = SharedDataMiddleware(application, static_files)
if use_debugger:
from .debug import DebuggedApplication
# Debug调试
application = DebuggedApplication(application, evalex=use_evalex)
if not is_running_from_reloader():
# 启动前 socket 的准备和检查 ,使用IPV4还是IPV6、端口是否占用
s = prepare_socket(hostname, port)
fd = s.fileno()
os.environ["WERKZEUG_SERVER_FD"] = str(fd)
else:
fd = int(os.environ["WERKZEUG_SERVER_FD"])
srv = make_server(
hostname,
port,
application,
threaded,
processes,
request_handler,
passthrough_errors,
ssl_context,
fd=fd,
)
if not is_running_from_reloader():
srv.log_startup()
if use_reloader:
from ._reloader import run_with_reloader
run_with_reloader(
srv.serve_forever,
extra_files=extra_files,
exclude_patterns=exclude_patterns,
interval=reloader_interval,
reloader_type=reloader_type,
)
else:
srv.serve_forever()
|
从上面的代码我们可以看到有两处关键的函数调用,一处是 make_server
,另外一处是 serve_forever
。我们先看 make_server
做了什么:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
# werkzeug相关代码
def make_server(
host: str,
port: int,
app: "WSGIApplication",
threaded: bool = False,
processes: int = 1,
request_handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
passthrough_errors: bool = False,
ssl_context: t.Optional[_TSSLContextArg] = None,
fd: t.Optional[int] = None,
) -> BaseWSGIServer:
if threaded and processes > 1:
raise ValueError("Cannot have a multi-thread and multi-process server.")
if threaded:
# ThreadedWSGIServer 继承了 BaseWSGIServer
return ThreadedWSGIServer(
host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
)
if processes > 1:
# ForkingWSGIServer 继承了 BaseWSGIServer
return ForkingWSGIServer(
host,
port,
app,
processes,
request_handler,
passthrough_errors,
ssl_context,
fd=fd,
)
return BaseWSGIServer(
host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
)
|
能看到这里是返回了一个 BaseWSGIServer
实例化对象。BaseWSGIServer 在初始化的时候主要做了
两件事情,一是 绑定IP和端口以及监听
;二是添加 WSGIRequestHandler
这个请求处理器。
我们再来看看 serve_forever
做了什么:
1
2
3
4
5
6
7
8
9
10
|
# werkzeug相关代码
class BaseWSGIServer(HTTPServer):
def serve_forever(self, poll_interval: float = 0.5) -> None:
try:
super().serve_forever(poll_interval=poll_interval)
except KeyboardInterrupt:
pass
finally:
self.server_close()
|
再来进一步看看父类的 serve_forever
做了什么:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
# werkzeug相关代码
class BaseServer:
def serve_forever(self, poll_interval=0.5):
"""Handle one request at a time until shutdown.
Polls for shutdown every poll_interval seconds. Ignores
self.timeout. If you need to do periodic tasks, do them in
another thread.
"""
self.__is_shut_down.clear()
try:
# XXX: Consider using another file descriptor or connecting to the
# socket to wake this up instead of polling. Polling reduces our
# responsiveness to a shutdown request and wastes cpu at all other
# times.
# I/O多路复用选择器,有poll时选poll
with _ServerSelector() as selector:
selector.register(self, selectors.EVENT_READ)
while not self.__shutdown_request:
# 轮询
ready = selector.select(poll_interval)
# bpo-35017: shutdown() called during select(), exit immediately.
if self.__shutdown_request:
break
if ready:
self._handle_request_noblock()
self.service_actions()
finally:
self.__shutdown_request = False
self.__is_shut_down.set()
|
我们可以看到的是直接跳到了 BaseServer
这个类,这因为 BaseWSGIServer
–> HTTPServer
–> TCPServer
–> BaseServer
他们之间有继承关系。此处选用了 PollSelector
作为 I/O多路复用选择器,关于I/O多路复用就不详说了。当有活跃的文件描述符时,执行 _handle_request_noblock()
方法,到了这里,我们的应用启动并开始监听相关的端口了。
请求处理过程#
在上面我们已经说到了应用的启动过程,接下来看一看当接收到请求之后是怎么处理的。
上面说到,当有活跃的文件描述符会执行 _handle_request_noblock()
方法,来看一下这个方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# werkzeug相关代码
class BaseServer:
def _handle_request_noblock(self):
"""Handle one request, without blocking.
I assume that selector.select() has returned that the socket is
readable before this function was called, so there should be no risk of
blocking in get_request().
"""
try:
request, client_address = self.get_request()
except OSError:
return
if self.verify_request(request, client_address):
try:
self.process_request(request, client_address)
except Exception:
self.handle_error(request, client_address)
self.shutdown_request(request)
except:
self.shutdown_request(request)
raise
else:
self.shutdown_request(request)
|
在这段代码里我们可以看出来,这里是调用了 process_request
方法去处理接收到的request,这个时候就到了我们前面提到的 WSGIRequestHandler
这个请求处理器了。 WSGIRequestHandler
是 BaseRequestHandler
的子类,看看初始化相关的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
# werkzeug相关代码
class BaseRequestHandler:
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self):
pass
def handle(self):
pass
def finish(self):
pass
|
从上面的代码中可以看到,初始化时调用了 handle
方法,在 handle
方法里是调用了 handle_one_request
方法,我们来看看这个方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
# werkzeug相关代码
class BaseHTTPRequestHandler(socketserver.StreamRequestHandler):
def handle_one_request(self):
"""Handle a single HTTP request.
You normally don't need to override this method; see the class
__doc__ string for information on how to handle specific HTTP
commands such as GET and POST.
"""
try:
self.raw_requestline = self.rfile.readline(65537)
if len(self.raw_requestline) > 65536:
self.requestline = ''
self.request_version = ''
self.command = ''
self.send_error(HTTPStatus.REQUEST_URI_TOO_LONG)
return
if not self.raw_requestline:
self.close_connection = True
return
if not self.parse_request():
# An error code has been sent, just exit
return
mname = 'do_' + self.command
if not hasattr(self, mname):
self.send_error(
HTTPStatus.NOT_IMPLEMENTED,
"Unsupported method (%r)" % self.command)
return
method = getattr(self, mname)
method()
self.wfile.flush() #actually send the response if not already done.
except socket.timeout as e:
#a read or a write timed out. Discard this connection
self.log_error("Request timed out: %r", e)
self.close_connection = True
return
|
可以看到关键的地方是这个 method
方法的调用,而这个 method
其实就是 do_GET、do_POST
等方法。通过 debug 看到,这个方法和 WSGIRequestHandler.run_wsgi
绑定了,即调用了 WSGIRequestHandler.run_wsgi
方法,这是很关键的一个方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
# werkzeug相关代码
class WSGIRequestHandler(BaseHTTPRequestHandler):
def run_wsgi(self) -> None:
if self.headers.get("Expect", "").lower().strip() == "100-continue":
self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n")
self.environ = environ = self.make_environ()
status_set: t.Optional[str] = None
headers_set: t.Optional[t.List[t.Tuple[str, str]]] = None
status_sent: t.Optional[str] = None
headers_sent: t.Optional[t.List[t.Tuple[str, str]]] = None
chunk_response: bool = False
def write(data: bytes) -> None:
nonlocal status_sent, headers_sent, chunk_response
assert status_set is not None, "write() before start_response"
assert headers_set is not None, "write() before start_response"
if status_sent is None:
status_sent = status_set
headers_sent = headers_set
try:
code_str, msg = status_sent.split(None, 1)
except ValueError:
code_str, msg = status_sent, ""
code = int(code_str)
self.send_response(code, msg)
header_keys = set()
for key, value in headers_sent:
self.send_header(key, value)
header_keys.add(key.lower())
# Use chunked transfer encoding if there is no content
# length. Do not use for 1xx and 204 responses. 304
# responses and HEAD requests are also excluded, which
# is the more conservative behavior and matches other
# parts of the code.
# https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1
if (
not (
"content-length" in header_keys
or environ["REQUEST_METHOD"] == "HEAD"
or (100 <= code < 200)
or code in {204, 304}
)
and self.protocol_version >= "HTTP/1.1"
):
chunk_response = True
self.send_header("Transfer-Encoding", "chunked")
# Always close the connection. This disables HTTP/1.1
# keep-alive connections. They aren't handled well by
# Python's http.server because it doesn't know how to
# drain the stream before the next request line.
self.send_header("Connection", "close")
self.end_headers()
assert isinstance(data, bytes), "applications must write bytes"
if data:
if chunk_response:
self.wfile.write(hex(len(data))[2:].encode())
self.wfile.write(b"\r\n")
self.wfile.write(data)
if chunk_response:
self.wfile.write(b"\r\n")
self.wfile.flush()
def start_response(status, headers, exc_info=None): # type: ignore
nonlocal status_set, headers_set
if exc_info:
try:
if headers_sent:
raise exc_info[1].with_traceback(exc_info[2])
finally:
exc_info = None
elif headers_set:
raise AssertionError("Headers already set")
status_set = status
headers_set = headers
return write
def execute(app: "WSGIApplication") -> None:
application_iter = app(environ, start_response)
try:
for data in application_iter:
write(data)
if not headers_sent:
write(b"")
if chunk_response:
self.wfile.write(b"0\r\n\r\n")
finally:
if hasattr(application_iter, "close"):
application_iter.close() # type: ignore
try:
execute(self.server.app)
except (ConnectionError, socket.timeout) as e:
self.connection_dropped(e, environ)
except Exception as e:
if self.server.passthrough_errors:
raise
if status_sent is not None and chunk_response:
self.close_connection = True
try:
# if we haven't yet sent the headers but they are set
# we roll back to be able to set them again.
if status_sent is None:
status_set = None
headers_set = None
execute(InternalServerError())
except Exception:
pass
from .debug.tbtools import DebugTraceback
msg = DebugTraceback(e).render_traceback_text()
self.server.log("error", f"Error on request:\n{msg}")
|
这段代码里是 execute(self.server.app)
这一段,而这个 server.app
其实就是我们传进来的 flask application了,通过 application_iter = app(environ, start_response)
这一行调用了 Flask 的 __call__
方法。上面涉及到了很多 werkzeug
相关的代码,然后在这里又回到了 flask
。 来看一看这个方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
# Flask相关代码
class Flask(Scaffold):
def __call__(self, environ: dict, start_response: t.Callable) -> t.Any:
"""The WSGI server calls the Flask application object as the
WSGI application. This calls :meth:`wsgi_app`, which can be
wrapped to apply middleware.
"""
return self.wsgi_app(environ, start_response)
def wsgi_app(self, environ: dict, start_response: t.Callable) -> t.Any:
# 创建请求的上下文
ctx = self.request_context(environ)
error: t.Optional[BaseException] = None
try:
try:
ctx.push()
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.handle_exception(e)
except: # noqa: B001
error = sys.exc_info()[1]
raise
return response(environ, start_response)
finally:
if self.should_ignore_error(error):
error = None
ctx.auto_pop(error)
|
这段代码首先是创建请求的上下文,然后通过 self.full_dispatch_request()
方法分发请求,我们再来看看 full_dispatch_request
的具体代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
class Flask(Scaffold):
def full_dispatch_request(self) -> Response:
# Run before_first_request functions if this is the thread's first request.
# Inlined to avoid a method call on subsequent requests.
# This is deprecated, will be removed in Flask 2.3.
if not self._got_first_request:
with self._before_request_lock:
if not self._got_first_request:
for func in self.before_first_request_funcs:
self.ensure_sync(func)()
self._got_first_request = True
try:
request_started.send(self)
rv = self.preprocess_request()
if rv is None:
rv = self.dispatch_request()
except Exception as e:
rv = self.handle_user_exception(e)
return self.finalize_request(rv)
def dispatch_request(self) -> ft.ResponseReturnValue:
req = request_ctx.request
if req.routing_exception is not None:
self.raise_routing_exception(req)
rule: Rule = req.url_rule # type: ignore[assignment]
# if we provide automatic options for this URL and the
# request came with the OPTIONS method, reply automatically
if (
getattr(rule, "provide_automatic_options", False)
and req.method == "OPTIONS"
):
return self.make_default_options_response()
# otherwise dispatch to the handler for that endpoint
view_args: t.Dict[str, t.Any] = req.view_args # type: ignore[assignment]
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
|
能看的到这里 dispatch_request
分发请求,使用 handle_user_exception
处理分发过程中的异常处理。在 dispatch_request
当中,根据url路径,找到相应的方法,并调用,代码为 self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)
。我们再来看一看 self.finalize_request(rv)
做了什么:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
# Flask相关代码
class Flask(Scaffold):
def finalize_request(
self,
rv: t.Union[ResponseReturnValue, HTTPException],
from_error_handler: bool = False,
) -> Response:
response = self.make_response(rv)
try:
response = self.process_response(response)
request_finished.send(self, response=response)
except Exception:
if not from_error_handler:
raise
self.logger.exception(
"Request finalizing failed with an error while handling an error"
)
return response
|
这里的 finalize_request()
就是把 view_functions
返回的结果组装成 Response 并返回,在其中处理了 status、headers、session
等,到此我们的请求处理就结束了。