deepgram.clients.listen.v1.websocket.client
1# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. 2# Use of this source code is governed by a MIT license that can be found in the LICENSE file. 3# SPDX-License-Identifier: MIT 4import json 5import time 6import logging 7from typing import Dict, Union, Optional, cast, Any, Callable, Type 8from datetime import datetime 9import threading 10 11from .....utils import verboselogs 12from .....options import DeepgramClientOptions 13from ...enums import LiveTranscriptionEvents 14from ....common import AbstractSyncWebSocketClient 15from ....common import DeepgramError 16 17from .response import ( 18 OpenResponse, 19 LiveResultResponse, 20 MetadataResponse, 21 SpeechStartedResponse, 22 UtteranceEndResponse, 23 CloseResponse, 24 ErrorResponse, 25 UnhandledResponse, 26) 27from .options import ListenWebSocketOptions 28 29ONE_SECOND = 1 30HALF_SECOND = 0.5 31DEEPGRAM_INTERVAL = 5 32PING_INTERVAL = 20 33 34 35class ListenWebSocketClient( 36 AbstractSyncWebSocketClient 37): # pylint: disable=too-many-instance-attributes 38 """ 39 Client for interacting with Deepgram's live transcription services over WebSockets. 40 41 This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events. 42 43 Args: 44 config (DeepgramClientOptions): all the options for the client. 45 thread_cls (Type[threading.Thread]): optional thread class to use for creating threads, 46 defaults to threading.Thread. Useful for custom thread management like ContextVar support. 47 """ 48 49 _logger: verboselogs.VerboseLogger 50 _config: DeepgramClientOptions 51 _endpoint: str 52 53 _lock_flush: threading.Lock 54 _event_handlers: Dict[LiveTranscriptionEvents, list] 55 56 _keep_alive_thread: Union[threading.Thread, None] 57 _flush_thread: Union[threading.Thread, None] 58 _last_datagram: Optional[datetime] = None 59 60 _thread_cls: Type[threading.Thread] 61 62 _kwargs: Optional[Dict] = None 63 _addons: Optional[Dict] = None 64 _options: Optional[Dict] = None 65 _headers: Optional[Dict] = None 66 67 def __init__( 68 self, 69 config: DeepgramClientOptions, 70 thread_cls: Type[threading.Thread] = threading.Thread, 71 ): 72 if config is None: 73 raise DeepgramError("Config is required") 74 75 self._logger = verboselogs.VerboseLogger(__name__) 76 self._logger.addHandler(logging.StreamHandler()) 77 self._logger.setLevel(config.verbose) 78 79 self._config = config 80 self._endpoint = "v1/listen" 81 82 self._flush_thread = None 83 self._keep_alive_thread = None 84 85 # auto flush 86 self._last_datagram = None 87 self._lock_flush = threading.Lock() 88 89 self._thread_cls = thread_cls 90 91 # init handlers 92 self._event_handlers = { 93 event: [] for event in LiveTranscriptionEvents.__members__.values() 94 } 95 96 # call the parent constructor 97 super().__init__( 98 config=self._config, 99 endpoint=self._endpoint, 100 thread_cls=self._thread_cls, 101 ) 102 103 # pylint: disable=too-many-statements,too-many-branches 104 def start( 105 self, 106 options: Optional[Union[ListenWebSocketOptions, Dict]] = None, 107 addons: Optional[Dict] = None, 108 headers: Optional[Dict] = None, 109 members: Optional[Dict] = None, 110 **kwargs, 111 ) -> bool: 112 """ 113 Starts the WebSocket connection for live transcription. 114 """ 115 self._logger.debug("ListenWebSocketClient.start ENTER") 116 self._logger.info("options: %s", options) 117 self._logger.info("addons: %s", addons) 118 self._logger.info("headers: %s", headers) 119 self._logger.info("members: %s", members) 120 self._logger.info("kwargs: %s", kwargs) 121 122 if isinstance(options, ListenWebSocketOptions) and not options.check(): 123 self._logger.error("options.check failed") 124 self._logger.debug("ListenWebSocketClient.start LEAVE") 125 raise DeepgramError("Fatal transcription options error") 126 127 self._addons = addons 128 self._headers = headers 129 130 # add "members" as members of the class 131 if members is not None: 132 self.__dict__.update(members) 133 134 # set kwargs as members of the class 135 if kwargs is not None: 136 self._kwargs = kwargs 137 else: 138 self._kwargs = {} 139 140 if isinstance(options, ListenWebSocketOptions): 141 self._logger.info("ListenWebSocketOptions switching class -> dict") 142 self._options = options.to_dict() 143 elif options is not None: 144 self._options = options 145 else: 146 self._options = {} 147 148 try: 149 # call parent start 150 if ( 151 super().start( 152 self._options, 153 self._addons, 154 self._headers, 155 **dict(cast(Dict[Any, Any], self._kwargs)), 156 ) 157 is False 158 ): 159 self._logger.error("ListenWebSocketClient.start failed") 160 self._logger.debug("ListenWebSocketClient.start LEAVE") 161 return False 162 163 # debug the threads 164 for thread in threading.enumerate(): 165 self._logger.debug("after running thread: %s", thread.name) 166 self._logger.debug("number of active threads: %s", threading.active_count()) 167 168 # keepalive thread 169 if self._config.is_keep_alive_enabled(): 170 self._logger.notice("keepalive is enabled") 171 self._keep_alive_thread = self._thread_cls(target=self._keep_alive) 172 self._keep_alive_thread.start() 173 else: 174 self._logger.notice("keepalive is disabled") 175 176 # flush thread 177 if self._config.is_auto_flush_reply_enabled(): 178 self._logger.notice("autoflush is enabled") 179 self._flush_thread = self._thread_cls(target=self._flush) 180 self._flush_thread.start() 181 else: 182 self._logger.notice("autoflush is disabled") 183 184 # debug the threads 185 for thread in threading.enumerate(): 186 self._logger.debug("after running thread: %s", thread.name) 187 self._logger.debug("number of active threads: %s", threading.active_count()) 188 189 self._logger.notice("start succeeded") 190 self._logger.debug("ListenWebSocketClient.start LEAVE") 191 return True 192 193 except Exception as e: # pylint: disable=broad-except 194 self._logger.error( 195 "WebSocketException in ListenWebSocketClient.start: %s", e 196 ) 197 self._logger.debug("ListenWebSocketClient.start LEAVE") 198 if self._config.options.get("termination_exception_connect") is True: 199 raise e 200 return False 201 202 # pylint: enable=too-many-statements,too-many-branches 203 204 def on( 205 self, event: LiveTranscriptionEvents, handler: Callable 206 ) -> None: # registers event handlers for specific events 207 """ 208 Registers event handlers for specific events. 209 """ 210 self._logger.info("event subscribed: %s", event) 211 if event in LiveTranscriptionEvents.__members__.values() and callable(handler): 212 self._event_handlers[event].append(handler) 213 214 def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None: 215 """ 216 Emits events to the registered event handlers. 217 """ 218 self._logger.debug("ListenWebSocketClient._emit ENTER") 219 self._logger.debug("callback handlers for: %s", event) 220 221 # debug the threads 222 for thread in threading.enumerate(): 223 self._logger.debug("after running thread: %s", thread.name) 224 self._logger.debug("number of active threads: %s", threading.active_count()) 225 226 self._logger.debug("callback handlers for: %s", event) 227 for handler in self._event_handlers[event]: 228 handler(self, *args, **kwargs) 229 230 # debug the threads 231 for thread in threading.enumerate(): 232 self._logger.debug("after running thread: %s", thread.name) 233 self._logger.debug("number of active threads: %s", threading.active_count()) 234 235 self._logger.debug("ListenWebSocketClient._emit LEAVE") 236 237 # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches 238 def _process_text(self, message: str) -> None: 239 """ 240 Processes messages received over the WebSocket connection. 241 """ 242 self._logger.debug("ListenWebSocketClient._process_text ENTER") 243 244 try: 245 if len(message) == 0: 246 self._logger.debug("message is empty") 247 self._logger.debug("ListenWebSocketClient._process_text LEAVE") 248 return 249 250 data = json.loads(message) 251 response_type = data.get("type") 252 self._logger.debug("response_type: %s, data: %s", response_type, data) 253 254 match response_type: 255 case LiveTranscriptionEvents.Open: 256 open_result: OpenResponse = OpenResponse.from_json(message) 257 self._logger.verbose("OpenResponse: %s", open_result) 258 self._emit( 259 LiveTranscriptionEvents(LiveTranscriptionEvents.Open), 260 open=open_result, 261 **dict(cast(Dict[Any, Any], self._kwargs)), 262 ) 263 case LiveTranscriptionEvents.Transcript: 264 msg_result: LiveResultResponse = LiveResultResponse.from_json( 265 message 266 ) 267 self._logger.verbose("LiveResultResponse: %s", msg_result) 268 269 # auto flush 270 if self._config.is_inspecting_listen(): 271 inspect_res = self._inspect(msg_result) 272 if not inspect_res: 273 self._logger.error("inspect_res failed") 274 275 self._emit( 276 LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript), 277 result=msg_result, 278 **dict(cast(Dict[Any, Any], self._kwargs)), 279 ) 280 case LiveTranscriptionEvents.Metadata: 281 meta_result: MetadataResponse = MetadataResponse.from_json(message) 282 self._logger.verbose("MetadataResponse: %s", meta_result) 283 self._emit( 284 LiveTranscriptionEvents(LiveTranscriptionEvents.Metadata), 285 metadata=meta_result, 286 **dict(cast(Dict[Any, Any], self._kwargs)), 287 ) 288 case LiveTranscriptionEvents.SpeechStarted: 289 ss_result: SpeechStartedResponse = SpeechStartedResponse.from_json( 290 message 291 ) 292 self._logger.verbose("SpeechStartedResponse: %s", ss_result) 293 self._emit( 294 LiveTranscriptionEvents(LiveTranscriptionEvents.SpeechStarted), 295 speech_started=ss_result, 296 **dict(cast(Dict[Any, Any], self._kwargs)), 297 ) 298 case LiveTranscriptionEvents.UtteranceEnd: 299 ue_result: UtteranceEndResponse = UtteranceEndResponse.from_json( 300 message 301 ) 302 self._logger.verbose("UtteranceEndResponse: %s", ue_result) 303 self._emit( 304 LiveTranscriptionEvents(LiveTranscriptionEvents.UtteranceEnd), 305 utterance_end=ue_result, 306 **dict(cast(Dict[Any, Any], self._kwargs)), 307 ) 308 case LiveTranscriptionEvents.Close: 309 close_result: CloseResponse = CloseResponse.from_json(message) 310 self._logger.verbose("CloseResponse: %s", close_result) 311 self._emit( 312 LiveTranscriptionEvents(LiveTranscriptionEvents.Close), 313 close=close_result, 314 **dict(cast(Dict[Any, Any], self._kwargs)), 315 ) 316 case LiveTranscriptionEvents.Error: 317 err_error: ErrorResponse = ErrorResponse.from_json(message) 318 self._logger.verbose("ErrorResponse: %s", err_error) 319 self._emit( 320 LiveTranscriptionEvents(LiveTranscriptionEvents.Error), 321 error=err_error, 322 **dict(cast(Dict[Any, Any], self._kwargs)), 323 ) 324 case _: 325 self._logger.warning( 326 "Unknown Message: response_type: %s, data: %s", 327 response_type, 328 data, 329 ) 330 unhandled_error: UnhandledResponse = UnhandledResponse( 331 type=LiveTranscriptionEvents(LiveTranscriptionEvents.Unhandled), 332 raw=message, 333 ) 334 self._emit( 335 LiveTranscriptionEvents(LiveTranscriptionEvents.Unhandled), 336 unhandled=unhandled_error, 337 **dict(cast(Dict[Any, Any], self._kwargs)), 338 ) 339 340 self._logger.notice("_process_text Succeeded") 341 self._logger.debug("SpeakStreamClient._process_text LEAVE") 342 343 except Exception as e: # pylint: disable=broad-except 344 self._logger.error( 345 "Exception in ListenWebSocketClient._process_text: %s", e 346 ) 347 e_error: ErrorResponse = ErrorResponse( 348 "Exception in ListenWebSocketClient._process_text", 349 f"{e}", 350 "Exception", 351 ) 352 self._logger.error( 353 "Exception in ListenWebSocketClient._process_text: %s", str(e) 354 ) 355 self._emit( 356 LiveTranscriptionEvents(LiveTranscriptionEvents.Error), 357 e_error, 358 **dict(cast(Dict[Any, Any], self._kwargs)), 359 ) 360 361 # signal exit and close 362 super()._signal_exit() 363 364 self._logger.debug("ListenWebSocketClient._process_text LEAVE") 365 366 if self._config.options.get("termination_exception") is True: 367 raise 368 return 369 370 # pylint: enable=too-many-return-statements,too-many-statements 371 372 def _process_binary(self, message: bytes) -> None: 373 raise NotImplementedError("no _process_binary method should be called") 374 375 # pylint: disable=too-many-return-statements 376 def _keep_alive(self) -> None: 377 self._logger.debug("ListenWebSocketClient._keep_alive ENTER") 378 379 counter = 0 380 while True: 381 try: 382 counter += 1 383 self._exit_event.wait(timeout=ONE_SECOND) 384 385 if self._exit_event.is_set(): 386 self._logger.notice("_keep_alive exiting gracefully") 387 self._logger.debug("ListenWebSocketClient._keep_alive LEAVE") 388 return 389 390 # deepgram keepalive 391 if counter % DEEPGRAM_INTERVAL == 0: 392 self.keep_alive() 393 394 except Exception as e: # pylint: disable=broad-except 395 self._logger.error( 396 "Exception in ListenWebSocketClient._keep_alive: %s", e 397 ) 398 e_error: ErrorResponse = ErrorResponse( 399 "Exception in ListenWebSocketClient._keep_alive", 400 f"{e}", 401 "Exception", 402 ) 403 self._logger.error( 404 "Exception in ListenWebSocketClient._keep_alive: %s", str(e) 405 ) 406 self._emit( 407 LiveTranscriptionEvents(LiveTranscriptionEvents.Error), 408 e_error, 409 **dict(cast(Dict[Any, Any], self._kwargs)), 410 ) 411 412 # signal exit and close 413 super()._signal_exit() 414 415 self._logger.debug("ListenWebSocketClient._keep_alive LEAVE") 416 417 if self._config.options.get("termination_exception") is True: 418 raise 419 return 420 421 ## pylint: disable=too-many-return-statements,too-many-statements 422 def _flush(self) -> None: 423 self._logger.debug("ListenWebSocketClient._flush ENTER") 424 425 delta_in_ms_str = self._config.options.get("auto_flush_reply_delta") 426 if delta_in_ms_str is None: 427 self._logger.error("auto_flush_reply_delta is None") 428 self._logger.debug("ListenWebSocketClient._flush LEAVE") 429 return 430 delta_in_ms = float(delta_in_ms_str) 431 432 _flush_event = threading.Event() 433 while True: 434 try: 435 _flush_event.wait(timeout=HALF_SECOND) 436 437 if self._exit_event.is_set(): 438 self._logger.notice("_flush exiting gracefully") 439 self._logger.debug("ListenWebSocketClient._flush LEAVE") 440 return 441 442 with self._lock_flush: 443 if self._last_datagram is None: 444 self._logger.debug("AutoFlush last_datagram is None") 445 continue 446 447 delta = datetime.now() - self._last_datagram 448 diff_in_ms = delta.total_seconds() * 1000 449 self._logger.debug("AutoFlush delta: %f", diff_in_ms) 450 if diff_in_ms < delta_in_ms: 451 self._logger.debug("AutoFlush delta is less than threshold") 452 continue 453 454 with self._lock_flush: 455 self._last_datagram = None 456 self.finalize() 457 458 except Exception as e: # pylint: disable=broad-except 459 self._logger.error("Exception in ListenWebSocketClient._flush: %s", e) 460 e_error: ErrorResponse = ErrorResponse( 461 "Exception in ListenWebSocketClient._flush", 462 f"{e}", 463 "Exception", 464 ) 465 self._logger.error( 466 "Exception in ListenWebSocketClient._flush: %s", str(e) 467 ) 468 self._emit( 469 LiveTranscriptionEvents(LiveTranscriptionEvents.Error), 470 e_error, 471 **dict(cast(Dict[Any, Any], self._kwargs)), 472 ) 473 474 # signal exit and close 475 super()._signal_exit() 476 477 self._logger.debug("ListenWebSocketClient._flush LEAVE") 478 479 if self._config.options.get("termination_exception") is True: 480 raise 481 return 482 483 # pylint: enable=too-many-return-statements 484 485 def keep_alive(self) -> bool: 486 """ 487 Sends a KeepAlive message 488 """ 489 self._logger.spam("ListenWebSocketClient.keep_alive ENTER") 490 491 self._logger.notice("Sending KeepAlive...") 492 ret = self.send(json.dumps({"type": "KeepAlive"})) 493 494 if not ret: 495 self._logger.error("keep_alive failed") 496 self._logger.spam("ListenWebSocketClient.keep_alive LEAVE") 497 return False 498 499 self._logger.notice("keep_alive succeeded") 500 self._logger.spam("ListenWebSocketClient.keep_alive LEAVE") 501 502 return True 503 504 def finalize(self) -> bool: 505 """ 506 Finalizes the Transcript connection by flushing it 507 """ 508 self._logger.spam("ListenWebSocketClient.finalize ENTER") 509 510 self._logger.notice("Sending Finalize...") 511 ret = self.send(json.dumps({"type": "Finalize"})) 512 513 if not ret: 514 self._logger.error("finalize failed") 515 self._logger.spam("ListenWebSocketClient.finalize LEAVE") 516 return False 517 518 self._logger.notice("finalize succeeded") 519 self._logger.spam("ListenWebSocketClient.finalize LEAVE") 520 521 return True 522 523 def _close_message(self) -> bool: 524 return self.send(json.dumps({"type": "CloseStream"})) 525 526 # closes the WebSocket connection gracefully 527 def finish(self) -> bool: 528 """ 529 Closes the WebSocket connection gracefully. 530 """ 531 self._logger.spam("ListenWebSocketClient.finish ENTER") 532 533 # call parent finish 534 if super().finish() is False: 535 self._logger.error("ListenWebSocketClient.finish failed") 536 537 # debug the threads 538 for thread in threading.enumerate(): 539 self._logger.debug("before running thread: %s", thread.name) 540 self._logger.debug("number of active threads: %s", threading.active_count()) 541 542 # stop the threads 543 self._logger.verbose("cancelling tasks...") 544 if self._flush_thread is not None: 545 self._flush_thread.join() 546 self._flush_thread = None 547 self._logger.notice("processing _flush_thread thread joined") 548 549 if self._keep_alive_thread is not None: 550 self._keep_alive_thread.join() 551 self._keep_alive_thread = None 552 self._logger.notice("processing _keep_alive_thread thread joined") 553 554 if self._listen_thread is not None: 555 self._listen_thread.join() 556 self._listen_thread = None 557 self._logger.notice("listening thread joined") 558 559 # debug the threads 560 for thread in threading.enumerate(): 561 self._logger.debug("before running thread: %s", thread.name) 562 self._logger.debug("number of active threads: %s", threading.active_count()) 563 564 self._logger.notice("finish succeeded") 565 self._logger.spam("ListenWebSocketClient.finish LEAVE") 566 return True 567 568 def _inspect(self, msg_result: LiveResultResponse) -> bool: 569 # auto flush_inspect is generically used to track any messages you might want to snoop on 570 # place additional logic here to inspect messages of interest 571 572 # for auto flush functionality 573 # set the last datagram 574 sentence = msg_result.channel.alternatives[0].transcript 575 if len(sentence) == 0: 576 return True 577 578 if msg_result.is_final: 579 with self._lock_flush: 580 self._logger.debug("AutoFlush is_final received") 581 self._last_datagram = None 582 else: 583 with self._lock_flush: 584 self._last_datagram = datetime.now() 585 self._logger.debug( 586 "AutoFlush interim received: %s", 587 str(self._last_datagram), 588 ) 589 590 return True
ONE_SECOND =
1
HALF_SECOND =
0.5
DEEPGRAM_INTERVAL =
5
PING_INTERVAL =
20
class
ListenWebSocketClient(deepgram.clients.common.v1.abstract_sync_websocket.AbstractSyncWebSocketClient):
36class ListenWebSocketClient( 37 AbstractSyncWebSocketClient 38): # pylint: disable=too-many-instance-attributes 39 """ 40 Client for interacting with Deepgram's live transcription services over WebSockets. 41 42 This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events. 43 44 Args: 45 config (DeepgramClientOptions): all the options for the client. 46 thread_cls (Type[threading.Thread]): optional thread class to use for creating threads, 47 defaults to threading.Thread. Useful for custom thread management like ContextVar support. 48 """ 49 50 _logger: verboselogs.VerboseLogger 51 _config: DeepgramClientOptions 52 _endpoint: str 53 54 _lock_flush: threading.Lock 55 _event_handlers: Dict[LiveTranscriptionEvents, list] 56 57 _keep_alive_thread: Union[threading.Thread, None] 58 _flush_thread: Union[threading.Thread, None] 59 _last_datagram: Optional[datetime] = None 60 61 _thread_cls: Type[threading.Thread] 62 63 _kwargs: Optional[Dict] = None 64 _addons: Optional[Dict] = None 65 _options: Optional[Dict] = None 66 _headers: Optional[Dict] = None 67 68 def __init__( 69 self, 70 config: DeepgramClientOptions, 71 thread_cls: Type[threading.Thread] = threading.Thread, 72 ): 73 if config is None: 74 raise DeepgramError("Config is required") 75 76 self._logger = verboselogs.VerboseLogger(__name__) 77 self._logger.addHandler(logging.StreamHandler()) 78 self._logger.setLevel(config.verbose) 79 80 self._config = config 81 self._endpoint = "v1/listen" 82 83 self._flush_thread = None 84 self._keep_alive_thread = None 85 86 # auto flush 87 self._last_datagram = None 88 self._lock_flush = threading.Lock() 89 90 self._thread_cls = thread_cls 91 92 # init handlers 93 self._event_handlers = { 94 event: [] for event in LiveTranscriptionEvents.__members__.values() 95 } 96 97 # call the parent constructor 98 super().__init__( 99 config=self._config, 100 endpoint=self._endpoint, 101 thread_cls=self._thread_cls, 102 ) 103 104 # pylint: disable=too-many-statements,too-many-branches 105 def start( 106 self, 107 options: Optional[Union[ListenWebSocketOptions, Dict]] = None, 108 addons: Optional[Dict] = None, 109 headers: Optional[Dict] = None, 110 members: Optional[Dict] = None, 111 **kwargs, 112 ) -> bool: 113 """ 114 Starts the WebSocket connection for live transcription. 115 """ 116 self._logger.debug("ListenWebSocketClient.start ENTER") 117 self._logger.info("options: %s", options) 118 self._logger.info("addons: %s", addons) 119 self._logger.info("headers: %s", headers) 120 self._logger.info("members: %s", members) 121 self._logger.info("kwargs: %s", kwargs) 122 123 if isinstance(options, ListenWebSocketOptions) and not options.check(): 124 self._logger.error("options.check failed") 125 self._logger.debug("ListenWebSocketClient.start LEAVE") 126 raise DeepgramError("Fatal transcription options error") 127 128 self._addons = addons 129 self._headers = headers 130 131 # add "members" as members of the class 132 if members is not None: 133 self.__dict__.update(members) 134 135 # set kwargs as members of the class 136 if kwargs is not None: 137 self._kwargs = kwargs 138 else: 139 self._kwargs = {} 140 141 if isinstance(options, ListenWebSocketOptions): 142 self._logger.info("ListenWebSocketOptions switching class -> dict") 143 self._options = options.to_dict() 144 elif options is not None: 145 self._options = options 146 else: 147 self._options = {} 148 149 try: 150 # call parent start 151 if ( 152 super().start( 153 self._options, 154 self._addons, 155 self._headers, 156 **dict(cast(Dict[Any, Any], self._kwargs)), 157 ) 158 is False 159 ): 160 self._logger.error("ListenWebSocketClient.start failed") 161 self._logger.debug("ListenWebSocketClient.start LEAVE") 162 return False 163 164 # debug the threads 165 for thread in threading.enumerate(): 166 self._logger.debug("after running thread: %s", thread.name) 167 self._logger.debug("number of active threads: %s", threading.active_count()) 168 169 # keepalive thread 170 if self._config.is_keep_alive_enabled(): 171 self._logger.notice("keepalive is enabled") 172 self._keep_alive_thread = self._thread_cls(target=self._keep_alive) 173 self._keep_alive_thread.start() 174 else: 175 self._logger.notice("keepalive is disabled") 176 177 # flush thread 178 if self._config.is_auto_flush_reply_enabled(): 179 self._logger.notice("autoflush is enabled") 180 self._flush_thread = self._thread_cls(target=self._flush) 181 self._flush_thread.start() 182 else: 183 self._logger.notice("autoflush is disabled") 184 185 # debug the threads 186 for thread in threading.enumerate(): 187 self._logger.debug("after running thread: %s", thread.name) 188 self._logger.debug("number of active threads: %s", threading.active_count()) 189 190 self._logger.notice("start succeeded") 191 self._logger.debug("ListenWebSocketClient.start LEAVE") 192 return True 193 194 except Exception as e: # pylint: disable=broad-except 195 self._logger.error( 196 "WebSocketException in ListenWebSocketClient.start: %s", e 197 ) 198 self._logger.debug("ListenWebSocketClient.start LEAVE") 199 if self._config.options.get("termination_exception_connect") is True: 200 raise e 201 return False 202 203 # pylint: enable=too-many-statements,too-many-branches 204 205 def on( 206 self, event: LiveTranscriptionEvents, handler: Callable 207 ) -> None: # registers event handlers for specific events 208 """ 209 Registers event handlers for specific events. 210 """ 211 self._logger.info("event subscribed: %s", event) 212 if event in LiveTranscriptionEvents.__members__.values() and callable(handler): 213 self._event_handlers[event].append(handler) 214 215 def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None: 216 """ 217 Emits events to the registered event handlers. 218 """ 219 self._logger.debug("ListenWebSocketClient._emit ENTER") 220 self._logger.debug("callback handlers for: %s", event) 221 222 # debug the threads 223 for thread in threading.enumerate(): 224 self._logger.debug("after running thread: %s", thread.name) 225 self._logger.debug("number of active threads: %s", threading.active_count()) 226 227 self._logger.debug("callback handlers for: %s", event) 228 for handler in self._event_handlers[event]: 229 handler(self, *args, **kwargs) 230 231 # debug the threads 232 for thread in threading.enumerate(): 233 self._logger.debug("after running thread: %s", thread.name) 234 self._logger.debug("number of active threads: %s", threading.active_count()) 235 236 self._logger.debug("ListenWebSocketClient._emit LEAVE") 237 238 # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches 239 def _process_text(self, message: str) -> None: 240 """ 241 Processes messages received over the WebSocket connection. 242 """ 243 self._logger.debug("ListenWebSocketClient._process_text ENTER") 244 245 try: 246 if len(message) == 0: 247 self._logger.debug("message is empty") 248 self._logger.debug("ListenWebSocketClient._process_text LEAVE") 249 return 250 251 data = json.loads(message) 252 response_type = data.get("type") 253 self._logger.debug("response_type: %s, data: %s", response_type, data) 254 255 match response_type: 256 case LiveTranscriptionEvents.Open: 257 open_result: OpenResponse = OpenResponse.from_json(message) 258 self._logger.verbose("OpenResponse: %s", open_result) 259 self._emit( 260 LiveTranscriptionEvents(LiveTranscriptionEvents.Open), 261 open=open_result, 262 **dict(cast(Dict[Any, Any], self._kwargs)), 263 ) 264 case LiveTranscriptionEvents.Transcript: 265 msg_result: LiveResultResponse = LiveResultResponse.from_json( 266 message 267 ) 268 self._logger.verbose("LiveResultResponse: %s", msg_result) 269 270 # auto flush 271 if self._config.is_inspecting_listen(): 272 inspect_res = self._inspect(msg_result) 273 if not inspect_res: 274 self._logger.error("inspect_res failed") 275 276 self._emit( 277 LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript), 278 result=msg_result, 279 **dict(cast(Dict[Any, Any], self._kwargs)), 280 ) 281 case LiveTranscriptionEvents.Metadata: 282 meta_result: MetadataResponse = MetadataResponse.from_json(message) 283 self._logger.verbose("MetadataResponse: %s", meta_result) 284 self._emit( 285 LiveTranscriptionEvents(LiveTranscriptionEvents.Metadata), 286 metadata=meta_result, 287 **dict(cast(Dict[Any, Any], self._kwargs)), 288 ) 289 case LiveTranscriptionEvents.SpeechStarted: 290 ss_result: SpeechStartedResponse = SpeechStartedResponse.from_json( 291 message 292 ) 293 self._logger.verbose("SpeechStartedResponse: %s", ss_result) 294 self._emit( 295 LiveTranscriptionEvents(LiveTranscriptionEvents.SpeechStarted), 296 speech_started=ss_result, 297 **dict(cast(Dict[Any, Any], self._kwargs)), 298 ) 299 case LiveTranscriptionEvents.UtteranceEnd: 300 ue_result: UtteranceEndResponse = UtteranceEndResponse.from_json( 301 message 302 ) 303 self._logger.verbose("UtteranceEndResponse: %s", ue_result) 304 self._emit( 305 LiveTranscriptionEvents(LiveTranscriptionEvents.UtteranceEnd), 306 utterance_end=ue_result, 307 **dict(cast(Dict[Any, Any], self._kwargs)), 308 ) 309 case LiveTranscriptionEvents.Close: 310 close_result: CloseResponse = CloseResponse.from_json(message) 311 self._logger.verbose("CloseResponse: %s", close_result) 312 self._emit( 313 LiveTranscriptionEvents(LiveTranscriptionEvents.Close), 314 close=close_result, 315 **dict(cast(Dict[Any, Any], self._kwargs)), 316 ) 317 case LiveTranscriptionEvents.Error: 318 err_error: ErrorResponse = ErrorResponse.from_json(message) 319 self._logger.verbose("ErrorResponse: %s", err_error) 320 self._emit( 321 LiveTranscriptionEvents(LiveTranscriptionEvents.Error), 322 error=err_error, 323 **dict(cast(Dict[Any, Any], self._kwargs)), 324 ) 325 case _: 326 self._logger.warning( 327 "Unknown Message: response_type: %s, data: %s", 328 response_type, 329 data, 330 ) 331 unhandled_error: UnhandledResponse = UnhandledResponse( 332 type=LiveTranscriptionEvents(LiveTranscriptionEvents.Unhandled), 333 raw=message, 334 ) 335 self._emit( 336 LiveTranscriptionEvents(LiveTranscriptionEvents.Unhandled), 337 unhandled=unhandled_error, 338 **dict(cast(Dict[Any, Any], self._kwargs)), 339 ) 340 341 self._logger.notice("_process_text Succeeded") 342 self._logger.debug("SpeakStreamClient._process_text LEAVE") 343 344 except Exception as e: # pylint: disable=broad-except 345 self._logger.error( 346 "Exception in ListenWebSocketClient._process_text: %s", e 347 ) 348 e_error: ErrorResponse = ErrorResponse( 349 "Exception in ListenWebSocketClient._process_text", 350 f"{e}", 351 "Exception", 352 ) 353 self._logger.error( 354 "Exception in ListenWebSocketClient._process_text: %s", str(e) 355 ) 356 self._emit( 357 LiveTranscriptionEvents(LiveTranscriptionEvents.Error), 358 e_error, 359 **dict(cast(Dict[Any, Any], self._kwargs)), 360 ) 361 362 # signal exit and close 363 super()._signal_exit() 364 365 self._logger.debug("ListenWebSocketClient._process_text LEAVE") 366 367 if self._config.options.get("termination_exception") is True: 368 raise 369 return 370 371 # pylint: enable=too-many-return-statements,too-many-statements 372 373 def _process_binary(self, message: bytes) -> None: 374 raise NotImplementedError("no _process_binary method should be called") 375 376 # pylint: disable=too-many-return-statements 377 def _keep_alive(self) -> None: 378 self._logger.debug("ListenWebSocketClient._keep_alive ENTER") 379 380 counter = 0 381 while True: 382 try: 383 counter += 1 384 self._exit_event.wait(timeout=ONE_SECOND) 385 386 if self._exit_event.is_set(): 387 self._logger.notice("_keep_alive exiting gracefully") 388 self._logger.debug("ListenWebSocketClient._keep_alive LEAVE") 389 return 390 391 # deepgram keepalive 392 if counter % DEEPGRAM_INTERVAL == 0: 393 self.keep_alive() 394 395 except Exception as e: # pylint: disable=broad-except 396 self._logger.error( 397 "Exception in ListenWebSocketClient._keep_alive: %s", e 398 ) 399 e_error: ErrorResponse = ErrorResponse( 400 "Exception in ListenWebSocketClient._keep_alive", 401 f"{e}", 402 "Exception", 403 ) 404 self._logger.error( 405 "Exception in ListenWebSocketClient._keep_alive: %s", str(e) 406 ) 407 self._emit( 408 LiveTranscriptionEvents(LiveTranscriptionEvents.Error), 409 e_error, 410 **dict(cast(Dict[Any, Any], self._kwargs)), 411 ) 412 413 # signal exit and close 414 super()._signal_exit() 415 416 self._logger.debug("ListenWebSocketClient._keep_alive LEAVE") 417 418 if self._config.options.get("termination_exception") is True: 419 raise 420 return 421 422 ## pylint: disable=too-many-return-statements,too-many-statements 423 def _flush(self) -> None: 424 self._logger.debug("ListenWebSocketClient._flush ENTER") 425 426 delta_in_ms_str = self._config.options.get("auto_flush_reply_delta") 427 if delta_in_ms_str is None: 428 self._logger.error("auto_flush_reply_delta is None") 429 self._logger.debug("ListenWebSocketClient._flush LEAVE") 430 return 431 delta_in_ms = float(delta_in_ms_str) 432 433 _flush_event = threading.Event() 434 while True: 435 try: 436 _flush_event.wait(timeout=HALF_SECOND) 437 438 if self._exit_event.is_set(): 439 self._logger.notice("_flush exiting gracefully") 440 self._logger.debug("ListenWebSocketClient._flush LEAVE") 441 return 442 443 with self._lock_flush: 444 if self._last_datagram is None: 445 self._logger.debug("AutoFlush last_datagram is None") 446 continue 447 448 delta = datetime.now() - self._last_datagram 449 diff_in_ms = delta.total_seconds() * 1000 450 self._logger.debug("AutoFlush delta: %f", diff_in_ms) 451 if diff_in_ms < delta_in_ms: 452 self._logger.debug("AutoFlush delta is less than threshold") 453 continue 454 455 with self._lock_flush: 456 self._last_datagram = None 457 self.finalize() 458 459 except Exception as e: # pylint: disable=broad-except 460 self._logger.error("Exception in ListenWebSocketClient._flush: %s", e) 461 e_error: ErrorResponse = ErrorResponse( 462 "Exception in ListenWebSocketClient._flush", 463 f"{e}", 464 "Exception", 465 ) 466 self._logger.error( 467 "Exception in ListenWebSocketClient._flush: %s", str(e) 468 ) 469 self._emit( 470 LiveTranscriptionEvents(LiveTranscriptionEvents.Error), 471 e_error, 472 **dict(cast(Dict[Any, Any], self._kwargs)), 473 ) 474 475 # signal exit and close 476 super()._signal_exit() 477 478 self._logger.debug("ListenWebSocketClient._flush LEAVE") 479 480 if self._config.options.get("termination_exception") is True: 481 raise 482 return 483 484 # pylint: enable=too-many-return-statements 485 486 def keep_alive(self) -> bool: 487 """ 488 Sends a KeepAlive message 489 """ 490 self._logger.spam("ListenWebSocketClient.keep_alive ENTER") 491 492 self._logger.notice("Sending KeepAlive...") 493 ret = self.send(json.dumps({"type": "KeepAlive"})) 494 495 if not ret: 496 self._logger.error("keep_alive failed") 497 self._logger.spam("ListenWebSocketClient.keep_alive LEAVE") 498 return False 499 500 self._logger.notice("keep_alive succeeded") 501 self._logger.spam("ListenWebSocketClient.keep_alive LEAVE") 502 503 return True 504 505 def finalize(self) -> bool: 506 """ 507 Finalizes the Transcript connection by flushing it 508 """ 509 self._logger.spam("ListenWebSocketClient.finalize ENTER") 510 511 self._logger.notice("Sending Finalize...") 512 ret = self.send(json.dumps({"type": "Finalize"})) 513 514 if not ret: 515 self._logger.error("finalize failed") 516 self._logger.spam("ListenWebSocketClient.finalize LEAVE") 517 return False 518 519 self._logger.notice("finalize succeeded") 520 self._logger.spam("ListenWebSocketClient.finalize LEAVE") 521 522 return True 523 524 def _close_message(self) -> bool: 525 return self.send(json.dumps({"type": "CloseStream"})) 526 527 # closes the WebSocket connection gracefully 528 def finish(self) -> bool: 529 """ 530 Closes the WebSocket connection gracefully. 531 """ 532 self._logger.spam("ListenWebSocketClient.finish ENTER") 533 534 # call parent finish 535 if super().finish() is False: 536 self._logger.error("ListenWebSocketClient.finish failed") 537 538 # debug the threads 539 for thread in threading.enumerate(): 540 self._logger.debug("before running thread: %s", thread.name) 541 self._logger.debug("number of active threads: %s", threading.active_count()) 542 543 # stop the threads 544 self._logger.verbose("cancelling tasks...") 545 if self._flush_thread is not None: 546 self._flush_thread.join() 547 self._flush_thread = None 548 self._logger.notice("processing _flush_thread thread joined") 549 550 if self._keep_alive_thread is not None: 551 self._keep_alive_thread.join() 552 self._keep_alive_thread = None 553 self._logger.notice("processing _keep_alive_thread thread joined") 554 555 if self._listen_thread is not None: 556 self._listen_thread.join() 557 self._listen_thread = None 558 self._logger.notice("listening thread joined") 559 560 # debug the threads 561 for thread in threading.enumerate(): 562 self._logger.debug("before running thread: %s", thread.name) 563 self._logger.debug("number of active threads: %s", threading.active_count()) 564 565 self._logger.notice("finish succeeded") 566 self._logger.spam("ListenWebSocketClient.finish LEAVE") 567 return True 568 569 def _inspect(self, msg_result: LiveResultResponse) -> bool: 570 # auto flush_inspect is generically used to track any messages you might want to snoop on 571 # place additional logic here to inspect messages of interest 572 573 # for auto flush functionality 574 # set the last datagram 575 sentence = msg_result.channel.alternatives[0].transcript 576 if len(sentence) == 0: 577 return True 578 579 if msg_result.is_final: 580 with self._lock_flush: 581 self._logger.debug("AutoFlush is_final received") 582 self._last_datagram = None 583 else: 584 with self._lock_flush: 585 self._last_datagram = datetime.now() 586 self._logger.debug( 587 "AutoFlush interim received: %s", 588 str(self._last_datagram), 589 ) 590 591 return True
Client for interacting with Deepgram's live transcription services over WebSockets.
This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
Args: config (DeepgramClientOptions): all the options for the client. thread_cls (Type[threading.Thread]): optional thread class to use for creating threads, defaults to threading.Thread. Useful for custom thread management like ContextVar support.
ListenWebSocketClient( config: deepgram.options.DeepgramClientOptions, thread_cls: Type[threading.Thread] = <class 'threading.Thread'>)
68 def __init__( 69 self, 70 config: DeepgramClientOptions, 71 thread_cls: Type[threading.Thread] = threading.Thread, 72 ): 73 if config is None: 74 raise DeepgramError("Config is required") 75 76 self._logger = verboselogs.VerboseLogger(__name__) 77 self._logger.addHandler(logging.StreamHandler()) 78 self._logger.setLevel(config.verbose) 79 80 self._config = config 81 self._endpoint = "v1/listen" 82 83 self._flush_thread = None 84 self._keep_alive_thread = None 85 86 # auto flush 87 self._last_datagram = None 88 self._lock_flush = threading.Lock() 89 90 self._thread_cls = thread_cls 91 92 # init handlers 93 self._event_handlers = { 94 event: [] for event in LiveTranscriptionEvents.__members__.values() 95 } 96 97 # call the parent constructor 98 super().__init__( 99 config=self._config, 100 endpoint=self._endpoint, 101 thread_cls=self._thread_cls, 102 )
def
start( self, options: Union[deepgram.clients.listen.v1.websocket.options.LiveOptions, Dict, NoneType] = None, addons: Optional[Dict] = None, headers: Optional[Dict] = None, members: Optional[Dict] = None, **kwargs) -> bool:
105 def start( 106 self, 107 options: Optional[Union[ListenWebSocketOptions, Dict]] = None, 108 addons: Optional[Dict] = None, 109 headers: Optional[Dict] = None, 110 members: Optional[Dict] = None, 111 **kwargs, 112 ) -> bool: 113 """ 114 Starts the WebSocket connection for live transcription. 115 """ 116 self._logger.debug("ListenWebSocketClient.start ENTER") 117 self._logger.info("options: %s", options) 118 self._logger.info("addons: %s", addons) 119 self._logger.info("headers: %s", headers) 120 self._logger.info("members: %s", members) 121 self._logger.info("kwargs: %s", kwargs) 122 123 if isinstance(options, ListenWebSocketOptions) and not options.check(): 124 self._logger.error("options.check failed") 125 self._logger.debug("ListenWebSocketClient.start LEAVE") 126 raise DeepgramError("Fatal transcription options error") 127 128 self._addons = addons 129 self._headers = headers 130 131 # add "members" as members of the class 132 if members is not None: 133 self.__dict__.update(members) 134 135 # set kwargs as members of the class 136 if kwargs is not None: 137 self._kwargs = kwargs 138 else: 139 self._kwargs = {} 140 141 if isinstance(options, ListenWebSocketOptions): 142 self._logger.info("ListenWebSocketOptions switching class -> dict") 143 self._options = options.to_dict() 144 elif options is not None: 145 self._options = options 146 else: 147 self._options = {} 148 149 try: 150 # call parent start 151 if ( 152 super().start( 153 self._options, 154 self._addons, 155 self._headers, 156 **dict(cast(Dict[Any, Any], self._kwargs)), 157 ) 158 is False 159 ): 160 self._logger.error("ListenWebSocketClient.start failed") 161 self._logger.debug("ListenWebSocketClient.start LEAVE") 162 return False 163 164 # debug the threads 165 for thread in threading.enumerate(): 166 self._logger.debug("after running thread: %s", thread.name) 167 self._logger.debug("number of active threads: %s", threading.active_count()) 168 169 # keepalive thread 170 if self._config.is_keep_alive_enabled(): 171 self._logger.notice("keepalive is enabled") 172 self._keep_alive_thread = self._thread_cls(target=self._keep_alive) 173 self._keep_alive_thread.start() 174 else: 175 self._logger.notice("keepalive is disabled") 176 177 # flush thread 178 if self._config.is_auto_flush_reply_enabled(): 179 self._logger.notice("autoflush is enabled") 180 self._flush_thread = self._thread_cls(target=self._flush) 181 self._flush_thread.start() 182 else: 183 self._logger.notice("autoflush is disabled") 184 185 # debug the threads 186 for thread in threading.enumerate(): 187 self._logger.debug("after running thread: %s", thread.name) 188 self._logger.debug("number of active threads: %s", threading.active_count()) 189 190 self._logger.notice("start succeeded") 191 self._logger.debug("ListenWebSocketClient.start LEAVE") 192 return True 193 194 except Exception as e: # pylint: disable=broad-except 195 self._logger.error( 196 "WebSocketException in ListenWebSocketClient.start: %s", e 197 ) 198 self._logger.debug("ListenWebSocketClient.start LEAVE") 199 if self._config.options.get("termination_exception_connect") is True: 200 raise e 201 return False
Starts the WebSocket connection for live transcription.
def
on( self, event: deepgram.clients.listen.enums.LiveTranscriptionEvents, handler: Callable) -> None:
205 def on( 206 self, event: LiveTranscriptionEvents, handler: Callable 207 ) -> None: # registers event handlers for specific events 208 """ 209 Registers event handlers for specific events. 210 """ 211 self._logger.info("event subscribed: %s", event) 212 if event in LiveTranscriptionEvents.__members__.values() and callable(handler): 213 self._event_handlers[event].append(handler)
Registers event handlers for specific events.
def
keep_alive(self) -> bool:
486 def keep_alive(self) -> bool: 487 """ 488 Sends a KeepAlive message 489 """ 490 self._logger.spam("ListenWebSocketClient.keep_alive ENTER") 491 492 self._logger.notice("Sending KeepAlive...") 493 ret = self.send(json.dumps({"type": "KeepAlive"})) 494 495 if not ret: 496 self._logger.error("keep_alive failed") 497 self._logger.spam("ListenWebSocketClient.keep_alive LEAVE") 498 return False 499 500 self._logger.notice("keep_alive succeeded") 501 self._logger.spam("ListenWebSocketClient.keep_alive LEAVE") 502 503 return True
Sends a KeepAlive message
def
finalize(self) -> bool:
505 def finalize(self) -> bool: 506 """ 507 Finalizes the Transcript connection by flushing it 508 """ 509 self._logger.spam("ListenWebSocketClient.finalize ENTER") 510 511 self._logger.notice("Sending Finalize...") 512 ret = self.send(json.dumps({"type": "Finalize"})) 513 514 if not ret: 515 self._logger.error("finalize failed") 516 self._logger.spam("ListenWebSocketClient.finalize LEAVE") 517 return False 518 519 self._logger.notice("finalize succeeded") 520 self._logger.spam("ListenWebSocketClient.finalize LEAVE") 521 522 return True
Finalizes the Transcript connection by flushing it
def
finish(self) -> bool:
528 def finish(self) -> bool: 529 """ 530 Closes the WebSocket connection gracefully. 531 """ 532 self._logger.spam("ListenWebSocketClient.finish ENTER") 533 534 # call parent finish 535 if super().finish() is False: 536 self._logger.error("ListenWebSocketClient.finish failed") 537 538 # debug the threads 539 for thread in threading.enumerate(): 540 self._logger.debug("before running thread: %s", thread.name) 541 self._logger.debug("number of active threads: %s", threading.active_count()) 542 543 # stop the threads 544 self._logger.verbose("cancelling tasks...") 545 if self._flush_thread is not None: 546 self._flush_thread.join() 547 self._flush_thread = None 548 self._logger.notice("processing _flush_thread thread joined") 549 550 if self._keep_alive_thread is not None: 551 self._keep_alive_thread.join() 552 self._keep_alive_thread = None 553 self._logger.notice("processing _keep_alive_thread thread joined") 554 555 if self._listen_thread is not None: 556 self._listen_thread.join() 557 self._listen_thread = None 558 self._logger.notice("listening thread joined") 559 560 # debug the threads 561 for thread in threading.enumerate(): 562 self._logger.debug("before running thread: %s", thread.name) 563 self._logger.debug("number of active threads: %s", threading.active_count()) 564 565 self._logger.notice("finish succeeded") 566 self._logger.spam("ListenWebSocketClient.finish LEAVE") 567 return True
Closes the WebSocket connection gracefully.