deepgram.clients.common.v1.abstract_async_websocket
1# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. 2# Use of this source code is governed by a MIT license that can be found in the LICENSE file. 3# SPDX-License-Identifier: MIT 4import asyncio 5import json 6import logging 7from typing import Dict, Union, Optional, cast, Any, Callable 8from datetime import datetime 9import threading 10from abc import ABC, abstractmethod 11 12import websockets 13 14try: 15 # Websockets versions >= 13 16 from websockets.asyncio.client import connect, ClientConnection 17 18 WS_ADDITIONAL_HEADERS_KEY = "additional_headers" 19except ImportError: 20 # Backward compatibility with websockets versions 12 21 from websockets.legacy.client import ( # type: ignore 22 connect, 23 WebSocketClientProtocol as ClientConnection, 24 ) 25 26 WS_ADDITIONAL_HEADERS_KEY = "extra_headers" 27 28from ....audio import Speaker 29from ....utils import verboselogs 30from ....options import DeepgramClientOptions 31from .helpers import convert_to_websocket_url, append_query_params 32from .errors import DeepgramError 33 34from .websocket_response import ( 35 OpenResponse, 36 CloseResponse, 37 ErrorResponse, 38) 39from .websocket_events import WebSocketEvents 40 41 42ONE_SECOND = 1 43HALF_SECOND = 0.5 44DEEPGRAM_INTERVAL = 5 45PING_INTERVAL = 20 46 47 48class AbstractAsyncWebSocketClient(ABC): # pylint: disable=too-many-instance-attributes 49 """ 50 Abstract class for using WebSockets. 51 52 This class provides methods to establish a WebSocket connection generically for 53 use in all WebSocket clients. 54 """ 55 56 _logger: verboselogs.VerboseLogger 57 _config: DeepgramClientOptions 58 _endpoint: str 59 _websocket_url: str 60 61 _socket: Optional[ClientConnection] = None 62 63 _listen_thread: Union[asyncio.Task, None] 64 _delegate: Optional[Speaker] = None 65 66 _kwargs: Optional[Dict] = None 67 _addons: Optional[Dict] = None 68 _options: Optional[Dict] = None 69 _headers: Optional[Dict] = None 70 71 def __init__(self, config: DeepgramClientOptions, endpoint: str = ""): 72 if config is None: 73 raise DeepgramError("Config is required") 74 if endpoint == "": 75 raise DeepgramError("endpoint is required") 76 77 self._logger = verboselogs.VerboseLogger(__name__) 78 self._logger.addHandler(logging.StreamHandler()) 79 self._logger.setLevel(config.verbose) 80 81 self._config = config 82 self._endpoint = endpoint 83 84 self._listen_thread = None 85 86 # events 87 self._exit_event = asyncio.Event() 88 89 # set websocket url 90 self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint) 91 92 def delegate_listening(self, delegate: Speaker) -> None: 93 """ 94 Delegate the listening thread to the Speaker object. 95 """ 96 self._delegate = delegate 97 98 # pylint: disable=too-many-branches,too-many-statements 99 async def start( 100 self, 101 options: Optional[Any] = None, 102 addons: Optional[Dict] = None, 103 headers: Optional[Dict] = None, 104 **kwargs, 105 ) -> bool: 106 """ 107 Starts the WebSocket connection for live transcription. 108 """ 109 self._logger.debug("AbstractAsyncWebSocketClient.start ENTER") 110 self._logger.info("addons: %s", addons) 111 self._logger.info("headers: %s", headers) 112 self._logger.info("kwargs: %s", kwargs) 113 114 self._addons = addons 115 self._headers = headers 116 117 # set kwargs 118 if kwargs is not None: 119 self._kwargs = kwargs 120 else: 121 self._kwargs = {} 122 123 if not isinstance(options, dict): 124 self._logger.error("options is not a dict") 125 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 126 return False 127 128 # set options 129 if options is not None: 130 self._options = options 131 else: 132 self._options = {} 133 134 combined_options = self._options.copy() 135 if self._addons is not None: 136 self._logger.info("merging addons to options") 137 combined_options.update(self._addons) 138 self._logger.info("new options: %s", combined_options) 139 self._logger.debug("combined_options: %s", combined_options) 140 141 combined_headers = self._config.headers.copy() 142 if self._headers is not None: 143 self._logger.info("merging headers to options") 144 combined_headers.update(self._headers) 145 self._logger.info("new headers: %s", combined_headers) 146 self._logger.debug("combined_headers: %s", combined_headers) 147 148 url_with_params = append_query_params(self._websocket_url, combined_options) 149 150 try: 151 ws_connect_kwargs: Dict = { 152 "ping_interval": PING_INTERVAL, 153 WS_ADDITIONAL_HEADERS_KEY: combined_headers, 154 } 155 156 self._socket = await connect( 157 url_with_params, 158 **ws_connect_kwargs, 159 ) 160 self._exit_event.clear() 161 162 # debug the threads 163 for thread in threading.enumerate(): 164 self._logger.debug("after running thread: %s", thread.name) 165 self._logger.debug("number of active threads: %s", threading.active_count()) 166 167 # delegate the listening thread to external object 168 if self._delegate is not None: 169 self._logger.notice("_delegate is enabled. this is usually the speaker") 170 self._delegate.set_pull_callback(self._socket.recv) 171 self._delegate.set_push_callback(self._process_message) 172 else: 173 self._logger.notice("create _listening thread") 174 self._listen_thread = asyncio.create_task(self._listening()) 175 176 # debug the threads 177 for thread in threading.enumerate(): 178 self._logger.debug("after running thread: %s", thread.name) 179 self._logger.debug("number of active threads: %s", threading.active_count()) 180 181 # push open event 182 await self._emit( 183 WebSocketEvents(WebSocketEvents.Open), 184 OpenResponse(type=WebSocketEvents.Open), 185 ) 186 187 self._logger.notice("start succeeded") 188 self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE") 189 return True 190 except websockets.exceptions.ConnectionClosed as e: 191 self._logger.error( 192 "ConnectionClosed in AbstractAsyncWebSocketClient.start: %s", e 193 ) 194 self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE") 195 if self._config.options.get("termination_exception_connect", False): 196 raise 197 return False 198 except websockets.exceptions.WebSocketException as e: 199 self._logger.error( 200 "WebSocketException in AbstractAsyncWebSocketClient.start: %s", e 201 ) 202 self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE") 203 if self._config.options.get("termination_exception_connect", False): 204 raise 205 return False 206 except Exception as e: # pylint: disable=broad-except 207 self._logger.error( 208 "WebSocketException in AbstractAsyncWebSocketClient.start: %s", e 209 ) 210 self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE") 211 if self._config.options.get("termination_exception_connect", False): 212 raise 213 return False 214 215 async def is_connected(self) -> bool: 216 """ 217 Returns the connection status of the WebSocket. 218 """ 219 return self._socket is not None 220 221 # pylint: enable=too-many-branches,too-many-statements 222 223 @abstractmethod 224 def on(self, event: WebSocketEvents, handler: Callable) -> None: 225 """ 226 Registers an event handler for the WebSocket connection. 227 """ 228 raise NotImplementedError("no on method") 229 230 @abstractmethod 231 async def _emit(self, event: WebSocketEvents, *args, **kwargs) -> None: 232 """ 233 Emits an event to the WebSocket connection. 234 """ 235 raise NotImplementedError("no _emit method") 236 237 # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches 238 async def _listening(self) -> None: 239 """ 240 Listens for messages from the WebSocket connection. 241 """ 242 self._logger.debug("AbstractAsyncWebSocketClient._listening ENTER") 243 244 while True: 245 try: 246 if self._exit_event.is_set(): 247 self._logger.notice("_listening exiting gracefully") 248 self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE") 249 return 250 251 if self._socket is None: 252 self._logger.warning("socket is empty") 253 self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE") 254 return 255 256 message = await self._socket.recv() 257 258 if message is None: 259 self._logger.info("message is None") 260 continue 261 262 self._logger.spam("data type: %s", type(message)) 263 264 if isinstance(message, bytes): 265 self._logger.debug("Binary data received") 266 await self._process_binary(message) 267 else: 268 self._logger.debug("Text data received") 269 await self._process_text(message) 270 271 self._logger.notice("_listening Succeeded") 272 self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE") 273 274 except websockets.exceptions.ConnectionClosedOK as e: 275 # signal exit and close 276 await self._signal_exit() 277 278 self._logger.notice(f"_listening({e.code}) exiting gracefully") 279 self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE") 280 return 281 282 except websockets.exceptions.ConnectionClosed as e: 283 if e.code in [1000, 1001]: 284 # signal exit and close 285 await self._signal_exit() 286 287 self._logger.notice(f"_listening({e.code}) exiting gracefully") 288 self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE") 289 return 290 291 # we need to explicitly call self._signal_exit() here because we are hanging on a recv() 292 # note: this is different than the speak websocket client 293 self._logger.error( 294 "ConnectionClosed in AbstractAsyncWebSocketClient._listening with code %s: %s", 295 e.code, 296 e.reason, 297 ) 298 cc_error: ErrorResponse = ErrorResponse( 299 "ConnectionClosed in AbstractAsyncWebSocketClient._listening", 300 f"{e}", 301 "ConnectionClosed", 302 ) 303 await self._emit( 304 WebSocketEvents(WebSocketEvents.Error), 305 error=cc_error, 306 **dict(cast(Dict[Any, Any], self._kwargs)), 307 ) 308 309 # signal exit and close 310 await self._signal_exit() 311 312 self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE") 313 314 if self._config.options.get("termination_exception_connect") is True: 315 raise 316 return 317 318 except websockets.exceptions.WebSocketException as e: 319 self._logger.error( 320 "WebSocketException in AbstractAsyncWebSocketClient._listening: %s", 321 e, 322 ) 323 ws_error: ErrorResponse = ErrorResponse( 324 "WebSocketException in AbstractAsyncWebSocketClient._listening", 325 f"{e}", 326 "WebSocketException", 327 ) 328 await self._emit( 329 WebSocketEvents(WebSocketEvents.Error), 330 error=ws_error, 331 **dict(cast(Dict[Any, Any], self._kwargs)), 332 ) 333 334 # signal exit and close 335 await self._signal_exit() 336 337 self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE") 338 339 if self._config.options.get("termination_exception_connect") is True: 340 raise 341 return 342 343 except Exception as e: # pylint: disable=broad-except 344 self._logger.error( 345 "Exception in AbstractAsyncWebSocketClient._listening: %s", e 346 ) 347 e_error: ErrorResponse = ErrorResponse( 348 "Exception in AbstractAsyncWebSocketClient._listening", 349 f"{e}", 350 "Exception", 351 ) 352 await self._emit( 353 WebSocketEvents(WebSocketEvents.Error), 354 error=e_error, 355 **dict(cast(Dict[Any, Any], self._kwargs)), 356 ) 357 358 # signal exit and close 359 await self._signal_exit() 360 361 self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE") 362 363 if self._config.options.get("termination_exception_connect") is True: 364 raise 365 return 366 367 # pylint: enable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches 368 369 async def _process_message(self, message: Union[str, bytes]) -> None: 370 if isinstance(message, bytes): 371 await self._process_binary(message) 372 else: 373 await self._process_text(message) 374 375 @abstractmethod 376 async def _process_text(self, message: str) -> None: 377 raise NotImplementedError("no _process_text method") 378 379 @abstractmethod 380 async def _process_binary(self, message: bytes) -> None: 381 raise NotImplementedError("no _process_binary method") 382 383 @abstractmethod 384 async def _close_message(self) -> bool: 385 raise NotImplementedError("no _close_message method") 386 387 # pylint: disable=too-many-return-statements,too-many-branches 388 389 async def send(self, data: Union[str, bytes]) -> bool: 390 """ 391 Sends data over the WebSocket connection. 392 """ 393 self._logger.spam("AbstractAsyncWebSocketClient.send ENTER") 394 395 if self._exit_event.is_set(): 396 self._logger.notice("send exiting gracefully") 397 self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE") 398 return False 399 400 if not await self.is_connected(): 401 self._logger.notice("is_connected is False") 402 self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE") 403 return False 404 405 if self._socket is not None: 406 try: 407 await self._socket.send(data) 408 except websockets.exceptions.ConnectionClosedOK as e: 409 self._logger.notice(f"send() exiting gracefully: {e.code}") 410 self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE") 411 if self._config.options.get("termination_exception_send") is True: 412 raise 413 return True 414 except websockets.exceptions.ConnectionClosed as e: 415 if e.code in [1000, 1001]: 416 self._logger.notice(f"send({e.code}) exiting gracefully") 417 self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE") 418 if self._config.options.get("termination_exception_send") is True: 419 raise 420 return True 421 422 self._logger.error("send() failed - ConnectionClosed: %s", str(e)) 423 self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE") 424 if self._config.options.get("termination_exception_send") is True: 425 raise 426 return False 427 except websockets.exceptions.WebSocketException as e: 428 self._logger.error("send() failed - WebSocketException: %s", str(e)) 429 self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE") 430 if self._config.options.get("termination_exception_send") is True: 431 raise 432 return False 433 except Exception as e: # pylint: disable=broad-except 434 self._logger.error("send() failed - Exception: %s", str(e)) 435 self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE") 436 if self._config.options.get("termination_exception_send") is True: 437 raise 438 return False 439 440 self._logger.spam("send() succeeded") 441 self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE") 442 return True 443 444 self._logger.spam("send() failed. socket is None") 445 self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE") 446 return False 447 448 # pylint: enable=too-many-return-statements,too-many-branches 449 450 async def finish(self) -> bool: 451 """ 452 Closes the WebSocket connection gracefully. 453 """ 454 self._logger.debug("AbstractAsyncWebSocketClient.finish ENTER") 455 456 # signal exit 457 await self._signal_exit() 458 459 # stop the threads 460 self._logger.verbose("cancelling tasks...") 461 try: 462 # Before cancelling, check if the tasks were created 463 # debug the threads 464 for thread in threading.enumerate(): 465 self._logger.debug("before running thread: %s", thread.name) 466 self._logger.debug("number of active threads: %s", threading.active_count()) 467 468 tasks = [] 469 if self._listen_thread is not None: 470 self._listen_thread.cancel() 471 tasks.append(self._listen_thread) 472 self._logger.notice("processing _listen_thread cancel...") 473 474 # Use asyncio.gather to wait for tasks to be cancelled 475 await asyncio.gather(*filter(None, tasks)) 476 self._logger.notice("threads joined") 477 478 # debug the threads 479 for thread in threading.enumerate(): 480 if thread is not None and thread.name is not None: 481 self._logger.debug("after running thread: %s", thread.name) 482 else: 483 self._logger.debug("after running thread: unknown_thread_name") 484 self._logger.debug("number of active threads: %s", threading.active_count()) 485 486 self._logger.notice("finish succeeded") 487 self._logger.spam("AbstractAsyncWebSocketClient.finish LEAVE") 488 return True 489 490 except asyncio.CancelledError as e: 491 self._logger.error("tasks cancelled error: %s", e) 492 self._logger.debug("AbstractAsyncWebSocketClient.finish LEAVE") 493 return True 494 495 async def _signal_exit(self) -> None: 496 # send close event 497 self._logger.verbose("closing socket...") 498 if self._socket is not None: 499 self._logger.verbose("send Close...") 500 try: 501 # if the socket connection is closed, the following line might throw an error 502 await self._close_message() 503 except websockets.exceptions.ConnectionClosedOK as e: 504 self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) 505 except websockets.exceptions.ConnectionClosed as e: 506 self._logger.error("_signal_exit - ConnectionClosed: %s", e.code) 507 except websockets.exceptions.WebSocketException as e: 508 self._logger.error("_signal_exit - WebSocketException: %s", str(e)) 509 except Exception as e: # pylint: disable=broad-except 510 self._logger.error("_signal_exit - Exception: %s", str(e)) 511 512 # push close event 513 try: 514 await self._emit( 515 WebSocketEvents(WebSocketEvents.Close), 516 close=CloseResponse(type=WebSocketEvents.Close), 517 **dict(cast(Dict[Any, Any], self._kwargs)), 518 ) 519 except Exception as e: # pylint: disable=broad-except 520 self._logger.error("_emit - Exception: %s", e) 521 522 # wait for task to send 523 await asyncio.sleep(0.5) 524 525 # signal exit 526 self._exit_event.set() 527 528 # closes the WebSocket connection gracefully 529 self._logger.verbose("clean up socket...") 530 if self._socket is not None: 531 self._logger.verbose("socket.wait_closed...") 532 try: 533 await self._socket.close() 534 except websockets.exceptions.WebSocketException as e: 535 self._logger.error("socket.wait_closed failed: %s", e) 536 537 self._socket = None
ONE_SECOND =
1
HALF_SECOND =
0.5
DEEPGRAM_INTERVAL =
5
PING_INTERVAL =
20
class
AbstractAsyncWebSocketClient(abc.ABC):
49class AbstractAsyncWebSocketClient(ABC): # pylint: disable=too-many-instance-attributes 50 """ 51 Abstract class for using WebSockets. 52 53 This class provides methods to establish a WebSocket connection generically for 54 use in all WebSocket clients. 55 """ 56 57 _logger: verboselogs.VerboseLogger 58 _config: DeepgramClientOptions 59 _endpoint: str 60 _websocket_url: str 61 62 _socket: Optional[ClientConnection] = None 63 64 _listen_thread: Union[asyncio.Task, None] 65 _delegate: Optional[Speaker] = None 66 67 _kwargs: Optional[Dict] = None 68 _addons: Optional[Dict] = None 69 _options: Optional[Dict] = None 70 _headers: Optional[Dict] = None 71 72 def __init__(self, config: DeepgramClientOptions, endpoint: str = ""): 73 if config is None: 74 raise DeepgramError("Config is required") 75 if endpoint == "": 76 raise DeepgramError("endpoint is required") 77 78 self._logger = verboselogs.VerboseLogger(__name__) 79 self._logger.addHandler(logging.StreamHandler()) 80 self._logger.setLevel(config.verbose) 81 82 self._config = config 83 self._endpoint = endpoint 84 85 self._listen_thread = None 86 87 # events 88 self._exit_event = asyncio.Event() 89 90 # set websocket url 91 self._websocket_url = convert_to_websocket_url(self._config.url, self._endpoint) 92 93 def delegate_listening(self, delegate: Speaker) -> None: 94 """ 95 Delegate the listening thread to the Speaker object. 96 """ 97 self._delegate = delegate 98 99 # pylint: disable=too-many-branches,too-many-statements 100 async def start( 101 self, 102 options: Optional[Any] = None, 103 addons: Optional[Dict] = None, 104 headers: Optional[Dict] = None, 105 **kwargs, 106 ) -> bool: 107 """ 108 Starts the WebSocket connection for live transcription. 109 """ 110 self._logger.debug("AbstractAsyncWebSocketClient.start ENTER") 111 self._logger.info("addons: %s", addons) 112 self._logger.info("headers: %s", headers) 113 self._logger.info("kwargs: %s", kwargs) 114 115 self._addons = addons 116 self._headers = headers 117 118 # set kwargs 119 if kwargs is not None: 120 self._kwargs = kwargs 121 else: 122 self._kwargs = {} 123 124 if not isinstance(options, dict): 125 self._logger.error("options is not a dict") 126 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 127 return False 128 129 # set options 130 if options is not None: 131 self._options = options 132 else: 133 self._options = {} 134 135 combined_options = self._options.copy() 136 if self._addons is not None: 137 self._logger.info("merging addons to options") 138 combined_options.update(self._addons) 139 self._logger.info("new options: %s", combined_options) 140 self._logger.debug("combined_options: %s", combined_options) 141 142 combined_headers = self._config.headers.copy() 143 if self._headers is not None: 144 self._logger.info("merging headers to options") 145 combined_headers.update(self._headers) 146 self._logger.info("new headers: %s", combined_headers) 147 self._logger.debug("combined_headers: %s", combined_headers) 148 149 url_with_params = append_query_params(self._websocket_url, combined_options) 150 151 try: 152 ws_connect_kwargs: Dict = { 153 "ping_interval": PING_INTERVAL, 154 WS_ADDITIONAL_HEADERS_KEY: combined_headers, 155 } 156 157 self._socket = await connect( 158 url_with_params, 159 **ws_connect_kwargs, 160 ) 161 self._exit_event.clear() 162 163 # debug the threads 164 for thread in threading.enumerate(): 165 self._logger.debug("after running thread: %s", thread.name) 166 self._logger.debug("number of active threads: %s", threading.active_count()) 167 168 # delegate the listening thread to external object 169 if self._delegate is not None: 170 self._logger.notice("_delegate is enabled. this is usually the speaker") 171 self._delegate.set_pull_callback(self._socket.recv) 172 self._delegate.set_push_callback(self._process_message) 173 else: 174 self._logger.notice("create _listening thread") 175 self._listen_thread = asyncio.create_task(self._listening()) 176 177 # debug the threads 178 for thread in threading.enumerate(): 179 self._logger.debug("after running thread: %s", thread.name) 180 self._logger.debug("number of active threads: %s", threading.active_count()) 181 182 # push open event 183 await self._emit( 184 WebSocketEvents(WebSocketEvents.Open), 185 OpenResponse(type=WebSocketEvents.Open), 186 ) 187 188 self._logger.notice("start succeeded") 189 self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE") 190 return True 191 except websockets.exceptions.ConnectionClosed as e: 192 self._logger.error( 193 "ConnectionClosed in AbstractAsyncWebSocketClient.start: %s", e 194 ) 195 self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE") 196 if self._config.options.get("termination_exception_connect", False): 197 raise 198 return False 199 except websockets.exceptions.WebSocketException as e: 200 self._logger.error( 201 "WebSocketException in AbstractAsyncWebSocketClient.start: %s", e 202 ) 203 self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE") 204 if self._config.options.get("termination_exception_connect", False): 205 raise 206 return False 207 except Exception as e: # pylint: disable=broad-except 208 self._logger.error( 209 "WebSocketException in AbstractAsyncWebSocketClient.start: %s", e 210 ) 211 self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE") 212 if self._config.options.get("termination_exception_connect", False): 213 raise 214 return False 215 216 async def is_connected(self) -> bool: 217 """ 218 Returns the connection status of the WebSocket. 219 """ 220 return self._socket is not None 221 222 # pylint: enable=too-many-branches,too-many-statements 223 224 @abstractmethod 225 def on(self, event: WebSocketEvents, handler: Callable) -> None: 226 """ 227 Registers an event handler for the WebSocket connection. 228 """ 229 raise NotImplementedError("no on method") 230 231 @abstractmethod 232 async def _emit(self, event: WebSocketEvents, *args, **kwargs) -> None: 233 """ 234 Emits an event to the WebSocket connection. 235 """ 236 raise NotImplementedError("no _emit method") 237 238 # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches 239 async def _listening(self) -> None: 240 """ 241 Listens for messages from the WebSocket connection. 242 """ 243 self._logger.debug("AbstractAsyncWebSocketClient._listening ENTER") 244 245 while True: 246 try: 247 if self._exit_event.is_set(): 248 self._logger.notice("_listening exiting gracefully") 249 self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE") 250 return 251 252 if self._socket is None: 253 self._logger.warning("socket is empty") 254 self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE") 255 return 256 257 message = await self._socket.recv() 258 259 if message is None: 260 self._logger.info("message is None") 261 continue 262 263 self._logger.spam("data type: %s", type(message)) 264 265 if isinstance(message, bytes): 266 self._logger.debug("Binary data received") 267 await self._process_binary(message) 268 else: 269 self._logger.debug("Text data received") 270 await self._process_text(message) 271 272 self._logger.notice("_listening Succeeded") 273 self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE") 274 275 except websockets.exceptions.ConnectionClosedOK as e: 276 # signal exit and close 277 await self._signal_exit() 278 279 self._logger.notice(f"_listening({e.code}) exiting gracefully") 280 self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE") 281 return 282 283 except websockets.exceptions.ConnectionClosed as e: 284 if e.code in [1000, 1001]: 285 # signal exit and close 286 await self._signal_exit() 287 288 self._logger.notice(f"_listening({e.code}) exiting gracefully") 289 self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE") 290 return 291 292 # we need to explicitly call self._signal_exit() here because we are hanging on a recv() 293 # note: this is different than the speak websocket client 294 self._logger.error( 295 "ConnectionClosed in AbstractAsyncWebSocketClient._listening with code %s: %s", 296 e.code, 297 e.reason, 298 ) 299 cc_error: ErrorResponse = ErrorResponse( 300 "ConnectionClosed in AbstractAsyncWebSocketClient._listening", 301 f"{e}", 302 "ConnectionClosed", 303 ) 304 await self._emit( 305 WebSocketEvents(WebSocketEvents.Error), 306 error=cc_error, 307 **dict(cast(Dict[Any, Any], self._kwargs)), 308 ) 309 310 # signal exit and close 311 await self._signal_exit() 312 313 self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE") 314 315 if self._config.options.get("termination_exception_connect") is True: 316 raise 317 return 318 319 except websockets.exceptions.WebSocketException as e: 320 self._logger.error( 321 "WebSocketException in AbstractAsyncWebSocketClient._listening: %s", 322 e, 323 ) 324 ws_error: ErrorResponse = ErrorResponse( 325 "WebSocketException in AbstractAsyncWebSocketClient._listening", 326 f"{e}", 327 "WebSocketException", 328 ) 329 await self._emit( 330 WebSocketEvents(WebSocketEvents.Error), 331 error=ws_error, 332 **dict(cast(Dict[Any, Any], self._kwargs)), 333 ) 334 335 # signal exit and close 336 await self._signal_exit() 337 338 self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE") 339 340 if self._config.options.get("termination_exception_connect") is True: 341 raise 342 return 343 344 except Exception as e: # pylint: disable=broad-except 345 self._logger.error( 346 "Exception in AbstractAsyncWebSocketClient._listening: %s", e 347 ) 348 e_error: ErrorResponse = ErrorResponse( 349 "Exception in AbstractAsyncWebSocketClient._listening", 350 f"{e}", 351 "Exception", 352 ) 353 await self._emit( 354 WebSocketEvents(WebSocketEvents.Error), 355 error=e_error, 356 **dict(cast(Dict[Any, Any], self._kwargs)), 357 ) 358 359 # signal exit and close 360 await self._signal_exit() 361 362 self._logger.debug("AbstractAsyncWebSocketClient._listening LEAVE") 363 364 if self._config.options.get("termination_exception_connect") is True: 365 raise 366 return 367 368 # pylint: enable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches 369 370 async def _process_message(self, message: Union[str, bytes]) -> None: 371 if isinstance(message, bytes): 372 await self._process_binary(message) 373 else: 374 await self._process_text(message) 375 376 @abstractmethod 377 async def _process_text(self, message: str) -> None: 378 raise NotImplementedError("no _process_text method") 379 380 @abstractmethod 381 async def _process_binary(self, message: bytes) -> None: 382 raise NotImplementedError("no _process_binary method") 383 384 @abstractmethod 385 async def _close_message(self) -> bool: 386 raise NotImplementedError("no _close_message method") 387 388 # pylint: disable=too-many-return-statements,too-many-branches 389 390 async def send(self, data: Union[str, bytes]) -> bool: 391 """ 392 Sends data over the WebSocket connection. 393 """ 394 self._logger.spam("AbstractAsyncWebSocketClient.send ENTER") 395 396 if self._exit_event.is_set(): 397 self._logger.notice("send exiting gracefully") 398 self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE") 399 return False 400 401 if not await self.is_connected(): 402 self._logger.notice("is_connected is False") 403 self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE") 404 return False 405 406 if self._socket is not None: 407 try: 408 await self._socket.send(data) 409 except websockets.exceptions.ConnectionClosedOK as e: 410 self._logger.notice(f"send() exiting gracefully: {e.code}") 411 self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE") 412 if self._config.options.get("termination_exception_send") is True: 413 raise 414 return True 415 except websockets.exceptions.ConnectionClosed as e: 416 if e.code in [1000, 1001]: 417 self._logger.notice(f"send({e.code}) exiting gracefully") 418 self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE") 419 if self._config.options.get("termination_exception_send") is True: 420 raise 421 return True 422 423 self._logger.error("send() failed - ConnectionClosed: %s", str(e)) 424 self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE") 425 if self._config.options.get("termination_exception_send") is True: 426 raise 427 return False 428 except websockets.exceptions.WebSocketException as e: 429 self._logger.error("send() failed - WebSocketException: %s", str(e)) 430 self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE") 431 if self._config.options.get("termination_exception_send") is True: 432 raise 433 return False 434 except Exception as e: # pylint: disable=broad-except 435 self._logger.error("send() failed - Exception: %s", str(e)) 436 self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE") 437 if self._config.options.get("termination_exception_send") is True: 438 raise 439 return False 440 441 self._logger.spam("send() succeeded") 442 self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE") 443 return True 444 445 self._logger.spam("send() failed. socket is None") 446 self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE") 447 return False 448 449 # pylint: enable=too-many-return-statements,too-many-branches 450 451 async def finish(self) -> bool: 452 """ 453 Closes the WebSocket connection gracefully. 454 """ 455 self._logger.debug("AbstractAsyncWebSocketClient.finish ENTER") 456 457 # signal exit 458 await self._signal_exit() 459 460 # stop the threads 461 self._logger.verbose("cancelling tasks...") 462 try: 463 # Before cancelling, check if the tasks were created 464 # debug the threads 465 for thread in threading.enumerate(): 466 self._logger.debug("before running thread: %s", thread.name) 467 self._logger.debug("number of active threads: %s", threading.active_count()) 468 469 tasks = [] 470 if self._listen_thread is not None: 471 self._listen_thread.cancel() 472 tasks.append(self._listen_thread) 473 self._logger.notice("processing _listen_thread cancel...") 474 475 # Use asyncio.gather to wait for tasks to be cancelled 476 await asyncio.gather(*filter(None, tasks)) 477 self._logger.notice("threads joined") 478 479 # debug the threads 480 for thread in threading.enumerate(): 481 if thread is not None and thread.name is not None: 482 self._logger.debug("after running thread: %s", thread.name) 483 else: 484 self._logger.debug("after running thread: unknown_thread_name") 485 self._logger.debug("number of active threads: %s", threading.active_count()) 486 487 self._logger.notice("finish succeeded") 488 self._logger.spam("AbstractAsyncWebSocketClient.finish LEAVE") 489 return True 490 491 except asyncio.CancelledError as e: 492 self._logger.error("tasks cancelled error: %s", e) 493 self._logger.debug("AbstractAsyncWebSocketClient.finish LEAVE") 494 return True 495 496 async def _signal_exit(self) -> None: 497 # send close event 498 self._logger.verbose("closing socket...") 499 if self._socket is not None: 500 self._logger.verbose("send Close...") 501 try: 502 # if the socket connection is closed, the following line might throw an error 503 await self._close_message() 504 except websockets.exceptions.ConnectionClosedOK as e: 505 self._logger.notice("_signal_exit - ConnectionClosedOK: %s", e.code) 506 except websockets.exceptions.ConnectionClosed as e: 507 self._logger.error("_signal_exit - ConnectionClosed: %s", e.code) 508 except websockets.exceptions.WebSocketException as e: 509 self._logger.error("_signal_exit - WebSocketException: %s", str(e)) 510 except Exception as e: # pylint: disable=broad-except 511 self._logger.error("_signal_exit - Exception: %s", str(e)) 512 513 # push close event 514 try: 515 await self._emit( 516 WebSocketEvents(WebSocketEvents.Close), 517 close=CloseResponse(type=WebSocketEvents.Close), 518 **dict(cast(Dict[Any, Any], self._kwargs)), 519 ) 520 except Exception as e: # pylint: disable=broad-except 521 self._logger.error("_emit - Exception: %s", e) 522 523 # wait for task to send 524 await asyncio.sleep(0.5) 525 526 # signal exit 527 self._exit_event.set() 528 529 # closes the WebSocket connection gracefully 530 self._logger.verbose("clean up socket...") 531 if self._socket is not None: 532 self._logger.verbose("socket.wait_closed...") 533 try: 534 await self._socket.close() 535 except websockets.exceptions.WebSocketException as e: 536 self._logger.error("socket.wait_closed failed: %s", e) 537 538 self._socket = None
Abstract class for using WebSockets.
This class provides methods to establish a WebSocket connection generically for use in all WebSocket clients.
93 def delegate_listening(self, delegate: Speaker) -> None: 94 """ 95 Delegate the listening thread to the Speaker object. 96 """ 97 self._delegate = delegate
Delegate the listening thread to the Speaker object.
async def
start( self, options: Optional[Any] = None, addons: Optional[Dict] = None, headers: Optional[Dict] = None, **kwargs) -> bool:
100 async def start( 101 self, 102 options: Optional[Any] = None, 103 addons: Optional[Dict] = None, 104 headers: Optional[Dict] = None, 105 **kwargs, 106 ) -> bool: 107 """ 108 Starts the WebSocket connection for live transcription. 109 """ 110 self._logger.debug("AbstractAsyncWebSocketClient.start ENTER") 111 self._logger.info("addons: %s", addons) 112 self._logger.info("headers: %s", headers) 113 self._logger.info("kwargs: %s", kwargs) 114 115 self._addons = addons 116 self._headers = headers 117 118 # set kwargs 119 if kwargs is not None: 120 self._kwargs = kwargs 121 else: 122 self._kwargs = {} 123 124 if not isinstance(options, dict): 125 self._logger.error("options is not a dict") 126 self._logger.debug("AbstractSyncWebSocketClient.start LEAVE") 127 return False 128 129 # set options 130 if options is not None: 131 self._options = options 132 else: 133 self._options = {} 134 135 combined_options = self._options.copy() 136 if self._addons is not None: 137 self._logger.info("merging addons to options") 138 combined_options.update(self._addons) 139 self._logger.info("new options: %s", combined_options) 140 self._logger.debug("combined_options: %s", combined_options) 141 142 combined_headers = self._config.headers.copy() 143 if self._headers is not None: 144 self._logger.info("merging headers to options") 145 combined_headers.update(self._headers) 146 self._logger.info("new headers: %s", combined_headers) 147 self._logger.debug("combined_headers: %s", combined_headers) 148 149 url_with_params = append_query_params(self._websocket_url, combined_options) 150 151 try: 152 ws_connect_kwargs: Dict = { 153 "ping_interval": PING_INTERVAL, 154 WS_ADDITIONAL_HEADERS_KEY: combined_headers, 155 } 156 157 self._socket = await connect( 158 url_with_params, 159 **ws_connect_kwargs, 160 ) 161 self._exit_event.clear() 162 163 # debug the threads 164 for thread in threading.enumerate(): 165 self._logger.debug("after running thread: %s", thread.name) 166 self._logger.debug("number of active threads: %s", threading.active_count()) 167 168 # delegate the listening thread to external object 169 if self._delegate is not None: 170 self._logger.notice("_delegate is enabled. this is usually the speaker") 171 self._delegate.set_pull_callback(self._socket.recv) 172 self._delegate.set_push_callback(self._process_message) 173 else: 174 self._logger.notice("create _listening thread") 175 self._listen_thread = asyncio.create_task(self._listening()) 176 177 # debug the threads 178 for thread in threading.enumerate(): 179 self._logger.debug("after running thread: %s", thread.name) 180 self._logger.debug("number of active threads: %s", threading.active_count()) 181 182 # push open event 183 await self._emit( 184 WebSocketEvents(WebSocketEvents.Open), 185 OpenResponse(type=WebSocketEvents.Open), 186 ) 187 188 self._logger.notice("start succeeded") 189 self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE") 190 return True 191 except websockets.exceptions.ConnectionClosed as e: 192 self._logger.error( 193 "ConnectionClosed in AbstractAsyncWebSocketClient.start: %s", e 194 ) 195 self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE") 196 if self._config.options.get("termination_exception_connect", False): 197 raise 198 return False 199 except websockets.exceptions.WebSocketException as e: 200 self._logger.error( 201 "WebSocketException in AbstractAsyncWebSocketClient.start: %s", e 202 ) 203 self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE") 204 if self._config.options.get("termination_exception_connect", False): 205 raise 206 return False 207 except Exception as e: # pylint: disable=broad-except 208 self._logger.error( 209 "WebSocketException in AbstractAsyncWebSocketClient.start: %s", e 210 ) 211 self._logger.debug("AbstractAsyncWebSocketClient.start LEAVE") 212 if self._config.options.get("termination_exception_connect", False): 213 raise 214 return False
Starts the WebSocket connection for live transcription.
async def
is_connected(self) -> bool:
216 async def is_connected(self) -> bool: 217 """ 218 Returns the connection status of the WebSocket. 219 """ 220 return self._socket is not None
Returns the connection status of the WebSocket.
@abstractmethod
def
on( self, event: deepgram.clients.common.v1.websocket_events.WebSocketEvents, handler: Callable) -> None:
224 @abstractmethod 225 def on(self, event: WebSocketEvents, handler: Callable) -> None: 226 """ 227 Registers an event handler for the WebSocket connection. 228 """ 229 raise NotImplementedError("no on method")
Registers an event handler for the WebSocket connection.
async def
send(self, data: Union[str, bytes]) -> bool:
390 async def send(self, data: Union[str, bytes]) -> bool: 391 """ 392 Sends data over the WebSocket connection. 393 """ 394 self._logger.spam("AbstractAsyncWebSocketClient.send ENTER") 395 396 if self._exit_event.is_set(): 397 self._logger.notice("send exiting gracefully") 398 self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE") 399 return False 400 401 if not await self.is_connected(): 402 self._logger.notice("is_connected is False") 403 self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE") 404 return False 405 406 if self._socket is not None: 407 try: 408 await self._socket.send(data) 409 except websockets.exceptions.ConnectionClosedOK as e: 410 self._logger.notice(f"send() exiting gracefully: {e.code}") 411 self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE") 412 if self._config.options.get("termination_exception_send") is True: 413 raise 414 return True 415 except websockets.exceptions.ConnectionClosed as e: 416 if e.code in [1000, 1001]: 417 self._logger.notice(f"send({e.code}) exiting gracefully") 418 self._logger.debug("AbstractAsyncWebSocketClient.send LEAVE") 419 if self._config.options.get("termination_exception_send") is True: 420 raise 421 return True 422 423 self._logger.error("send() failed - ConnectionClosed: %s", str(e)) 424 self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE") 425 if self._config.options.get("termination_exception_send") is True: 426 raise 427 return False 428 except websockets.exceptions.WebSocketException as e: 429 self._logger.error("send() failed - WebSocketException: %s", str(e)) 430 self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE") 431 if self._config.options.get("termination_exception_send") is True: 432 raise 433 return False 434 except Exception as e: # pylint: disable=broad-except 435 self._logger.error("send() failed - Exception: %s", str(e)) 436 self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE") 437 if self._config.options.get("termination_exception_send") is True: 438 raise 439 return False 440 441 self._logger.spam("send() succeeded") 442 self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE") 443 return True 444 445 self._logger.spam("send() failed. socket is None") 446 self._logger.spam("AbstractAsyncWebSocketClient.send LEAVE") 447 return False
Sends data over the WebSocket connection.
async def
finish(self) -> bool:
451 async def finish(self) -> bool: 452 """ 453 Closes the WebSocket connection gracefully. 454 """ 455 self._logger.debug("AbstractAsyncWebSocketClient.finish ENTER") 456 457 # signal exit 458 await self._signal_exit() 459 460 # stop the threads 461 self._logger.verbose("cancelling tasks...") 462 try: 463 # Before cancelling, check if the tasks were created 464 # debug the threads 465 for thread in threading.enumerate(): 466 self._logger.debug("before running thread: %s", thread.name) 467 self._logger.debug("number of active threads: %s", threading.active_count()) 468 469 tasks = [] 470 if self._listen_thread is not None: 471 self._listen_thread.cancel() 472 tasks.append(self._listen_thread) 473 self._logger.notice("processing _listen_thread cancel...") 474 475 # Use asyncio.gather to wait for tasks to be cancelled 476 await asyncio.gather(*filter(None, tasks)) 477 self._logger.notice("threads joined") 478 479 # debug the threads 480 for thread in threading.enumerate(): 481 if thread is not None and thread.name is not None: 482 self._logger.debug("after running thread: %s", thread.name) 483 else: 484 self._logger.debug("after running thread: unknown_thread_name") 485 self._logger.debug("number of active threads: %s", threading.active_count()) 486 487 self._logger.notice("finish succeeded") 488 self._logger.spam("AbstractAsyncWebSocketClient.finish LEAVE") 489 return True 490 491 except asyncio.CancelledError as e: 492 self._logger.error("tasks cancelled error: %s", e) 493 self._logger.debug("AbstractAsyncWebSocketClient.finish LEAVE") 494 return True
Closes the WebSocket connection gracefully.