deepgram.clients.common.v1.abstract_sync_websocket

  1# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved.
  2# Use of this source code is governed by a MIT license that can be found in the LICENSE file.
  3# SPDX-License-Identifier: MIT
  4import json
  5import time
  6import logging
  7from typing import Dict, Union, Optional, cast, Any, Callable, Type
  8from datetime import datetime
  9import threading
 10from abc import ABC, abstractmethod
 11
 12from websockets.sync.client import connect, ClientConnection
 13import websockets
 14
 15from ....audio import Speaker
 16from ....utils import verboselogs
 17from ....options import DeepgramClientOptions
 18from .helpers import convert_to_websocket_url, append_query_params
 19from .errors import DeepgramError
 20
 21from .websocket_response import (
 22    OpenResponse,
 23    CloseResponse,
 24    ErrorResponse,
 25)
 26from .websocket_events import WebSocketEvents
 27
 28
 29ONE_SECOND = 1
 30HALF_SECOND = 0.5
 31DEEPGRAM_INTERVAL = 5
 32PING_INTERVAL = 20
 33
 34
 35class AbstractSyncWebSocketClient(ABC):  # pylint: disable=too-many-instance-attributes
 36    """
 37    Abstract class for using WebSockets.
 38
 39    This class provides methods to establish a WebSocket connection generically for
 40    use in all WebSocket clients.
 41
 42    Args:
 43        config (DeepgramClientOptions): all the options for the client
 44        endpoint (str): the endpoint to connect to
 45        thread_cls (Type[threading.Thread]): optional thread class to use for creating threads,
 46            defaults to threading.Thread. Useful for custom thread management like ContextVar support.
 47    """
 48
 49    _logger: verboselogs.VerboseLogger
 50    _config: DeepgramClientOptions
 51    _endpoint: str
 52    _websocket_url: str
 53
 54    _socket: Optional[ClientConnection] = None
 55    _exit_event: threading.Event
 56    _lock_send: threading.Lock
 57
 58    _listen_thread: Union[threading.Thread, None]
 59    _delegate: Optional[Speaker] = None
 60
 61    _thread_cls: Type[threading.Thread]
 62
 63    _kwargs: Optional[Dict] = None
 64    _addons: Optional[Dict] = None
 65    _options: Optional[Dict] = None
 66    _headers: Optional[Dict] = None
 67
 68    def __init__(
 69        self,
 70        config: DeepgramClientOptions,
 71        endpoint: str = "",
 72        thread_cls: Type[threading.Thread] = threading.Thread,
 73    ):
 74        if config is None:
 75            raise DeepgramError("Config is required")
 76        if endpoint == "":
 77            raise DeepgramError("endpoint is required")
 78
 79        self._logger = verboselogs.VerboseLogger(__name__)
 80        self._logger.addHandler(logging.StreamHandler())
 81        self._logger.setLevel(config.verbose)
 82
 83        self._config = config
 84        self._endpoint = endpoint
 85        self._lock_send = threading.Lock()
 86
 87        self._listen_thread = None
 88
 89        self._thread_cls = thread_cls
 90
 91        # exit
 92        self._exit_event = threading.Event()
 93
 94        # set websocket url
 95        self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint)
 96
 97    def delegate_listening(self, delegate: Speaker) -> None:
 98        """
 99        Delegate the listening thread to the main thread.
100        """
101        self._delegate = delegate
102
103    # pylint: disable=too-many-statements,too-many-branches
104    def start(
105        self,
106        options: Optional[Any] = None,
107        addons: Optional[Dict] = None,
108        headers: Optional[Dict] = None,
109        **kwargs,
110    ) -> bool:
111        """
112        Starts the WebSocket connection for live transcription.
113        """
114        self._logger.debug("AbstractSyncWebSocketClient.start ENTER")
115        self._logger.info("addons: %s", addons)
116        self._logger.info("headers: %s", headers)
117        self._logger.info("kwargs: %s", kwargs)
118
119        self._addons = addons
120        self._headers = headers
121
122        # set kwargs
123        if kwargs is not None:
124            self._kwargs = kwargs
125        else:
126            self._kwargs = {}
127
128        if not isinstance(options, dict):
129            self._logger.error("options is not a dict")
130            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
131            return False
132
133        # set options
134        if options is not None:
135            self._options = options
136        else:
137            self._options = {}
138
139        combined_options = self._options.copy()
140        if self._addons is not None:
141            self._logger.info("merging addons to options")
142            combined_options.update(self._addons)
143            self._logger.info("new options: %s", combined_options)
144        self._logger.debug("combined_options: %s", combined_options)
145
146        combined_headers = self._config.headers.copy()
147        if self._headers is not None:
148            self._logger.info("merging headers to options")
149            combined_headers.update(self._headers)
150            self._logger.info("new headers: %s", combined_headers)
151        self._logger.debug("combined_headers: %s", combined_headers)
152
153        url_with_params = append_query_params(self._websocket_url, combined_options)
154        try:
155            self._socket = connect(url_with_params, additional_headers=combined_headers)
156            self._exit_event.clear()
157
158            # debug the threads
159            for thread in threading.enumerate():
160                self._logger.debug("after running thread: %s", thread.name)
161            self._logger.debug("number of active threads: %s", threading.active_count())
162
163            # delegate the listening thread to external object
164            if self._delegate is not None:
165                self._logger.notice("_delegate is enabled. this is usually the speaker")
166                self._delegate.set_pull_callback(self._socket.recv)
167                self._delegate.set_push_callback(self._process_message)
168            else:
169                self._logger.notice("create _listening thread")
170                self._listen_thread = self._thread_cls(target=self._listening)
171                self._listen_thread.start()
172
173            # debug the threads
174            for thread in threading.enumerate():
175                self._logger.debug("after running thread: %s", thread.name)
176            self._logger.debug("number of active threads: %s", threading.active_count())
177
178            # push open event
179            self._emit(
180                WebSocketEvents(WebSocketEvents.Open),
181                OpenResponse(type=WebSocketEvents.Open),
182            )
183
184            self._logger.notice("start succeeded")
185            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
186            return True
187        except websockets.exceptions.ConnectionClosed as e:
188            self._logger.error(
189                "ConnectionClosed in AbstractSyncWebSocketClient.start: %s", e
190            )
191            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
192            if self._config.options.get("termination_exception_connect", False):
193                raise e
194            return False
195        except websockets.exceptions.WebSocketException as e:
196            self._logger.error(
197                "WebSocketException in AbstractSyncWebSocketClient.start: %s", e
198            )
199            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
200            if self._config.options.get("termination_exception_connect", False):
201                raise e
202            return False
203        except Exception as e:  # pylint: disable=broad-except
204            self._logger.error(
205                "WebSocketException in AbstractSyncWebSocketClient.start: %s", e
206            )
207            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
208            if self._config.options.get("termination_exception_connect", False):
209                raise e
210            return False
211
212    def is_connected(self) -> bool:
213        """
214        Returns the connection status of the WebSocket.
215        """
216        return self._socket is not None
217
218    # pylint: enable=too-many-statements,too-many-branches
219
220    @abstractmethod
221    def on(self, event: WebSocketEvents, handler: Callable) -> None:
222        """
223        Registers an event handler for the WebSocket connection.
224        """
225        raise NotImplementedError("no on method")
226
227    @abstractmethod
228    def _emit(self, event: WebSocketEvents, *args, **kwargs) -> None:
229        """
230        Emits an event to the WebSocket connection.
231        """
232        raise NotImplementedError("no _emit method")
233
234    # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches
235    def _listening(
236        self,
237    ) -> None:
238        """
239        Listens for messages from the WebSocket connection.
240        """
241        self._logger.debug("AbstractSyncWebSocketClient._listening ENTER")
242
243        while True:
244            try:
245                if self._exit_event.is_set():
246                    self._logger.notice("_listening exiting gracefully")
247                    self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE")
248                    return
249
250                if self._socket is None:
251                    self._logger.warning("socket is empty")
252                    self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE")
253                    return
254
255                message = self._socket.recv()
256
257                if message is None:
258                    self._logger.info("message is None")
259                    continue
260
261                self._logger.spam("data type: %s", type(message))
262
263                if isinstance(message, bytes):
264                    self._logger.debug("Binary data received")
265                    self._process_binary(message)
266                else:
267                    self._logger.debug("Text data received")
268                    self._process_text(message)
269
270                self._logger.notice("_listening Succeeded")
271                self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE")
272
273            except websockets.exceptions.ConnectionClosedOK as e:
274                # signal exit and close
275                self._signal_exit()
276
277                self._logger.notice(f"_listening({e.code}) exiting gracefully")
278                self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE")
279                return
280
281            except websockets.exceptions.ConnectionClosed as e:
282                if e.code in [1000, 1001]:
283                    # signal exit and close
284                    self._signal_exit()
285
286                    self._logger.notice(f"_listening({e.code}) exiting gracefully")
287                    self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE")
288                    return
289
290                # we need to explicitly call self._signal_exit() here because we are hanging on a recv()
291                # note: this is different than the speak websocket client
292                self._logger.error(
293                    "ConnectionClosed in AbstractSyncWebSocketClient._listening with code %s: %s",
294                    e.code,
295                    e.reason,
296                )
297                cc_error: ErrorResponse = ErrorResponse(
298                    "ConnectionClosed in AbstractSyncWebSocketClient._listening",
299                    f"{e}",
300                    "ConnectionClosed",
301                )
302                self._emit(
303                    WebSocketEvents(WebSocketEvents.Error),
304                    cc_error,
305                    **dict(cast(Dict[Any, Any], self._kwargs)),
306                )
307
308                # signal exit and close
309                self._signal_exit()
310
311                self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE")
312
313                if self._config.options.get("termination_exception_connect") is True:
314                    raise
315                return
316
317            except websockets.exceptions.WebSocketException as e:
318                self._logger.error(
319                    "WebSocketException in AbstractSyncWebSocketClient._listening with: %s",
320                    e,
321                )
322                ws_error: ErrorResponse = ErrorResponse(
323                    "WebSocketException in AbstractSyncWebSocketClient._listening",
324                    f"{e}",
325                    "WebSocketException",
326                )
327                self._emit(
328                    WebSocketEvents(WebSocketEvents.Error),
329                    ws_error,
330                    **dict(cast(Dict[Any, Any], self._kwargs)),
331                )
332
333                # signal exit and close
334                self._signal_exit()
335
336                self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE")
337
338                if self._config.options.get("termination_exception_connect") is True:
339                    raise
340                return
341
342            except Exception as e:  # pylint: disable=broad-except
343                self._logger.error(
344                    "Exception in AbstractSyncWebSocketClient._listening: %s", e
345                )
346                e_error: ErrorResponse = ErrorResponse(
347                    "Exception in AbstractSyncWebSocketClient._listening",
348                    f"{e}",
349                    "Exception",
350                )
351                self._emit(
352                    WebSocketEvents(WebSocketEvents.Error),
353                    e_error,
354                    **dict(cast(Dict[Any, Any], self._kwargs)),
355                )
356
357                # signal exit and close
358                self._signal_exit()
359
360                self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE")
361
362                if self._config.options.get("termination_exception_connect") is True:
363                    raise
364                return
365
366    # pylint: enable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches
367
368    def _process_message(self, message: Union[str, bytes]) -> None:
369        if isinstance(message, bytes):
370            self._process_binary(message)
371        else:
372            self._process_text(message)
373
374    @abstractmethod
375    def _process_text(self, message: str) -> None:
376        raise NotImplementedError("no _process_text method")
377
378    @abstractmethod
379    def _process_binary(self, message: bytes) -> None:
380        raise NotImplementedError("no _process_binary method")
381
382    @abstractmethod
383    def _close_message(self) -> bool:
384        raise NotImplementedError("no _close_message method")
385
386    # pylint: disable=too-many-return-statements,too-many-branches
387    def send(self, data: Union[str, bytes]) -> bool:
388        """
389        Sends data over the WebSocket connection.
390        """
391        self._logger.spam("AbstractSyncWebSocketClient.send ENTER")
392
393        if self._exit_event.is_set():
394            self._logger.notice("send exiting gracefully")
395            self._logger.debug("AbstractSyncWebSocketClient.send LEAVE")
396            return False
397
398        if not self.is_connected():
399            self._logger.notice("is_connected is False")
400            self._logger.debug("AbstractSyncWebSocketClient.send LEAVE")
401            return False
402
403        if self._socket is not None:
404            with self._lock_send:
405                try:
406                    self._socket.send(data)
407                except websockets.exceptions.ConnectionClosedOK as e:
408                    self._logger.notice(f"send() exiting gracefully: {e.code}")
409                    self._logger.debug("AbstractSyncWebSocketClient.send LEAVE")
410                    if self._config.options.get("termination_exception_send") is True:
411                        raise
412                    return True
413                except websockets.exceptions.ConnectionClosed as e:
414                    if e.code in [1000, 1001]:
415                        self._logger.notice(f"send({e.code}) exiting gracefully")
416                        self._logger.debug("AbstractSyncWebSocketClient.send LEAVE")
417                        if (
418                            self._config.options.get("termination_exception_send")
419                            == "true"
420                        ):
421                            raise
422                        return True
423                    self._logger.error("send() failed - ConnectionClosed: %s", str(e))
424                    self._logger.spam("AbstractSyncWebSocketClient.send LEAVE")
425                    if self._config.options.get("termination_exception_send") is True:
426                        raise
427                    return False
428                except websockets.exceptions.WebSocketException as e:
429                    self._logger.error("send() failed - WebSocketException: %s", str(e))
430                    self._logger.spam("AbstractSyncWebSocketClient.send LEAVE")
431                    if self._config.options.get("termination_exception_send") is True:
432                        raise
433                    return False
434                except Exception as e:  # pylint: disable=broad-except
435                    self._logger.error("send() failed - Exception: %s", str(e))
436                    self._logger.spam("AbstractSyncWebSocketClient.send LEAVE")
437                    if self._config.options.get("termination_exception_send") is True:
438                        raise
439                    return False
440
441            self._logger.spam("send() succeeded")
442            self._logger.spam("AbstractSyncWebSocketClient.send LEAVE")
443            return True
444
445        self._logger.spam("send() failed. socket is None")
446        self._logger.spam("AbstractSyncWebSocketClient.send LEAVE")
447        return False
448
449    # pylint: enable=too-many-return-statements,too-many-branches
450
451    def finish(self) -> bool:
452        """
453        Closes the WebSocket connection gracefully.
454        """
455        self._logger.spam("AbstractSyncWebSocketClient.finish ENTER")
456
457        # debug the threads
458        for thread in threading.enumerate():
459            self._logger.debug("before running thread: %s", thread.name)
460        self._logger.debug("number of active threads: %s", threading.active_count())
461
462        # signal exit
463        self._signal_exit()
464
465        # stop the threads
466        self._logger.verbose("cancelling tasks...")
467        if self._listen_thread is not None:
468            self._listen_thread.join()
469            self._listen_thread = None
470        self._logger.notice("listening thread joined")
471
472        # debug the threads
473        for thread in threading.enumerate():
474            if thread is not None and thread.name is not None:
475                self._logger.debug("before running thread: %s", thread.name)
476            else:
477                self._logger.debug("after running thread: unknown_thread_name")
478        self._logger.debug("number of active threads: %s", threading.active_count())
479
480        self._logger.notice("finish succeeded")
481        self._logger.spam("AbstractSyncWebSocketClient.finish LEAVE")
482        return True
483
484    # signals the WebSocket connection to exit
485    def _signal_exit(self) -> None:
486        # closes the WebSocket connection gracefully
487        self._logger.notice("closing socket...")
488        if self._socket is not None:
489            self._logger.notice("sending Close...")
490            try:
491                # if the socket connection is closed, the following line might throw an error
492                self._close_message()
493            except websockets.exceptions.ConnectionClosedOK as e:
494                self._logger.notice("_signal_exit  - ConnectionClosedOK: %s", e.code)
495            except websockets.exceptions.ConnectionClosed as e:
496                self._logger.error("_signal_exit  - ConnectionClosed: %s", e.code)
497            except websockets.exceptions.WebSocketException as e:
498                self._logger.error("_signal_exit - WebSocketException: %s", str(e))
499            except Exception as e:  # pylint: disable=broad-except
500                self._logger.error("_signal_exit - Exception: %s", str(e))
501
502            # push close event
503            try:
504                self._emit(
505                    WebSocketEvents(WebSocketEvents.Close),
506                    CloseResponse(type=WebSocketEvents.Close),
507                    **dict(cast(Dict[Any, Any], self._kwargs)),
508                )
509            except Exception as e:  # pylint: disable=broad-except
510                self._logger.error("_signal_exit - Exception: %s", e)
511
512            # wait for task to send
513            time.sleep(0.5)
514
515        # signal exit
516        self._exit_event.set()
517
518        # closes the WebSocket connection gracefully
519        self._logger.verbose("clean up socket...")
520        if self._socket is not None:
521            self._logger.verbose("socket.wait_closed...")
522            try:
523                self._socket.close()
524            except websockets.exceptions.WebSocketException as e:
525                self._logger.error("socket.wait_closed failed: %s", e)
526
527        self._socket = None
ONE_SECOND = 1
HALF_SECOND = 0.5
DEEPGRAM_INTERVAL = 5
PING_INTERVAL = 20
class AbstractSyncWebSocketClient(abc.ABC):
 36class AbstractSyncWebSocketClient(ABC):  # pylint: disable=too-many-instance-attributes
 37    """
 38    Abstract class for using WebSockets.
 39
 40    This class provides methods to establish a WebSocket connection generically for
 41    use in all WebSocket clients.
 42
 43    Args:
 44        config (DeepgramClientOptions): all the options for the client
 45        endpoint (str): the endpoint to connect to
 46        thread_cls (Type[threading.Thread]): optional thread class to use for creating threads,
 47            defaults to threading.Thread. Useful for custom thread management like ContextVar support.
 48    """
 49
 50    _logger: verboselogs.VerboseLogger
 51    _config: DeepgramClientOptions
 52    _endpoint: str
 53    _websocket_url: str
 54
 55    _socket: Optional[ClientConnection] = None
 56    _exit_event: threading.Event
 57    _lock_send: threading.Lock
 58
 59    _listen_thread: Union[threading.Thread, None]
 60    _delegate: Optional[Speaker] = None
 61
 62    _thread_cls: Type[threading.Thread]
 63
 64    _kwargs: Optional[Dict] = None
 65    _addons: Optional[Dict] = None
 66    _options: Optional[Dict] = None
 67    _headers: Optional[Dict] = None
 68
 69    def __init__(
 70        self,
 71        config: DeepgramClientOptions,
 72        endpoint: str = "",
 73        thread_cls: Type[threading.Thread] = threading.Thread,
 74    ):
 75        if config is None:
 76            raise DeepgramError("Config is required")
 77        if endpoint == "":
 78            raise DeepgramError("endpoint is required")
 79
 80        self._logger = verboselogs.VerboseLogger(__name__)
 81        self._logger.addHandler(logging.StreamHandler())
 82        self._logger.setLevel(config.verbose)
 83
 84        self._config = config
 85        self._endpoint = endpoint
 86        self._lock_send = threading.Lock()
 87
 88        self._listen_thread = None
 89
 90        self._thread_cls = thread_cls
 91
 92        # exit
 93        self._exit_event = threading.Event()
 94
 95        # set websocket url
 96        self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint)
 97
 98    def delegate_listening(self, delegate: Speaker) -> None:
 99        """
100        Delegate the listening thread to the main thread.
101        """
102        self._delegate = delegate
103
104    # pylint: disable=too-many-statements,too-many-branches
105    def start(
106        self,
107        options: Optional[Any] = None,
108        addons: Optional[Dict] = None,
109        headers: Optional[Dict] = None,
110        **kwargs,
111    ) -> bool:
112        """
113        Starts the WebSocket connection for live transcription.
114        """
115        self._logger.debug("AbstractSyncWebSocketClient.start ENTER")
116        self._logger.info("addons: %s", addons)
117        self._logger.info("headers: %s", headers)
118        self._logger.info("kwargs: %s", kwargs)
119
120        self._addons = addons
121        self._headers = headers
122
123        # set kwargs
124        if kwargs is not None:
125            self._kwargs = kwargs
126        else:
127            self._kwargs = {}
128
129        if not isinstance(options, dict):
130            self._logger.error("options is not a dict")
131            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
132            return False
133
134        # set options
135        if options is not None:
136            self._options = options
137        else:
138            self._options = {}
139
140        combined_options = self._options.copy()
141        if self._addons is not None:
142            self._logger.info("merging addons to options")
143            combined_options.update(self._addons)
144            self._logger.info("new options: %s", combined_options)
145        self._logger.debug("combined_options: %s", combined_options)
146
147        combined_headers = self._config.headers.copy()
148        if self._headers is not None:
149            self._logger.info("merging headers to options")
150            combined_headers.update(self._headers)
151            self._logger.info("new headers: %s", combined_headers)
152        self._logger.debug("combined_headers: %s", combined_headers)
153
154        url_with_params = append_query_params(self._websocket_url, combined_options)
155        try:
156            self._socket = connect(url_with_params, additional_headers=combined_headers)
157            self._exit_event.clear()
158
159            # debug the threads
160            for thread in threading.enumerate():
161                self._logger.debug("after running thread: %s", thread.name)
162            self._logger.debug("number of active threads: %s", threading.active_count())
163
164            # delegate the listening thread to external object
165            if self._delegate is not None:
166                self._logger.notice("_delegate is enabled. this is usually the speaker")
167                self._delegate.set_pull_callback(self._socket.recv)
168                self._delegate.set_push_callback(self._process_message)
169            else:
170                self._logger.notice("create _listening thread")
171                self._listen_thread = self._thread_cls(target=self._listening)
172                self._listen_thread.start()
173
174            # debug the threads
175            for thread in threading.enumerate():
176                self._logger.debug("after running thread: %s", thread.name)
177            self._logger.debug("number of active threads: %s", threading.active_count())
178
179            # push open event
180            self._emit(
181                WebSocketEvents(WebSocketEvents.Open),
182                OpenResponse(type=WebSocketEvents.Open),
183            )
184
185            self._logger.notice("start succeeded")
186            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
187            return True
188        except websockets.exceptions.ConnectionClosed as e:
189            self._logger.error(
190                "ConnectionClosed in AbstractSyncWebSocketClient.start: %s", e
191            )
192            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
193            if self._config.options.get("termination_exception_connect", False):
194                raise e
195            return False
196        except websockets.exceptions.WebSocketException as e:
197            self._logger.error(
198                "WebSocketException in AbstractSyncWebSocketClient.start: %s", e
199            )
200            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
201            if self._config.options.get("termination_exception_connect", False):
202                raise e
203            return False
204        except Exception as e:  # pylint: disable=broad-except
205            self._logger.error(
206                "WebSocketException in AbstractSyncWebSocketClient.start: %s", e
207            )
208            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
209            if self._config.options.get("termination_exception_connect", False):
210                raise e
211            return False
212
213    def is_connected(self) -> bool:
214        """
215        Returns the connection status of the WebSocket.
216        """
217        return self._socket is not None
218
219    # pylint: enable=too-many-statements,too-many-branches
220
221    @abstractmethod
222    def on(self, event: WebSocketEvents, handler: Callable) -> None:
223        """
224        Registers an event handler for the WebSocket connection.
225        """
226        raise NotImplementedError("no on method")
227
228    @abstractmethod
229    def _emit(self, event: WebSocketEvents, *args, **kwargs) -> None:
230        """
231        Emits an event to the WebSocket connection.
232        """
233        raise NotImplementedError("no _emit method")
234
235    # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches
236    def _listening(
237        self,
238    ) -> None:
239        """
240        Listens for messages from the WebSocket connection.
241        """
242        self._logger.debug("AbstractSyncWebSocketClient._listening ENTER")
243
244        while True:
245            try:
246                if self._exit_event.is_set():
247                    self._logger.notice("_listening exiting gracefully")
248                    self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE")
249                    return
250
251                if self._socket is None:
252                    self._logger.warning("socket is empty")
253                    self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE")
254                    return
255
256                message = self._socket.recv()
257
258                if message is None:
259                    self._logger.info("message is None")
260                    continue
261
262                self._logger.spam("data type: %s", type(message))
263
264                if isinstance(message, bytes):
265                    self._logger.debug("Binary data received")
266                    self._process_binary(message)
267                else:
268                    self._logger.debug("Text data received")
269                    self._process_text(message)
270
271                self._logger.notice("_listening Succeeded")
272                self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE")
273
274            except websockets.exceptions.ConnectionClosedOK as e:
275                # signal exit and close
276                self._signal_exit()
277
278                self._logger.notice(f"_listening({e.code}) exiting gracefully")
279                self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE")
280                return
281
282            except websockets.exceptions.ConnectionClosed as e:
283                if e.code in [1000, 1001]:
284                    # signal exit and close
285                    self._signal_exit()
286
287                    self._logger.notice(f"_listening({e.code}) exiting gracefully")
288                    self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE")
289                    return
290
291                # we need to explicitly call self._signal_exit() here because we are hanging on a recv()
292                # note: this is different than the speak websocket client
293                self._logger.error(
294                    "ConnectionClosed in AbstractSyncWebSocketClient._listening with code %s: %s",
295                    e.code,
296                    e.reason,
297                )
298                cc_error: ErrorResponse = ErrorResponse(
299                    "ConnectionClosed in AbstractSyncWebSocketClient._listening",
300                    f"{e}",
301                    "ConnectionClosed",
302                )
303                self._emit(
304                    WebSocketEvents(WebSocketEvents.Error),
305                    cc_error,
306                    **dict(cast(Dict[Any, Any], self._kwargs)),
307                )
308
309                # signal exit and close
310                self._signal_exit()
311
312                self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE")
313
314                if self._config.options.get("termination_exception_connect") is True:
315                    raise
316                return
317
318            except websockets.exceptions.WebSocketException as e:
319                self._logger.error(
320                    "WebSocketException in AbstractSyncWebSocketClient._listening with: %s",
321                    e,
322                )
323                ws_error: ErrorResponse = ErrorResponse(
324                    "WebSocketException in AbstractSyncWebSocketClient._listening",
325                    f"{e}",
326                    "WebSocketException",
327                )
328                self._emit(
329                    WebSocketEvents(WebSocketEvents.Error),
330                    ws_error,
331                    **dict(cast(Dict[Any, Any], self._kwargs)),
332                )
333
334                # signal exit and close
335                self._signal_exit()
336
337                self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE")
338
339                if self._config.options.get("termination_exception_connect") is True:
340                    raise
341                return
342
343            except Exception as e:  # pylint: disable=broad-except
344                self._logger.error(
345                    "Exception in AbstractSyncWebSocketClient._listening: %s", e
346                )
347                e_error: ErrorResponse = ErrorResponse(
348                    "Exception in AbstractSyncWebSocketClient._listening",
349                    f"{e}",
350                    "Exception",
351                )
352                self._emit(
353                    WebSocketEvents(WebSocketEvents.Error),
354                    e_error,
355                    **dict(cast(Dict[Any, Any], self._kwargs)),
356                )
357
358                # signal exit and close
359                self._signal_exit()
360
361                self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE")
362
363                if self._config.options.get("termination_exception_connect") is True:
364                    raise
365                return
366
367    # pylint: enable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches
368
369    def _process_message(self, message: Union[str, bytes]) -> None:
370        if isinstance(message, bytes):
371            self._process_binary(message)
372        else:
373            self._process_text(message)
374
375    @abstractmethod
376    def _process_text(self, message: str) -> None:
377        raise NotImplementedError("no _process_text method")
378
379    @abstractmethod
380    def _process_binary(self, message: bytes) -> None:
381        raise NotImplementedError("no _process_binary method")
382
383    @abstractmethod
384    def _close_message(self) -> bool:
385        raise NotImplementedError("no _close_message method")
386
387    # pylint: disable=too-many-return-statements,too-many-branches
388    def send(self, data: Union[str, bytes]) -> bool:
389        """
390        Sends data over the WebSocket connection.
391        """
392        self._logger.spam("AbstractSyncWebSocketClient.send ENTER")
393
394        if self._exit_event.is_set():
395            self._logger.notice("send exiting gracefully")
396            self._logger.debug("AbstractSyncWebSocketClient.send LEAVE")
397            return False
398
399        if not self.is_connected():
400            self._logger.notice("is_connected is False")
401            self._logger.debug("AbstractSyncWebSocketClient.send LEAVE")
402            return False
403
404        if self._socket is not None:
405            with self._lock_send:
406                try:
407                    self._socket.send(data)
408                except websockets.exceptions.ConnectionClosedOK as e:
409                    self._logger.notice(f"send() exiting gracefully: {e.code}")
410                    self._logger.debug("AbstractSyncWebSocketClient.send LEAVE")
411                    if self._config.options.get("termination_exception_send") is True:
412                        raise
413                    return True
414                except websockets.exceptions.ConnectionClosed as e:
415                    if e.code in [1000, 1001]:
416                        self._logger.notice(f"send({e.code}) exiting gracefully")
417                        self._logger.debug("AbstractSyncWebSocketClient.send LEAVE")
418                        if (
419                            self._config.options.get("termination_exception_send")
420                            == "true"
421                        ):
422                            raise
423                        return True
424                    self._logger.error("send() failed - ConnectionClosed: %s", str(e))
425                    self._logger.spam("AbstractSyncWebSocketClient.send LEAVE")
426                    if self._config.options.get("termination_exception_send") is True:
427                        raise
428                    return False
429                except websockets.exceptions.WebSocketException as e:
430                    self._logger.error("send() failed - WebSocketException: %s", str(e))
431                    self._logger.spam("AbstractSyncWebSocketClient.send LEAVE")
432                    if self._config.options.get("termination_exception_send") is True:
433                        raise
434                    return False
435                except Exception as e:  # pylint: disable=broad-except
436                    self._logger.error("send() failed - Exception: %s", str(e))
437                    self._logger.spam("AbstractSyncWebSocketClient.send LEAVE")
438                    if self._config.options.get("termination_exception_send") is True:
439                        raise
440                    return False
441
442            self._logger.spam("send() succeeded")
443            self._logger.spam("AbstractSyncWebSocketClient.send LEAVE")
444            return True
445
446        self._logger.spam("send() failed. socket is None")
447        self._logger.spam("AbstractSyncWebSocketClient.send LEAVE")
448        return False
449
450    # pylint: enable=too-many-return-statements,too-many-branches
451
452    def finish(self) -> bool:
453        """
454        Closes the WebSocket connection gracefully.
455        """
456        self._logger.spam("AbstractSyncWebSocketClient.finish ENTER")
457
458        # debug the threads
459        for thread in threading.enumerate():
460            self._logger.debug("before running thread: %s", thread.name)
461        self._logger.debug("number of active threads: %s", threading.active_count())
462
463        # signal exit
464        self._signal_exit()
465
466        # stop the threads
467        self._logger.verbose("cancelling tasks...")
468        if self._listen_thread is not None:
469            self._listen_thread.join()
470            self._listen_thread = None
471        self._logger.notice("listening thread joined")
472
473        # debug the threads
474        for thread in threading.enumerate():
475            if thread is not None and thread.name is not None:
476                self._logger.debug("before running thread: %s", thread.name)
477            else:
478                self._logger.debug("after running thread: unknown_thread_name")
479        self._logger.debug("number of active threads: %s", threading.active_count())
480
481        self._logger.notice("finish succeeded")
482        self._logger.spam("AbstractSyncWebSocketClient.finish LEAVE")
483        return True
484
485    # signals the WebSocket connection to exit
486    def _signal_exit(self) -> None:
487        # closes the WebSocket connection gracefully
488        self._logger.notice("closing socket...")
489        if self._socket is not None:
490            self._logger.notice("sending Close...")
491            try:
492                # if the socket connection is closed, the following line might throw an error
493                self._close_message()
494            except websockets.exceptions.ConnectionClosedOK as e:
495                self._logger.notice("_signal_exit  - ConnectionClosedOK: %s", e.code)
496            except websockets.exceptions.ConnectionClosed as e:
497                self._logger.error("_signal_exit  - ConnectionClosed: %s", e.code)
498            except websockets.exceptions.WebSocketException as e:
499                self._logger.error("_signal_exit - WebSocketException: %s", str(e))
500            except Exception as e:  # pylint: disable=broad-except
501                self._logger.error("_signal_exit - Exception: %s", str(e))
502
503            # push close event
504            try:
505                self._emit(
506                    WebSocketEvents(WebSocketEvents.Close),
507                    CloseResponse(type=WebSocketEvents.Close),
508                    **dict(cast(Dict[Any, Any], self._kwargs)),
509                )
510            except Exception as e:  # pylint: disable=broad-except
511                self._logger.error("_signal_exit - Exception: %s", e)
512
513            # wait for task to send
514            time.sleep(0.5)
515
516        # signal exit
517        self._exit_event.set()
518
519        # closes the WebSocket connection gracefully
520        self._logger.verbose("clean up socket...")
521        if self._socket is not None:
522            self._logger.verbose("socket.wait_closed...")
523            try:
524                self._socket.close()
525            except websockets.exceptions.WebSocketException as e:
526                self._logger.error("socket.wait_closed failed: %s", e)
527
528        self._socket = None

Abstract class for using WebSockets.

This class provides methods to establish a WebSocket connection generically for use in all WebSocket clients.

Args: config (DeepgramClientOptions): all the options for the client endpoint (str): the endpoint to connect to thread_cls (Type[threading.Thread]): optional thread class to use for creating threads, defaults to threading.Thread. Useful for custom thread management like ContextVar support.

def delegate_listening(self, delegate: deepgram.audio.speaker.speaker.Speaker) -> None:
 98    def delegate_listening(self, delegate: Speaker) -> None:
 99        """
100        Delegate the listening thread to the main thread.
101        """
102        self._delegate = delegate

Delegate the listening thread to the main thread.

def start( self, options: Optional[Any] = None, addons: Optional[Dict] = None, headers: Optional[Dict] = None, **kwargs) -> bool:
105    def start(
106        self,
107        options: Optional[Any] = None,
108        addons: Optional[Dict] = None,
109        headers: Optional[Dict] = None,
110        **kwargs,
111    ) -> bool:
112        """
113        Starts the WebSocket connection for live transcription.
114        """
115        self._logger.debug("AbstractSyncWebSocketClient.start ENTER")
116        self._logger.info("addons: %s", addons)
117        self._logger.info("headers: %s", headers)
118        self._logger.info("kwargs: %s", kwargs)
119
120        self._addons = addons
121        self._headers = headers
122
123        # set kwargs
124        if kwargs is not None:
125            self._kwargs = kwargs
126        else:
127            self._kwargs = {}
128
129        if not isinstance(options, dict):
130            self._logger.error("options is not a dict")
131            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
132            return False
133
134        # set options
135        if options is not None:
136            self._options = options
137        else:
138            self._options = {}
139
140        combined_options = self._options.copy()
141        if self._addons is not None:
142            self._logger.info("merging addons to options")
143            combined_options.update(self._addons)
144            self._logger.info("new options: %s", combined_options)
145        self._logger.debug("combined_options: %s", combined_options)
146
147        combined_headers = self._config.headers.copy()
148        if self._headers is not None:
149            self._logger.info("merging headers to options")
150            combined_headers.update(self._headers)
151            self._logger.info("new headers: %s", combined_headers)
152        self._logger.debug("combined_headers: %s", combined_headers)
153
154        url_with_params = append_query_params(self._websocket_url, combined_options)
155        try:
156            self._socket = connect(url_with_params, additional_headers=combined_headers)
157            self._exit_event.clear()
158
159            # debug the threads
160            for thread in threading.enumerate():
161                self._logger.debug("after running thread: %s", thread.name)
162            self._logger.debug("number of active threads: %s", threading.active_count())
163
164            # delegate the listening thread to external object
165            if self._delegate is not None:
166                self._logger.notice("_delegate is enabled. this is usually the speaker")
167                self._delegate.set_pull_callback(self._socket.recv)
168                self._delegate.set_push_callback(self._process_message)
169            else:
170                self._logger.notice("create _listening thread")
171                self._listen_thread = self._thread_cls(target=self._listening)
172                self._listen_thread.start()
173
174            # debug the threads
175            for thread in threading.enumerate():
176                self._logger.debug("after running thread: %s", thread.name)
177            self._logger.debug("number of active threads: %s", threading.active_count())
178
179            # push open event
180            self._emit(
181                WebSocketEvents(WebSocketEvents.Open),
182                OpenResponse(type=WebSocketEvents.Open),
183            )
184
185            self._logger.notice("start succeeded")
186            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
187            return True
188        except websockets.exceptions.ConnectionClosed as e:
189            self._logger.error(
190                "ConnectionClosed in AbstractSyncWebSocketClient.start: %s", e
191            )
192            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
193            if self._config.options.get("termination_exception_connect", False):
194                raise e
195            return False
196        except websockets.exceptions.WebSocketException as e:
197            self._logger.error(
198                "WebSocketException in AbstractSyncWebSocketClient.start: %s", e
199            )
200            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
201            if self._config.options.get("termination_exception_connect", False):
202                raise e
203            return False
204        except Exception as e:  # pylint: disable=broad-except
205            self._logger.error(
206                "WebSocketException in AbstractSyncWebSocketClient.start: %s", e
207            )
208            self._logger.debug("AbstractSyncWebSocketClient.start LEAVE")
209            if self._config.options.get("termination_exception_connect", False):
210                raise e
211            return False

Starts the WebSocket connection for live transcription.

def is_connected(self) -> bool:
213    def is_connected(self) -> bool:
214        """
215        Returns the connection status of the WebSocket.
216        """
217        return self._socket is not None

Returns the connection status of the WebSocket.

@abstractmethod
def on( self, event: deepgram.clients.common.v1.websocket_events.WebSocketEvents, handler: Callable) -> None:
221    @abstractmethod
222    def on(self, event: WebSocketEvents, handler: Callable) -> None:
223        """
224        Registers an event handler for the WebSocket connection.
225        """
226        raise NotImplementedError("no on method")

Registers an event handler for the WebSocket connection.

def send(self, data: Union[str, bytes]) -> bool:
388    def send(self, data: Union[str, bytes]) -> bool:
389        """
390        Sends data over the WebSocket connection.
391        """
392        self._logger.spam("AbstractSyncWebSocketClient.send ENTER")
393
394        if self._exit_event.is_set():
395            self._logger.notice("send exiting gracefully")
396            self._logger.debug("AbstractSyncWebSocketClient.send LEAVE")
397            return False
398
399        if not self.is_connected():
400            self._logger.notice("is_connected is False")
401            self._logger.debug("AbstractSyncWebSocketClient.send LEAVE")
402            return False
403
404        if self._socket is not None:
405            with self._lock_send:
406                try:
407                    self._socket.send(data)
408                except websockets.exceptions.ConnectionClosedOK as e:
409                    self._logger.notice(f"send() exiting gracefully: {e.code}")
410                    self._logger.debug("AbstractSyncWebSocketClient.send LEAVE")
411                    if self._config.options.get("termination_exception_send") is True:
412                        raise
413                    return True
414                except websockets.exceptions.ConnectionClosed as e:
415                    if e.code in [1000, 1001]:
416                        self._logger.notice(f"send({e.code}) exiting gracefully")
417                        self._logger.debug("AbstractSyncWebSocketClient.send LEAVE")
418                        if (
419                            self._config.options.get("termination_exception_send")
420                            == "true"
421                        ):
422                            raise
423                        return True
424                    self._logger.error("send() failed - ConnectionClosed: %s", str(e))
425                    self._logger.spam("AbstractSyncWebSocketClient.send LEAVE")
426                    if self._config.options.get("termination_exception_send") is True:
427                        raise
428                    return False
429                except websockets.exceptions.WebSocketException as e:
430                    self._logger.error("send() failed - WebSocketException: %s", str(e))
431                    self._logger.spam("AbstractSyncWebSocketClient.send LEAVE")
432                    if self._config.options.get("termination_exception_send") is True:
433                        raise
434                    return False
435                except Exception as e:  # pylint: disable=broad-except
436                    self._logger.error("send() failed - Exception: %s", str(e))
437                    self._logger.spam("AbstractSyncWebSocketClient.send LEAVE")
438                    if self._config.options.get("termination_exception_send") is True:
439                        raise
440                    return False
441
442            self._logger.spam("send() succeeded")
443            self._logger.spam("AbstractSyncWebSocketClient.send LEAVE")
444            return True
445
446        self._logger.spam("send() failed. socket is None")
447        self._logger.spam("AbstractSyncWebSocketClient.send LEAVE")
448        return False

Sends data over the WebSocket connection.

def finish(self) -> bool:
452    def finish(self) -> bool:
453        """
454        Closes the WebSocket connection gracefully.
455        """
456        self._logger.spam("AbstractSyncWebSocketClient.finish ENTER")
457
458        # debug the threads
459        for thread in threading.enumerate():
460            self._logger.debug("before running thread: %s", thread.name)
461        self._logger.debug("number of active threads: %s", threading.active_count())
462
463        # signal exit
464        self._signal_exit()
465
466        # stop the threads
467        self._logger.verbose("cancelling tasks...")
468        if self._listen_thread is not None:
469            self._listen_thread.join()
470            self._listen_thread = None
471        self._logger.notice("listening thread joined")
472
473        # debug the threads
474        for thread in threading.enumerate():
475            if thread is not None and thread.name is not None:
476                self._logger.debug("before running thread: %s", thread.name)
477            else:
478                self._logger.debug("after running thread: unknown_thread_name")
479        self._logger.debug("number of active threads: %s", threading.active_count())
480
481        self._logger.notice("finish succeeded")
482        self._logger.spam("AbstractSyncWebSocketClient.finish LEAVE")
483        return True

Closes the WebSocket connection gracefully.