Flask简介

Flask是一个“微” python web 框架,只实现了最基本的功能,简单、灵活、可拓展性强。

  • 简单:使用简单,通过 @app.route() 装饰器即可配置路由
  • 灵活:配置灵活,支持多种不同类型的配置方式。可自定义
    1. 使用dict对象
    2. 使用环境变量 app.config.from_envvar()
    3. 使用配置文件 app.config.from_file() 包括json、toml
  • 可拓展性强:可自定义其他部分的组件,如 flask-SQLAlchemy、flask-session

依赖

flask依赖的核心组件有2个: werkzeugjinjajinja 是一个模版引擎,负责模版的渲染,werkzeug 是一个 WSGI 工具集的库

启动流程

在Flask的官方文档中给了一个 ‘Hello World’ 启动案例,我们本地启动一下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
from flask import Flask

app = Flask(__name__)

@app.route("/")
def hello_world():
    return "<p>Hello, World!</p>"

if __name__ == '__main__':
    app.run("127.0.0.1", 5000)

我们可以看出来,启动的时候首先调用了 app.run() 方法,我们来看一下这个方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class Flask(Scaffold):
    def run(
        self,
        host: t.Optional[str] = None,
        port: t.Optional[int] = None,
        debug: t.Optional[bool] = None,
        load_dotenv: bool = True,
        **options: t.Any,
    ) -> None:

        # Change this into a no-op if the server is invoked from the
        # command line. Have a look at cli.py for more information.
        if os.environ.get("FLASK_RUN_FROM_CLI") == "true":
            from .debughelpers import explain_ignored_app_run

            explain_ignored_app_run()
            return

        if get_load_dotenv(load_dotenv):
            cli.load_dotenv()

            # if set, let env vars override previous values
            if "FLASK_ENV" in os.environ:
                self.env = get_env()
                self.debug = get_debug_flag()
            elif "FLASK_DEBUG" in os.environ:
                self.debug = get_debug_flag()

        # debug passed to method overrides all other sources
        if debug is not None:
            self.debug = bool(debug)

        server_name = self.config.get("SERVER_NAME")
        sn_host = sn_port = None

        if server_name:
            sn_host, _, sn_port = server_name.partition(":")

        if not host:
            if sn_host:
                host = sn_host
            else:
                host = "127.0.0.1"

        if port or port == 0:
            port = int(port)
        elif sn_port:
            port = int(sn_port)
        else:
            port = 5000

        options.setdefault("use_reloader", self.debug)
        options.setdefault("use_debugger", self.debug)
        options.setdefault("threaded", True)

        cli.show_server_banner(self.env, self.debug, self.name, False)

        from werkzeug.serving import run_simple

        try:
            run_simple(t.cast(str, host), port, self, **options)
        finally:
            # reset the first request information if the development server
            # reset normally.  This makes it possible to restart the server
            # without reloader and that stuff from an interactive shell.
            self._got_first_request = False

我们可以看到,前面一部分主要是处理一些启动参数。在参数处理完成之后,调用了 werkzeug.serving 中的 run_simple 函数,能看到这个函数传入了一个 self 参数,即我们的 flask application,我们再进入这个 run_simple 当中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# werkzeug相关代码
def run_simple(
    hostname: str,
    port: int,
    application: "WSGIApplication",
    use_reloader: bool = False,
    use_debugger: bool = False,
    use_evalex: bool = True,
    extra_files: t.Optional[t.Iterable[str]] = None,
    exclude_patterns: t.Optional[t.Iterable[str]] = None,
    reloader_interval: int = 1,
    reloader_type: str = "auto",
    threaded: bool = False,
    processes: int = 1,
    request_handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
    static_files: t.Optional[t.Dict[str, t.Union[str, t.Tuple[str, str]]]] = None,
    passthrough_errors: bool = False,
    ssl_context: t.Optional[_TSSLContextArg] = None,
) -> None:

    if not isinstance(port, int):
        raise TypeError("port must be an integer")

    if static_files:
        from .middleware.shared_data import SharedDataMiddleware
        # 静态文件的加载
        application = SharedDataMiddleware(application, static_files)

    if use_debugger:
        from .debug import DebuggedApplication
        # Debug调试
        application = DebuggedApplication(application, evalex=use_evalex)

    if not is_running_from_reloader():
        # 启动前 socket 的准备和检查 ,使用IPV4还是IPV6、端口是否占用
        s = prepare_socket(hostname, port)
        fd = s.fileno()
        os.environ["WERKZEUG_SERVER_FD"] = str(fd)
    else:
        fd = int(os.environ["WERKZEUG_SERVER_FD"])
    
    srv = make_server(
        hostname,
        port,
        application,
        threaded,
        processes,
        request_handler,
        passthrough_errors,
        ssl_context,
        fd=fd,
    )

    if not is_running_from_reloader():
        srv.log_startup()

    if use_reloader:
        from ._reloader import run_with_reloader

        run_with_reloader(
            srv.serve_forever,
            extra_files=extra_files,
            exclude_patterns=exclude_patterns,
            interval=reloader_interval,
            reloader_type=reloader_type,
        )
    else:
        srv.serve_forever()

从上面的代码我们可以看到有两处关键的函数调用,一处是 make_server ,另外一处是 serve_forever 。我们先看 make_server 做了什么:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# werkzeug相关代码
def make_server(
    host: str,
    port: int,
    app: "WSGIApplication",
    threaded: bool = False,
    processes: int = 1,
    request_handler: t.Optional[t.Type[WSGIRequestHandler]] = None,
    passthrough_errors: bool = False,
    ssl_context: t.Optional[_TSSLContextArg] = None,
    fd: t.Optional[int] = None,
) -> BaseWSGIServer:
    if threaded and processes > 1:
        raise ValueError("Cannot have a multi-thread and multi-process server.")

    if threaded:
        # ThreadedWSGIServer 继承了 BaseWSGIServer
        return ThreadedWSGIServer(
            host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
        )

    if processes > 1:
        # ForkingWSGIServer 继承了 BaseWSGIServer
        return ForkingWSGIServer(
            host,
            port,
            app,
            processes,
            request_handler,
            passthrough_errors,
            ssl_context,
            fd=fd,
        )

    return BaseWSGIServer(
        host, port, app, request_handler, passthrough_errors, ssl_context, fd=fd
    )

能看到这里是返回了一个 BaseWSGIServer 实例化对象。BaseWSGIServer 在初始化的时候主要做了 两件事情,一是 绑定IP和端口以及监听;二是添加 WSGIRequestHandler 这个请求处理器。

我们再来看看 serve_forever 做了什么:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
# werkzeug相关代码
class BaseWSGIServer(HTTPServer):

    def serve_forever(self, poll_interval: float = 0.5) -> None:
        try:
            super().serve_forever(poll_interval=poll_interval)
        except KeyboardInterrupt:
            pass
        finally:
            self.server_close()

再来进一步看看父类的 serve_forever 做了什么:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# werkzeug相关代码
class BaseServer:

    def serve_forever(self, poll_interval=0.5):
        """Handle one request at a time until shutdown.

        Polls for shutdown every poll_interval seconds. Ignores
        self.timeout. If you need to do periodic tasks, do them in
        another thread.
        """
        self.__is_shut_down.clear()
        try:
            # XXX: Consider using another file descriptor or connecting to the
            # socket to wake this up instead of polling. Polling reduces our
            # responsiveness to a shutdown request and wastes cpu at all other
            # times.
            # I/O多路复用选择器,有poll时选poll
            with _ServerSelector() as selector:
                selector.register(self, selectors.EVENT_READ)

                while not self.__shutdown_request:
                    # 轮询
                    ready = selector.select(poll_interval)
                    # bpo-35017: shutdown() called during select(), exit immediately.
                    if self.__shutdown_request:
                        break
                    if ready:
                        self._handle_request_noblock()

                    self.service_actions()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

我们可以看到的是直接跳到了 BaseServer 这个类,这因为 BaseWSGIServer –> HTTPServer –> TCPServer –> BaseServer 他们之间有继承关系。此处选用了 PollSelector 作为 I/O多路复用选择器,关于I/O多路复用就不详说了。当有活跃的文件描述符时,执行 _handle_request_noblock()方法,到了这里,我们的应用启动并开始监听相关的端口了。

请求处理过程

在上面我们已经说到了应用的启动过程,接下来看一看当接收到请求之后是怎么处理的。 上面说到,当有活跃的文件描述符会执行 _handle_request_noblock() 方法,来看一下这个方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# werkzeug相关代码
class BaseServer:

    def _handle_request_noblock(self):
        """Handle one request, without blocking.

        I assume that selector.select() has returned that the socket is
        readable before this function was called, so there should be no risk of
        blocking in get_request().
        """
        try:
            request, client_address = self.get_request()
        except OSError:
            return
        if self.verify_request(request, client_address):
            try:
                self.process_request(request, client_address)
            except Exception:
                self.handle_error(request, client_address)
                self.shutdown_request(request)
            except:
                self.shutdown_request(request)
                raise
        else:
            self.shutdown_request(request)

在这段代码里我们可以看出来,这里是调用了 process_request 方法去处理接收到的request,这个时候就到了我们前面提到的 WSGIRequestHandler 这个请求处理器了。 WSGIRequestHandlerBaseRequestHandler 的子类,看看初始化相关的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
# werkzeug相关代码
class BaseRequestHandler:

    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):
        pass

    def handle(self):
        pass

    def finish(self):
        pass

从上面的代码中可以看到,初始化时调用了 handle 方法,在 handle 方法里是调用了 handle_one_request 方法,我们来看看这个方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# werkzeug相关代码
class BaseHTTPRequestHandler(socketserver.StreamRequestHandler):

    def handle_one_request(self):
        """Handle a single HTTP request.

        You normally don't need to override this method; see the class
        __doc__ string for information on how to handle specific HTTP
        commands such as GET and POST.

        """
        try:
            self.raw_requestline = self.rfile.readline(65537)
            if len(self.raw_requestline) > 65536:
                self.requestline = ''
                self.request_version = ''
                self.command = ''
                self.send_error(HTTPStatus.REQUEST_URI_TOO_LONG)
                return
            if not self.raw_requestline:
                self.close_connection = True
                return
            if not self.parse_request():
                # An error code has been sent, just exit
                return
            mname = 'do_' + self.command
            if not hasattr(self, mname):
                self.send_error(
                    HTTPStatus.NOT_IMPLEMENTED,
                    "Unsupported method (%r)" % self.command)
                return
            method = getattr(self, mname)
            method()
            self.wfile.flush() #actually send the response if not already done.
        except socket.timeout as e:
            #a read or a write timed out.  Discard this connection
            self.log_error("Request timed out: %r", e)
            self.close_connection = True
            return

可以看到关键的地方是这个 method 方法的调用,而这个 method 其实就是 do_GET、do_POST 等方法。通过 debug 看到,这个方法和 WSGIRequestHandler.run_wsgi 绑定了,即调用了 WSGIRequestHandler.run_wsgi 方法,这是很关键的一个方法:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# werkzeug相关代码
class WSGIRequestHandler(BaseHTTPRequestHandler):
    def run_wsgi(self) -> None:
        if self.headers.get("Expect", "").lower().strip() == "100-continue":
            self.wfile.write(b"HTTP/1.1 100 Continue\r\n\r\n")

        self.environ = environ = self.make_environ()
        status_set: t.Optional[str] = None
        headers_set: t.Optional[t.List[t.Tuple[str, str]]] = None
        status_sent: t.Optional[str] = None
        headers_sent: t.Optional[t.List[t.Tuple[str, str]]] = None
        chunk_response: bool = False

        def write(data: bytes) -> None:
            nonlocal status_sent, headers_sent, chunk_response
            assert status_set is not None, "write() before start_response"
            assert headers_set is not None, "write() before start_response"
            if status_sent is None:
                status_sent = status_set
                headers_sent = headers_set
                try:
                    code_str, msg = status_sent.split(None, 1)
                except ValueError:
                    code_str, msg = status_sent, ""
                code = int(code_str)
                self.send_response(code, msg)
                header_keys = set()
                for key, value in headers_sent:
                    self.send_header(key, value)
                    header_keys.add(key.lower())

                # Use chunked transfer encoding if there is no content
                # length. Do not use for 1xx and 204 responses. 304
                # responses and HEAD requests are also excluded, which
                # is the more conservative behavior and matches other
                # parts of the code.
                # https://httpwg.org/specs/rfc7230.html#rfc.section.3.3.1
                if (
                    not (
                        "content-length" in header_keys
                        or environ["REQUEST_METHOD"] == "HEAD"
                        or (100 <= code < 200)
                        or code in {204, 304}
                    )
                    and self.protocol_version >= "HTTP/1.1"
                ):
                    chunk_response = True
                    self.send_header("Transfer-Encoding", "chunked")

                # Always close the connection. This disables HTTP/1.1
                # keep-alive connections. They aren't handled well by
                # Python's http.server because it doesn't know how to
                # drain the stream before the next request line.
                self.send_header("Connection", "close")
                self.end_headers()

            assert isinstance(data, bytes), "applications must write bytes"

            if data:
                if chunk_response:
                    self.wfile.write(hex(len(data))[2:].encode())
                    self.wfile.write(b"\r\n")

                self.wfile.write(data)

                if chunk_response:
                    self.wfile.write(b"\r\n")

            self.wfile.flush()

        def start_response(status, headers, exc_info=None):  # type: ignore
            nonlocal status_set, headers_set
            if exc_info:
                try:
                    if headers_sent:
                        raise exc_info[1].with_traceback(exc_info[2])
                finally:
                    exc_info = None
            elif headers_set:
                raise AssertionError("Headers already set")
            status_set = status
            headers_set = headers
            return write

        def execute(app: "WSGIApplication") -> None:
            application_iter = app(environ, start_response)
            try:
                for data in application_iter:
                    write(data)
                if not headers_sent:
                    write(b"")
                if chunk_response:
                    self.wfile.write(b"0\r\n\r\n")
            finally:
                if hasattr(application_iter, "close"):
                    application_iter.close()  # type: ignore

        try:
            execute(self.server.app)
        except (ConnectionError, socket.timeout) as e:
            self.connection_dropped(e, environ)
        except Exception as e:
            if self.server.passthrough_errors:
                raise

            if status_sent is not None and chunk_response:
                self.close_connection = True

            try:
                # if we haven't yet sent the headers but they are set
                # we roll back to be able to set them again.
                if status_sent is None:
                    status_set = None
                    headers_set = None
                execute(InternalServerError())
            except Exception:
                pass

            from .debug.tbtools import DebugTraceback

            msg = DebugTraceback(e).render_traceback_text()
            self.server.log("error", f"Error on request:\n{msg}")

这段代码里是 execute(self.server.app) 这一段,而这个 server.app 其实就是我们传进来的 flask application了,通过 application_iter = app(environ, start_response) 这一行调用了 Flask 的 __call__ 方法。上面涉及到了很多 werkzeug 相关的代码,然后在这里又回到了 flask。 来看一看这个方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Flask相关代码
class Flask(Scaffold):

    def __call__(self, environ: dict, start_response: t.Callable) -> t.Any:
        """The WSGI server calls the Flask application object as the
        WSGI application. This calls :meth:`wsgi_app`, which can be
        wrapped to apply middleware.
        """
        return self.wsgi_app(environ, start_response)

    def wsgi_app(self, environ: dict, start_response: t.Callable) -> t.Any:
        # 创建请求的上下文
        ctx = self.request_context(environ)
        error: t.Optional[BaseException] = None
        try:
            try:
                ctx.push()
                response = self.full_dispatch_request()
            except Exception as e:
                error = e
                response = self.handle_exception(e)
            except:  # noqa: B001
                error = sys.exc_info()[1]
                raise
            return response(environ, start_response)
        finally:
            if self.should_ignore_error(error):
                error = None
            ctx.auto_pop(error)

这段代码首先是创建请求的上下文,然后通过 self.full_dispatch_request() 方法分发请求,我们再来看看 full_dispatch_request 的具体代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Flask(Scaffold):

    def full_dispatch_request(self) -> Response:
        # Run before_first_request functions if this is the thread's first request.
        # Inlined to avoid a method call on subsequent requests.
        # This is deprecated, will be removed in Flask 2.3.
        if not self._got_first_request:
            with self._before_request_lock:
                if not self._got_first_request:
                    for func in self.before_first_request_funcs:
                        self.ensure_sync(func)()

                    self._got_first_request = True

        try:
            request_started.send(self)
            rv = self.preprocess_request()
            if rv is None:
                rv = self.dispatch_request()
        except Exception as e:
            rv = self.handle_user_exception(e)
        return self.finalize_request(rv)


    def dispatch_request(self) -> ft.ResponseReturnValue:
        req = request_ctx.request
        if req.routing_exception is not None:
            self.raise_routing_exception(req)
        rule: Rule = req.url_rule  # type: ignore[assignment]
        # if we provide automatic options for this URL and the
        # request came with the OPTIONS method, reply automatically
        if (
            getattr(rule, "provide_automatic_options", False)
            and req.method == "OPTIONS"
        ):
            return self.make_default_options_response()
        # otherwise dispatch to the handler for that endpoint
        view_args: t.Dict[str, t.Any] = req.view_args  # type: ignore[assignment]
        return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)

能看的到这里 dispatch_request 分发请求,使用 handle_user_exception 处理分发过程中的异常处理。在 dispatch_request 当中,根据url路径,找到相应的方法,并调用,代码为 self.ensure_sync(self.view_functions[rule.endpoint])(**req.view_args)。我们再来看一看 self.finalize_request(rv) 做了什么:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# Flask相关代码
class Flask(Scaffold):

    def finalize_request(
        self,
        rv: t.Union[ResponseReturnValue, HTTPException],
        from_error_handler: bool = False,
    ) -> Response:
        response = self.make_response(rv)
        try:
            response = self.process_response(response)
            request_finished.send(self, response=response)
        except Exception:
            if not from_error_handler:
                raise
            self.logger.exception(
                "Request finalizing failed with an error while handling an error"
            )
        return response

这里的 finalize_request() 就是把 view_functions 返回的结果组装成 Response 并返回,在其中处理了 status、headers、session 等,到此我们的请求处理就结束了。