Skip to content

wg_utilities.api

API related entities.

TempAuthServer

Temporary Flask server for auth flows.

This allows the auth code to be retrieved without manual intervention

Parameters:

Name Type Description Default
name str

the name of the Flask application package (i.e. name)

required
host str

the hostname to listen on

'localhost'
port int

the port of the webserver

0
debug bool

if given, enable or disable debug mode

False
auto_run bool

automatically run the server on instantiation

False
Source code in wg_utilities/api/temp_auth_server.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
class TempAuthServer:
    """Temporary Flask server for auth flows.

    This allows the auth code to be retrieved without manual intervention

    Args:
        name (str): the name of the Flask application package (i.e. __name__)
        host (str): the hostname to listen on
        port (int): the port of the webserver
        debug (bool): if given, enable or disable debug mode
        auto_run (bool): automatically run the server on instantiation
    """

    class ServerThread(Thread):
        """Run a Flask app in a separate thread with shutdown control."""

        def __init__(self, app: Flask, host: str = "localhost", port: int = 0):
            super().__init__()

            if port == 0:
                for i in range(5001, 5021):
                    try:
                        self.server = make_server(host, i, app)
                        break
                    except (SystemExit, OSError):
                        continue
                else:
                    raise OSError("No available ports in range 5000-5020")
            else:
                self.server = make_server(host, port, app)

            self.ctx = app.app_context()
            self.ctx.push()

            self.host = self.server.host
            self.port = self.server.port

        def run(self) -> None:
            """Start the server."""
            self.server.serve_forever()

        def shutdown(self) -> None:
            """Shutdown the server."""
            self.server.shutdown()

    def __init__(
        self,
        name: str,
        host: str = "localhost",
        port: int = 0,
        *,
        debug: bool = False,
        auto_run: bool = False,
    ):
        self.host = host
        self._user_port = port
        self._actual_port: int
        self.debug = debug

        self.app = Flask(name)

        self._server_thread: TempAuthServer.ServerThread
        self._request_args: dict[str, dict[str, object]] = {}

        self.create_endpoints()

        if auto_run:
            self.start_server()

    def create_endpoints(self) -> None:
        """Create all necessary endpoints.

        This is an "endpoint factory" (rather than one decorated method per endpoint)
        because of the need to use `self.app` as the decorator
        """

        @self.app.route("/get_auth_code", methods=["GET"])
        def get_auth_code() -> Response:
            # TODO add 400 response for mismatch in state token
            """Endpoint for getting auth code from third party callback.

            Returns:
                dict: simple response dict
            """
            self._request_args[request.path] = request.args

            return cast(
                Response,
                self.app.response_class(
                    response=dedent(
                        """
                    <html lang="en">
                    <head>
                        <style>
                            body {
                                font-family: Verdana, sans-serif;
                                height: 100vh;
                            }
                        </style>
                        <title>Authentication Complete</title>
                    </head>
                    <body onclick="self.close()">
                        <h1>Authentication complete!</h1>
                        <span>Click anywhere to close this window.</span>
                    </body>
                    </html>
                """,
                    ).strip(),
                    status=200,
                ),
            )

    def wait_for_request(
        self,
        endpoint: str,
        max_wait: int = 300,
        *,
        kill_on_request: bool = False,
    ) -> dict[str, Any]:
        """Wait for a request.

        Wait for a request to come into the server for when it is needed in a
        synchronous flow

        Args:
            endpoint (str): the endpoint path to wait for a request to
            max_wait (int): how many seconds to wait before timing out
            kill_on_request (bool): kill/stop the server when a request comes through

        Returns:
            dict: the args which were sent with the request

        Raises:
            TimeoutError: if no request is received within the timeout limit
        """

        if not self.is_running:
            self.start_server()

        start_time = time()
        while (
            time_elapsed := time() - start_time
        ) <= max_wait and not self._request_args.get(endpoint):
            sleep(0.5)

        if time_elapsed > max_wait:
            raise TimeoutError(
                f"No request received to {endpoint} within {max_wait} seconds",
            )

        if kill_on_request:
            self.stop_server()

        return dict(self._request_args[endpoint])

    def start_server(self) -> None:
        """Run the local server."""
        if not self.is_running:
            self.server_thread.start()

            self._actual_port = self.server_thread.port

    def stop_server(self) -> None:
        """Stop the local server by hitting its `/kill` endpoint.

        See Also:
            self.create_endpoints: `/kill` endpoint
        """
        # No point instantiating a new server if we're just going to kill it
        if hasattr(self, "_server_thread") and self.server_thread.is_alive():
            self.server_thread.shutdown()

            while self.server_thread.is_alive():
                pass

            del self._server_thread

    @property
    def get_auth_code_url(self) -> str:
        """Get the URL for the auth code endpoint.

        Returns:
            str: the URL for the auth code endpoint
        """
        return f"http://{self.host}:{self.port}/get_auth_code"

    @property
    def is_running(self) -> bool:
        """Return whether the server is is_running."""
        if not hasattr(self, "_server_thread"):
            return False

        return self.server_thread.is_alive()

    @property
    def port(self) -> int:
        """Port.

        Returns:
            int: the port of the server

        Raises:
            ValueError: if the server is not running
        """
        if not hasattr(self, "_actual_port"):
            raise ValueError("Server is not running")

        return self._actual_port

    @port.setter
    def port(self, port: int) -> None:
        """Port setter.

        Args:
            port (int): the port to set

        Raises:
            ValueError: if the server is already running
        """
        if self.is_running:
            raise ValueError("Cannot set port while server is running")

        self._user_port = port

    @property
    def server_thread(self) -> ServerThread:
        """Server thread.

        Returns:
            ServerThread: the server thread
        """
        if not hasattr(self, "_server_thread"):
            self._server_thread = self.ServerThread(
                self.app,
                host=self.host,
                port=self._user_port,
            )

        return self._server_thread

get_auth_code_url: str property

Get the URL for the auth code endpoint.

Returns:

Name Type Description
str str

the URL for the auth code endpoint

is_running: bool property

Return whether the server is is_running.

port: int property writable

Port.

Returns:

Name Type Description
int int

the port of the server

Raises:

Type Description
ValueError

if the server is not running

server_thread: ServerThread property

Server thread.

Returns:

Name Type Description
ServerThread ServerThread

the server thread

ServerThread

Bases: Thread

Run a Flask app in a separate thread with shutdown control.

Source code in wg_utilities/api/temp_auth_server.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class ServerThread(Thread):
    """Run a Flask app in a separate thread with shutdown control."""

    def __init__(self, app: Flask, host: str = "localhost", port: int = 0):
        super().__init__()

        if port == 0:
            for i in range(5001, 5021):
                try:
                    self.server = make_server(host, i, app)
                    break
                except (SystemExit, OSError):
                    continue
            else:
                raise OSError("No available ports in range 5000-5020")
        else:
            self.server = make_server(host, port, app)

        self.ctx = app.app_context()
        self.ctx.push()

        self.host = self.server.host
        self.port = self.server.port

    def run(self) -> None:
        """Start the server."""
        self.server.serve_forever()

    def shutdown(self) -> None:
        """Shutdown the server."""
        self.server.shutdown()

run()

Start the server.

Source code in wg_utilities/api/temp_auth_server.py
51
52
53
def run(self) -> None:
    """Start the server."""
    self.server.serve_forever()

shutdown()

Shutdown the server.

Source code in wg_utilities/api/temp_auth_server.py
55
56
57
def shutdown(self) -> None:
    """Shutdown the server."""
    self.server.shutdown()

create_endpoints()

Create all necessary endpoints.

This is an "endpoint factory" (rather than one decorated method per endpoint) because of the need to use self.app as the decorator

Source code in wg_utilities/api/temp_auth_server.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def create_endpoints(self) -> None:
    """Create all necessary endpoints.

    This is an "endpoint factory" (rather than one decorated method per endpoint)
    because of the need to use `self.app` as the decorator
    """

    @self.app.route("/get_auth_code", methods=["GET"])
    def get_auth_code() -> Response:
        # TODO add 400 response for mismatch in state token
        """Endpoint for getting auth code from third party callback.

        Returns:
            dict: simple response dict
        """
        self._request_args[request.path] = request.args

        return cast(
            Response,
            self.app.response_class(
                response=dedent(
                    """
                <html lang="en">
                <head>
                    <style>
                        body {
                            font-family: Verdana, sans-serif;
                            height: 100vh;
                        }
                    </style>
                    <title>Authentication Complete</title>
                </head>
                <body onclick="self.close()">
                    <h1>Authentication complete!</h1>
                    <span>Click anywhere to close this window.</span>
                </body>
                </html>
            """,
                ).strip(),
                status=200,
            ),
        )

start_server()

Run the local server.

Source code in wg_utilities/api/temp_auth_server.py
169
170
171
172
173
174
def start_server(self) -> None:
    """Run the local server."""
    if not self.is_running:
        self.server_thread.start()

        self._actual_port = self.server_thread.port

stop_server()

Stop the local server by hitting its /kill endpoint.

See Also

self.create_endpoints: /kill endpoint

Source code in wg_utilities/api/temp_auth_server.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def stop_server(self) -> None:
    """Stop the local server by hitting its `/kill` endpoint.

    See Also:
        self.create_endpoints: `/kill` endpoint
    """
    # No point instantiating a new server if we're just going to kill it
    if hasattr(self, "_server_thread") and self.server_thread.is_alive():
        self.server_thread.shutdown()

        while self.server_thread.is_alive():
            pass

        del self._server_thread

wait_for_request(endpoint, max_wait=300, *, kill_on_request=False)

Wait for a request.

Wait for a request to come into the server for when it is needed in a synchronous flow

Parameters:

Name Type Description Default
endpoint str

the endpoint path to wait for a request to

required
max_wait int

how many seconds to wait before timing out

300
kill_on_request bool

kill/stop the server when a request comes through

False

Returns:

Name Type Description
dict dict[str, Any]

the args which were sent with the request

Raises:

Type Description
TimeoutError

if no request is received within the timeout limit

Source code in wg_utilities/api/temp_auth_server.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def wait_for_request(
    self,
    endpoint: str,
    max_wait: int = 300,
    *,
    kill_on_request: bool = False,
) -> dict[str, Any]:
    """Wait for a request.

    Wait for a request to come into the server for when it is needed in a
    synchronous flow

    Args:
        endpoint (str): the endpoint path to wait for a request to
        max_wait (int): how many seconds to wait before timing out
        kill_on_request (bool): kill/stop the server when a request comes through

    Returns:
        dict: the args which were sent with the request

    Raises:
        TimeoutError: if no request is received within the timeout limit
    """

    if not self.is_running:
        self.start_server()

    start_time = time()
    while (
        time_elapsed := time() - start_time
    ) <= max_wait and not self._request_args.get(endpoint):
        sleep(0.5)

    if time_elapsed > max_wait:
        raise TimeoutError(
            f"No request received to {endpoint} within {max_wait} seconds",
        )

    if kill_on_request:
        self.stop_server()

    return dict(self._request_args[endpoint])

temp_auth_server

Provide a class for creating a temporary server during an authentication flow.

TempAuthServer

Temporary Flask server for auth flows.

This allows the auth code to be retrieved without manual intervention

Parameters:

Name Type Description Default
name str

the name of the Flask application package (i.e. name)

required
host str

the hostname to listen on

'localhost'
port int

the port of the webserver

0
debug bool

if given, enable or disable debug mode

False
auto_run bool

automatically run the server on instantiation

False
Source code in wg_utilities/api/temp_auth_server.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
class TempAuthServer:
    """Temporary Flask server for auth flows.

    This allows the auth code to be retrieved without manual intervention

    Args:
        name (str): the name of the Flask application package (i.e. __name__)
        host (str): the hostname to listen on
        port (int): the port of the webserver
        debug (bool): if given, enable or disable debug mode
        auto_run (bool): automatically run the server on instantiation
    """

    class ServerThread(Thread):
        """Run a Flask app in a separate thread with shutdown control."""

        def __init__(self, app: Flask, host: str = "localhost", port: int = 0):
            super().__init__()

            if port == 0:
                for i in range(5001, 5021):
                    try:
                        self.server = make_server(host, i, app)
                        break
                    except (SystemExit, OSError):
                        continue
                else:
                    raise OSError("No available ports in range 5000-5020")
            else:
                self.server = make_server(host, port, app)

            self.ctx = app.app_context()
            self.ctx.push()

            self.host = self.server.host
            self.port = self.server.port

        def run(self) -> None:
            """Start the server."""
            self.server.serve_forever()

        def shutdown(self) -> None:
            """Shutdown the server."""
            self.server.shutdown()

    def __init__(
        self,
        name: str,
        host: str = "localhost",
        port: int = 0,
        *,
        debug: bool = False,
        auto_run: bool = False,
    ):
        self.host = host
        self._user_port = port
        self._actual_port: int
        self.debug = debug

        self.app = Flask(name)

        self._server_thread: TempAuthServer.ServerThread
        self._request_args: dict[str, dict[str, object]] = {}

        self.create_endpoints()

        if auto_run:
            self.start_server()

    def create_endpoints(self) -> None:
        """Create all necessary endpoints.

        This is an "endpoint factory" (rather than one decorated method per endpoint)
        because of the need to use `self.app` as the decorator
        """

        @self.app.route("/get_auth_code", methods=["GET"])
        def get_auth_code() -> Response:
            # TODO add 400 response for mismatch in state token
            """Endpoint for getting auth code from third party callback.

            Returns:
                dict: simple response dict
            """
            self._request_args[request.path] = request.args

            return cast(
                Response,
                self.app.response_class(
                    response=dedent(
                        """
                    <html lang="en">
                    <head>
                        <style>
                            body {
                                font-family: Verdana, sans-serif;
                                height: 100vh;
                            }
                        </style>
                        <title>Authentication Complete</title>
                    </head>
                    <body onclick="self.close()">
                        <h1>Authentication complete!</h1>
                        <span>Click anywhere to close this window.</span>
                    </body>
                    </html>
                """,
                    ).strip(),
                    status=200,
                ),
            )

    def wait_for_request(
        self,
        endpoint: str,
        max_wait: int = 300,
        *,
        kill_on_request: bool = False,
    ) -> dict[str, Any]:
        """Wait for a request.

        Wait for a request to come into the server for when it is needed in a
        synchronous flow

        Args:
            endpoint (str): the endpoint path to wait for a request to
            max_wait (int): how many seconds to wait before timing out
            kill_on_request (bool): kill/stop the server when a request comes through

        Returns:
            dict: the args which were sent with the request

        Raises:
            TimeoutError: if no request is received within the timeout limit
        """

        if not self.is_running:
            self.start_server()

        start_time = time()
        while (
            time_elapsed := time() - start_time
        ) <= max_wait and not self._request_args.get(endpoint):
            sleep(0.5)

        if time_elapsed > max_wait:
            raise TimeoutError(
                f"No request received to {endpoint} within {max_wait} seconds",
            )

        if kill_on_request:
            self.stop_server()

        return dict(self._request_args[endpoint])

    def start_server(self) -> None:
        """Run the local server."""
        if not self.is_running:
            self.server_thread.start()

            self._actual_port = self.server_thread.port

    def stop_server(self) -> None:
        """Stop the local server by hitting its `/kill` endpoint.

        See Also:
            self.create_endpoints: `/kill` endpoint
        """
        # No point instantiating a new server if we're just going to kill it
        if hasattr(self, "_server_thread") and self.server_thread.is_alive():
            self.server_thread.shutdown()

            while self.server_thread.is_alive():
                pass

            del self._server_thread

    @property
    def get_auth_code_url(self) -> str:
        """Get the URL for the auth code endpoint.

        Returns:
            str: the URL for the auth code endpoint
        """
        return f"http://{self.host}:{self.port}/get_auth_code"

    @property
    def is_running(self) -> bool:
        """Return whether the server is is_running."""
        if not hasattr(self, "_server_thread"):
            return False

        return self.server_thread.is_alive()

    @property
    def port(self) -> int:
        """Port.

        Returns:
            int: the port of the server

        Raises:
            ValueError: if the server is not running
        """
        if not hasattr(self, "_actual_port"):
            raise ValueError("Server is not running")

        return self._actual_port

    @port.setter
    def port(self, port: int) -> None:
        """Port setter.

        Args:
            port (int): the port to set

        Raises:
            ValueError: if the server is already running
        """
        if self.is_running:
            raise ValueError("Cannot set port while server is running")

        self._user_port = port

    @property
    def server_thread(self) -> ServerThread:
        """Server thread.

        Returns:
            ServerThread: the server thread
        """
        if not hasattr(self, "_server_thread"):
            self._server_thread = self.ServerThread(
                self.app,
                host=self.host,
                port=self._user_port,
            )

        return self._server_thread

get_auth_code_url: str property

Get the URL for the auth code endpoint.

Returns:

Name Type Description
str str

the URL for the auth code endpoint

is_running: bool property

Return whether the server is is_running.

port: int property writable

Port.

Returns:

Name Type Description
int int

the port of the server

Raises:

Type Description
ValueError

if the server is not running

server_thread: ServerThread property

Server thread.

Returns:

Name Type Description
ServerThread ServerThread

the server thread

ServerThread

Bases: Thread

Run a Flask app in a separate thread with shutdown control.

Source code in wg_utilities/api/temp_auth_server.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class ServerThread(Thread):
    """Run a Flask app in a separate thread with shutdown control."""

    def __init__(self, app: Flask, host: str = "localhost", port: int = 0):
        super().__init__()

        if port == 0:
            for i in range(5001, 5021):
                try:
                    self.server = make_server(host, i, app)
                    break
                except (SystemExit, OSError):
                    continue
            else:
                raise OSError("No available ports in range 5000-5020")
        else:
            self.server = make_server(host, port, app)

        self.ctx = app.app_context()
        self.ctx.push()

        self.host = self.server.host
        self.port = self.server.port

    def run(self) -> None:
        """Start the server."""
        self.server.serve_forever()

    def shutdown(self) -> None:
        """Shutdown the server."""
        self.server.shutdown()
run()

Start the server.

Source code in wg_utilities/api/temp_auth_server.py
51
52
53
def run(self) -> None:
    """Start the server."""
    self.server.serve_forever()
shutdown()

Shutdown the server.

Source code in wg_utilities/api/temp_auth_server.py
55
56
57
def shutdown(self) -> None:
    """Shutdown the server."""
    self.server.shutdown()

create_endpoints()

Create all necessary endpoints.

This is an "endpoint factory" (rather than one decorated method per endpoint) because of the need to use self.app as the decorator

Source code in wg_utilities/api/temp_auth_server.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def create_endpoints(self) -> None:
    """Create all necessary endpoints.

    This is an "endpoint factory" (rather than one decorated method per endpoint)
    because of the need to use `self.app` as the decorator
    """

    @self.app.route("/get_auth_code", methods=["GET"])
    def get_auth_code() -> Response:
        # TODO add 400 response for mismatch in state token
        """Endpoint for getting auth code from third party callback.

        Returns:
            dict: simple response dict
        """
        self._request_args[request.path] = request.args

        return cast(
            Response,
            self.app.response_class(
                response=dedent(
                    """
                <html lang="en">
                <head>
                    <style>
                        body {
                            font-family: Verdana, sans-serif;
                            height: 100vh;
                        }
                    </style>
                    <title>Authentication Complete</title>
                </head>
                <body onclick="self.close()">
                    <h1>Authentication complete!</h1>
                    <span>Click anywhere to close this window.</span>
                </body>
                </html>
            """,
                ).strip(),
                status=200,
            ),
        )

start_server()

Run the local server.

Source code in wg_utilities/api/temp_auth_server.py
169
170
171
172
173
174
def start_server(self) -> None:
    """Run the local server."""
    if not self.is_running:
        self.server_thread.start()

        self._actual_port = self.server_thread.port

stop_server()

Stop the local server by hitting its /kill endpoint.

See Also

self.create_endpoints: /kill endpoint

Source code in wg_utilities/api/temp_auth_server.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def stop_server(self) -> None:
    """Stop the local server by hitting its `/kill` endpoint.

    See Also:
        self.create_endpoints: `/kill` endpoint
    """
    # No point instantiating a new server if we're just going to kill it
    if hasattr(self, "_server_thread") and self.server_thread.is_alive():
        self.server_thread.shutdown()

        while self.server_thread.is_alive():
            pass

        del self._server_thread

wait_for_request(endpoint, max_wait=300, *, kill_on_request=False)

Wait for a request.

Wait for a request to come into the server for when it is needed in a synchronous flow

Parameters:

Name Type Description Default
endpoint str

the endpoint path to wait for a request to

required
max_wait int

how many seconds to wait before timing out

300
kill_on_request bool

kill/stop the server when a request comes through

False

Returns:

Name Type Description
dict dict[str, Any]

the args which were sent with the request

Raises:

Type Description
TimeoutError

if no request is received within the timeout limit

Source code in wg_utilities/api/temp_auth_server.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def wait_for_request(
    self,
    endpoint: str,
    max_wait: int = 300,
    *,
    kill_on_request: bool = False,
) -> dict[str, Any]:
    """Wait for a request.

    Wait for a request to come into the server for when it is needed in a
    synchronous flow

    Args:
        endpoint (str): the endpoint path to wait for a request to
        max_wait (int): how many seconds to wait before timing out
        kill_on_request (bool): kill/stop the server when a request comes through

    Returns:
        dict: the args which were sent with the request

    Raises:
        TimeoutError: if no request is received within the timeout limit
    """

    if not self.is_running:
        self.start_server()

    start_time = time()
    while (
        time_elapsed := time() - start_time
    ) <= max_wait and not self._request_args.get(endpoint):
        sleep(0.5)

    if time_elapsed > max_wait:
        raise TimeoutError(
            f"No request received to {endpoint} within {max_wait} seconds",
        )

    if kill_on_request:
        self.stop_server()

    return dict(self._request_args[endpoint])