deepgram.clients.common.v1.abstract_sync_websocket
1# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. 2# Use of this source code is governed by a MIT license that can be found in the LICENSE file. 3# SPDX-License-Identifier: MIT 4import json 5import time 6import logging 7from typing import Dict, Union, Optional, cast, Any, Callable, Type 8from datetime import datetime 9import threading 10from abc import ABC, abstractmethod 11 12from websockets.sync.client import connect, ClientConnection 13import websockets 14 15from ....audio import Speaker 16from ....utils import verboselogs 17from ....options import DeepgramClientOptions 18from .helpers import convert_to_websocket_url, append_query_params 19from .errors import DeepgramError 20 21from .websocket_response import ( 22 OpenResponse, 23 CloseResponse, 24 ErrorResponse, 25) 26from .websocket_events import WebSocketEvents 27 28 29ONE_SECOND = 1 30HALF_SECOND = 0.5 31DEEPGRAM_INTERVAL = 5 32PING_INTERVAL = 20 33 34 35class AbstractSyncWebSocketClient(ABC): # pylint: disable=too-many-instance-attributes 36 """ 37 Abstract class for using WebSockets. 38 39 This class provides methods to establish a WebSocket connection generically for 40 use in all WebSocket clients. 41 42 Args: 43 config (DeepgramClientOptions): all the options for the client 44 endpoint (str): the endpoint to connect to 45 thread_cls (Type[threading.Thread]): optional thread class to use for creating threads, 46 defaults to threading.Thread. Useful for custom thread management like ContextVar support. 47 """ 48 49 _logger: verboselogs.VerboseLogger 50 _config: DeepgramClientOptions 51 _endpoint: str 52 _websocket_url: str 53 54 _socket: Optional[ClientConnection] = None 55 _exit_event: threading.Event 56 _lock_send: threading.Lock 57 58 _listen_thread: Union[threading.Thread, None] 59 _delegate: Optional[Speaker] = None 60 61 _thread_cls: Type[threading.Thread] 62 63 _kwargs: Optional[Dict] = None 64 _addons: Optional[Dict] = None 65 _options: Optional[Dict] = None 66 _headers: Optional[Dict] = None 67 68 def __init__( 69 self, 70 config: DeepgramClientOptions, 71 endpoint: str = "", 72 thread_cls: Type[threading.Thread] = threading.Thread, 73 ): 74 if config is None: 75 raise DeepgramError("Config is required") 76 if endpoint == "": 77 raise DeepgramError("endpoint is required") 78 79 self._logger = verboselogs.VerboseLogger(__name__) 80 self._logger.addHandler(logging.StreamHandler()) 81 self._logger.setLevel(config.verbose) 82 83 self._config = config 84 self._endpoint = endpoint 85 self._lock_send = threading.Lock() 86 87 self._listen_thread = None 88 89 self._thread_cls = thread_cls 90 91 # exit 92 self._exit_event = threading.Event() 93 94 # set websocket url 95 self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint) 96 97 def delegate_listening(self, delegate: Speaker) -> None: 98 """ 99 Delegate the listening thread to the main thread. 100 """ 101 self._delegate = delegate 102 103 # pylint: disable=too-many-statements,too-many-branches 104 def start( 105 self, 106 options: Optional[Any] = None, 107 addons: Optional[Dict] = None, 108 headers: Optional[Dict] = None, 109 **kwargs, 110 ) -> bool: 111 """ 112 Starts the WebSocket connection for live transcription. 113 """ 114 self._logger.debug("AbstractSyncWebSocketClient.start ENTER") 115 self._logger.info("addons: %s", addons) 116 self._logger.info("headers: %s", headers) 117 self._logger.info("kwargs: %s", kwargs) 118 119 self._addons = addons 120 self._headers = headers 121 122 # set kwargs 123 if kwargs is not None: 124 self._kwargs = kwargs 125 else: 126 self._kwargs = {} 127 128 if not isinstance(options, dict): 129 self._logger.error("options is not a dict") 130 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 131 return False 132 133 # set options 134 if options is not None: 135 self._options = options 136 else: 137 self._options = {} 138 139 combined_options = self._options.copy() 140 if self._addons is not None: 141 self._logger.info("merging addons to options") 142 combined_options.update(self._addons) 143 self._logger.info("new options: %s", combined_options) 144 self._logger.debug("combined_options: %s", combined_options) 145 146 combined_headers = self._config.headers.copy() 147 if self._headers is not None: 148 self._logger.info("merging headers to options") 149 combined_headers.update(self._headers) 150 self._logger.info("new headers: %s", combined_headers) 151 self._logger.debug("combined_headers: %s", combined_headers) 152 153 url_with_params = append_query_params(self._websocket_url, combined_options) 154 try: 155 self._socket = connect(url_with_params, additional_headers=combined_headers) 156 self._exit_event.clear() 157 158 # debug the threads 159 for thread in threading.enumerate(): 160 self._logger.debug("after running thread: %s", thread.name) 161 self._logger.debug("number of active threads: %s", threading.active_count()) 162 163 # delegate the listening thread to external object 164 if self._delegate is not None: 165 self._logger.notice("_delegate is enabled. this is usually the speaker") 166 self._delegate.set_pull_callback(self._socket.recv) 167 self._delegate.set_push_callback(self._process_message) 168 else: 169 self._logger.notice("create _listening thread") 170 self._listen_thread = self._thread_cls(target=self._listening) 171 self._listen_thread.start() 172 173 # debug the threads 174 for thread in threading.enumerate(): 175 self._logger.debug("after running thread: %s", thread.name) 176 self._logger.debug("number of active threads: %s", threading.active_count()) 177 178 # push open event 179 self._emit( 180 WebSocketEvents(WebSocketEvents.Open), 181 OpenResponse(type=WebSocketEvents.Open), 182 ) 183 184 self._logger.notice("start succeeded") 185 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 186 return True 187 except websockets.exceptions.ConnectionClosed as e: 188 self._logger.error( 189 "ConnectionClosed in AbstractSyncWebSocketClient.start: %s", e 190 ) 191 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 192 if self._config.options.get("termination_exception_connect", False): 193 raise e 194 return False 195 except websockets.exceptions.WebSocketException as e: 196 self._logger.error( 197 "WebSocketException in AbstractSyncWebSocketClient.start: %s", e 198 ) 199 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 200 if self._config.options.get("termination_exception_connect", False): 201 raise e 202 return False 203 except Exception as e: # pylint: disable=broad-except 204 self._logger.error( 205 "WebSocketException in AbstractSyncWebSocketClient.start: %s", e 206 ) 207 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 208 if self._config.options.get("termination_exception_connect", False): 209 raise e 210 return False 211 212 def is_connected(self) -> bool: 213 """ 214 Returns the connection status of the WebSocket. 215 """ 216 return self._socket is not None 217 218 # pylint: enable=too-many-statements,too-many-branches 219 220 @abstractmethod 221 def on(self, event: WebSocketEvents, handler: Callable) -> None: 222 """ 223 Registers an event handler for the WebSocket connection. 224 """ 225 raise NotImplementedError("no on method") 226 227 @abstractmethod 228 def _emit(self, event: WebSocketEvents, *args, **kwargs) -> None: 229 """ 230 Emits an event to the WebSocket connection. 231 """ 232 raise NotImplementedError("no _emit method") 233 234 # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches 235 def _listening( 236 self, 237 ) -> None: 238 """ 239 Listens for messages from the WebSocket connection. 240 """ 241 self._logger.debug("AbstractSyncWebSocketClient._listening ENTER") 242 243 while True: 244 try: 245 if self._exit_event.is_set(): 246 self._logger.notice("_listening exiting gracefully") 247 self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE") 248 return 249 250 if self._socket is None: 251 self._logger.warning("socket is empty") 252 self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE") 253 return 254 255 message = self._socket.recv() 256 257 if message is None: 258 self._logger.info("message is None") 259 continue 260 261 self._logger.spam("data type: %s", type(message)) 262 263 if isinstance(message, bytes): 264 self._logger.debug("Binary data received") 265 self._process_binary(message) 266 else: 267 self._logger.debug("Text data received") 268 self._process_text(message) 269 270 self._logger.notice("_listening Succeeded") 271 self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE") 272 273 except websockets.exceptions.ConnectionClosedOK as e: 274 # signal exit and close 275 self._signal_exit() 276 277 self._logger.notice(f"_listening({e.code}) exiting gracefully") 278 self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE") 279 return 280 281 except websockets.exceptions.ConnectionClosed as e: 282 if e.code in [1000, 1001]: 283 # signal exit and close 284 self._signal_exit() 285 286 self._logger.notice(f"_listening({e.code}) exiting gracefully") 287 self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE") 288 return 289 290 # we need to explicitly call self._signal_exit() here because we are hanging on a recv() 291 # note: this is different than the speak websocket client 292 self._logger.error( 293 "ConnectionClosed in AbstractSyncWebSocketClient._listening with code %s: %s", 294 e.code, 295 e.reason, 296 ) 297 cc_error: ErrorResponse = ErrorResponse( 298 "ConnectionClosed in AbstractSyncWebSocketClient._listening", 299 f"{e}", 300 "ConnectionClosed", 301 ) 302 self._emit( 303 WebSocketEvents(WebSocketEvents.Error), 304 cc_error, 305 **dict(cast(Dict[Any, Any], self._kwargs)), 306 ) 307 308 # signal exit and close 309 self._signal_exit() 310 311 self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE") 312 313 if self._config.options.get("termination_exception_connect") is True: 314 raise 315 return 316 317 except websockets.exceptions.WebSocketException as e: 318 self._logger.error( 319 "WebSocketException in AbstractSyncWebSocketClient._listening with: %s", 320 e, 321 ) 322 ws_error: ErrorResponse = ErrorResponse( 323 "WebSocketException in AbstractSyncWebSocketClient._listening", 324 f"{e}", 325 "WebSocketException", 326 ) 327 self._emit( 328 WebSocketEvents(WebSocketEvents.Error), 329 ws_error, 330 **dict(cast(Dict[Any, Any], self._kwargs)), 331 ) 332 333 # signal exit and close 334 self._signal_exit() 335 336 self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE") 337 338 if self._config.options.get("termination_exception_connect") is True: 339 raise 340 return 341 342 except Exception as e: # pylint: disable=broad-except 343 self._logger.error( 344 "Exception in AbstractSyncWebSocketClient._listening: %s", e 345 ) 346 e_error: ErrorResponse = ErrorResponse( 347 "Exception in AbstractSyncWebSocketClient._listening", 348 f"{e}", 349 "Exception", 350 ) 351 self._emit( 352 WebSocketEvents(WebSocketEvents.Error), 353 e_error, 354 **dict(cast(Dict[Any, Any], self._kwargs)), 355 ) 356 357 # signal exit and close 358 self._signal_exit() 359 360 self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE") 361 362 if self._config.options.get("termination_exception_connect") is True: 363 raise 364 return 365 366 # pylint: enable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches 367 368 def _process_message(self, message: Union[str, bytes]) -> None: 369 if isinstance(message, bytes): 370 self._process_binary(message) 371 else: 372 self._process_text(message) 373 374 @abstractmethod 375 def _process_text(self, message: str) -> None: 376 raise NotImplementedError("no _process_text method") 377 378 @abstractmethod 379 def _process_binary(self, message: bytes) -> None: 380 raise NotImplementedError("no _process_binary method") 381 382 @abstractmethod 383 def _close_message(self) -> bool: 384 raise NotImplementedError("no _close_message method") 385 386 # pylint: disable=too-many-return-statements,too-many-branches 387 def send(self, data: Union[str, bytes]) -> bool: 388 """ 389 Sends data over the WebSocket connection. 390 """ 391 self._logger.spam("AbstractSyncWebSocketClient.send ENTER") 392 393 if self._exit_event.is_set(): 394 self._logger.notice("send exiting gracefully") 395 self._logger.debug("AbstractSyncWebSocketClient.send LEAVE") 396 return False 397 398 if not self.is_connected(): 399 self._logger.notice("is_connected is False") 400 self._logger.debug("AbstractSyncWebSocketClient.send LEAVE") 401 return False 402 403 if self._socket is not None: 404 with self._lock_send: 405 try: 406 self._socket.send(data) 407 except websockets.exceptions.ConnectionClosedOK as e: 408 self._logger.notice(f"send() exiting gracefully: {e.code}") 409 self._logger.debug("AbstractSyncWebSocketClient.send LEAVE") 410 if self._config.options.get("termination_exception_send") is True: 411 raise 412 return True 413 except websockets.exceptions.ConnectionClosed as e: 414 if e.code in [1000, 1001]: 415 self._logger.notice(f"send({e.code}) exiting gracefully") 416 self._logger.debug("AbstractSyncWebSocketClient.send LEAVE") 417 if ( 418 self._config.options.get("termination_exception_send") 419 == "true" 420 ): 421 raise 422 return True 423 self._logger.error("send() failed - ConnectionClosed: %s", str(e)) 424 self._logger.spam("AbstractSyncWebSocketClient.send LEAVE") 425 if self._config.options.get("termination_exception_send") is True: 426 raise 427 return False 428 except websockets.exceptions.WebSocketException as e: 429 self._logger.error("send() failed - WebSocketException: %s", str(e)) 430 self._logger.spam("AbstractSyncWebSocketClient.send LEAVE") 431 if self._config.options.get("termination_exception_send") is True: 432 raise 433 return False 434 except Exception as e: # pylint: disable=broad-except 435 self._logger.error("send() failed - Exception: %s", str(e)) 436 self._logger.spam("AbstractSyncWebSocketClient.send LEAVE") 437 if self._config.options.get("termination_exception_send") is True: 438 raise 439 return False 440 441 self._logger.spam("send() succeeded") 442 self._logger.spam("AbstractSyncWebSocketClient.send LEAVE") 443 return True 444 445 self._logger.spam("send() failed. socket is None") 446 self._logger.spam("AbstractSyncWebSocketClient.send LEAVE") 447 return False 448 449 # pylint: enable=too-many-return-statements,too-many-branches 450 451 def finish(self) -> bool: 452 """ 453 Closes the WebSocket connection gracefully. 454 """ 455 self._logger.spam("AbstractSyncWebSocketClient.finish ENTER") 456 457 # debug the threads 458 for thread in threading.enumerate(): 459 self._logger.debug("before running thread: %s", thread.name) 460 self._logger.debug("number of active threads: %s", threading.active_count()) 461 462 # signal exit 463 self._signal_exit() 464 465 # stop the threads 466 self._logger.verbose("cancelling tasks...") 467 if self._listen_thread is not None: 468 self._listen_thread.join() 469 self._listen_thread = None 470 self._logger.notice("listening thread joined") 471 472 # debug the threads 473 for thread in threading.enumerate(): 474 if thread is not None and thread.name is not None: 475 self._logger.debug("before running thread: %s", thread.name) 476 else: 477 self._logger.debug("after running thread: unknown_thread_name") 478 self._logger.debug("number of active threads: %s", threading.active_count()) 479 480 self._logger.notice("finish succeeded") 481 self._logger.spam("AbstractSyncWebSocketClient.finish LEAVE") 482 return True 483 484 # signals the WebSocket connection to exit 485 def _signal_exit(self) -> None: 486 # closes the WebSocket connection gracefully 487 self._logger.notice("closing socket...") 488 if self._socket is not None: 489 self._logger.notice("sending Close...") 490 try: 491 # if the socket connection is closed, the following line might throw an error 492 self._close_message() 493 except websockets.exceptions.ConnectionClosedOK as e: 494 self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) 495 except websockets.exceptions.ConnectionClosed as e: 496 self._logger.error("_signal_exit - ConnectionClosed: %s", e.code) 497 except websockets.exceptions.WebSocketException as e: 498 self._logger.error("_signal_exit - WebSocketException: %s", str(e)) 499 except Exception as e: # pylint: disable=broad-except 500 self._logger.error("_signal_exit - Exception: %s", str(e)) 501 502 # push close event 503 try: 504 self._emit( 505 WebSocketEvents(WebSocketEvents.Close), 506 CloseResponse(type=WebSocketEvents.Close), 507 **dict(cast(Dict[Any, Any], self._kwargs)), 508 ) 509 except Exception as e: # pylint: disable=broad-except 510 self._logger.error("_signal_exit - Exception: %s", e) 511 512 # wait for task to send 513 time.sleep(0.5) 514 515 # signal exit 516 self._exit_event.set() 517 518 # closes the WebSocket connection gracefully 519 self._logger.verbose("clean up socket...") 520 if self._socket is not None: 521 self._logger.verbose("socket.wait_closed...") 522 try: 523 self._socket.close() 524 except websockets.exceptions.WebSocketException as e: 525 self._logger.error("socket.wait_closed failed: %s", e) 526 527 self._socket = None
ONE_SECOND =
1
HALF_SECOND =
0.5
DEEPGRAM_INTERVAL =
5
PING_INTERVAL =
20
class
AbstractSyncWebSocketClient(abc.ABC):
36class AbstractSyncWebSocketClient(ABC): # pylint: disable=too-many-instance-attributes 37 """ 38 Abstract class for using WebSockets. 39 40 This class provides methods to establish a WebSocket connection generically for 41 use in all WebSocket clients. 42 43 Args: 44 config (DeepgramClientOptions): all the options for the client 45 endpoint (str): the endpoint to connect to 46 thread_cls (Type[threading.Thread]): optional thread class to use for creating threads, 47 defaults to threading.Thread. Useful for custom thread management like ContextVar support. 48 """ 49 50 _logger: verboselogs.VerboseLogger 51 _config: DeepgramClientOptions 52 _endpoint: str 53 _websocket_url: str 54 55 _socket: Optional[ClientConnection] = None 56 _exit_event: threading.Event 57 _lock_send: threading.Lock 58 59 _listen_thread: Union[threading.Thread, None] 60 _delegate: Optional[Speaker] = None 61 62 _thread_cls: Type[threading.Thread] 63 64 _kwargs: Optional[Dict] = None 65 _addons: Optional[Dict] = None 66 _options: Optional[Dict] = None 67 _headers: Optional[Dict] = None 68 69 def __init__( 70 self, 71 config: DeepgramClientOptions, 72 endpoint: str = "", 73 thread_cls: Type[threading.Thread] = threading.Thread, 74 ): 75 if config is None: 76 raise DeepgramError("Config is required") 77 if endpoint == "": 78 raise DeepgramError("endpoint is required") 79 80 self._logger = verboselogs.VerboseLogger(__name__) 81 self._logger.addHandler(logging.StreamHandler()) 82 self._logger.setLevel(config.verbose) 83 84 self._config = config 85 self._endpoint = endpoint 86 self._lock_send = threading.Lock() 87 88 self._listen_thread = None 89 90 self._thread_cls = thread_cls 91 92 # exit 93 self._exit_event = threading.Event() 94 95 # set websocket url 96 self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint) 97 98 def delegate_listening(self, delegate: Speaker) -> None: 99 """ 100 Delegate the listening thread to the main thread. 101 """ 102 self._delegate = delegate 103 104 # pylint: disable=too-many-statements,too-many-branches 105 def start( 106 self, 107 options: Optional[Any] = None, 108 addons: Optional[Dict] = None, 109 headers: Optional[Dict] = None, 110 **kwargs, 111 ) -> bool: 112 """ 113 Starts the WebSocket connection for live transcription. 114 """ 115 self._logger.debug("AbstractSyncWebSocketClient.start ENTER") 116 self._logger.info("addons: %s", addons) 117 self._logger.info("headers: %s", headers) 118 self._logger.info("kwargs: %s", kwargs) 119 120 self._addons = addons 121 self._headers = headers 122 123 # set kwargs 124 if kwargs is not None: 125 self._kwargs = kwargs 126 else: 127 self._kwargs = {} 128 129 if not isinstance(options, dict): 130 self._logger.error("options is not a dict") 131 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 132 return False 133 134 # set options 135 if options is not None: 136 self._options = options 137 else: 138 self._options = {} 139 140 combined_options = self._options.copy() 141 if self._addons is not None: 142 self._logger.info("merging addons to options") 143 combined_options.update(self._addons) 144 self._logger.info("new options: %s", combined_options) 145 self._logger.debug("combined_options: %s", combined_options) 146 147 combined_headers = self._config.headers.copy() 148 if self._headers is not None: 149 self._logger.info("merging headers to options") 150 combined_headers.update(self._headers) 151 self._logger.info("new headers: %s", combined_headers) 152 self._logger.debug("combined_headers: %s", combined_headers) 153 154 url_with_params = append_query_params(self._websocket_url, combined_options) 155 try: 156 self._socket = connect(url_with_params, additional_headers=combined_headers) 157 self._exit_event.clear() 158 159 # debug the threads 160 for thread in threading.enumerate(): 161 self._logger.debug("after running thread: %s", thread.name) 162 self._logger.debug("number of active threads: %s", threading.active_count()) 163 164 # delegate the listening thread to external object 165 if self._delegate is not None: 166 self._logger.notice("_delegate is enabled. this is usually the speaker") 167 self._delegate.set_pull_callback(self._socket.recv) 168 self._delegate.set_push_callback(self._process_message) 169 else: 170 self._logger.notice("create _listening thread") 171 self._listen_thread = self._thread_cls(target=self._listening) 172 self._listen_thread.start() 173 174 # debug the threads 175 for thread in threading.enumerate(): 176 self._logger.debug("after running thread: %s", thread.name) 177 self._logger.debug("number of active threads: %s", threading.active_count()) 178 179 # push open event 180 self._emit( 181 WebSocketEvents(WebSocketEvents.Open), 182 OpenResponse(type=WebSocketEvents.Open), 183 ) 184 185 self._logger.notice("start succeeded") 186 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 187 return True 188 except websockets.exceptions.ConnectionClosed as e: 189 self._logger.error( 190 "ConnectionClosed in AbstractSyncWebSocketClient.start: %s", e 191 ) 192 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 193 if self._config.options.get("termination_exception_connect", False): 194 raise e 195 return False 196 except websockets.exceptions.WebSocketException as e: 197 self._logger.error( 198 "WebSocketException in AbstractSyncWebSocketClient.start: %s", e 199 ) 200 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 201 if self._config.options.get("termination_exception_connect", False): 202 raise e 203 return False 204 except Exception as e: # pylint: disable=broad-except 205 self._logger.error( 206 "WebSocketException in AbstractSyncWebSocketClient.start: %s", e 207 ) 208 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 209 if self._config.options.get("termination_exception_connect", False): 210 raise e 211 return False 212 213 def is_connected(self) -> bool: 214 """ 215 Returns the connection status of the WebSocket. 216 """ 217 return self._socket is not None 218 219 # pylint: enable=too-many-statements,too-many-branches 220 221 @abstractmethod 222 def on(self, event: WebSocketEvents, handler: Callable) -> None: 223 """ 224 Registers an event handler for the WebSocket connection. 225 """ 226 raise NotImplementedError("no on method") 227 228 @abstractmethod 229 def _emit(self, event: WebSocketEvents, *args, **kwargs) -> None: 230 """ 231 Emits an event to the WebSocket connection. 232 """ 233 raise NotImplementedError("no _emit method") 234 235 # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches 236 def _listening( 237 self, 238 ) -> None: 239 """ 240 Listens for messages from the WebSocket connection. 241 """ 242 self._logger.debug("AbstractSyncWebSocketClient._listening ENTER") 243 244 while True: 245 try: 246 if self._exit_event.is_set(): 247 self._logger.notice("_listening exiting gracefully") 248 self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE") 249 return 250 251 if self._socket is None: 252 self._logger.warning("socket is empty") 253 self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE") 254 return 255 256 message = self._socket.recv() 257 258 if message is None: 259 self._logger.info("message is None") 260 continue 261 262 self._logger.spam("data type: %s", type(message)) 263 264 if isinstance(message, bytes): 265 self._logger.debug("Binary data received") 266 self._process_binary(message) 267 else: 268 self._logger.debug("Text data received") 269 self._process_text(message) 270 271 self._logger.notice("_listening Succeeded") 272 self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE") 273 274 except websockets.exceptions.ConnectionClosedOK as e: 275 # signal exit and close 276 self._signal_exit() 277 278 self._logger.notice(f"_listening({e.code}) exiting gracefully") 279 self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE") 280 return 281 282 except websockets.exceptions.ConnectionClosed as e: 283 if e.code in [1000, 1001]: 284 # signal exit and close 285 self._signal_exit() 286 287 self._logger.notice(f"_listening({e.code}) exiting gracefully") 288 self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE") 289 return 290 291 # we need to explicitly call self._signal_exit() here because we are hanging on a recv() 292 # note: this is different than the speak websocket client 293 self._logger.error( 294 "ConnectionClosed in AbstractSyncWebSocketClient._listening with code %s: %s", 295 e.code, 296 e.reason, 297 ) 298 cc_error: ErrorResponse = ErrorResponse( 299 "ConnectionClosed in AbstractSyncWebSocketClient._listening", 300 f"{e}", 301 "ConnectionClosed", 302 ) 303 self._emit( 304 WebSocketEvents(WebSocketEvents.Error), 305 cc_error, 306 **dict(cast(Dict[Any, Any], self._kwargs)), 307 ) 308 309 # signal exit and close 310 self._signal_exit() 311 312 self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE") 313 314 if self._config.options.get("termination_exception_connect") is True: 315 raise 316 return 317 318 except websockets.exceptions.WebSocketException as e: 319 self._logger.error( 320 "WebSocketException in AbstractSyncWebSocketClient._listening with: %s", 321 e, 322 ) 323 ws_error: ErrorResponse = ErrorResponse( 324 "WebSocketException in AbstractSyncWebSocketClient._listening", 325 f"{e}", 326 "WebSocketException", 327 ) 328 self._emit( 329 WebSocketEvents(WebSocketEvents.Error), 330 ws_error, 331 **dict(cast(Dict[Any, Any], self._kwargs)), 332 ) 333 334 # signal exit and close 335 self._signal_exit() 336 337 self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE") 338 339 if self._config.options.get("termination_exception_connect") is True: 340 raise 341 return 342 343 except Exception as e: # pylint: disable=broad-except 344 self._logger.error( 345 "Exception in AbstractSyncWebSocketClient._listening: %s", e 346 ) 347 e_error: ErrorResponse = ErrorResponse( 348 "Exception in AbstractSyncWebSocketClient._listening", 349 f"{e}", 350 "Exception", 351 ) 352 self._emit( 353 WebSocketEvents(WebSocketEvents.Error), 354 e_error, 355 **dict(cast(Dict[Any, Any], self._kwargs)), 356 ) 357 358 # signal exit and close 359 self._signal_exit() 360 361 self._logger.debug("AbstractSyncWebSocketClient._listening LEAVE") 362 363 if self._config.options.get("termination_exception_connect") is True: 364 raise 365 return 366 367 # pylint: enable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches 368 369 def _process_message(self, message: Union[str, bytes]) -> None: 370 if isinstance(message, bytes): 371 self._process_binary(message) 372 else: 373 self._process_text(message) 374 375 @abstractmethod 376 def _process_text(self, message: str) -> None: 377 raise NotImplementedError("no _process_text method") 378 379 @abstractmethod 380 def _process_binary(self, message: bytes) -> None: 381 raise NotImplementedError("no _process_binary method") 382 383 @abstractmethod 384 def _close_message(self) -> bool: 385 raise NotImplementedError("no _close_message method") 386 387 # pylint: disable=too-many-return-statements,too-many-branches 388 def send(self, data: Union[str, bytes]) -> bool: 389 """ 390 Sends data over the WebSocket connection. 391 """ 392 self._logger.spam("AbstractSyncWebSocketClient.send ENTER") 393 394 if self._exit_event.is_set(): 395 self._logger.notice("send exiting gracefully") 396 self._logger.debug("AbstractSyncWebSocketClient.send LEAVE") 397 return False 398 399 if not self.is_connected(): 400 self._logger.notice("is_connected is False") 401 self._logger.debug("AbstractSyncWebSocketClient.send LEAVE") 402 return False 403 404 if self._socket is not None: 405 with self._lock_send: 406 try: 407 self._socket.send(data) 408 except websockets.exceptions.ConnectionClosedOK as e: 409 self._logger.notice(f"send() exiting gracefully: {e.code}") 410 self._logger.debug("AbstractSyncWebSocketClient.send LEAVE") 411 if self._config.options.get("termination_exception_send") is True: 412 raise 413 return True 414 except websockets.exceptions.ConnectionClosed as e: 415 if e.code in [1000, 1001]: 416 self._logger.notice(f"send({e.code}) exiting gracefully") 417 self._logger.debug("AbstractSyncWebSocketClient.send LEAVE") 418 if ( 419 self._config.options.get("termination_exception_send") 420 == "true" 421 ): 422 raise 423 return True 424 self._logger.error("send() failed - ConnectionClosed: %s", str(e)) 425 self._logger.spam("AbstractSyncWebSocketClient.send LEAVE") 426 if self._config.options.get("termination_exception_send") is True: 427 raise 428 return False 429 except websockets.exceptions.WebSocketException as e: 430 self._logger.error("send() failed - WebSocketException: %s", str(e)) 431 self._logger.spam("AbstractSyncWebSocketClient.send LEAVE") 432 if self._config.options.get("termination_exception_send") is True: 433 raise 434 return False 435 except Exception as e: # pylint: disable=broad-except 436 self._logger.error("send() failed - Exception: %s", str(e)) 437 self._logger.spam("AbstractSyncWebSocketClient.send LEAVE") 438 if self._config.options.get("termination_exception_send") is True: 439 raise 440 return False 441 442 self._logger.spam("send() succeeded") 443 self._logger.spam("AbstractSyncWebSocketClient.send LEAVE") 444 return True 445 446 self._logger.spam("send() failed. socket is None") 447 self._logger.spam("AbstractSyncWebSocketClient.send LEAVE") 448 return False 449 450 # pylint: enable=too-many-return-statements,too-many-branches 451 452 def finish(self) -> bool: 453 """ 454 Closes the WebSocket connection gracefully. 455 """ 456 self._logger.spam("AbstractSyncWebSocketClient.finish ENTER") 457 458 # debug the threads 459 for thread in threading.enumerate(): 460 self._logger.debug("before running thread: %s", thread.name) 461 self._logger.debug("number of active threads: %s", threading.active_count()) 462 463 # signal exit 464 self._signal_exit() 465 466 # stop the threads 467 self._logger.verbose("cancelling tasks...") 468 if self._listen_thread is not None: 469 self._listen_thread.join() 470 self._listen_thread = None 471 self._logger.notice("listening thread joined") 472 473 # debug the threads 474 for thread in threading.enumerate(): 475 if thread is not None and thread.name is not None: 476 self._logger.debug("before running thread: %s", thread.name) 477 else: 478 self._logger.debug("after running thread: unknown_thread_name") 479 self._logger.debug("number of active threads: %s", threading.active_count()) 480 481 self._logger.notice("finish succeeded") 482 self._logger.spam("AbstractSyncWebSocketClient.finish LEAVE") 483 return True 484 485 # signals the WebSocket connection to exit 486 def _signal_exit(self) -> None: 487 # closes the WebSocket connection gracefully 488 self._logger.notice("closing socket...") 489 if self._socket is not None: 490 self._logger.notice("sending Close...") 491 try: 492 # if the socket connection is closed, the following line might throw an error 493 self._close_message() 494 except websockets.exceptions.ConnectionClosedOK as e: 495 self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) 496 except websockets.exceptions.ConnectionClosed as e: 497 self._logger.error("_signal_exit - ConnectionClosed: %s", e.code) 498 except websockets.exceptions.WebSocketException as e: 499 self._logger.error("_signal_exit - WebSocketException: %s", str(e)) 500 except Exception as e: # pylint: disable=broad-except 501 self._logger.error("_signal_exit - Exception: %s", str(e)) 502 503 # push close event 504 try: 505 self._emit( 506 WebSocketEvents(WebSocketEvents.Close), 507 CloseResponse(type=WebSocketEvents.Close), 508 **dict(cast(Dict[Any, Any], self._kwargs)), 509 ) 510 except Exception as e: # pylint: disable=broad-except 511 self._logger.error("_signal_exit - Exception: %s", e) 512 513 # wait for task to send 514 time.sleep(0.5) 515 516 # signal exit 517 self._exit_event.set() 518 519 # closes the WebSocket connection gracefully 520 self._logger.verbose("clean up socket...") 521 if self._socket is not None: 522 self._logger.verbose("socket.wait_closed...") 523 try: 524 self._socket.close() 525 except websockets.exceptions.WebSocketException as e: 526 self._logger.error("socket.wait_closed failed: %s", e) 527 528 self._socket = None
Abstract class for using WebSockets.
This class provides methods to establish a WebSocket connection generically for use in all WebSocket clients.
Args: config (DeepgramClientOptions): all the options for the client endpoint (str): the endpoint to connect to thread_cls (Type[threading.Thread]): optional thread class to use for creating threads, defaults to threading.Thread. Useful for custom thread management like ContextVar support.
98 def delegate_listening(self, delegate: Speaker) -> None: 99 """ 100 Delegate the listening thread to the main thread. 101 """ 102 self._delegate = delegate
Delegate the listening thread to the main thread.
def
start( self, options: Optional[Any] = None, addons: Optional[Dict] = None, headers: Optional[Dict] = None, **kwargs) -> bool:
105 def start( 106 self, 107 options: Optional[Any] = None, 108 addons: Optional[Dict] = None, 109 headers: Optional[Dict] = None, 110 **kwargs, 111 ) -> bool: 112 """ 113 Starts the WebSocket connection for live transcription. 114 """ 115 self._logger.debug("AbstractSyncWebSocketClient.start ENTER") 116 self._logger.info("addons: %s", addons) 117 self._logger.info("headers: %s", headers) 118 self._logger.info("kwargs: %s", kwargs) 119 120 self._addons = addons 121 self._headers = headers 122 123 # set kwargs 124 if kwargs is not None: 125 self._kwargs = kwargs 126 else: 127 self._kwargs = {} 128 129 if not isinstance(options, dict): 130 self._logger.error("options is not a dict") 131 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 132 return False 133 134 # set options 135 if options is not None: 136 self._options = options 137 else: 138 self._options = {} 139 140 combined_options = self._options.copy() 141 if self._addons is not None: 142 self._logger.info("merging addons to options") 143 combined_options.update(self._addons) 144 self._logger.info("new options: %s", combined_options) 145 self._logger.debug("combined_options: %s", combined_options) 146 147 combined_headers = self._config.headers.copy() 148 if self._headers is not None: 149 self._logger.info("merging headers to options") 150 combined_headers.update(self._headers) 151 self._logger.info("new headers: %s", combined_headers) 152 self._logger.debug("combined_headers: %s", combined_headers) 153 154 url_with_params = append_query_params(self._websocket_url, combined_options) 155 try: 156 self._socket = connect(url_with_params, additional_headers=combined_headers) 157 self._exit_event.clear() 158 159 # debug the threads 160 for thread in threading.enumerate(): 161 self._logger.debug("after running thread: %s", thread.name) 162 self._logger.debug("number of active threads: %s", threading.active_count()) 163 164 # delegate the listening thread to external object 165 if self._delegate is not None: 166 self._logger.notice("_delegate is enabled. this is usually the speaker") 167 self._delegate.set_pull_callback(self._socket.recv) 168 self._delegate.set_push_callback(self._process_message) 169 else: 170 self._logger.notice("create _listening thread") 171 self._listen_thread = self._thread_cls(target=self._listening) 172 self._listen_thread.start() 173 174 # debug the threads 175 for thread in threading.enumerate(): 176 self._logger.debug("after running thread: %s", thread.name) 177 self._logger.debug("number of active threads: %s", threading.active_count()) 178 179 # push open event 180 self._emit( 181 WebSocketEvents(WebSocketEvents.Open), 182 OpenResponse(type=WebSocketEvents.Open), 183 ) 184 185 self._logger.notice("start succeeded") 186 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 187 return True 188 except websockets.exceptions.ConnectionClosed as e: 189 self._logger.error( 190 "ConnectionClosed in AbstractSyncWebSocketClient.start: %s", e 191 ) 192 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 193 if self._config.options.get("termination_exception_connect", False): 194 raise e 195 return False 196 except websockets.exceptions.WebSocketException as e: 197 self._logger.error( 198 "WebSocketException in AbstractSyncWebSocketClient.start: %s", e 199 ) 200 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 201 if self._config.options.get("termination_exception_connect", False): 202 raise e 203 return False 204 except Exception as e: # pylint: disable=broad-except 205 self._logger.error( 206 "WebSocketException in AbstractSyncWebSocketClient.start: %s", e 207 ) 208 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 209 if self._config.options.get("termination_exception_connect", False): 210 raise e 211 return False
Starts the WebSocket connection for live transcription.
def
is_connected(self) -> bool:
213 def is_connected(self) -> bool: 214 """ 215 Returns the connection status of the WebSocket. 216 """ 217 return self._socket is not None
Returns the connection status of the WebSocket.
@abstractmethod
def
on( self, event: deepgram.clients.common.v1.websocket_events.WebSocketEvents, handler: Callable) -> None:
221 @abstractmethod 222 def on(self, event: WebSocketEvents, handler: Callable) -> None: 223 """ 224 Registers an event handler for the WebSocket connection. 225 """ 226 raise NotImplementedError("no on method")
Registers an event handler for the WebSocket connection.
def
send(self, data: Union[str, bytes]) -> bool:
388 def send(self, data: Union[str, bytes]) -> bool: 389 """ 390 Sends data over the WebSocket connection. 391 """ 392 self._logger.spam("AbstractSyncWebSocketClient.send ENTER") 393 394 if self._exit_event.is_set(): 395 self._logger.notice("send exiting gracefully") 396 self._logger.debug("AbstractSyncWebSocketClient.send LEAVE") 397 return False 398 399 if not self.is_connected(): 400 self._logger.notice("is_connected is False") 401 self._logger.debug("AbstractSyncWebSocketClient.send LEAVE") 402 return False 403 404 if self._socket is not None: 405 with self._lock_send: 406 try: 407 self._socket.send(data) 408 except websockets.exceptions.ConnectionClosedOK as e: 409 self._logger.notice(f"send() exiting gracefully: {e.code}") 410 self._logger.debug("AbstractSyncWebSocketClient.send LEAVE") 411 if self._config.options.get("termination_exception_send") is True: 412 raise 413 return True 414 except websockets.exceptions.ConnectionClosed as e: 415 if e.code in [1000, 1001]: 416 self._logger.notice(f"send({e.code}) exiting gracefully") 417 self._logger.debug("AbstractSyncWebSocketClient.send LEAVE") 418 if ( 419 self._config.options.get("termination_exception_send") 420 == "true" 421 ): 422 raise 423 return True 424 self._logger.error("send() failed - ConnectionClosed: %s", str(e)) 425 self._logger.spam("AbstractSyncWebSocketClient.send LEAVE") 426 if self._config.options.get("termination_exception_send") is True: 427 raise 428 return False 429 except websockets.exceptions.WebSocketException as e: 430 self._logger.error("send() failed - WebSocketException: %s", str(e)) 431 self._logger.spam("AbstractSyncWebSocketClient.send LEAVE") 432 if self._config.options.get("termination_exception_send") is True: 433 raise 434 return False 435 except Exception as e: # pylint: disable=broad-except 436 self._logger.error("send() failed - Exception: %s", str(e)) 437 self._logger.spam("AbstractSyncWebSocketClient.send LEAVE") 438 if self._config.options.get("termination_exception_send") is True: 439 raise 440 return False 441 442 self._logger.spam("send() succeeded") 443 self._logger.spam("AbstractSyncWebSocketClient.send LEAVE") 444 return True 445 446 self._logger.spam("send() failed. socket is None") 447 self._logger.spam("AbstractSyncWebSocketClient.send LEAVE") 448 return False
Sends data over the WebSocket connection.
def
finish(self) -> bool:
452 def finish(self) -> bool: 453 """ 454 Closes the WebSocket connection gracefully. 455 """ 456 self._logger.spam("AbstractSyncWebSocketClient.finish ENTER") 457 458 # debug the threads 459 for thread in threading.enumerate(): 460 self._logger.debug("before running thread: %s", thread.name) 461 self._logger.debug("number of active threads: %s", threading.active_count()) 462 463 # signal exit 464 self._signal_exit() 465 466 # stop the threads 467 self._logger.verbose("cancelling tasks...") 468 if self._listen_thread is not None: 469 self._listen_thread.join() 470 self._listen_thread = None 471 self._logger.notice("listening thread joined") 472 473 # debug the threads 474 for thread in threading.enumerate(): 475 if thread is not None and thread.name is not None: 476 self._logger.debug("before running thread: %s", thread.name) 477 else: 478 self._logger.debug("after running thread: unknown_thread_name") 479 self._logger.debug("number of active threads: %s", threading.active_count()) 480 481 self._logger.notice("finish succeeded") 482 self._logger.spam("AbstractSyncWebSocketClient.finish LEAVE") 483 return True
Closes the WebSocket connection gracefully.