deepgram.clients.common.v1.abstract_async_websocket

  1# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved.
  2# Use of this source code is governed by a MIT license that can be found in the LICENSE file.
  3# SPDX-License-Identifier: MIT
  4import asyncio
  5import json
  6import logging
  7from typing import Dict, Union, Optional, cast, Any, Callable
  8from datetime import datetime
  9import threading
 10from abc import ABC, abstractmethod
 11
 12import websockets
 13
 14try:
 15    # Websockets versions >= 13
 16    from websockets.asyncio.client import connect, ClientConnection
 17
 18    WS_ADDITIONAL_HEADERS_KEY = "additional_headers"
 19except ImportError:
 20    # Backward compatibility with websockets versions 12
 21    from websockets.legacy.client import (  # type: ignore
 22        connect,
 23        WebSocketClientProtocol as ClientConnection,
 24    )
 25
 26    WS_ADDITIONAL_HEADERS_KEY = "extra_headers"
 27
 28from ....audio import Speaker
 29from ....utils import verboselogs
 30from ....options import DeepgramClientOptions
 31from .helpers import convert_to_websocket_url, append_query_params
 32from .errors import DeepgramError
 33
 34from .websocket_response import (
 35    OpenResponse,
 36    CloseResponse,
 37    ErrorResponse,
 38)
 39from .websocket_events import WebSocketEvents
 40
 41
 42ONE_SECOND = 1
 43HALF_SECOND = 0.5
 44DEEPGRAM_INTERVAL = 5
 45PING_INTERVAL = 20
 46
 47
 48class AbstractAsyncWebSocketClient(ABC):  # pylint: disable=too-many-instance-attributes
 49    """
 50    Abstract class for using WebSockets.
 51
 52    This class provides methods to establish a WebSocket connection generically for
 53    use in all WebSocket clients.
 54    """
 55
 56    _logger: verboselogs.VerboseLogger
 57    _config: DeepgramClientOptions
 58    _endpoint: str
 59    _websocket_url: str
 60
 61    _socket: Optional[ClientConnection] = None
 62
 63    _listen_thread: Union[asyncio.Task, None]
 64    _delegate: Optional[Speaker] = None
 65
 66    _kwargs: Optional[Dict] = None
 67    _addons: Optional[Dict] = None
 68    _options: Optional[Dict] = None
 69    _headers: Optional[Dict] = None
 70
 71    def __init__(self, config: DeepgramClientOptions, endpoint: str = ""):
 72        if config is None:
 73            raise DeepgramError("Config is required")
 74        if endpoint == "":
 75            raise DeepgramError("endpoint is required")
 76
 77        self._logger = verboselogs.VerboseLogger(__name__)
 78        self._logger.addHandler(logging.StreamHandler())
 79        self._logger.setLevel(config.verbose)
 80
 81        self._config = config
 82        self._endpoint = endpoint
 83
 84        self._listen_thread = None
 85
 86        # events
 87        self._exit_event = asyncio.Event()
 88
 89        # set websocket url
 90        self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint)
 91
 92    def delegate_listening(self, delegate: Speaker) -> None:
 93        """
 94        Delegate the listening thread to the Speaker object.
 95        """
 96        self._delegate = delegate
 97
 98    # pylint: disable=too-many-branches,too-many-statements
 99    async def start(
100        self,
101        options: Optional[Any] = None,
102        addons: Optional[Dict] = None,
103        headers: Optional[Dict] = None,
104        **kwargs,
105    ) -> bool:
106        """
107        Starts the WebSocket connection for live transcription.
108        """
109        self._logger.debug("AbstractAsyncWebSocketClient.start ENTER")
110        self._logger.info("addons: %s", addons)
111        self._logger.info("headers: %s", headers)
112        self._logger.info("kwargs: %s", kwargs)
113
114        self._addons = addons
115        self._headers = headers
116
117        # set kwargs
118        if kwargs is not None:
119            self._kwargs = kwargs
120        else:
121            self._kwargs = {}
122
123        if not isinstance(options, dict):
124            self._logger.error("options is not a dict")
125            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
126            return False
127
128        # set options
129        if options is not None:
130            self._options = options
131        else:
132            self._options = {}
133
134        combined_options = self._options.copy()
135        if self._addons is not None:
136            self._logger.info("merging addons to options")
137            combined_options.update(self._addons)
138            self._logger.info("new options: %s", combined_options)
139        self._logger.debug("combined_options: %s", combined_options)
140
141        combined_headers = self._config.headers.copy()
142        if self._headers is not None:
143            self._logger.info("merging headers to options")
144            combined_headers.update(self._headers)
145            self._logger.info("new headers: %s", combined_headers)
146        self._logger.debug("combined_headers: %s", combined_headers)
147
148        url_with_params = append_query_params(self._websocket_url, combined_options)
149
150        try:
151            ws_connect_kwargs: Dict = {
152                "ping_interval": PING_INTERVAL,
153                WS_ADDITIONAL_HEADERS_KEY: combined_headers,
154            }
155
156            self._socket = await connect(
157                url_with_params,
158                **ws_connect_kwargs,
159            )
160            self._exit_event.clear()
161
162            # debug the threads
163            for thread in threading.enumerate():
164                self._logger.debug("after running thread: %s", thread.name)
165            self._logger.debug("number of active threads: %s", threading.active_count())
166
167            # delegate the listening thread to external object
168            if self._delegate is not None:
169                self._logger.notice("_delegate is enabled. this is usually the speaker")
170                self._delegate.set_pull_callback(self._socket.recv)
171                self._delegate.set_push_callback(self._process_message)
172            else:
173                self._logger.notice("create _listening thread")
174                self._listen_thread = asyncio.create_task(self._listening())
175
176            # debug the threads
177            for thread in threading.enumerate():
178                self._logger.debug("after running thread: %s", thread.name)
179            self._logger.debug("number of active threads: %s", threading.active_count())
180
181            # push open event
182            await self._emit(
183                WebSocketEvents(WebSocketEvents.Open),
184                OpenResponse(type=WebSocketEvents.Open),
185            )
186
187            self._logger.notice("start succeeded")
188            self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE")
189            return True
190        except websockets.exceptions.ConnectionClosed as e:
191            self._logger.error(
192                "ConnectionClosed in AbstractAsyncWebSocketClient.start: %s", e
193            )
194            self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE")
195            if self._config.options.get("termination_exception_connect", False):
196                raise
197            return False
198        except websockets.exceptions.WebSocketException as e:
199            self._logger.error(
200                "WebSocketException in AbstractAsyncWebSocketClient.start: %s", e
201            )
202            self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE")
203            if self._config.options.get("termination_exception_connect", False):
204                raise
205            return False
206        except Exception as e:  # pylint: disable=broad-except
207            self._logger.error(
208                "WebSocketException in AbstractAsyncWebSocketClient.start: %s", e
209            )
210            self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE")
211            if self._config.options.get("termination_exception_connect", False):
212                raise
213            return False
214
215    async def is_connected(self) -> bool:
216        """
217        Returns the connection status of the WebSocket.
218        """
219        return self._socket is not None
220
221    # pylint: enable=too-many-branches,too-many-statements
222
223    @abstractmethod
224    def on(self, event: WebSocketEvents, handler: Callable) -> None:
225        """
226        Registers an event handler for the WebSocket connection.
227        """
228        raise NotImplementedError("no on method")
229
230    @abstractmethod
231    async def _emit(self, event: WebSocketEvents, *args, **kwargs) -> None:
232        """
233        Emits an event to the WebSocket connection.
234        """
235        raise NotImplementedError("no _emit method")
236
237    # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches
238    async def _listening(self) -> None:
239        """
240        Listens for messages from the WebSocket connection.
241        """
242        self._logger.debug("AbstractAsyncWebSocketClient._listening ENTER")
243
244        while True:
245            try:
246                if self._exit_event.is_set():
247                    self._logger.notice("_listening exiting gracefully")
248                    self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE")
249                    return
250
251                if self._socket is None:
252                    self._logger.warning("socket is empty")
253                    self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE")
254                    return
255
256                message = await self._socket.recv()
257
258                if message is None:
259                    self._logger.info("message is None")
260                    continue
261
262                self._logger.spam("data type: %s", type(message))
263
264                if isinstance(message, bytes):
265                    self._logger.debug("Binary data received")
266                    await self._process_binary(message)
267                else:
268                    self._logger.debug("Text data received")
269                    await self._process_text(message)
270
271                self._logger.notice("_listening Succeeded")
272                self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE")
273
274            except websockets.exceptions.ConnectionClosedOK as e:
275                # signal exit and close
276                await self._signal_exit()
277
278                self._logger.notice(f"_listening({e.code}) exiting gracefully")
279                self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE")
280                return
281
282            except websockets.exceptions.ConnectionClosed as e:
283                if e.code in [1000, 1001]:
284                    # signal exit and close
285                    await self._signal_exit()
286
287                    self._logger.notice(f"_listening({e.code}) exiting gracefully")
288                    self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE")
289                    return
290
291                # we need to explicitly call self._signal_exit() here because we are hanging on a recv()
292                # note: this is different than the speak websocket client
293                self._logger.error(
294                    "ConnectionClosed in AbstractAsyncWebSocketClient._listening with code %s: %s",
295                    e.code,
296                    e.reason,
297                )
298                cc_error: ErrorResponse = ErrorResponse(
299                    "ConnectionClosed in AbstractAsyncWebSocketClient._listening",
300                    f"{e}",
301                    "ConnectionClosed",
302                )
303                await self._emit(
304                    WebSocketEvents(WebSocketEvents.Error),
305                    error=cc_error,
306                    **dict(cast(Dict[Any, Any], self._kwargs)),
307                )
308
309                # signal exit and close
310                await self._signal_exit()
311
312                self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE")
313
314                if self._config.options.get("termination_exception_connect") is True:
315                    raise
316                return
317
318            except websockets.exceptions.WebSocketException as e:
319                self._logger.error(
320                    "WebSocketException in AbstractAsyncWebSocketClient._listening: %s",
321                    e,
322                )
323                ws_error: ErrorResponse = ErrorResponse(
324                    "WebSocketException in AbstractAsyncWebSocketClient._listening",
325                    f"{e}",
326                    "WebSocketException",
327                )
328                await self._emit(
329                    WebSocketEvents(WebSocketEvents.Error),
330                    error=ws_error,
331                    **dict(cast(Dict[Any, Any], self._kwargs)),
332                )
333
334                # signal exit and close
335                await self._signal_exit()
336
337                self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE")
338
339                if self._config.options.get("termination_exception_connect") is True:
340                    raise
341                return
342
343            except Exception as e:  # pylint: disable=broad-except
344                self._logger.error(
345                    "Exception in AbstractAsyncWebSocketClient._listening: %s", e
346                )
347                e_error: ErrorResponse = ErrorResponse(
348                    "Exception in AbstractAsyncWebSocketClient._listening",
349                    f"{e}",
350                    "Exception",
351                )
352                await self._emit(
353                    WebSocketEvents(WebSocketEvents.Error),
354                    error=e_error,
355                    **dict(cast(Dict[Any, Any], self._kwargs)),
356                )
357
358                # signal exit and close
359                await self._signal_exit()
360
361                self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE")
362
363                if self._config.options.get("termination_exception_connect") is True:
364                    raise
365                return
366
367    # pylint: enable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches
368
369    async def _process_message(self, message: Union[str, bytes]) -> None:
370        if isinstance(message, bytes):
371            await self._process_binary(message)
372        else:
373            await self._process_text(message)
374
375    @abstractmethod
376    async def _process_text(self, message: str) -> None:
377        raise NotImplementedError("no _process_text method")
378
379    @abstractmethod
380    async def _process_binary(self, message: bytes) -> None:
381        raise NotImplementedError("no _process_binary method")
382
383    @abstractmethod
384    async def _close_message(self) -> bool:
385        raise NotImplementedError("no _close_message method")
386
387    # pylint: disable=too-many-return-statements,too-many-branches
388
389    async def send(self, data: Union[str, bytes]) -> bool:
390        """
391        Sends data over the WebSocket connection.
392        """
393        self._logger.spam("AbstractAsyncWebSocketClient.send ENTER")
394
395        if self._exit_event.is_set():
396            self._logger.notice("send exiting gracefully")
397            self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE")
398            return False
399
400        if not await self.is_connected():
401            self._logger.notice("is_connected is False")
402            self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE")
403            return False
404
405        if self._socket is not None:
406            try:
407                await self._socket.send(data)
408            except websockets.exceptions.ConnectionClosedOK as e:
409                self._logger.notice(f"send() exiting gracefully: {e.code}")
410                self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE")
411                if self._config.options.get("termination_exception_send") is True:
412                    raise
413                return True
414            except websockets.exceptions.ConnectionClosed as e:
415                if e.code in [1000, 1001]:
416                    self._logger.notice(f"send({e.code}) exiting gracefully")
417                    self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE")
418                    if self._config.options.get("termination_exception_send") is True:
419                        raise
420                    return True
421
422                self._logger.error("send() failed - ConnectionClosed: %s", str(e))
423                self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE")
424                if self._config.options.get("termination_exception_send") is True:
425                    raise
426                return False
427            except websockets.exceptions.WebSocketException as e:
428                self._logger.error("send() failed - WebSocketException: %s", str(e))
429                self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE")
430                if self._config.options.get("termination_exception_send") is True:
431                    raise
432                return False
433            except Exception as e:  # pylint: disable=broad-except
434                self._logger.error("send() failed - Exception: %s", str(e))
435                self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE")
436                if self._config.options.get("termination_exception_send") is True:
437                    raise
438                return False
439
440            self._logger.spam("send() succeeded")
441            self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE")
442            return True
443
444        self._logger.spam("send() failed. socket is None")
445        self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE")
446        return False
447
448    # pylint: enable=too-many-return-statements,too-many-branches
449
450    async def finish(self) -> bool:
451        """
452        Closes the WebSocket connection gracefully.
453        """
454        self._logger.debug("AbstractAsyncWebSocketClient.finish ENTER")
455
456        # signal exit
457        await self._signal_exit()
458
459        # stop the threads
460        self._logger.verbose("cancelling tasks...")
461        try:
462            # Before cancelling, check if the tasks were created
463            # debug the threads
464            for thread in threading.enumerate():
465                self._logger.debug("before running thread: %s", thread.name)
466            self._logger.debug("number of active threads: %s", threading.active_count())
467
468            tasks = []
469            if self._listen_thread is not None:
470                self._listen_thread.cancel()
471                tasks.append(self._listen_thread)
472                self._logger.notice("processing _listen_thread cancel...")
473
474            # Use asyncio.gather to wait for tasks to be cancelled
475            await asyncio.gather(*filter(None, tasks))
476            self._logger.notice("threads joined")
477
478            # debug the threads
479            for thread in threading.enumerate():
480                if thread is not None and thread.name is not None:
481                    self._logger.debug("after running thread: %s", thread.name)
482                else:
483                    self._logger.debug("after running thread: unknown_thread_name")
484            self._logger.debug("number of active threads: %s", threading.active_count())
485
486            self._logger.notice("finish succeeded")
487            self._logger.spam("AbstractAsyncWebSocketClient.finish LEAVE")
488            return True
489
490        except asyncio.CancelledError as e:
491            self._logger.error("tasks cancelled error: %s", e)
492            self._logger.debug("AbstractAsyncWebSocketClient.finish LEAVE")
493            return True
494
495    async def _signal_exit(self) -> None:
496        # send close event
497        self._logger.verbose("closing socket...")
498        if self._socket is not None:
499            self._logger.verbose("send Close...")
500            try:
501                # if the socket connection is closed, the following line might throw an error
502                await self._close_message()
503            except websockets.exceptions.ConnectionClosedOK as e:
504                self._logger.notice("_signal_exit  - ConnectionClosedOK: %s", e.code)
505            except websockets.exceptions.ConnectionClosed as e:
506                self._logger.error("_signal_exit  - ConnectionClosed: %s", e.code)
507            except websockets.exceptions.WebSocketException as e:
508                self._logger.error("_signal_exit - WebSocketException: %s", str(e))
509            except Exception as e:  # pylint: disable=broad-except
510                self._logger.error("_signal_exit - Exception: %s", str(e))
511
512            # push close event
513            try:
514                await self._emit(
515                    WebSocketEvents(WebSocketEvents.Close),
516                    close=CloseResponse(type=WebSocketEvents.Close),
517                    **dict(cast(Dict[Any, Any], self._kwargs)),
518                )
519            except Exception as e:  # pylint: disable=broad-except
520                self._logger.error("_emit - Exception: %s", e)
521
522            # wait for task to send
523            await asyncio.sleep(0.5)
524
525        # signal exit
526        self._exit_event.set()
527
528        # closes the WebSocket connection gracefully
529        self._logger.verbose("clean up socket...")
530        if self._socket is not None:
531            self._logger.verbose("socket.wait_closed...")
532            try:
533                await self._socket.close()
534            except websockets.exceptions.WebSocketException as e:
535                self._logger.error("socket.wait_closed failed: %s", e)
536
537        self._socket = None
ONE_SECOND = 1
HALF_SECOND = 0.5
DEEPGRAM_INTERVAL = 5
PING_INTERVAL = 20
class AbstractAsyncWebSocketClient(abc.ABC):
 49class AbstractAsyncWebSocketClient(ABC):  # pylint: disable=too-many-instance-attributes
 50    """
 51    Abstract class for using WebSockets.
 52
 53    This class provides methods to establish a WebSocket connection generically for
 54    use in all WebSocket clients.
 55    """
 56
 57    _logger: verboselogs.VerboseLogger
 58    _config: DeepgramClientOptions
 59    _endpoint: str
 60    _websocket_url: str
 61
 62    _socket: Optional[ClientConnection] = None
 63
 64    _listen_thread: Union[asyncio.Task, None]
 65    _delegate: Optional[Speaker] = None
 66
 67    _kwargs: Optional[Dict] = None
 68    _addons: Optional[Dict] = None
 69    _options: Optional[Dict] = None
 70    _headers: Optional[Dict] = None
 71
 72    def __init__(self, config: DeepgramClientOptions, endpoint: str = ""):
 73        if config is None:
 74            raise DeepgramError("Config is required")
 75        if endpoint == "":
 76            raise DeepgramError("endpoint is required")
 77
 78        self._logger = verboselogs.VerboseLogger(__name__)
 79        self._logger.addHandler(logging.StreamHandler())
 80        self._logger.setLevel(config.verbose)
 81
 82        self._config = config
 83        self._endpoint = endpoint
 84
 85        self._listen_thread = None
 86
 87        # events
 88        self._exit_event = asyncio.Event()
 89
 90        # set websocket url
 91        self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint)
 92
 93    def delegate_listening(self, delegate: Speaker) -> None:
 94        """
 95        Delegate the listening thread to the Speaker object.
 96        """
 97        self._delegate = delegate
 98
 99    # pylint: disable=too-many-branches,too-many-statements
100    async def start(
101        self,
102        options: Optional[Any] = None,
103        addons: Optional[Dict] = None,
104        headers: Optional[Dict] = None,
105        **kwargs,
106    ) -> bool:
107        """
108        Starts the WebSocket connection for live transcription.
109        """
110        self._logger.debug("AbstractAsyncWebSocketClient.start ENTER")
111        self._logger.info("addons: %s", addons)
112        self._logger.info("headers: %s", headers)
113        self._logger.info("kwargs: %s", kwargs)
114
115        self._addons = addons
116        self._headers = headers
117
118        # set kwargs
119        if kwargs is not None:
120            self._kwargs = kwargs
121        else:
122            self._kwargs = {}
123
124        if not isinstance(options, dict):
125            self._logger.error("options is not a dict")
126            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
127            return False
128
129        # set options
130        if options is not None:
131            self._options = options
132        else:
133            self._options = {}
134
135        combined_options = self._options.copy()
136        if self._addons is not None:
137            self._logger.info("merging addons to options")
138            combined_options.update(self._addons)
139            self._logger.info("new options: %s", combined_options)
140        self._logger.debug("combined_options: %s", combined_options)
141
142        combined_headers = self._config.headers.copy()
143        if self._headers is not None:
144            self._logger.info("merging headers to options")
145            combined_headers.update(self._headers)
146            self._logger.info("new headers: %s", combined_headers)
147        self._logger.debug("combined_headers: %s", combined_headers)
148
149        url_with_params = append_query_params(self._websocket_url, combined_options)
150
151        try:
152            ws_connect_kwargs: Dict = {
153                "ping_interval": PING_INTERVAL,
154                WS_ADDITIONAL_HEADERS_KEY: combined_headers,
155            }
156
157            self._socket = await connect(
158                url_with_params,
159                **ws_connect_kwargs,
160            )
161            self._exit_event.clear()
162
163            # debug the threads
164            for thread in threading.enumerate():
165                self._logger.debug("after running thread: %s", thread.name)
166            self._logger.debug("number of active threads: %s", threading.active_count())
167
168            # delegate the listening thread to external object
169            if self._delegate is not None:
170                self._logger.notice("_delegate is enabled. this is usually the speaker")
171                self._delegate.set_pull_callback(self._socket.recv)
172                self._delegate.set_push_callback(self._process_message)
173            else:
174                self._logger.notice("create _listening thread")
175                self._listen_thread = asyncio.create_task(self._listening())
176
177            # debug the threads
178            for thread in threading.enumerate():
179                self._logger.debug("after running thread: %s", thread.name)
180            self._logger.debug("number of active threads: %s", threading.active_count())
181
182            # push open event
183            await self._emit(
184                WebSocketEvents(WebSocketEvents.Open),
185                OpenResponse(type=WebSocketEvents.Open),
186            )
187
188            self._logger.notice("start succeeded")
189            self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE")
190            return True
191        except websockets.exceptions.ConnectionClosed as e:
192            self._logger.error(
193                "ConnectionClosed in AbstractAsyncWebSocketClient.start: %s", e
194            )
195            self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE")
196            if self._config.options.get("termination_exception_connect", False):
197                raise
198            return False
199        except websockets.exceptions.WebSocketException as e:
200            self._logger.error(
201                "WebSocketException in AbstractAsyncWebSocketClient.start: %s", e
202            )
203            self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE")
204            if self._config.options.get("termination_exception_connect", False):
205                raise
206            return False
207        except Exception as e:  # pylint: disable=broad-except
208            self._logger.error(
209                "WebSocketException in AbstractAsyncWebSocketClient.start: %s", e
210            )
211            self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE")
212            if self._config.options.get("termination_exception_connect", False):
213                raise
214            return False
215
216    async def is_connected(self) -> bool:
217        """
218        Returns the connection status of the WebSocket.
219        """
220        return self._socket is not None
221
222    # pylint: enable=too-many-branches,too-many-statements
223
224    @abstractmethod
225    def on(self, event: WebSocketEvents, handler: Callable) -> None:
226        """
227        Registers an event handler for the WebSocket connection.
228        """
229        raise NotImplementedError("no on method")
230
231    @abstractmethod
232    async def _emit(self, event: WebSocketEvents, *args, **kwargs) -> None:
233        """
234        Emits an event to the WebSocket connection.
235        """
236        raise NotImplementedError("no _emit method")
237
238    # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches
239    async def _listening(self) -> None:
240        """
241        Listens for messages from the WebSocket connection.
242        """
243        self._logger.debug("AbstractAsyncWebSocketClient._listening ENTER")
244
245        while True:
246            try:
247                if self._exit_event.is_set():
248                    self._logger.notice("_listening exiting gracefully")
249                    self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE")
250                    return
251
252                if self._socket is None:
253                    self._logger.warning("socket is empty")
254                    self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE")
255                    return
256
257                message = await self._socket.recv()
258
259                if message is None:
260                    self._logger.info("message is None")
261                    continue
262
263                self._logger.spam("data type: %s", type(message))
264
265                if isinstance(message, bytes):
266                    self._logger.debug("Binary data received")
267                    await self._process_binary(message)
268                else:
269                    self._logger.debug("Text data received")
270                    await self._process_text(message)
271
272                self._logger.notice("_listening Succeeded")
273                self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE")
274
275            except websockets.exceptions.ConnectionClosedOK as e:
276                # signal exit and close
277                await self._signal_exit()
278
279                self._logger.notice(f"_listening({e.code}) exiting gracefully")
280                self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE")
281                return
282
283            except websockets.exceptions.ConnectionClosed as e:
284                if e.code in [1000, 1001]:
285                    # signal exit and close
286                    await self._signal_exit()
287
288                    self._logger.notice(f"_listening({e.code}) exiting gracefully")
289                    self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE")
290                    return
291
292                # we need to explicitly call self._signal_exit() here because we are hanging on a recv()
293                # note: this is different than the speak websocket client
294                self._logger.error(
295                    "ConnectionClosed in AbstractAsyncWebSocketClient._listening with code %s: %s",
296                    e.code,
297                    e.reason,
298                )
299                cc_error: ErrorResponse = ErrorResponse(
300                    "ConnectionClosed in AbstractAsyncWebSocketClient._listening",
301                    f"{e}",
302                    "ConnectionClosed",
303                )
304                await self._emit(
305                    WebSocketEvents(WebSocketEvents.Error),
306                    error=cc_error,
307                    **dict(cast(Dict[Any, Any], self._kwargs)),
308                )
309
310                # signal exit and close
311                await self._signal_exit()
312
313                self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE")
314
315                if self._config.options.get("termination_exception_connect") is True:
316                    raise
317                return
318
319            except websockets.exceptions.WebSocketException as e:
320                self._logger.error(
321                    "WebSocketException in AbstractAsyncWebSocketClient._listening: %s",
322                    e,
323                )
324                ws_error: ErrorResponse = ErrorResponse(
325                    "WebSocketException in AbstractAsyncWebSocketClient._listening",
326                    f"{e}",
327                    "WebSocketException",
328                )
329                await self._emit(
330                    WebSocketEvents(WebSocketEvents.Error),
331                    error=ws_error,
332                    **dict(cast(Dict[Any, Any], self._kwargs)),
333                )
334
335                # signal exit and close
336                await self._signal_exit()
337
338                self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE")
339
340                if self._config.options.get("termination_exception_connect") is True:
341                    raise
342                return
343
344            except Exception as e:  # pylint: disable=broad-except
345                self._logger.error(
346                    "Exception in AbstractAsyncWebSocketClient._listening: %s", e
347                )
348                e_error: ErrorResponse = ErrorResponse(
349                    "Exception in AbstractAsyncWebSocketClient._listening",
350                    f"{e}",
351                    "Exception",
352                )
353                await self._emit(
354                    WebSocketEvents(WebSocketEvents.Error),
355                    error=e_error,
356                    **dict(cast(Dict[Any, Any], self._kwargs)),
357                )
358
359                # signal exit and close
360                await self._signal_exit()
361
362                self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE")
363
364                if self._config.options.get("termination_exception_connect") is True:
365                    raise
366                return
367
368    # pylint: enable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches
369
370    async def _process_message(self, message: Union[str, bytes]) -> None:
371        if isinstance(message, bytes):
372            await self._process_binary(message)
373        else:
374            await self._process_text(message)
375
376    @abstractmethod
377    async def _process_text(self, message: str) -> None:
378        raise NotImplementedError("no _process_text method")
379
380    @abstractmethod
381    async def _process_binary(self, message: bytes) -> None:
382        raise NotImplementedError("no _process_binary method")
383
384    @abstractmethod
385    async def _close_message(self) -> bool:
386        raise NotImplementedError("no _close_message method")
387
388    # pylint: disable=too-many-return-statements,too-many-branches
389
390    async def send(self, data: Union[str, bytes]) -> bool:
391        """
392        Sends data over the WebSocket connection.
393        """
394        self._logger.spam("AbstractAsyncWebSocketClient.send ENTER")
395
396        if self._exit_event.is_set():
397            self._logger.notice("send exiting gracefully")
398            self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE")
399            return False
400
401        if not await self.is_connected():
402            self._logger.notice("is_connected is False")
403            self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE")
404            return False
405
406        if self._socket is not None:
407            try:
408                await self._socket.send(data)
409            except websockets.exceptions.ConnectionClosedOK as e:
410                self._logger.notice(f"send() exiting gracefully: {e.code}")
411                self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE")
412                if self._config.options.get("termination_exception_send") is True:
413                    raise
414                return True
415            except websockets.exceptions.ConnectionClosed as e:
416                if e.code in [1000, 1001]:
417                    self._logger.notice(f"send({e.code}) exiting gracefully")
418                    self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE")
419                    if self._config.options.get("termination_exception_send") is True:
420                        raise
421                    return True
422
423                self._logger.error("send() failed - ConnectionClosed: %s", str(e))
424                self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE")
425                if self._config.options.get("termination_exception_send") is True:
426                    raise
427                return False
428            except websockets.exceptions.WebSocketException as e:
429                self._logger.error("send() failed - WebSocketException: %s", str(e))
430                self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE")
431                if self._config.options.get("termination_exception_send") is True:
432                    raise
433                return False
434            except Exception as e:  # pylint: disable=broad-except
435                self._logger.error("send() failed - Exception: %s", str(e))
436                self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE")
437                if self._config.options.get("termination_exception_send") is True:
438                    raise
439                return False
440
441            self._logger.spam("send() succeeded")
442            self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE")
443            return True
444
445        self._logger.spam("send() failed. socket is None")
446        self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE")
447        return False
448
449    # pylint: enable=too-many-return-statements,too-many-branches
450
451    async def finish(self) -> bool:
452        """
453        Closes the WebSocket connection gracefully.
454        """
455        self._logger.debug("AbstractAsyncWebSocketClient.finish ENTER")
456
457        # signal exit
458        await self._signal_exit()
459
460        # stop the threads
461        self._logger.verbose("cancelling tasks...")
462        try:
463            # Before cancelling, check if the tasks were created
464            # debug the threads
465            for thread in threading.enumerate():
466                self._logger.debug("before running thread: %s", thread.name)
467            self._logger.debug("number of active threads: %s", threading.active_count())
468
469            tasks = []
470            if self._listen_thread is not None:
471                self._listen_thread.cancel()
472                tasks.append(self._listen_thread)
473                self._logger.notice("processing _listen_thread cancel...")
474
475            # Use asyncio.gather to wait for tasks to be cancelled
476            await asyncio.gather(*filter(None, tasks))
477            self._logger.notice("threads joined")
478
479            # debug the threads
480            for thread in threading.enumerate():
481                if thread is not None and thread.name is not None:
482                    self._logger.debug("after running thread: %s", thread.name)
483                else:
484                    self._logger.debug("after running thread: unknown_thread_name")
485            self._logger.debug("number of active threads: %s", threading.active_count())
486
487            self._logger.notice("finish succeeded")
488            self._logger.spam("AbstractAsyncWebSocketClient.finish LEAVE")
489            return True
490
491        except asyncio.CancelledError as e:
492            self._logger.error("tasks cancelled error: %s", e)
493            self._logger.debug("AbstractAsyncWebSocketClient.finish LEAVE")
494            return True
495
496    async def _signal_exit(self) -> None:
497        # send close event
498        self._logger.verbose("closing socket...")
499        if self._socket is not None:
500            self._logger.verbose("send Close...")
501            try:
502                # if the socket connection is closed, the following line might throw an error
503                await self._close_message()
504            except websockets.exceptions.ConnectionClosedOK as e:
505                self._logger.notice("_signal_exit  - ConnectionClosedOK: %s", e.code)
506            except websockets.exceptions.ConnectionClosed as e:
507                self._logger.error("_signal_exit  - ConnectionClosed: %s", e.code)
508            except websockets.exceptions.WebSocketException as e:
509                self._logger.error("_signal_exit - WebSocketException: %s", str(e))
510            except Exception as e:  # pylint: disable=broad-except
511                self._logger.error("_signal_exit - Exception: %s", str(e))
512
513            # push close event
514            try:
515                await self._emit(
516                    WebSocketEvents(WebSocketEvents.Close),
517                    close=CloseResponse(type=WebSocketEvents.Close),
518                    **dict(cast(Dict[Any, Any], self._kwargs)),
519                )
520            except Exception as e:  # pylint: disable=broad-except
521                self._logger.error("_emit - Exception: %s", e)
522
523            # wait for task to send
524            await asyncio.sleep(0.5)
525
526        # signal exit
527        self._exit_event.set()
528
529        # closes the WebSocket connection gracefully
530        self._logger.verbose("clean up socket...")
531        if self._socket is not None:
532            self._logger.verbose("socket.wait_closed...")
533            try:
534                await self._socket.close()
535            except websockets.exceptions.WebSocketException as e:
536                self._logger.error("socket.wait_closed failed: %s", e)
537
538        self._socket = None

Abstract class for using WebSockets.

This class provides methods to establish a WebSocket connection generically for use in all WebSocket clients.

def delegate_listening(self, delegate: deepgram.audio.speaker.speaker.Speaker) -> None:
93    def delegate_listening(self, delegate: Speaker) -> None:
94        """
95        Delegate the listening thread to the Speaker object.
96        """
97        self._delegate = delegate

Delegate the listening thread to the Speaker object.

async def start( self, options: Optional[Any] = None, addons: Optional[Dict] = None, headers: Optional[Dict] = None, **kwargs) -> bool:
100    async def start(
101        self,
102        options: Optional[Any] = None,
103        addons: Optional[Dict] = None,
104        headers: Optional[Dict] = None,
105        **kwargs,
106    ) -> bool:
107        """
108        Starts the WebSocket connection for live transcription.
109        """
110        self._logger.debug("AbstractAsyncWebSocketClient.start ENTER")
111        self._logger.info("addons: %s", addons)
112        self._logger.info("headers: %s", headers)
113        self._logger.info("kwargs: %s", kwargs)
114
115        self._addons = addons
116        self._headers = headers
117
118        # set kwargs
119        if kwargs is not None:
120            self._kwargs = kwargs
121        else:
122            self._kwargs = {}
123
124        if not isinstance(options, dict):
125            self._logger.error("options is not a dict")
126            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
127            return False
128
129        # set options
130        if options is not None:
131            self._options = options
132        else:
133            self._options = {}
134
135        combined_options = self._options.copy()
136        if self._addons is not None:
137            self._logger.info("merging addons to options")
138            combined_options.update(self._addons)
139            self._logger.info("new options: %s", combined_options)
140        self._logger.debug("combined_options: %s", combined_options)
141
142        combined_headers = self._config.headers.copy()
143        if self._headers is not None:
144            self._logger.info("merging headers to options")
145            combined_headers.update(self._headers)
146            self._logger.info("new headers: %s", combined_headers)
147        self._logger.debug("combined_headers: %s", combined_headers)
148
149        url_with_params = append_query_params(self._websocket_url, combined_options)
150
151        try:
152            ws_connect_kwargs: Dict = {
153                "ping_interval": PING_INTERVAL,
154                WS_ADDITIONAL_HEADERS_KEY: combined_headers,
155            }
156
157            self._socket = await connect(
158                url_with_params,
159                **ws_connect_kwargs,
160            )
161            self._exit_event.clear()
162
163            # debug the threads
164            for thread in threading.enumerate():
165                self._logger.debug("after running thread: %s", thread.name)
166            self._logger.debug("number of active threads: %s", threading.active_count())
167
168            # delegate the listening thread to external object
169            if self._delegate is not None:
170                self._logger.notice("_delegate is enabled. this is usually the speaker")
171                self._delegate.set_pull_callback(self._socket.recv)
172                self._delegate.set_push_callback(self._process_message)
173            else:
174                self._logger.notice("create _listening thread")
175                self._listen_thread = asyncio.create_task(self._listening())
176
177            # debug the threads
178            for thread in threading.enumerate():
179                self._logger.debug("after running thread: %s", thread.name)
180            self._logger.debug("number of active threads: %s", threading.active_count())
181
182            # push open event
183            await self._emit(
184                WebSocketEvents(WebSocketEvents.Open),
185                OpenResponse(type=WebSocketEvents.Open),
186            )
187
188            self._logger.notice("start succeeded")
189            self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE")
190            return True
191        except websockets.exceptions.ConnectionClosed as e:
192            self._logger.error(
193                "ConnectionClosed in AbstractAsyncWebSocketClient.start: %s", e
194            )
195            self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE")
196            if self._config.options.get("termination_exception_connect", False):
197                raise
198            return False
199        except websockets.exceptions.WebSocketException as e:
200            self._logger.error(
201                "WebSocketException in AbstractAsyncWebSocketClient.start: %s", e
202            )
203            self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE")
204            if self._config.options.get("termination_exception_connect", False):
205                raise
206            return False
207        except Exception as e:  # pylint: disable=broad-except
208            self._logger.error(
209                "WebSocketException in AbstractAsyncWebSocketClient.start: %s", e
210            )
211            self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE")
212            if self._config.options.get("termination_exception_connect", False):
213                raise
214            return False

Starts the WebSocket connection for live transcription.

async def is_connected(self) -> bool:
216    async def is_connected(self) -> bool:
217        """
218        Returns the connection status of the WebSocket.
219        """
220        return self._socket is not None

Returns the connection status of the WebSocket.

@abstractmethod
def on( self, event: deepgram.clients.common.v1.websocket_events.WebSocketEvents, handler: Callable) -> None:
224    @abstractmethod
225    def on(self, event: WebSocketEvents, handler: Callable) -> None:
226        """
227        Registers an event handler for the WebSocket connection.
228        """
229        raise NotImplementedError("no on method")

Registers an event handler for the WebSocket connection.

async def send(self, data: Union[str, bytes]) -> bool:
390    async def send(self, data: Union[str, bytes]) -> bool:
391        """
392        Sends data over the WebSocket connection.
393        """
394        self._logger.spam("AbstractAsyncWebSocketClient.send ENTER")
395
396        if self._exit_event.is_set():
397            self._logger.notice("send exiting gracefully")
398            self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE")
399            return False
400
401        if not await self.is_connected():
402            self._logger.notice("is_connected is False")
403            self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE")
404            return False
405
406        if self._socket is not None:
407            try:
408                await self._socket.send(data)
409            except websockets.exceptions.ConnectionClosedOK as e:
410                self._logger.notice(f"send() exiting gracefully: {e.code}")
411                self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE")
412                if self._config.options.get("termination_exception_send") is True:
413                    raise
414                return True
415            except websockets.exceptions.ConnectionClosed as e:
416                if e.code in [1000, 1001]:
417                    self._logger.notice(f"send({e.code}) exiting gracefully")
418                    self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE")
419                    if self._config.options.get("termination_exception_send") is True:
420                        raise
421                    return True
422
423                self._logger.error("send() failed - ConnectionClosed: %s", str(e))
424                self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE")
425                if self._config.options.get("termination_exception_send") is True:
426                    raise
427                return False
428            except websockets.exceptions.WebSocketException as e:
429                self._logger.error("send() failed - WebSocketException: %s", str(e))
430                self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE")
431                if self._config.options.get("termination_exception_send") is True:
432                    raise
433                return False
434            except Exception as e:  # pylint: disable=broad-except
435                self._logger.error("send() failed - Exception: %s", str(e))
436                self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE")
437                if self._config.options.get("termination_exception_send") is True:
438                    raise
439                return False
440
441            self._logger.spam("send() succeeded")
442            self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE")
443            return True
444
445        self._logger.spam("send() failed. socket is None")
446        self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE")
447        return False

Sends data over the WebSocket connection.

async def finish(self) -> bool:
451    async def finish(self) -> bool:
452        """
453        Closes the WebSocket connection gracefully.
454        """
455        self._logger.debug("AbstractAsyncWebSocketClient.finish ENTER")
456
457        # signal exit
458        await self._signal_exit()
459
460        # stop the threads
461        self._logger.verbose("cancelling tasks...")
462        try:
463            # Before cancelling, check if the tasks were created
464            # debug the threads
465            for thread in threading.enumerate():
466                self._logger.debug("before running thread: %s", thread.name)
467            self._logger.debug("number of active threads: %s", threading.active_count())
468
469            tasks = []
470            if self._listen_thread is not None:
471                self._listen_thread.cancel()
472                tasks.append(self._listen_thread)
473                self._logger.notice("processing _listen_thread cancel...")
474
475            # Use asyncio.gather to wait for tasks to be cancelled
476            await asyncio.gather(*filter(None, tasks))
477            self._logger.notice("threads joined")
478
479            # debug the threads
480            for thread in threading.enumerate():
481                if thread is not None and thread.name is not None:
482                    self._logger.debug("after running thread: %s", thread.name)
483                else:
484                    self._logger.debug("after running thread: unknown_thread_name")
485            self._logger.debug("number of active threads: %s", threading.active_count())
486
487            self._logger.notice("finish succeeded")
488            self._logger.spam("AbstractAsyncWebSocketClient.finish LEAVE")
489            return True
490
491        except asyncio.CancelledError as e:
492            self._logger.error("tasks cancelled error: %s", e)
493            self._logger.debug("AbstractAsyncWebSocketClient.finish LEAVE")
494            return True

Closes the WebSocket connection gracefully.