deepgram.clients.agent.v1.websocket.async_client
1# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. 2# Use of this source code is governed by a MIT license that can be found in the LICENSE file. 3# SPDX-License-Identifier: MIT 4 5import asyncio 6import json 7import logging 8from typing import Dict, Union, Optional, cast, Any, Callable 9import threading 10 11from .....utils import verboselogs 12from .....options import DeepgramClientOptions 13from ...enums import AgentWebSocketEvents 14from ....common import AbstractAsyncWebSocketClient 15from ....common import DeepgramError 16 17from .response import ( 18 OpenResponse, 19 WelcomeResponse, 20 SettingsAppliedResponse, 21 ConversationTextResponse, 22 UserStartedSpeakingResponse, 23 AgentThinkingResponse, 24 FunctionCalling, 25 FunctionCallRequest, 26 AgentStartedSpeakingResponse, 27 AgentAudioDoneResponse, 28 InjectionRefusedResponse, 29 CloseResponse, 30 ErrorResponse, 31 UnhandledResponse, 32) 33from .options import ( 34 SettingsConfigurationOptions, 35 UpdateInstructionsOptions, 36 UpdateSpeakOptions, 37 InjectAgentMessageOptions, 38 FunctionCallResponse, 39 AgentKeepAlive, 40) 41 42from .....audio.speaker import ( 43 Speaker, 44 RATE as SPEAKER_RATE, 45 CHANNELS as SPEAKER_CHANNELS, 46 PLAYBACK_DELTA as SPEAKER_PLAYBACK_DELTA, 47) 48from .....audio.microphone import ( 49 Microphone, 50 RATE as MICROPHONE_RATE, 51 CHANNELS as MICROPHONE_CHANNELS, 52) 53 54ONE_SECOND = 1 55HALF_SECOND = 0.5 56DEEPGRAM_INTERVAL = 5 57 58 59class AsyncAgentWebSocketClient( 60 AbstractAsyncWebSocketClient 61): # pylint: disable=too-many-instance-attributes 62 """ 63 Client for interacting with Deepgram's live transcription services over WebSockets. 64 65 This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events. 66 67 Args: 68 config (DeepgramClientOptions): all the options for the client. 69 """ 70 71 _logger: verboselogs.VerboseLogger 72 _config: DeepgramClientOptions 73 _endpoint: str 74 75 _event_handlers: Dict[AgentWebSocketEvents, list] 76 77 _keep_alive_thread: Union[asyncio.Task, None] 78 79 _kwargs: Optional[Dict] = None 80 _addons: Optional[Dict] = None 81 # note the distinction here. We can't use _config because it's already used in the parent 82 _settings: Optional[SettingsConfigurationOptions] = None 83 _headers: Optional[Dict] = None 84 85 _speaker_created: bool = False 86 _speaker: Optional[Speaker] = None 87 _microphone_created: bool = False 88 _microphone: Optional[Microphone] = None 89 90 def __init__(self, config: DeepgramClientOptions): 91 if config is None: 92 raise DeepgramError("Config is required") 93 94 self._logger = verboselogs.VerboseLogger(__name__) 95 self._logger.addHandler(logging.StreamHandler()) 96 self._logger.setLevel(config.verbose) 97 98 self._config = config 99 100 # needs to be "wss://agent.deepgram.com/agent" 101 self._endpoint = "agent" 102 103 # override the endpoint since it needs to be "wss://agent.deepgram.com/agent" 104 self._config.url = "agent.deepgram.com" 105 self._keep_alive_thread = None 106 107 # init handlers 108 self._event_handlers = { 109 event: [] for event in AgentWebSocketEvents.__members__.values() 110 } 111 112 if self._config.options.get("microphone_record") == "true": 113 self._logger.info("microphone_record is enabled") 114 rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE) 115 channels = self._config.options.get( 116 "microphone_record_channels", MICROPHONE_CHANNELS 117 ) 118 device_index = self._config.options.get("microphone_record_device_index") 119 120 self._logger.debug("rate: %s", rate) 121 self._logger.debug("channels: %s", channels) 122 if device_index is not None: 123 self._logger.debug("device_index: %s", device_index) 124 125 self._microphone_created = True 126 127 if device_index is not None: 128 self._microphone = Microphone( 129 rate=rate, 130 channels=channels, 131 verbose=self._config.verbose, 132 input_device_index=device_index, 133 ) 134 else: 135 self._microphone = Microphone( 136 rate=rate, 137 channels=channels, 138 verbose=self._config.verbose, 139 ) 140 141 if self._config.options.get("speaker_playback") == "true": 142 self._logger.info("speaker_playback is enabled") 143 rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE) 144 channels = self._config.options.get( 145 "speaker_playback_channels", SPEAKER_CHANNELS 146 ) 147 playback_delta_in_ms = self._config.options.get( 148 "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA 149 ) 150 device_index = self._config.options.get("speaker_playback_device_index") 151 152 self._logger.debug("rate: %s", rate) 153 self._logger.debug("channels: %s", channels) 154 155 self._speaker_created = True 156 157 if device_index is not None: 158 self._logger.debug("device_index: %s", device_index) 159 160 self._speaker = Speaker( 161 rate=rate, 162 channels=channels, 163 last_play_delta_in_ms=playback_delta_in_ms, 164 verbose=self._config.verbose, 165 output_device_index=device_index, 166 microphone=self._microphone, 167 ) 168 else: 169 self._speaker = Speaker( 170 rate=rate, 171 channels=channels, 172 last_play_delta_in_ms=playback_delta_in_ms, 173 verbose=self._config.verbose, 174 microphone=self._microphone, 175 ) 176 # call the parent constructor 177 super().__init__(self._config, self._endpoint) 178 179 # pylint: disable=too-many-branches,too-many-statements 180 async def start( 181 self, 182 options: Optional[SettingsConfigurationOptions] = None, 183 addons: Optional[Dict] = None, 184 headers: Optional[Dict] = None, 185 members: Optional[Dict] = None, 186 **kwargs, 187 ) -> bool: 188 """ 189 Starts the WebSocket connection for agent API. 190 """ 191 self._logger.debug("AsyncAgentWebSocketClient.start ENTER") 192 self._logger.info("settings: %s", options) 193 self._logger.info("addons: %s", addons) 194 self._logger.info("headers: %s", headers) 195 self._logger.info("members: %s", members) 196 self._logger.info("kwargs: %s", kwargs) 197 198 if isinstance(options, SettingsConfigurationOptions) and not options.check(): 199 self._logger.error("settings.check failed") 200 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 201 raise DeepgramError("Fatal agent settings error") 202 203 self._addons = addons 204 self._headers = headers 205 206 # add "members" as members of the class 207 if members is not None: 208 self.__dict__.update(members) 209 210 # set kwargs as members of the class 211 if kwargs is not None: 212 self._kwargs = kwargs 213 else: 214 self._kwargs = {} 215 216 if isinstance(options, SettingsConfigurationOptions): 217 self._logger.info("options is class") 218 self._settings = options 219 elif isinstance(options, dict): 220 self._logger.info("options is dict") 221 self._settings = SettingsConfigurationOptions.from_dict(options) 222 elif isinstance(options, str): 223 self._logger.info("options is json") 224 self._settings = SettingsConfigurationOptions.from_json(options) 225 else: 226 raise DeepgramError("Invalid options type") 227 228 if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"): 229 raise DeepgramError("Keyterms are only supported for nova-3 models") 230 231 try: 232 # speaker substitutes the listening thread 233 if self._speaker is not None: 234 self._logger.notice("passing speaker to delegate_listening") 235 super().delegate_listening(self._speaker) 236 237 # call parent start 238 if ( 239 await super().start( 240 {}, 241 self._addons, 242 self._headers, 243 **dict(cast(Dict[Any, Any], self._kwargs)), 244 ) 245 is False 246 ): 247 self._logger.error("AsyncAgentWebSocketClient.start failed") 248 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 249 return False 250 251 if self._speaker is not None: 252 self._logger.notice("speaker is delegate_listening. Starting speaker") 253 self._speaker.start() 254 255 if self._speaker is not None and self._microphone is not None: 256 self._logger.notice( 257 "speaker is delegate_listening. Starting microphone" 258 ) 259 self._microphone.set_callback(self.send) 260 self._microphone.start() 261 262 # debug the threads 263 for thread in threading.enumerate(): 264 self._logger.debug("after running thread: %s", thread.name) 265 self._logger.debug("number of active threads: %s", threading.active_count()) 266 267 # keepalive thread 268 if self._config.is_keep_alive_enabled(): 269 self._logger.notice("keepalive is enabled") 270 self._keep_alive_thread = asyncio.create_task(self._keep_alive()) 271 else: 272 self._logger.notice("keepalive is disabled") 273 274 # debug the threads 275 for thread in threading.enumerate(): 276 self._logger.debug("after running thread: %s", thread.name) 277 self._logger.debug("number of active threads: %s", threading.active_count()) 278 279 # send the configurationsetting message 280 self._logger.notice("Sending ConfigurationSettings...") 281 ret_send_cs = await self.send(str(self._settings)) 282 if not ret_send_cs: 283 self._logger.error("ConfigurationSettings failed") 284 285 err_error: ErrorResponse = ErrorResponse( 286 "Exception in AsyncAgentWebSocketClient.start", 287 "ConfigurationSettings failed to send", 288 "Exception", 289 ) 290 await self._emit( 291 AgentWebSocketEvents(AgentWebSocketEvents.Error), 292 error=err_error, 293 **dict(cast(Dict[Any, Any], self._kwargs)), 294 ) 295 296 self._logger.debug("AgentWebSocketClient.start LEAVE") 297 return False 298 299 self._logger.notice("start succeeded") 300 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 301 return True 302 303 except Exception as e: # pylint: disable=broad-except 304 self._logger.error( 305 "WebSocketException in AsyncAgentWebSocketClient.start: %s", e 306 ) 307 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 308 if self._config.options.get("termination_exception_connect") is True: 309 raise e 310 return False 311 312 # pylint: enable=too-many-branches,too-many-statements 313 314 def on(self, event: AgentWebSocketEvents, handler: Callable) -> None: 315 """ 316 Registers event handlers for specific events. 317 """ 318 self._logger.info("event subscribed: %s", event) 319 if event in AgentWebSocketEvents.__members__.values() and callable(handler): 320 self._event_handlers[event].append(handler) 321 322 async def _emit(self, event: AgentWebSocketEvents, *args, **kwargs) -> None: 323 """ 324 Emits events to the registered event handlers. 325 """ 326 self._logger.debug("AsyncAgentWebSocketClient._emit ENTER") 327 self._logger.debug("callback handlers for: %s", event) 328 329 # debug the threads 330 for thread in threading.enumerate(): 331 self._logger.debug("after running thread: %s", thread.name) 332 self._logger.debug("number of active threads: %s", threading.active_count()) 333 334 self._logger.debug("callback handlers for: %s", event) 335 tasks = [] 336 for handler in self._event_handlers[event]: 337 task = asyncio.create_task(handler(self, *args, **kwargs)) 338 tasks.append(task) 339 340 if tasks: 341 self._logger.debug("waiting for tasks to finish...") 342 await asyncio.gather(*tasks, return_exceptions=True) 343 tasks.clear() 344 345 # debug the threads 346 for thread in threading.enumerate(): 347 self._logger.debug("after running thread: %s", thread.name) 348 self._logger.debug("number of active threads: %s", threading.active_count()) 349 350 self._logger.debug("AsyncAgentWebSocketClient._emit LEAVE") 351 352 # pylint: disable=too-many-locals,too-many-statements 353 async def _process_text(self, message: str) -> None: 354 """ 355 Processes messages received over the WebSocket connection. 356 """ 357 self._logger.debug("AsyncAgentWebSocketClient._process_text ENTER") 358 359 try: 360 self._logger.debug("Text data received") 361 if len(message) == 0: 362 self._logger.debug("message is empty") 363 self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE") 364 return 365 366 data = json.loads(message) 367 response_type = data.get("type") 368 self._logger.debug("response_type: %s, data: %s", response_type, data) 369 370 match response_type: 371 case AgentWebSocketEvents.Open: 372 open_result: OpenResponse = OpenResponse.from_json(message) 373 self._logger.verbose("OpenResponse: %s", open_result) 374 await self._emit( 375 AgentWebSocketEvents(AgentWebSocketEvents.Open), 376 open=open_result, 377 **dict(cast(Dict[Any, Any], self._kwargs)), 378 ) 379 case AgentWebSocketEvents.Welcome: 380 welcome_result: WelcomeResponse = WelcomeResponse.from_json(message) 381 self._logger.verbose("WelcomeResponse: %s", welcome_result) 382 await self._emit( 383 AgentWebSocketEvents(AgentWebSocketEvents.Welcome), 384 welcome=welcome_result, 385 **dict(cast(Dict[Any, Any], self._kwargs)), 386 ) 387 case AgentWebSocketEvents.SettingsApplied: 388 settings_applied_result: SettingsAppliedResponse = ( 389 SettingsAppliedResponse.from_json(message) 390 ) 391 self._logger.verbose( 392 "SettingsAppliedResponse: %s", settings_applied_result 393 ) 394 await self._emit( 395 AgentWebSocketEvents(AgentWebSocketEvents.SettingsApplied), 396 settings_applied=settings_applied_result, 397 **dict(cast(Dict[Any, Any], self._kwargs)), 398 ) 399 case AgentWebSocketEvents.ConversationText: 400 conversation_text_result: ConversationTextResponse = ( 401 ConversationTextResponse.from_json(message) 402 ) 403 self._logger.verbose( 404 "ConversationTextResponse: %s", conversation_text_result 405 ) 406 await self._emit( 407 AgentWebSocketEvents(AgentWebSocketEvents.ConversationText), 408 conversation_text=conversation_text_result, 409 **dict(cast(Dict[Any, Any], self._kwargs)), 410 ) 411 case AgentWebSocketEvents.UserStartedSpeaking: 412 user_started_speaking_result: UserStartedSpeakingResponse = ( 413 UserStartedSpeakingResponse.from_json(message) 414 ) 415 self._logger.verbose( 416 "UserStartedSpeakingResponse: %s", user_started_speaking_result 417 ) 418 await self._emit( 419 AgentWebSocketEvents(AgentWebSocketEvents.UserStartedSpeaking), 420 user_started_speaking=user_started_speaking_result, 421 **dict(cast(Dict[Any, Any], self._kwargs)), 422 ) 423 case AgentWebSocketEvents.AgentThinking: 424 agent_thinking_result: AgentThinkingResponse = ( 425 AgentThinkingResponse.from_json(message) 426 ) 427 self._logger.verbose( 428 "AgentThinkingResponse: %s", agent_thinking_result 429 ) 430 await self._emit( 431 AgentWebSocketEvents(AgentWebSocketEvents.AgentThinking), 432 agent_thinking=agent_thinking_result, 433 **dict(cast(Dict[Any, Any], self._kwargs)), 434 ) 435 case AgentWebSocketEvents.FunctionCalling: 436 function_calling_result: FunctionCalling = ( 437 FunctionCalling.from_json(message) 438 ) 439 self._logger.verbose("FunctionCalling: %s", function_calling_result) 440 await self._emit( 441 AgentWebSocketEvents(AgentWebSocketEvents.FunctionCalling), 442 function_calling=function_calling_result, 443 **dict(cast(Dict[Any, Any], self._kwargs)), 444 ) 445 case AgentWebSocketEvents.FunctionCallRequest: 446 function_call_request_result: FunctionCallRequest = ( 447 FunctionCallRequest.from_json(message) 448 ) 449 self._logger.verbose( 450 "FunctionCallRequest: %s", function_call_request_result 451 ) 452 await self._emit( 453 AgentWebSocketEvents(AgentWebSocketEvents.FunctionCallRequest), 454 function_call_request=function_call_request_result, 455 **dict(cast(Dict[Any, Any], self._kwargs)), 456 ) 457 case AgentWebSocketEvents.AgentStartedSpeaking: 458 agent_started_speaking_result: AgentStartedSpeakingResponse = ( 459 AgentStartedSpeakingResponse.from_json(message) 460 ) 461 self._logger.verbose( 462 "AgentStartedSpeakingResponse: %s", 463 agent_started_speaking_result, 464 ) 465 await self._emit( 466 AgentWebSocketEvents(AgentWebSocketEvents.AgentStartedSpeaking), 467 agent_started_speaking=agent_started_speaking_result, 468 **dict(cast(Dict[Any, Any], self._kwargs)), 469 ) 470 case AgentWebSocketEvents.AgentAudioDone: 471 agent_audio_done_result: AgentAudioDoneResponse = ( 472 AgentAudioDoneResponse.from_json(message) 473 ) 474 self._logger.verbose( 475 "AgentAudioDoneResponse: %s", agent_audio_done_result 476 ) 477 await self._emit( 478 AgentWebSocketEvents(AgentWebSocketEvents.AgentAudioDone), 479 agent_audio_done=agent_audio_done_result, 480 **dict(cast(Dict[Any, Any], self._kwargs)), 481 ) 482 case AgentWebSocketEvents.InjectionRefused: 483 injection_refused_result: InjectionRefusedResponse = ( 484 InjectionRefusedResponse.from_json(message) 485 ) 486 self._logger.verbose( 487 "InjectionRefused: %s", injection_refused_result 488 ) 489 await self._emit( 490 AgentWebSocketEvents(AgentWebSocketEvents.InjectionRefused), 491 injection_refused=injection_refused_result, 492 **dict(cast(Dict[Any, Any], self._kwargs)), 493 ) 494 case AgentWebSocketEvents.Close: 495 close_result: CloseResponse = CloseResponse.from_json(message) 496 self._logger.verbose("CloseResponse: %s", close_result) 497 await self._emit( 498 AgentWebSocketEvents(AgentWebSocketEvents.Close), 499 close=close_result, 500 **dict(cast(Dict[Any, Any], self._kwargs)), 501 ) 502 case AgentWebSocketEvents.Error: 503 err_error: ErrorResponse = ErrorResponse.from_json(message) 504 self._logger.verbose("ErrorResponse: %s", err_error) 505 await self._emit( 506 AgentWebSocketEvents(AgentWebSocketEvents.Error), 507 error=err_error, 508 **dict(cast(Dict[Any, Any], self._kwargs)), 509 ) 510 case _: 511 self._logger.warning( 512 "Unknown Message: response_type: %s, data: %s", 513 response_type, 514 data, 515 ) 516 unhandled_error: UnhandledResponse = UnhandledResponse( 517 type=AgentWebSocketEvents(AgentWebSocketEvents.Unhandled), 518 raw=message, 519 ) 520 await self._emit( 521 AgentWebSocketEvents(AgentWebSocketEvents.Unhandled), 522 unhandled=unhandled_error, 523 **dict(cast(Dict[Any, Any], self._kwargs)), 524 ) 525 526 self._logger.notice("_process_text Succeeded") 527 self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE") 528 529 except Exception as e: # pylint: disable=broad-except 530 self._logger.error( 531 "Exception in AsyncAgentWebSocketClient._process_text: %s", e 532 ) 533 e_error: ErrorResponse = ErrorResponse( 534 "Exception in AsyncAgentWebSocketClient._process_text", 535 f"{e}", 536 "Exception", 537 ) 538 await self._emit( 539 AgentWebSocketEvents(AgentWebSocketEvents.Error), 540 error=e_error, 541 **dict(cast(Dict[Any, Any], self._kwargs)), 542 ) 543 544 # signal exit and close 545 await super()._signal_exit() 546 547 self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE") 548 549 if self._config.options.get("termination_exception") is True: 550 raise 551 return 552 553 # pylint: enable=too-many-locals,too-many-statements 554 555 async def _process_binary(self, message: bytes) -> None: 556 self._logger.debug("AsyncAgentWebSocketClient._process_binary ENTER") 557 self._logger.debug("Binary data received") 558 559 await self._emit( 560 AgentWebSocketEvents(AgentWebSocketEvents.AudioData), 561 data=message, 562 **dict(cast(Dict[Any, Any], self._kwargs)), 563 ) 564 565 self._logger.notice("_process_binary Succeeded") 566 self._logger.debug("AsyncAgentWebSocketClient._process_binary LEAVE") 567 568 # pylint: disable=too-many-return-statements 569 async def _keep_alive(self) -> None: 570 """ 571 Sends keepalive messages to the WebSocket connection. 572 """ 573 self._logger.debug("AsyncAgentWebSocketClient._keep_alive ENTER") 574 575 counter = 0 576 while True: 577 try: 578 counter += 1 579 await asyncio.sleep(ONE_SECOND) 580 581 if self._exit_event.is_set(): 582 self._logger.notice("_keep_alive exiting gracefully") 583 self._logger.debug("AsyncAgentWebSocketClient._keep_alive LEAVE") 584 return 585 586 # deepgram keepalive 587 if counter % DEEPGRAM_INTERVAL == 0: 588 await self.keep_alive() 589 590 except Exception as e: # pylint: disable=broad-except 591 self._logger.error( 592 "Exception in AsyncAgentWebSocketClient._keep_alive: %s", e 593 ) 594 e_error: ErrorResponse = ErrorResponse( 595 "Exception in AsyncAgentWebSocketClient._keep_alive", 596 f"{e}", 597 "Exception", 598 ) 599 self._logger.error( 600 "Exception in AsyncAgentWebSocketClient._keep_alive: %s", str(e) 601 ) 602 await self._emit( 603 AgentWebSocketEvents(AgentWebSocketEvents.Error), 604 error=e_error, 605 **dict(cast(Dict[Any, Any], self._kwargs)), 606 ) 607 608 # signal exit and close 609 await super()._signal_exit() 610 611 self._logger.debug("AsyncAgentWebSocketClient._keep_alive LEAVE") 612 613 if self._config.options.get("termination_exception") is True: 614 raise 615 return 616 617 async def keep_alive(self) -> bool: 618 """ 619 Sends a KeepAlive message 620 """ 621 self._logger.spam("AsyncAgentWebSocketClient.keep_alive ENTER") 622 623 self._logger.notice("Sending KeepAlive...") 624 ret = await self.send(json.dumps({"type": "KeepAlive"})) 625 626 if not ret: 627 self._logger.error("keep_alive failed") 628 self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE") 629 return False 630 631 self._logger.notice("keep_alive succeeded") 632 self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE") 633 634 return True 635 636 async def _close_message(self) -> bool: 637 # TODO: No known API close message # pylint: disable=fixme 638 # return await self.send(json.dumps({"type": "Close"})) 639 return True 640 641 async def finish(self) -> bool: 642 """ 643 Closes the WebSocket connection gracefully. 644 """ 645 self._logger.debug("AsyncAgentWebSocketClient.finish ENTER") 646 647 # stop the threads 648 self._logger.verbose("cancelling tasks...") 649 try: 650 # call parent finish 651 if await super().finish() is False: 652 self._logger.error("AsyncAgentWebSocketClient.finish failed") 653 654 if self._microphone is not None and self._microphone_created: 655 self._microphone.finish() 656 self._microphone_created = False 657 658 if self._speaker is not None and self._speaker_created: 659 self._speaker.finish() 660 self._speaker_created = False 661 662 # Before cancelling, check if the tasks were created 663 # debug the threads 664 for thread in threading.enumerate(): 665 self._logger.debug("before running thread: %s", thread.name) 666 self._logger.debug("number of active threads: %s", threading.active_count()) 667 668 tasks = [] 669 if self._keep_alive_thread is not None: 670 self._keep_alive_thread.cancel() 671 tasks.append(self._keep_alive_thread) 672 self._logger.notice("processing _keep_alive_thread cancel...") 673 674 # Use asyncio.gather to wait for tasks to be cancelled 675 # Prevent indefinite waiting by setting a timeout 676 await asyncio.wait_for(asyncio.gather(*tasks), timeout=10) 677 self._logger.notice("threads joined") 678 679 self._speaker = None 680 self._microphone = None 681 682 # debug the threads 683 for thread in threading.enumerate(): 684 self._logger.debug("after running thread: %s", thread.name) 685 self._logger.debug("number of active threads: %s", threading.active_count()) 686 687 self._logger.notice("finish succeeded") 688 self._logger.spam("AsyncAgentWebSocketClient.finish LEAVE") 689 return True 690 691 except asyncio.CancelledError as e: 692 self._logger.error("tasks cancelled error: %s", e) 693 self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE") 694 return False 695 696 except asyncio.TimeoutError as e: 697 self._logger.error("tasks cancellation timed out: %s", e) 698 self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE") 699 return False
ONE_SECOND =
1
HALF_SECOND =
0.5
DEEPGRAM_INTERVAL =
5
class
AsyncAgentWebSocketClient(deepgram.clients.common.v1.abstract_async_websocket.AbstractAsyncWebSocketClient):
60class AsyncAgentWebSocketClient( 61 AbstractAsyncWebSocketClient 62): # pylint: disable=too-many-instance-attributes 63 """ 64 Client for interacting with Deepgram's live transcription services over WebSockets. 65 66 This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events. 67 68 Args: 69 config (DeepgramClientOptions): all the options for the client. 70 """ 71 72 _logger: verboselogs.VerboseLogger 73 _config: DeepgramClientOptions 74 _endpoint: str 75 76 _event_handlers: Dict[AgentWebSocketEvents, list] 77 78 _keep_alive_thread: Union[asyncio.Task, None] 79 80 _kwargs: Optional[Dict] = None 81 _addons: Optional[Dict] = None 82 # note the distinction here. We can't use _config because it's already used in the parent 83 _settings: Optional[SettingsConfigurationOptions] = None 84 _headers: Optional[Dict] = None 85 86 _speaker_created: bool = False 87 _speaker: Optional[Speaker] = None 88 _microphone_created: bool = False 89 _microphone: Optional[Microphone] = None 90 91 def __init__(self, config: DeepgramClientOptions): 92 if config is None: 93 raise DeepgramError("Config is required") 94 95 self._logger = verboselogs.VerboseLogger(__name__) 96 self._logger.addHandler(logging.StreamHandler()) 97 self._logger.setLevel(config.verbose) 98 99 self._config = config 100 101 # needs to be "wss://agent.deepgram.com/agent" 102 self._endpoint = "agent" 103 104 # override the endpoint since it needs to be "wss://agent.deepgram.com/agent" 105 self._config.url = "agent.deepgram.com" 106 self._keep_alive_thread = None 107 108 # init handlers 109 self._event_handlers = { 110 event: [] for event in AgentWebSocketEvents.__members__.values() 111 } 112 113 if self._config.options.get("microphone_record") == "true": 114 self._logger.info("microphone_record is enabled") 115 rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE) 116 channels = self._config.options.get( 117 "microphone_record_channels", MICROPHONE_CHANNELS 118 ) 119 device_index = self._config.options.get("microphone_record_device_index") 120 121 self._logger.debug("rate: %s", rate) 122 self._logger.debug("channels: %s", channels) 123 if device_index is not None: 124 self._logger.debug("device_index: %s", device_index) 125 126 self._microphone_created = True 127 128 if device_index is not None: 129 self._microphone = Microphone( 130 rate=rate, 131 channels=channels, 132 verbose=self._config.verbose, 133 input_device_index=device_index, 134 ) 135 else: 136 self._microphone = Microphone( 137 rate=rate, 138 channels=channels, 139 verbose=self._config.verbose, 140 ) 141 142 if self._config.options.get("speaker_playback") == "true": 143 self._logger.info("speaker_playback is enabled") 144 rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE) 145 channels = self._config.options.get( 146 "speaker_playback_channels", SPEAKER_CHANNELS 147 ) 148 playback_delta_in_ms = self._config.options.get( 149 "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA 150 ) 151 device_index = self._config.options.get("speaker_playback_device_index") 152 153 self._logger.debug("rate: %s", rate) 154 self._logger.debug("channels: %s", channels) 155 156 self._speaker_created = True 157 158 if device_index is not None: 159 self._logger.debug("device_index: %s", device_index) 160 161 self._speaker = Speaker( 162 rate=rate, 163 channels=channels, 164 last_play_delta_in_ms=playback_delta_in_ms, 165 verbose=self._config.verbose, 166 output_device_index=device_index, 167 microphone=self._microphone, 168 ) 169 else: 170 self._speaker = Speaker( 171 rate=rate, 172 channels=channels, 173 last_play_delta_in_ms=playback_delta_in_ms, 174 verbose=self._config.verbose, 175 microphone=self._microphone, 176 ) 177 # call the parent constructor 178 super().__init__(self._config, self._endpoint) 179 180 # pylint: disable=too-many-branches,too-many-statements 181 async def start( 182 self, 183 options: Optional[SettingsConfigurationOptions] = None, 184 addons: Optional[Dict] = None, 185 headers: Optional[Dict] = None, 186 members: Optional[Dict] = None, 187 **kwargs, 188 ) -> bool: 189 """ 190 Starts the WebSocket connection for agent API. 191 """ 192 self._logger.debug("AsyncAgentWebSocketClient.start ENTER") 193 self._logger.info("settings: %s", options) 194 self._logger.info("addons: %s", addons) 195 self._logger.info("headers: %s", headers) 196 self._logger.info("members: %s", members) 197 self._logger.info("kwargs: %s", kwargs) 198 199 if isinstance(options, SettingsConfigurationOptions) and not options.check(): 200 self._logger.error("settings.check failed") 201 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 202 raise DeepgramError("Fatal agent settings error") 203 204 self._addons = addons 205 self._headers = headers 206 207 # add "members" as members of the class 208 if members is not None: 209 self.__dict__.update(members) 210 211 # set kwargs as members of the class 212 if kwargs is not None: 213 self._kwargs = kwargs 214 else: 215 self._kwargs = {} 216 217 if isinstance(options, SettingsConfigurationOptions): 218 self._logger.info("options is class") 219 self._settings = options 220 elif isinstance(options, dict): 221 self._logger.info("options is dict") 222 self._settings = SettingsConfigurationOptions.from_dict(options) 223 elif isinstance(options, str): 224 self._logger.info("options is json") 225 self._settings = SettingsConfigurationOptions.from_json(options) 226 else: 227 raise DeepgramError("Invalid options type") 228 229 if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"): 230 raise DeepgramError("Keyterms are only supported for nova-3 models") 231 232 try: 233 # speaker substitutes the listening thread 234 if self._speaker is not None: 235 self._logger.notice("passing speaker to delegate_listening") 236 super().delegate_listening(self._speaker) 237 238 # call parent start 239 if ( 240 await super().start( 241 {}, 242 self._addons, 243 self._headers, 244 **dict(cast(Dict[Any, Any], self._kwargs)), 245 ) 246 is False 247 ): 248 self._logger.error("AsyncAgentWebSocketClient.start failed") 249 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 250 return False 251 252 if self._speaker is not None: 253 self._logger.notice("speaker is delegate_listening. Starting speaker") 254 self._speaker.start() 255 256 if self._speaker is not None and self._microphone is not None: 257 self._logger.notice( 258 "speaker is delegate_listening. Starting microphone" 259 ) 260 self._microphone.set_callback(self.send) 261 self._microphone.start() 262 263 # debug the threads 264 for thread in threading.enumerate(): 265 self._logger.debug("after running thread: %s", thread.name) 266 self._logger.debug("number of active threads: %s", threading.active_count()) 267 268 # keepalive thread 269 if self._config.is_keep_alive_enabled(): 270 self._logger.notice("keepalive is enabled") 271 self._keep_alive_thread = asyncio.create_task(self._keep_alive()) 272 else: 273 self._logger.notice("keepalive is disabled") 274 275 # debug the threads 276 for thread in threading.enumerate(): 277 self._logger.debug("after running thread: %s", thread.name) 278 self._logger.debug("number of active threads: %s", threading.active_count()) 279 280 # send the configurationsetting message 281 self._logger.notice("Sending ConfigurationSettings...") 282 ret_send_cs = await self.send(str(self._settings)) 283 if not ret_send_cs: 284 self._logger.error("ConfigurationSettings failed") 285 286 err_error: ErrorResponse = ErrorResponse( 287 "Exception in AsyncAgentWebSocketClient.start", 288 "ConfigurationSettings failed to send", 289 "Exception", 290 ) 291 await self._emit( 292 AgentWebSocketEvents(AgentWebSocketEvents.Error), 293 error=err_error, 294 **dict(cast(Dict[Any, Any], self._kwargs)), 295 ) 296 297 self._logger.debug("AgentWebSocketClient.start LEAVE") 298 return False 299 300 self._logger.notice("start succeeded") 301 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 302 return True 303 304 except Exception as e: # pylint: disable=broad-except 305 self._logger.error( 306 "WebSocketException in AsyncAgentWebSocketClient.start: %s", e 307 ) 308 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 309 if self._config.options.get("termination_exception_connect") is True: 310 raise e 311 return False 312 313 # pylint: enable=too-many-branches,too-many-statements 314 315 def on(self, event: AgentWebSocketEvents, handler: Callable) -> None: 316 """ 317 Registers event handlers for specific events. 318 """ 319 self._logger.info("event subscribed: %s", event) 320 if event in AgentWebSocketEvents.__members__.values() and callable(handler): 321 self._event_handlers[event].append(handler) 322 323 async def _emit(self, event: AgentWebSocketEvents, *args, **kwargs) -> None: 324 """ 325 Emits events to the registered event handlers. 326 """ 327 self._logger.debug("AsyncAgentWebSocketClient._emit ENTER") 328 self._logger.debug("callback handlers for: %s", event) 329 330 # debug the threads 331 for thread in threading.enumerate(): 332 self._logger.debug("after running thread: %s", thread.name) 333 self._logger.debug("number of active threads: %s", threading.active_count()) 334 335 self._logger.debug("callback handlers for: %s", event) 336 tasks = [] 337 for handler in self._event_handlers[event]: 338 task = asyncio.create_task(handler(self, *args, **kwargs)) 339 tasks.append(task) 340 341 if tasks: 342 self._logger.debug("waiting for tasks to finish...") 343 await asyncio.gather(*tasks, return_exceptions=True) 344 tasks.clear() 345 346 # debug the threads 347 for thread in threading.enumerate(): 348 self._logger.debug("after running thread: %s", thread.name) 349 self._logger.debug("number of active threads: %s", threading.active_count()) 350 351 self._logger.debug("AsyncAgentWebSocketClient._emit LEAVE") 352 353 # pylint: disable=too-many-locals,too-many-statements 354 async def _process_text(self, message: str) -> None: 355 """ 356 Processes messages received over the WebSocket connection. 357 """ 358 self._logger.debug("AsyncAgentWebSocketClient._process_text ENTER") 359 360 try: 361 self._logger.debug("Text data received") 362 if len(message) == 0: 363 self._logger.debug("message is empty") 364 self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE") 365 return 366 367 data = json.loads(message) 368 response_type = data.get("type") 369 self._logger.debug("response_type: %s, data: %s", response_type, data) 370 371 match response_type: 372 case AgentWebSocketEvents.Open: 373 open_result: OpenResponse = OpenResponse.from_json(message) 374 self._logger.verbose("OpenResponse: %s", open_result) 375 await self._emit( 376 AgentWebSocketEvents(AgentWebSocketEvents.Open), 377 open=open_result, 378 **dict(cast(Dict[Any, Any], self._kwargs)), 379 ) 380 case AgentWebSocketEvents.Welcome: 381 welcome_result: WelcomeResponse = WelcomeResponse.from_json(message) 382 self._logger.verbose("WelcomeResponse: %s", welcome_result) 383 await self._emit( 384 AgentWebSocketEvents(AgentWebSocketEvents.Welcome), 385 welcome=welcome_result, 386 **dict(cast(Dict[Any, Any], self._kwargs)), 387 ) 388 case AgentWebSocketEvents.SettingsApplied: 389 settings_applied_result: SettingsAppliedResponse = ( 390 SettingsAppliedResponse.from_json(message) 391 ) 392 self._logger.verbose( 393 "SettingsAppliedResponse: %s", settings_applied_result 394 ) 395 await self._emit( 396 AgentWebSocketEvents(AgentWebSocketEvents.SettingsApplied), 397 settings_applied=settings_applied_result, 398 **dict(cast(Dict[Any, Any], self._kwargs)), 399 ) 400 case AgentWebSocketEvents.ConversationText: 401 conversation_text_result: ConversationTextResponse = ( 402 ConversationTextResponse.from_json(message) 403 ) 404 self._logger.verbose( 405 "ConversationTextResponse: %s", conversation_text_result 406 ) 407 await self._emit( 408 AgentWebSocketEvents(AgentWebSocketEvents.ConversationText), 409 conversation_text=conversation_text_result, 410 **dict(cast(Dict[Any, Any], self._kwargs)), 411 ) 412 case AgentWebSocketEvents.UserStartedSpeaking: 413 user_started_speaking_result: UserStartedSpeakingResponse = ( 414 UserStartedSpeakingResponse.from_json(message) 415 ) 416 self._logger.verbose( 417 "UserStartedSpeakingResponse: %s", user_started_speaking_result 418 ) 419 await self._emit( 420 AgentWebSocketEvents(AgentWebSocketEvents.UserStartedSpeaking), 421 user_started_speaking=user_started_speaking_result, 422 **dict(cast(Dict[Any, Any], self._kwargs)), 423 ) 424 case AgentWebSocketEvents.AgentThinking: 425 agent_thinking_result: AgentThinkingResponse = ( 426 AgentThinkingResponse.from_json(message) 427 ) 428 self._logger.verbose( 429 "AgentThinkingResponse: %s", agent_thinking_result 430 ) 431 await self._emit( 432 AgentWebSocketEvents(AgentWebSocketEvents.AgentThinking), 433 agent_thinking=agent_thinking_result, 434 **dict(cast(Dict[Any, Any], self._kwargs)), 435 ) 436 case AgentWebSocketEvents.FunctionCalling: 437 function_calling_result: FunctionCalling = ( 438 FunctionCalling.from_json(message) 439 ) 440 self._logger.verbose("FunctionCalling: %s", function_calling_result) 441 await self._emit( 442 AgentWebSocketEvents(AgentWebSocketEvents.FunctionCalling), 443 function_calling=function_calling_result, 444 **dict(cast(Dict[Any, Any], self._kwargs)), 445 ) 446 case AgentWebSocketEvents.FunctionCallRequest: 447 function_call_request_result: FunctionCallRequest = ( 448 FunctionCallRequest.from_json(message) 449 ) 450 self._logger.verbose( 451 "FunctionCallRequest: %s", function_call_request_result 452 ) 453 await self._emit( 454 AgentWebSocketEvents(AgentWebSocketEvents.FunctionCallRequest), 455 function_call_request=function_call_request_result, 456 **dict(cast(Dict[Any, Any], self._kwargs)), 457 ) 458 case AgentWebSocketEvents.AgentStartedSpeaking: 459 agent_started_speaking_result: AgentStartedSpeakingResponse = ( 460 AgentStartedSpeakingResponse.from_json(message) 461 ) 462 self._logger.verbose( 463 "AgentStartedSpeakingResponse: %s", 464 agent_started_speaking_result, 465 ) 466 await self._emit( 467 AgentWebSocketEvents(AgentWebSocketEvents.AgentStartedSpeaking), 468 agent_started_speaking=agent_started_speaking_result, 469 **dict(cast(Dict[Any, Any], self._kwargs)), 470 ) 471 case AgentWebSocketEvents.AgentAudioDone: 472 agent_audio_done_result: AgentAudioDoneResponse = ( 473 AgentAudioDoneResponse.from_json(message) 474 ) 475 self._logger.verbose( 476 "AgentAudioDoneResponse: %s", agent_audio_done_result 477 ) 478 await self._emit( 479 AgentWebSocketEvents(AgentWebSocketEvents.AgentAudioDone), 480 agent_audio_done=agent_audio_done_result, 481 **dict(cast(Dict[Any, Any], self._kwargs)), 482 ) 483 case AgentWebSocketEvents.InjectionRefused: 484 injection_refused_result: InjectionRefusedResponse = ( 485 InjectionRefusedResponse.from_json(message) 486 ) 487 self._logger.verbose( 488 "InjectionRefused: %s", injection_refused_result 489 ) 490 await self._emit( 491 AgentWebSocketEvents(AgentWebSocketEvents.InjectionRefused), 492 injection_refused=injection_refused_result, 493 **dict(cast(Dict[Any, Any], self._kwargs)), 494 ) 495 case AgentWebSocketEvents.Close: 496 close_result: CloseResponse = CloseResponse.from_json(message) 497 self._logger.verbose("CloseResponse: %s", close_result) 498 await self._emit( 499 AgentWebSocketEvents(AgentWebSocketEvents.Close), 500 close=close_result, 501 **dict(cast(Dict[Any, Any], self._kwargs)), 502 ) 503 case AgentWebSocketEvents.Error: 504 err_error: ErrorResponse = ErrorResponse.from_json(message) 505 self._logger.verbose("ErrorResponse: %s", err_error) 506 await self._emit( 507 AgentWebSocketEvents(AgentWebSocketEvents.Error), 508 error=err_error, 509 **dict(cast(Dict[Any, Any], self._kwargs)), 510 ) 511 case _: 512 self._logger.warning( 513 "Unknown Message: response_type: %s, data: %s", 514 response_type, 515 data, 516 ) 517 unhandled_error: UnhandledResponse = UnhandledResponse( 518 type=AgentWebSocketEvents(AgentWebSocketEvents.Unhandled), 519 raw=message, 520 ) 521 await self._emit( 522 AgentWebSocketEvents(AgentWebSocketEvents.Unhandled), 523 unhandled=unhandled_error, 524 **dict(cast(Dict[Any, Any], self._kwargs)), 525 ) 526 527 self._logger.notice("_process_text Succeeded") 528 self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE") 529 530 except Exception as e: # pylint: disable=broad-except 531 self._logger.error( 532 "Exception in AsyncAgentWebSocketClient._process_text: %s", e 533 ) 534 e_error: ErrorResponse = ErrorResponse( 535 "Exception in AsyncAgentWebSocketClient._process_text", 536 f"{e}", 537 "Exception", 538 ) 539 await self._emit( 540 AgentWebSocketEvents(AgentWebSocketEvents.Error), 541 error=e_error, 542 **dict(cast(Dict[Any, Any], self._kwargs)), 543 ) 544 545 # signal exit and close 546 await super()._signal_exit() 547 548 self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE") 549 550 if self._config.options.get("termination_exception") is True: 551 raise 552 return 553 554 # pylint: enable=too-many-locals,too-many-statements 555 556 async def _process_binary(self, message: bytes) -> None: 557 self._logger.debug("AsyncAgentWebSocketClient._process_binary ENTER") 558 self._logger.debug("Binary data received") 559 560 await self._emit( 561 AgentWebSocketEvents(AgentWebSocketEvents.AudioData), 562 data=message, 563 **dict(cast(Dict[Any, Any], self._kwargs)), 564 ) 565 566 self._logger.notice("_process_binary Succeeded") 567 self._logger.debug("AsyncAgentWebSocketClient._process_binary LEAVE") 568 569 # pylint: disable=too-many-return-statements 570 async def _keep_alive(self) -> None: 571 """ 572 Sends keepalive messages to the WebSocket connection. 573 """ 574 self._logger.debug("AsyncAgentWebSocketClient._keep_alive ENTER") 575 576 counter = 0 577 while True: 578 try: 579 counter += 1 580 await asyncio.sleep(ONE_SECOND) 581 582 if self._exit_event.is_set(): 583 self._logger.notice("_keep_alive exiting gracefully") 584 self._logger.debug("AsyncAgentWebSocketClient._keep_alive LEAVE") 585 return 586 587 # deepgram keepalive 588 if counter % DEEPGRAM_INTERVAL == 0: 589 await self.keep_alive() 590 591 except Exception as e: # pylint: disable=broad-except 592 self._logger.error( 593 "Exception in AsyncAgentWebSocketClient._keep_alive: %s", e 594 ) 595 e_error: ErrorResponse = ErrorResponse( 596 "Exception in AsyncAgentWebSocketClient._keep_alive", 597 f"{e}", 598 "Exception", 599 ) 600 self._logger.error( 601 "Exception in AsyncAgentWebSocketClient._keep_alive: %s", str(e) 602 ) 603 await self._emit( 604 AgentWebSocketEvents(AgentWebSocketEvents.Error), 605 error=e_error, 606 **dict(cast(Dict[Any, Any], self._kwargs)), 607 ) 608 609 # signal exit and close 610 await super()._signal_exit() 611 612 self._logger.debug("AsyncAgentWebSocketClient._keep_alive LEAVE") 613 614 if self._config.options.get("termination_exception") is True: 615 raise 616 return 617 618 async def keep_alive(self) -> bool: 619 """ 620 Sends a KeepAlive message 621 """ 622 self._logger.spam("AsyncAgentWebSocketClient.keep_alive ENTER") 623 624 self._logger.notice("Sending KeepAlive...") 625 ret = await self.send(json.dumps({"type": "KeepAlive"})) 626 627 if not ret: 628 self._logger.error("keep_alive failed") 629 self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE") 630 return False 631 632 self._logger.notice("keep_alive succeeded") 633 self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE") 634 635 return True 636 637 async def _close_message(self) -> bool: 638 # TODO: No known API close message # pylint: disable=fixme 639 # return await self.send(json.dumps({"type": "Close"})) 640 return True 641 642 async def finish(self) -> bool: 643 """ 644 Closes the WebSocket connection gracefully. 645 """ 646 self._logger.debug("AsyncAgentWebSocketClient.finish ENTER") 647 648 # stop the threads 649 self._logger.verbose("cancelling tasks...") 650 try: 651 # call parent finish 652 if await super().finish() is False: 653 self._logger.error("AsyncAgentWebSocketClient.finish failed") 654 655 if self._microphone is not None and self._microphone_created: 656 self._microphone.finish() 657 self._microphone_created = False 658 659 if self._speaker is not None and self._speaker_created: 660 self._speaker.finish() 661 self._speaker_created = False 662 663 # Before cancelling, check if the tasks were created 664 # debug the threads 665 for thread in threading.enumerate(): 666 self._logger.debug("before running thread: %s", thread.name) 667 self._logger.debug("number of active threads: %s", threading.active_count()) 668 669 tasks = [] 670 if self._keep_alive_thread is not None: 671 self._keep_alive_thread.cancel() 672 tasks.append(self._keep_alive_thread) 673 self._logger.notice("processing _keep_alive_thread cancel...") 674 675 # Use asyncio.gather to wait for tasks to be cancelled 676 # Prevent indefinite waiting by setting a timeout 677 await asyncio.wait_for(asyncio.gather(*tasks), timeout=10) 678 self._logger.notice("threads joined") 679 680 self._speaker = None 681 self._microphone = None 682 683 # debug the threads 684 for thread in threading.enumerate(): 685 self._logger.debug("after running thread: %s", thread.name) 686 self._logger.debug("number of active threads: %s", threading.active_count()) 687 688 self._logger.notice("finish succeeded") 689 self._logger.spam("AsyncAgentWebSocketClient.finish LEAVE") 690 return True 691 692 except asyncio.CancelledError as e: 693 self._logger.error("tasks cancelled error: %s", e) 694 self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE") 695 return False 696 697 except asyncio.TimeoutError as e: 698 self._logger.error("tasks cancellation timed out: %s", e) 699 self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE") 700 return False
Client for interacting with Deepgram's live transcription services over WebSockets.
This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
Args: config (DeepgramClientOptions): all the options for the client.
AsyncAgentWebSocketClient(config: deepgram.options.DeepgramClientOptions)
91 def __init__(self, config: DeepgramClientOptions): 92 if config is None: 93 raise DeepgramError("Config is required") 94 95 self._logger = verboselogs.VerboseLogger(__name__) 96 self._logger.addHandler(logging.StreamHandler()) 97 self._logger.setLevel(config.verbose) 98 99 self._config = config 100 101 # needs to be "wss://agent.deepgram.com/agent" 102 self._endpoint = "agent" 103 104 # override the endpoint since it needs to be "wss://agent.deepgram.com/agent" 105 self._config.url = "agent.deepgram.com" 106 self._keep_alive_thread = None 107 108 # init handlers 109 self._event_handlers = { 110 event: [] for event in AgentWebSocketEvents.__members__.values() 111 } 112 113 if self._config.options.get("microphone_record") == "true": 114 self._logger.info("microphone_record is enabled") 115 rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE) 116 channels = self._config.options.get( 117 "microphone_record_channels", MICROPHONE_CHANNELS 118 ) 119 device_index = self._config.options.get("microphone_record_device_index") 120 121 self._logger.debug("rate: %s", rate) 122 self._logger.debug("channels: %s", channels) 123 if device_index is not None: 124 self._logger.debug("device_index: %s", device_index) 125 126 self._microphone_created = True 127 128 if device_index is not None: 129 self._microphone = Microphone( 130 rate=rate, 131 channels=channels, 132 verbose=self._config.verbose, 133 input_device_index=device_index, 134 ) 135 else: 136 self._microphone = Microphone( 137 rate=rate, 138 channels=channels, 139 verbose=self._config.verbose, 140 ) 141 142 if self._config.options.get("speaker_playback") == "true": 143 self._logger.info("speaker_playback is enabled") 144 rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE) 145 channels = self._config.options.get( 146 "speaker_playback_channels", SPEAKER_CHANNELS 147 ) 148 playback_delta_in_ms = self._config.options.get( 149 "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA 150 ) 151 device_index = self._config.options.get("speaker_playback_device_index") 152 153 self._logger.debug("rate: %s", rate) 154 self._logger.debug("channels: %s", channels) 155 156 self._speaker_created = True 157 158 if device_index is not None: 159 self._logger.debug("device_index: %s", device_index) 160 161 self._speaker = Speaker( 162 rate=rate, 163 channels=channels, 164 last_play_delta_in_ms=playback_delta_in_ms, 165 verbose=self._config.verbose, 166 output_device_index=device_index, 167 microphone=self._microphone, 168 ) 169 else: 170 self._speaker = Speaker( 171 rate=rate, 172 channels=channels, 173 last_play_delta_in_ms=playback_delta_in_ms, 174 verbose=self._config.verbose, 175 microphone=self._microphone, 176 ) 177 # call the parent constructor 178 super().__init__(self._config, self._endpoint)
async def
start( self, options: Optional[deepgram.clients.agent.v1.websocket.options.SettingsConfigurationOptions] = None, addons: Optional[Dict] = None, headers: Optional[Dict] = None, members: Optional[Dict] = None, **kwargs) -> bool:
181 async def start( 182 self, 183 options: Optional[SettingsConfigurationOptions] = None, 184 addons: Optional[Dict] = None, 185 headers: Optional[Dict] = None, 186 members: Optional[Dict] = None, 187 **kwargs, 188 ) -> bool: 189 """ 190 Starts the WebSocket connection for agent API. 191 """ 192 self._logger.debug("AsyncAgentWebSocketClient.start ENTER") 193 self._logger.info("settings: %s", options) 194 self._logger.info("addons: %s", addons) 195 self._logger.info("headers: %s", headers) 196 self._logger.info("members: %s", members) 197 self._logger.info("kwargs: %s", kwargs) 198 199 if isinstance(options, SettingsConfigurationOptions) and not options.check(): 200 self._logger.error("settings.check failed") 201 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 202 raise DeepgramError("Fatal agent settings error") 203 204 self._addons = addons 205 self._headers = headers 206 207 # add "members" as members of the class 208 if members is not None: 209 self.__dict__.update(members) 210 211 # set kwargs as members of the class 212 if kwargs is not None: 213 self._kwargs = kwargs 214 else: 215 self._kwargs = {} 216 217 if isinstance(options, SettingsConfigurationOptions): 218 self._logger.info("options is class") 219 self._settings = options 220 elif isinstance(options, dict): 221 self._logger.info("options is dict") 222 self._settings = SettingsConfigurationOptions.from_dict(options) 223 elif isinstance(options, str): 224 self._logger.info("options is json") 225 self._settings = SettingsConfigurationOptions.from_json(options) 226 else: 227 raise DeepgramError("Invalid options type") 228 229 if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"): 230 raise DeepgramError("Keyterms are only supported for nova-3 models") 231 232 try: 233 # speaker substitutes the listening thread 234 if self._speaker is not None: 235 self._logger.notice("passing speaker to delegate_listening") 236 super().delegate_listening(self._speaker) 237 238 # call parent start 239 if ( 240 await super().start( 241 {}, 242 self._addons, 243 self._headers, 244 **dict(cast(Dict[Any, Any], self._kwargs)), 245 ) 246 is False 247 ): 248 self._logger.error("AsyncAgentWebSocketClient.start failed") 249 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 250 return False 251 252 if self._speaker is not None: 253 self._logger.notice("speaker is delegate_listening. Starting speaker") 254 self._speaker.start() 255 256 if self._speaker is not None and self._microphone is not None: 257 self._logger.notice( 258 "speaker is delegate_listening. Starting microphone" 259 ) 260 self._microphone.set_callback(self.send) 261 self._microphone.start() 262 263 # debug the threads 264 for thread in threading.enumerate(): 265 self._logger.debug("after running thread: %s", thread.name) 266 self._logger.debug("number of active threads: %s", threading.active_count()) 267 268 # keepalive thread 269 if self._config.is_keep_alive_enabled(): 270 self._logger.notice("keepalive is enabled") 271 self._keep_alive_thread = asyncio.create_task(self._keep_alive()) 272 else: 273 self._logger.notice("keepalive is disabled") 274 275 # debug the threads 276 for thread in threading.enumerate(): 277 self._logger.debug("after running thread: %s", thread.name) 278 self._logger.debug("number of active threads: %s", threading.active_count()) 279 280 # send the configurationsetting message 281 self._logger.notice("Sending ConfigurationSettings...") 282 ret_send_cs = await self.send(str(self._settings)) 283 if not ret_send_cs: 284 self._logger.error("ConfigurationSettings failed") 285 286 err_error: ErrorResponse = ErrorResponse( 287 "Exception in AsyncAgentWebSocketClient.start", 288 "ConfigurationSettings failed to send", 289 "Exception", 290 ) 291 await self._emit( 292 AgentWebSocketEvents(AgentWebSocketEvents.Error), 293 error=err_error, 294 **dict(cast(Dict[Any, Any], self._kwargs)), 295 ) 296 297 self._logger.debug("AgentWebSocketClient.start LEAVE") 298 return False 299 300 self._logger.notice("start succeeded") 301 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 302 return True 303 304 except Exception as e: # pylint: disable=broad-except 305 self._logger.error( 306 "WebSocketException in AsyncAgentWebSocketClient.start: %s", e 307 ) 308 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 309 if self._config.options.get("termination_exception_connect") is True: 310 raise e 311 return False
Starts the WebSocket connection for agent API.
315 def on(self, event: AgentWebSocketEvents, handler: Callable) -> None: 316 """ 317 Registers event handlers for specific events. 318 """ 319 self._logger.info("event subscribed: %s", event) 320 if event in AgentWebSocketEvents.__members__.values() and callable(handler): 321 self._event_handlers[event].append(handler)
Registers event handlers for specific events.
async def
keep_alive(self) -> bool:
618 async def keep_alive(self) -> bool: 619 """ 620 Sends a KeepAlive message 621 """ 622 self._logger.spam("AsyncAgentWebSocketClient.keep_alive ENTER") 623 624 self._logger.notice("Sending KeepAlive...") 625 ret = await self.send(json.dumps({"type": "KeepAlive"})) 626 627 if not ret: 628 self._logger.error("keep_alive failed") 629 self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE") 630 return False 631 632 self._logger.notice("keep_alive succeeded") 633 self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE") 634 635 return True
Sends a KeepAlive message
async def
finish(self) -> bool:
642 async def finish(self) -> bool: 643 """ 644 Closes the WebSocket connection gracefully. 645 """ 646 self._logger.debug("AsyncAgentWebSocketClient.finish ENTER") 647 648 # stop the threads 649 self._logger.verbose("cancelling tasks...") 650 try: 651 # call parent finish 652 if await super().finish() is False: 653 self._logger.error("AsyncAgentWebSocketClient.finish failed") 654 655 if self._microphone is not None and self._microphone_created: 656 self._microphone.finish() 657 self._microphone_created = False 658 659 if self._speaker is not None and self._speaker_created: 660 self._speaker.finish() 661 self._speaker_created = False 662 663 # Before cancelling, check if the tasks were created 664 # debug the threads 665 for thread in threading.enumerate(): 666 self._logger.debug("before running thread: %s", thread.name) 667 self._logger.debug("number of active threads: %s", threading.active_count()) 668 669 tasks = [] 670 if self._keep_alive_thread is not None: 671 self._keep_alive_thread.cancel() 672 tasks.append(self._keep_alive_thread) 673 self._logger.notice("processing _keep_alive_thread cancel...") 674 675 # Use asyncio.gather to wait for tasks to be cancelled 676 # Prevent indefinite waiting by setting a timeout 677 await asyncio.wait_for(asyncio.gather(*tasks), timeout=10) 678 self._logger.notice("threads joined") 679 680 self._speaker = None 681 self._microphone = None 682 683 # debug the threads 684 for thread in threading.enumerate(): 685 self._logger.debug("after running thread: %s", thread.name) 686 self._logger.debug("number of active threads: %s", threading.active_count()) 687 688 self._logger.notice("finish succeeded") 689 self._logger.spam("AsyncAgentWebSocketClient.finish LEAVE") 690 return True 691 692 except asyncio.CancelledError as e: 693 self._logger.error("tasks cancelled error: %s", e) 694 self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE") 695 return False 696 697 except asyncio.TimeoutError as e: 698 self._logger.error("tasks cancellation timed out: %s", e) 699 self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE") 700 return False
Closes the WebSocket connection gracefully.