deepgram.clients.agent.v1.websocket.client
1# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. 2# Use of this source code is governed by a MIT license that can be found in the LICENSE file. 3# SPDX-License-Identifier: MIT 4 5import json 6import logging 7from typing import Dict, Union, Optional, cast, Any, Callable 8import threading 9import time 10 11from .....utils import verboselogs 12from .....options import DeepgramClientOptions 13from ...enums import AgentWebSocketEvents 14from ....common import AbstractSyncWebSocketClient 15from ....common import DeepgramError 16 17from .response import ( 18 OpenResponse, 19 WelcomeResponse, 20 SettingsAppliedResponse, 21 ConversationTextResponse, 22 UserStartedSpeakingResponse, 23 AgentThinkingResponse, 24 FunctionCalling, 25 FunctionCallRequest, 26 AgentStartedSpeakingResponse, 27 AgentAudioDoneResponse, 28 InjectionRefusedResponse, 29 CloseResponse, 30 ErrorResponse, 31 UnhandledResponse, 32) 33from .options import ( 34 SettingsConfigurationOptions, 35 UpdateInstructionsOptions, 36 UpdateSpeakOptions, 37 InjectAgentMessageOptions, 38 FunctionCallResponse, 39 AgentKeepAlive, 40) 41 42from .....audio.speaker import ( 43 Speaker, 44 RATE as SPEAKER_RATE, 45 CHANNELS as SPEAKER_CHANNELS, 46 PLAYBACK_DELTA as SPEAKER_PLAYBACK_DELTA, 47) 48from .....audio.microphone import ( 49 Microphone, 50 RATE as MICROPHONE_RATE, 51 CHANNELS as MICROPHONE_CHANNELS, 52) 53 54ONE_SECOND = 1 55HALF_SECOND = 0.5 56DEEPGRAM_INTERVAL = 5 57 58 59class AgentWebSocketClient( 60 AbstractSyncWebSocketClient 61): # pylint: disable=too-many-instance-attributes 62 """ 63 Client for interacting with Deepgram's live transcription services over WebSockets. 64 65 This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events. 66 67 Args: 68 config (DeepgramClientOptions): all the options for the client. 69 """ 70 71 _logger: verboselogs.VerboseLogger 72 _config: DeepgramClientOptions 73 _endpoint: str 74 75 _event_handlers: Dict[AgentWebSocketEvents, list] 76 77 _keep_alive_thread: Union[threading.Thread, None] 78 79 _kwargs: Optional[Dict] = None 80 _addons: Optional[Dict] = None 81 # note the distinction here. We can't use _config because it's already used in the parent 82 _settings: Optional[SettingsConfigurationOptions] = None 83 _headers: Optional[Dict] = None 84 85 _speaker_created: bool = False 86 _speaker: Optional[Speaker] = None 87 _microphone_created: bool = False 88 _microphone: Optional[Microphone] = None 89 90 def __init__(self, config: DeepgramClientOptions): 91 if config is None: 92 raise DeepgramError("Config is required") 93 94 self._logger = verboselogs.VerboseLogger(__name__) 95 self._logger.addHandler(logging.StreamHandler()) 96 self._logger.setLevel(config.verbose) 97 98 self._config = config 99 100 # needs to be "wss://agent.deepgram.com/agent" 101 self._endpoint = "agent" 102 103 # override the endpoint since it needs to be "wss://agent.deepgram.com/agent" 104 self._config.url = "agent.deepgram.com" 105 106 self._keep_alive_thread = None 107 108 # init handlers 109 self._event_handlers = { 110 event: [] for event in AgentWebSocketEvents.__members__.values() 111 } 112 113 if self._config.options.get("microphone_record") == "true": 114 self._logger.info("microphone_record is enabled") 115 rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE) 116 channels = self._config.options.get( 117 "microphone_record_channels", MICROPHONE_CHANNELS 118 ) 119 device_index = self._config.options.get("microphone_record_device_index") 120 121 self._logger.debug("rate: %s", rate) 122 self._logger.debug("channels: %s", channels) 123 124 self._microphone_created = True 125 126 if device_index is not None: 127 self._logger.debug("device_index: %s", device_index) 128 self._microphone = Microphone( 129 rate=rate, 130 channels=channels, 131 verbose=self._config.verbose, 132 input_device_index=device_index, 133 ) 134 else: 135 self._microphone = Microphone( 136 rate=rate, 137 channels=channels, 138 verbose=self._config.verbose, 139 ) 140 141 if self._config.options.get("speaker_playback") == "true": 142 self._logger.info("speaker_playback is enabled") 143 rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE) 144 channels = self._config.options.get( 145 "speaker_playback_channels", SPEAKER_CHANNELS 146 ) 147 playback_delta_in_ms = self._config.options.get( 148 "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA 149 ) 150 device_index = self._config.options.get("speaker_playback_device_index") 151 152 self._logger.debug("rate: %s", rate) 153 self._logger.debug("channels: %s", channels) 154 155 self._speaker_created = True 156 157 if device_index is not None: 158 self._logger.debug("device_index: %s", device_index) 159 160 self._speaker = Speaker( 161 rate=rate, 162 channels=channels, 163 last_play_delta_in_ms=playback_delta_in_ms, 164 verbose=self._config.verbose, 165 output_device_index=device_index, 166 microphone=self._microphone, 167 ) 168 else: 169 self._speaker = Speaker( 170 rate=rate, 171 channels=channels, 172 last_play_delta_in_ms=playback_delta_in_ms, 173 verbose=self._config.verbose, 174 microphone=self._microphone, 175 ) 176 177 # call the parent constructor 178 super().__init__(self._config, self._endpoint) 179 180 # pylint: disable=too-many-statements,too-many-branches 181 def start( 182 self, 183 options: Optional[SettingsConfigurationOptions] = None, 184 addons: Optional[Dict] = None, 185 headers: Optional[Dict] = None, 186 members: Optional[Dict] = None, 187 **kwargs, 188 ) -> bool: 189 """ 190 Starts the WebSocket connection for agent API. 191 """ 192 self._logger.debug("AgentWebSocketClient.start ENTER") 193 self._logger.info("settings: %s", options) 194 self._logger.info("addons: %s", addons) 195 self._logger.info("headers: %s", headers) 196 self._logger.info("members: %s", members) 197 self._logger.info("kwargs: %s", kwargs) 198 199 if isinstance(options, SettingsConfigurationOptions) and not options.check(): 200 self._logger.error("settings.check failed") 201 self._logger.debug("AgentWebSocketClient.start LEAVE") 202 raise DeepgramError("Fatal agent settings error") 203 204 self._addons = addons 205 self._headers = headers 206 207 # add "members" as members of the class 208 if members is not None: 209 self.__dict__.update(members) 210 211 # set kwargs as members of the class 212 if kwargs is not None: 213 self._kwargs = kwargs 214 else: 215 self._kwargs = {} 216 217 if isinstance(options, SettingsConfigurationOptions): 218 self._logger.info("options is class") 219 self._settings = options 220 elif isinstance(options, dict): 221 self._logger.info("options is dict") 222 self._settings = SettingsConfigurationOptions.from_dict(options) 223 elif isinstance(options, str): 224 self._logger.info("options is json") 225 self._settings = SettingsConfigurationOptions.from_json(options) 226 else: 227 raise DeepgramError("Invalid options type") 228 229 if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"): 230 raise DeepgramError("Keyterms are only supported for nova-3 models") 231 232 try: 233 # speaker substitutes the listening thread 234 if self._speaker is not None: 235 self._logger.notice("passing speaker to delegate_listening") 236 super().delegate_listening(self._speaker) 237 238 # call parent start 239 if ( 240 super().start( 241 {}, 242 self._addons, 243 self._headers, 244 **dict(cast(Dict[Any, Any], self._kwargs)), 245 ) 246 is False 247 ): 248 self._logger.error("AgentWebSocketClient.start failed") 249 self._logger.debug("AgentWebSocketClient.start LEAVE") 250 return False 251 252 if self._speaker is not None: 253 self._logger.notice("speaker is delegate_listening. Starting speaker") 254 self._speaker.start() 255 256 if self._speaker is not None and self._microphone is not None: 257 self._logger.notice( 258 "speaker is delegate_listening. Starting microphone" 259 ) 260 self._microphone.set_callback(self.send) 261 self._microphone.start() 262 263 # debug the threads 264 for thread in threading.enumerate(): 265 self._logger.debug("after running thread: %s", thread.name) 266 self._logger.debug("number of active threads: %s", threading.active_count()) 267 268 # keepalive thread 269 if self._config.is_keep_alive_enabled(): 270 self._logger.notice("keepalive is enabled") 271 self._keep_alive_thread = threading.Thread(target=self._keep_alive) 272 self._keep_alive_thread.start() 273 else: 274 self._logger.notice("keepalive is disabled") 275 276 # debug the threads 277 for thread in threading.enumerate(): 278 self._logger.debug("after running thread: %s", thread.name) 279 self._logger.debug("number of active threads: %s", threading.active_count()) 280 281 # send the configurationsetting message 282 self._logger.notice("Sending ConfigurationSettings...") 283 ret_send_cs = self.send(str(self._settings)) 284 if not ret_send_cs: 285 self._logger.error("ConfigurationSettings failed") 286 287 err_error: ErrorResponse = ErrorResponse( 288 "Exception in AgentWebSocketClient.start", 289 "ConfigurationSettings failed to send", 290 "Exception", 291 ) 292 self._emit( 293 AgentWebSocketEvents(AgentWebSocketEvents.Error), 294 error=err_error, 295 **dict(cast(Dict[Any, Any], self._kwargs)), 296 ) 297 298 self._logger.debug("AgentWebSocketClient.start LEAVE") 299 return False 300 301 self._logger.notice("start succeeded") 302 self._logger.debug("AgentWebSocketClient.start LEAVE") 303 return True 304 305 except Exception as e: # pylint: disable=broad-except 306 self._logger.error( 307 "WebSocketException in AgentWebSocketClient.start: %s", e 308 ) 309 self._logger.debug("AgentWebSocketClient.start LEAVE") 310 if self._config.options.get("termination_exception_connect") is True: 311 raise e 312 return False 313 314 # pylint: enable=too-many-statements,too-many-branches 315 316 def on(self, event: AgentWebSocketEvents, handler: Callable) -> None: 317 """ 318 Registers event handlers for specific events. 319 """ 320 self._logger.info("event subscribed: %s", event) 321 if event in AgentWebSocketEvents.__members__.values() and callable(handler): 322 self._event_handlers[event].append(handler) 323 324 def _emit(self, event: AgentWebSocketEvents, *args, **kwargs) -> None: 325 """ 326 Emits events to the registered event handlers. 327 """ 328 self._logger.debug("AgentWebSocketClient._emit ENTER") 329 self._logger.debug("callback handlers for: %s", event) 330 331 # debug the threads 332 for thread in threading.enumerate(): 333 self._logger.debug("after running thread: %s", thread.name) 334 self._logger.debug("number of active threads: %s", threading.active_count()) 335 336 self._logger.debug("callback handlers for: %s", event) 337 for handler in self._event_handlers[event]: 338 handler(self, *args, **kwargs) 339 340 # debug the threads 341 for thread in threading.enumerate(): 342 self._logger.debug("after running thread: %s", thread.name) 343 self._logger.debug("number of active threads: %s", threading.active_count()) 344 345 self._logger.debug("AgentWebSocketClient._emit LEAVE") 346 347 # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches 348 def _process_text(self, message: str) -> None: 349 """ 350 Processes messages received over the WebSocket connection. 351 """ 352 self._logger.debug("AgentWebSocketClient._process_text ENTER") 353 354 try: 355 self._logger.debug("Text data received") 356 if len(message) == 0: 357 self._logger.debug("message is empty") 358 self._logger.debug("AgentWebSocketClient._process_text LEAVE") 359 return 360 361 data = json.loads(message) 362 response_type = data.get("type") 363 self._logger.debug("response_type: %s, data: %s", response_type, data) 364 365 match response_type: 366 case AgentWebSocketEvents.Open: 367 open_result: OpenResponse = OpenResponse.from_json(message) 368 self._logger.verbose("OpenResponse: %s", open_result) 369 self._emit( 370 AgentWebSocketEvents(AgentWebSocketEvents.Open), 371 open=open_result, 372 **dict(cast(Dict[Any, Any], self._kwargs)), 373 ) 374 case AgentWebSocketEvents.Welcome: 375 welcome_result: WelcomeResponse = WelcomeResponse.from_json(message) 376 self._logger.verbose("WelcomeResponse: %s", welcome_result) 377 self._emit( 378 AgentWebSocketEvents(AgentWebSocketEvents.Welcome), 379 welcome=welcome_result, 380 **dict(cast(Dict[Any, Any], self._kwargs)), 381 ) 382 case AgentWebSocketEvents.SettingsApplied: 383 settings_applied_result: SettingsAppliedResponse = ( 384 SettingsAppliedResponse.from_json(message) 385 ) 386 self._logger.verbose( 387 "SettingsAppliedResponse: %s", settings_applied_result 388 ) 389 self._emit( 390 AgentWebSocketEvents(AgentWebSocketEvents.SettingsApplied), 391 settings_applied=settings_applied_result, 392 **dict(cast(Dict[Any, Any], self._kwargs)), 393 ) 394 case AgentWebSocketEvents.ConversationText: 395 conversation_text_result: ConversationTextResponse = ( 396 ConversationTextResponse.from_json(message) 397 ) 398 self._logger.verbose( 399 "ConversationTextResponse: %s", conversation_text_result 400 ) 401 self._emit( 402 AgentWebSocketEvents(AgentWebSocketEvents.ConversationText), 403 conversation_text=conversation_text_result, 404 **dict(cast(Dict[Any, Any], self._kwargs)), 405 ) 406 case AgentWebSocketEvents.UserStartedSpeaking: 407 user_started_speaking_result: UserStartedSpeakingResponse = ( 408 UserStartedSpeakingResponse.from_json(message) 409 ) 410 self._logger.verbose( 411 "UserStartedSpeakingResponse: %s", user_started_speaking_result 412 ) 413 self._emit( 414 AgentWebSocketEvents(AgentWebSocketEvents.UserStartedSpeaking), 415 user_started_speaking=user_started_speaking_result, 416 **dict(cast(Dict[Any, Any], self._kwargs)), 417 ) 418 case AgentWebSocketEvents.AgentThinking: 419 agent_thinking_result: AgentThinkingResponse = ( 420 AgentThinkingResponse.from_json(message) 421 ) 422 self._logger.verbose( 423 "AgentThinkingResponse: %s", agent_thinking_result 424 ) 425 self._emit( 426 AgentWebSocketEvents(AgentWebSocketEvents.AgentThinking), 427 agent_thinking=agent_thinking_result, 428 **dict(cast(Dict[Any, Any], self._kwargs)), 429 ) 430 case AgentWebSocketEvents.FunctionCalling: 431 function_calling_result: FunctionCalling = ( 432 FunctionCalling.from_json(message) 433 ) 434 self._logger.verbose("FunctionCalling: %s", function_calling_result) 435 self._emit( 436 AgentWebSocketEvents(AgentWebSocketEvents.FunctionCalling), 437 function_calling=function_calling_result, 438 **dict(cast(Dict[Any, Any], self._kwargs)), 439 ) 440 case AgentWebSocketEvents.FunctionCallRequest: 441 function_call_request_result: FunctionCallRequest = ( 442 FunctionCallRequest.from_json(message) 443 ) 444 self._logger.verbose( 445 "FunctionCallRequest: %s", function_call_request_result 446 ) 447 self._emit( 448 AgentWebSocketEvents(AgentWebSocketEvents.FunctionCallRequest), 449 function_call_request=function_call_request_result, 450 **dict(cast(Dict[Any, Any], self._kwargs)), 451 ) 452 case AgentWebSocketEvents.AgentStartedSpeaking: 453 agent_started_speaking_result: AgentStartedSpeakingResponse = ( 454 AgentStartedSpeakingResponse.from_json(message) 455 ) 456 self._logger.verbose( 457 "AgentStartedSpeakingResponse: %s", 458 agent_started_speaking_result, 459 ) 460 self._emit( 461 AgentWebSocketEvents(AgentWebSocketEvents.AgentStartedSpeaking), 462 agent_started_speaking=agent_started_speaking_result, 463 **dict(cast(Dict[Any, Any], self._kwargs)), 464 ) 465 case AgentWebSocketEvents.AgentAudioDone: 466 agent_audio_done_result: AgentAudioDoneResponse = ( 467 AgentAudioDoneResponse.from_json(message) 468 ) 469 self._logger.verbose( 470 "AgentAudioDoneResponse: %s", agent_audio_done_result 471 ) 472 self._emit( 473 AgentWebSocketEvents(AgentWebSocketEvents.AgentAudioDone), 474 agent_audio_done=agent_audio_done_result, 475 **dict(cast(Dict[Any, Any], self._kwargs)), 476 ) 477 case AgentWebSocketEvents.InjectionRefused: 478 injection_refused_result: InjectionRefusedResponse = ( 479 InjectionRefusedResponse.from_json(message) 480 ) 481 self._logger.verbose( 482 "InjectionRefused: %s", injection_refused_result 483 ) 484 self._emit( 485 AgentWebSocketEvents(AgentWebSocketEvents.InjectionRefused), 486 injection_refused=injection_refused_result, 487 **dict(cast(Dict[Any, Any], self._kwargs)), 488 ) 489 case AgentWebSocketEvents.Close: 490 close_result: CloseResponse = CloseResponse.from_json(message) 491 self._logger.verbose("CloseResponse: %s", close_result) 492 self._emit( 493 AgentWebSocketEvents(AgentWebSocketEvents.Close), 494 close=close_result, 495 **dict(cast(Dict[Any, Any], self._kwargs)), 496 ) 497 case AgentWebSocketEvents.Error: 498 err_error: ErrorResponse = ErrorResponse.from_json(message) 499 self._logger.verbose("ErrorResponse: %s", err_error) 500 self._emit( 501 AgentWebSocketEvents(AgentWebSocketEvents.Error), 502 error=err_error, 503 **dict(cast(Dict[Any, Any], self._kwargs)), 504 ) 505 case _: 506 self._logger.warning( 507 "Unknown Message: response_type: %s, data: %s", 508 response_type, 509 data, 510 ) 511 unhandled_error: UnhandledResponse = UnhandledResponse( 512 type=AgentWebSocketEvents(AgentWebSocketEvents.Unhandled), 513 raw=message, 514 ) 515 self._emit( 516 AgentWebSocketEvents(AgentWebSocketEvents.Unhandled), 517 unhandled=unhandled_error, 518 **dict(cast(Dict[Any, Any], self._kwargs)), 519 ) 520 521 self._logger.notice("_process_text Succeeded") 522 self._logger.debug("SpeakStreamClient._process_text LEAVE") 523 524 except Exception as e: # pylint: disable=broad-except 525 self._logger.error("Exception in AgentWebSocketClient._process_text: %s", e) 526 e_error: ErrorResponse = ErrorResponse( 527 "Exception in AgentWebSocketClient._process_text", 528 f"{e}", 529 "Exception", 530 ) 531 self._logger.error( 532 "Exception in AgentWebSocketClient._process_text: %s", str(e) 533 ) 534 self._emit( 535 AgentWebSocketEvents(AgentWebSocketEvents.Error), 536 error=e_error, 537 **dict(cast(Dict[Any, Any], self._kwargs)), 538 ) 539 540 # signal exit and close 541 super()._signal_exit() 542 543 self._logger.debug("AgentWebSocketClient._process_text LEAVE") 544 545 if self._config.options.get("termination_exception") is True: 546 raise 547 return 548 549 # pylint: enable=too-many-return-statements,too-many-statements 550 551 def _process_binary(self, message: bytes) -> None: 552 self._logger.debug("AgentWebSocketClient._process_binary ENTER") 553 self._logger.debug("Binary data received") 554 555 self._emit( 556 AgentWebSocketEvents(AgentWebSocketEvents.AudioData), 557 data=message, 558 **dict(cast(Dict[Any, Any], self._kwargs)), 559 ) 560 561 self._logger.notice("_process_binary Succeeded") 562 self._logger.debug("AgentWebSocketClient._process_binary LEAVE") 563 564 # pylint: disable=too-many-return-statements 565 def _keep_alive(self) -> None: 566 """ 567 Sends keepalive messages to the WebSocket connection. 568 """ 569 self._logger.debug("AgentWebSocketClient._keep_alive ENTER") 570 571 counter = 0 572 while True: 573 try: 574 counter += 1 575 self._exit_event.wait(timeout=ONE_SECOND) 576 577 if self._exit_event.is_set(): 578 self._logger.notice("_keep_alive exiting gracefully") 579 self._logger.debug("AgentWebSocketClient._keep_alive LEAVE") 580 return 581 582 # deepgram keepalive 583 if counter % DEEPGRAM_INTERVAL == 0: 584 self.keep_alive() 585 586 except Exception as e: # pylint: disable=broad-except 587 self._logger.error( 588 "Exception in AgentWebSocketClient._keep_alive: %s", e 589 ) 590 e_error: ErrorResponse = ErrorResponse( 591 "Exception in AgentWebSocketClient._keep_alive", 592 f"{e}", 593 "Exception", 594 ) 595 self._logger.error( 596 "Exception in AgentWebSocketClient._keep_alive: %s", str(e) 597 ) 598 self._emit( 599 AgentWebSocketEvents(AgentWebSocketEvents.Error), 600 error=e_error, 601 **dict(cast(Dict[Any, Any], self._kwargs)), 602 ) 603 604 # signal exit and close 605 super()._signal_exit() 606 607 self._logger.debug("AgentWebSocketClient._keep_alive LEAVE") 608 609 if self._config.options.get("termination_exception") is True: 610 raise 611 return 612 613 def keep_alive(self) -> bool: 614 """ 615 Sends a KeepAlive message 616 """ 617 self._logger.spam("AgentWebSocketClient.keep_alive ENTER") 618 619 self._logger.notice("Sending KeepAlive...") 620 ret = self.send(json.dumps({"type": "KeepAlive"})) 621 622 if not ret: 623 self._logger.error("keep_alive failed") 624 self._logger.spam("AgentWebSocketClient.keep_alive LEAVE") 625 return False 626 627 self._logger.notice("keep_alive succeeded") 628 self._logger.spam("AgentWebSocketClient.keep_alive LEAVE") 629 630 return True 631 632 def _close_message(self) -> bool: 633 # TODO: No known API close message # pylint: disable=fixme 634 # return self.send(json.dumps({"type": "Close"})) 635 return True 636 637 # closes the WebSocket connection gracefully 638 def finish(self) -> bool: 639 """ 640 Closes the WebSocket connection gracefully. 641 """ 642 self._logger.spam("AgentWebSocketClient.finish ENTER") 643 644 # call parent finish 645 if super().finish() is False: 646 self._logger.error("AgentWebSocketClient.finish failed") 647 648 if self._microphone is not None and self._microphone_created: 649 self._microphone.finish() 650 self._microphone_created = False 651 652 if self._speaker is not None and self._speaker_created: 653 self._speaker.finish() 654 self._speaker_created = False 655 656 # debug the threads 657 for thread in threading.enumerate(): 658 self._logger.debug("before running thread: %s", thread.name) 659 self._logger.debug("number of active threads: %s", threading.active_count()) 660 661 # stop the threads 662 self._logger.verbose("cancelling tasks...") 663 if self._keep_alive_thread is not None: 664 self._keep_alive_thread.join() 665 self._keep_alive_thread = None 666 self._logger.notice("processing _keep_alive_thread thread joined") 667 668 if self._listen_thread is not None: 669 self._listen_thread.join() 670 self._listen_thread = None 671 self._logger.notice("listening thread joined") 672 673 self._speaker = None 674 self._microphone = None 675 676 # debug the threads 677 for thread in threading.enumerate(): 678 self._logger.debug("before running thread: %s", thread.name) 679 self._logger.debug("number of active threads: %s", threading.active_count()) 680 681 self._logger.notice("finish succeeded") 682 self._logger.spam("AgentWebSocketClient.finish LEAVE") 683 return True
ONE_SECOND =
1
HALF_SECOND =
0.5
DEEPGRAM_INTERVAL =
5
class
AgentWebSocketClient(deepgram.clients.common.v1.abstract_sync_websocket.AbstractSyncWebSocketClient):
60class AgentWebSocketClient( 61 AbstractSyncWebSocketClient 62): # pylint: disable=too-many-instance-attributes 63 """ 64 Client for interacting with Deepgram's live transcription services over WebSockets. 65 66 This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events. 67 68 Args: 69 config (DeepgramClientOptions): all the options for the client. 70 """ 71 72 _logger: verboselogs.VerboseLogger 73 _config: DeepgramClientOptions 74 _endpoint: str 75 76 _event_handlers: Dict[AgentWebSocketEvents, list] 77 78 _keep_alive_thread: Union[threading.Thread, None] 79 80 _kwargs: Optional[Dict] = None 81 _addons: Optional[Dict] = None 82 # note the distinction here. We can't use _config because it's already used in the parent 83 _settings: Optional[SettingsConfigurationOptions] = None 84 _headers: Optional[Dict] = None 85 86 _speaker_created: bool = False 87 _speaker: Optional[Speaker] = None 88 _microphone_created: bool = False 89 _microphone: Optional[Microphone] = None 90 91 def __init__(self, config: DeepgramClientOptions): 92 if config is None: 93 raise DeepgramError("Config is required") 94 95 self._logger = verboselogs.VerboseLogger(__name__) 96 self._logger.addHandler(logging.StreamHandler()) 97 self._logger.setLevel(config.verbose) 98 99 self._config = config 100 101 # needs to be "wss://agent.deepgram.com/agent" 102 self._endpoint = "agent" 103 104 # override the endpoint since it needs to be "wss://agent.deepgram.com/agent" 105 self._config.url = "agent.deepgram.com" 106 107 self._keep_alive_thread = None 108 109 # init handlers 110 self._event_handlers = { 111 event: [] for event in AgentWebSocketEvents.__members__.values() 112 } 113 114 if self._config.options.get("microphone_record") == "true": 115 self._logger.info("microphone_record is enabled") 116 rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE) 117 channels = self._config.options.get( 118 "microphone_record_channels", MICROPHONE_CHANNELS 119 ) 120 device_index = self._config.options.get("microphone_record_device_index") 121 122 self._logger.debug("rate: %s", rate) 123 self._logger.debug("channels: %s", channels) 124 125 self._microphone_created = True 126 127 if device_index is not None: 128 self._logger.debug("device_index: %s", device_index) 129 self._microphone = Microphone( 130 rate=rate, 131 channels=channels, 132 verbose=self._config.verbose, 133 input_device_index=device_index, 134 ) 135 else: 136 self._microphone = Microphone( 137 rate=rate, 138 channels=channels, 139 verbose=self._config.verbose, 140 ) 141 142 if self._config.options.get("speaker_playback") == "true": 143 self._logger.info("speaker_playback is enabled") 144 rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE) 145 channels = self._config.options.get( 146 "speaker_playback_channels", SPEAKER_CHANNELS 147 ) 148 playback_delta_in_ms = self._config.options.get( 149 "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA 150 ) 151 device_index = self._config.options.get("speaker_playback_device_index") 152 153 self._logger.debug("rate: %s", rate) 154 self._logger.debug("channels: %s", channels) 155 156 self._speaker_created = True 157 158 if device_index is not None: 159 self._logger.debug("device_index: %s", device_index) 160 161 self._speaker = Speaker( 162 rate=rate, 163 channels=channels, 164 last_play_delta_in_ms=playback_delta_in_ms, 165 verbose=self._config.verbose, 166 output_device_index=device_index, 167 microphone=self._microphone, 168 ) 169 else: 170 self._speaker = Speaker( 171 rate=rate, 172 channels=channels, 173 last_play_delta_in_ms=playback_delta_in_ms, 174 verbose=self._config.verbose, 175 microphone=self._microphone, 176 ) 177 178 # call the parent constructor 179 super().__init__(self._config, self._endpoint) 180 181 # pylint: disable=too-many-statements,too-many-branches 182 def start( 183 self, 184 options: Optional[SettingsConfigurationOptions] = None, 185 addons: Optional[Dict] = None, 186 headers: Optional[Dict] = None, 187 members: Optional[Dict] = None, 188 **kwargs, 189 ) -> bool: 190 """ 191 Starts the WebSocket connection for agent API. 192 """ 193 self._logger.debug("AgentWebSocketClient.start ENTER") 194 self._logger.info("settings: %s", options) 195 self._logger.info("addons: %s", addons) 196 self._logger.info("headers: %s", headers) 197 self._logger.info("members: %s", members) 198 self._logger.info("kwargs: %s", kwargs) 199 200 if isinstance(options, SettingsConfigurationOptions) and not options.check(): 201 self._logger.error("settings.check failed") 202 self._logger.debug("AgentWebSocketClient.start LEAVE") 203 raise DeepgramError("Fatal agent settings error") 204 205 self._addons = addons 206 self._headers = headers 207 208 # add "members" as members of the class 209 if members is not None: 210 self.__dict__.update(members) 211 212 # set kwargs as members of the class 213 if kwargs is not None: 214 self._kwargs = kwargs 215 else: 216 self._kwargs = {} 217 218 if isinstance(options, SettingsConfigurationOptions): 219 self._logger.info("options is class") 220 self._settings = options 221 elif isinstance(options, dict): 222 self._logger.info("options is dict") 223 self._settings = SettingsConfigurationOptions.from_dict(options) 224 elif isinstance(options, str): 225 self._logger.info("options is json") 226 self._settings = SettingsConfigurationOptions.from_json(options) 227 else: 228 raise DeepgramError("Invalid options type") 229 230 if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"): 231 raise DeepgramError("Keyterms are only supported for nova-3 models") 232 233 try: 234 # speaker substitutes the listening thread 235 if self._speaker is not None: 236 self._logger.notice("passing speaker to delegate_listening") 237 super().delegate_listening(self._speaker) 238 239 # call parent start 240 if ( 241 super().start( 242 {}, 243 self._addons, 244 self._headers, 245 **dict(cast(Dict[Any, Any], self._kwargs)), 246 ) 247 is False 248 ): 249 self._logger.error("AgentWebSocketClient.start failed") 250 self._logger.debug("AgentWebSocketClient.start LEAVE") 251 return False 252 253 if self._speaker is not None: 254 self._logger.notice("speaker is delegate_listening. Starting speaker") 255 self._speaker.start() 256 257 if self._speaker is not None and self._microphone is not None: 258 self._logger.notice( 259 "speaker is delegate_listening. Starting microphone" 260 ) 261 self._microphone.set_callback(self.send) 262 self._microphone.start() 263 264 # debug the threads 265 for thread in threading.enumerate(): 266 self._logger.debug("after running thread: %s", thread.name) 267 self._logger.debug("number of active threads: %s", threading.active_count()) 268 269 # keepalive thread 270 if self._config.is_keep_alive_enabled(): 271 self._logger.notice("keepalive is enabled") 272 self._keep_alive_thread = threading.Thread(target=self._keep_alive) 273 self._keep_alive_thread.start() 274 else: 275 self._logger.notice("keepalive is disabled") 276 277 # debug the threads 278 for thread in threading.enumerate(): 279 self._logger.debug("after running thread: %s", thread.name) 280 self._logger.debug("number of active threads: %s", threading.active_count()) 281 282 # send the configurationsetting message 283 self._logger.notice("Sending ConfigurationSettings...") 284 ret_send_cs = self.send(str(self._settings)) 285 if not ret_send_cs: 286 self._logger.error("ConfigurationSettings failed") 287 288 err_error: ErrorResponse = ErrorResponse( 289 "Exception in AgentWebSocketClient.start", 290 "ConfigurationSettings failed to send", 291 "Exception", 292 ) 293 self._emit( 294 AgentWebSocketEvents(AgentWebSocketEvents.Error), 295 error=err_error, 296 **dict(cast(Dict[Any, Any], self._kwargs)), 297 ) 298 299 self._logger.debug("AgentWebSocketClient.start LEAVE") 300 return False 301 302 self._logger.notice("start succeeded") 303 self._logger.debug("AgentWebSocketClient.start LEAVE") 304 return True 305 306 except Exception as e: # pylint: disable=broad-except 307 self._logger.error( 308 "WebSocketException in AgentWebSocketClient.start: %s", e 309 ) 310 self._logger.debug("AgentWebSocketClient.start LEAVE") 311 if self._config.options.get("termination_exception_connect") is True: 312 raise e 313 return False 314 315 # pylint: enable=too-many-statements,too-many-branches 316 317 def on(self, event: AgentWebSocketEvents, handler: Callable) -> None: 318 """ 319 Registers event handlers for specific events. 320 """ 321 self._logger.info("event subscribed: %s", event) 322 if event in AgentWebSocketEvents.__members__.values() and callable(handler): 323 self._event_handlers[event].append(handler) 324 325 def _emit(self, event: AgentWebSocketEvents, *args, **kwargs) -> None: 326 """ 327 Emits events to the registered event handlers. 328 """ 329 self._logger.debug("AgentWebSocketClient._emit ENTER") 330 self._logger.debug("callback handlers for: %s", event) 331 332 # debug the threads 333 for thread in threading.enumerate(): 334 self._logger.debug("after running thread: %s", thread.name) 335 self._logger.debug("number of active threads: %s", threading.active_count()) 336 337 self._logger.debug("callback handlers for: %s", event) 338 for handler in self._event_handlers[event]: 339 handler(self, *args, **kwargs) 340 341 # debug the threads 342 for thread in threading.enumerate(): 343 self._logger.debug("after running thread: %s", thread.name) 344 self._logger.debug("number of active threads: %s", threading.active_count()) 345 346 self._logger.debug("AgentWebSocketClient._emit LEAVE") 347 348 # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches 349 def _process_text(self, message: str) -> None: 350 """ 351 Processes messages received over the WebSocket connection. 352 """ 353 self._logger.debug("AgentWebSocketClient._process_text ENTER") 354 355 try: 356 self._logger.debug("Text data received") 357 if len(message) == 0: 358 self._logger.debug("message is empty") 359 self._logger.debug("AgentWebSocketClient._process_text LEAVE") 360 return 361 362 data = json.loads(message) 363 response_type = data.get("type") 364 self._logger.debug("response_type: %s, data: %s", response_type, data) 365 366 match response_type: 367 case AgentWebSocketEvents.Open: 368 open_result: OpenResponse = OpenResponse.from_json(message) 369 self._logger.verbose("OpenResponse: %s", open_result) 370 self._emit( 371 AgentWebSocketEvents(AgentWebSocketEvents.Open), 372 open=open_result, 373 **dict(cast(Dict[Any, Any], self._kwargs)), 374 ) 375 case AgentWebSocketEvents.Welcome: 376 welcome_result: WelcomeResponse = WelcomeResponse.from_json(message) 377 self._logger.verbose("WelcomeResponse: %s", welcome_result) 378 self._emit( 379 AgentWebSocketEvents(AgentWebSocketEvents.Welcome), 380 welcome=welcome_result, 381 **dict(cast(Dict[Any, Any], self._kwargs)), 382 ) 383 case AgentWebSocketEvents.SettingsApplied: 384 settings_applied_result: SettingsAppliedResponse = ( 385 SettingsAppliedResponse.from_json(message) 386 ) 387 self._logger.verbose( 388 "SettingsAppliedResponse: %s", settings_applied_result 389 ) 390 self._emit( 391 AgentWebSocketEvents(AgentWebSocketEvents.SettingsApplied), 392 settings_applied=settings_applied_result, 393 **dict(cast(Dict[Any, Any], self._kwargs)), 394 ) 395 case AgentWebSocketEvents.ConversationText: 396 conversation_text_result: ConversationTextResponse = ( 397 ConversationTextResponse.from_json(message) 398 ) 399 self._logger.verbose( 400 "ConversationTextResponse: %s", conversation_text_result 401 ) 402 self._emit( 403 AgentWebSocketEvents(AgentWebSocketEvents.ConversationText), 404 conversation_text=conversation_text_result, 405 **dict(cast(Dict[Any, Any], self._kwargs)), 406 ) 407 case AgentWebSocketEvents.UserStartedSpeaking: 408 user_started_speaking_result: UserStartedSpeakingResponse = ( 409 UserStartedSpeakingResponse.from_json(message) 410 ) 411 self._logger.verbose( 412 "UserStartedSpeakingResponse: %s", user_started_speaking_result 413 ) 414 self._emit( 415 AgentWebSocketEvents(AgentWebSocketEvents.UserStartedSpeaking), 416 user_started_speaking=user_started_speaking_result, 417 **dict(cast(Dict[Any, Any], self._kwargs)), 418 ) 419 case AgentWebSocketEvents.AgentThinking: 420 agent_thinking_result: AgentThinkingResponse = ( 421 AgentThinkingResponse.from_json(message) 422 ) 423 self._logger.verbose( 424 "AgentThinkingResponse: %s", agent_thinking_result 425 ) 426 self._emit( 427 AgentWebSocketEvents(AgentWebSocketEvents.AgentThinking), 428 agent_thinking=agent_thinking_result, 429 **dict(cast(Dict[Any, Any], self._kwargs)), 430 ) 431 case AgentWebSocketEvents.FunctionCalling: 432 function_calling_result: FunctionCalling = ( 433 FunctionCalling.from_json(message) 434 ) 435 self._logger.verbose("FunctionCalling: %s", function_calling_result) 436 self._emit( 437 AgentWebSocketEvents(AgentWebSocketEvents.FunctionCalling), 438 function_calling=function_calling_result, 439 **dict(cast(Dict[Any, Any], self._kwargs)), 440 ) 441 case AgentWebSocketEvents.FunctionCallRequest: 442 function_call_request_result: FunctionCallRequest = ( 443 FunctionCallRequest.from_json(message) 444 ) 445 self._logger.verbose( 446 "FunctionCallRequest: %s", function_call_request_result 447 ) 448 self._emit( 449 AgentWebSocketEvents(AgentWebSocketEvents.FunctionCallRequest), 450 function_call_request=function_call_request_result, 451 **dict(cast(Dict[Any, Any], self._kwargs)), 452 ) 453 case AgentWebSocketEvents.AgentStartedSpeaking: 454 agent_started_speaking_result: AgentStartedSpeakingResponse = ( 455 AgentStartedSpeakingResponse.from_json(message) 456 ) 457 self._logger.verbose( 458 "AgentStartedSpeakingResponse: %s", 459 agent_started_speaking_result, 460 ) 461 self._emit( 462 AgentWebSocketEvents(AgentWebSocketEvents.AgentStartedSpeaking), 463 agent_started_speaking=agent_started_speaking_result, 464 **dict(cast(Dict[Any, Any], self._kwargs)), 465 ) 466 case AgentWebSocketEvents.AgentAudioDone: 467 agent_audio_done_result: AgentAudioDoneResponse = ( 468 AgentAudioDoneResponse.from_json(message) 469 ) 470 self._logger.verbose( 471 "AgentAudioDoneResponse: %s", agent_audio_done_result 472 ) 473 self._emit( 474 AgentWebSocketEvents(AgentWebSocketEvents.AgentAudioDone), 475 agent_audio_done=agent_audio_done_result, 476 **dict(cast(Dict[Any, Any], self._kwargs)), 477 ) 478 case AgentWebSocketEvents.InjectionRefused: 479 injection_refused_result: InjectionRefusedResponse = ( 480 InjectionRefusedResponse.from_json(message) 481 ) 482 self._logger.verbose( 483 "InjectionRefused: %s", injection_refused_result 484 ) 485 self._emit( 486 AgentWebSocketEvents(AgentWebSocketEvents.InjectionRefused), 487 injection_refused=injection_refused_result, 488 **dict(cast(Dict[Any, Any], self._kwargs)), 489 ) 490 case AgentWebSocketEvents.Close: 491 close_result: CloseResponse = CloseResponse.from_json(message) 492 self._logger.verbose("CloseResponse: %s", close_result) 493 self._emit( 494 AgentWebSocketEvents(AgentWebSocketEvents.Close), 495 close=close_result, 496 **dict(cast(Dict[Any, Any], self._kwargs)), 497 ) 498 case AgentWebSocketEvents.Error: 499 err_error: ErrorResponse = ErrorResponse.from_json(message) 500 self._logger.verbose("ErrorResponse: %s", err_error) 501 self._emit( 502 AgentWebSocketEvents(AgentWebSocketEvents.Error), 503 error=err_error, 504 **dict(cast(Dict[Any, Any], self._kwargs)), 505 ) 506 case _: 507 self._logger.warning( 508 "Unknown Message: response_type: %s, data: %s", 509 response_type, 510 data, 511 ) 512 unhandled_error: UnhandledResponse = UnhandledResponse( 513 type=AgentWebSocketEvents(AgentWebSocketEvents.Unhandled), 514 raw=message, 515 ) 516 self._emit( 517 AgentWebSocketEvents(AgentWebSocketEvents.Unhandled), 518 unhandled=unhandled_error, 519 **dict(cast(Dict[Any, Any], self._kwargs)), 520 ) 521 522 self._logger.notice("_process_text Succeeded") 523 self._logger.debug("SpeakStreamClient._process_text LEAVE") 524 525 except Exception as e: # pylint: disable=broad-except 526 self._logger.error("Exception in AgentWebSocketClient._process_text: %s", e) 527 e_error: ErrorResponse = ErrorResponse( 528 "Exception in AgentWebSocketClient._process_text", 529 f"{e}", 530 "Exception", 531 ) 532 self._logger.error( 533 "Exception in AgentWebSocketClient._process_text: %s", str(e) 534 ) 535 self._emit( 536 AgentWebSocketEvents(AgentWebSocketEvents.Error), 537 error=e_error, 538 **dict(cast(Dict[Any, Any], self._kwargs)), 539 ) 540 541 # signal exit and close 542 super()._signal_exit() 543 544 self._logger.debug("AgentWebSocketClient._process_text LEAVE") 545 546 if self._config.options.get("termination_exception") is True: 547 raise 548 return 549 550 # pylint: enable=too-many-return-statements,too-many-statements 551 552 def _process_binary(self, message: bytes) -> None: 553 self._logger.debug("AgentWebSocketClient._process_binary ENTER") 554 self._logger.debug("Binary data received") 555 556 self._emit( 557 AgentWebSocketEvents(AgentWebSocketEvents.AudioData), 558 data=message, 559 **dict(cast(Dict[Any, Any], self._kwargs)), 560 ) 561 562 self._logger.notice("_process_binary Succeeded") 563 self._logger.debug("AgentWebSocketClient._process_binary LEAVE") 564 565 # pylint: disable=too-many-return-statements 566 def _keep_alive(self) -> None: 567 """ 568 Sends keepalive messages to the WebSocket connection. 569 """ 570 self._logger.debug("AgentWebSocketClient._keep_alive ENTER") 571 572 counter = 0 573 while True: 574 try: 575 counter += 1 576 self._exit_event.wait(timeout=ONE_SECOND) 577 578 if self._exit_event.is_set(): 579 self._logger.notice("_keep_alive exiting gracefully") 580 self._logger.debug("AgentWebSocketClient._keep_alive LEAVE") 581 return 582 583 # deepgram keepalive 584 if counter % DEEPGRAM_INTERVAL == 0: 585 self.keep_alive() 586 587 except Exception as e: # pylint: disable=broad-except 588 self._logger.error( 589 "Exception in AgentWebSocketClient._keep_alive: %s", e 590 ) 591 e_error: ErrorResponse = ErrorResponse( 592 "Exception in AgentWebSocketClient._keep_alive", 593 f"{e}", 594 "Exception", 595 ) 596 self._logger.error( 597 "Exception in AgentWebSocketClient._keep_alive: %s", str(e) 598 ) 599 self._emit( 600 AgentWebSocketEvents(AgentWebSocketEvents.Error), 601 error=e_error, 602 **dict(cast(Dict[Any, Any], self._kwargs)), 603 ) 604 605 # signal exit and close 606 super()._signal_exit() 607 608 self._logger.debug("AgentWebSocketClient._keep_alive LEAVE") 609 610 if self._config.options.get("termination_exception") is True: 611 raise 612 return 613 614 def keep_alive(self) -> bool: 615 """ 616 Sends a KeepAlive message 617 """ 618 self._logger.spam("AgentWebSocketClient.keep_alive ENTER") 619 620 self._logger.notice("Sending KeepAlive...") 621 ret = self.send(json.dumps({"type": "KeepAlive"})) 622 623 if not ret: 624 self._logger.error("keep_alive failed") 625 self._logger.spam("AgentWebSocketClient.keep_alive LEAVE") 626 return False 627 628 self._logger.notice("keep_alive succeeded") 629 self._logger.spam("AgentWebSocketClient.keep_alive LEAVE") 630 631 return True 632 633 def _close_message(self) -> bool: 634 # TODO: No known API close message # pylint: disable=fixme 635 # return self.send(json.dumps({"type": "Close"})) 636 return True 637 638 # closes the WebSocket connection gracefully 639 def finish(self) -> bool: 640 """ 641 Closes the WebSocket connection gracefully. 642 """ 643 self._logger.spam("AgentWebSocketClient.finish ENTER") 644 645 # call parent finish 646 if super().finish() is False: 647 self._logger.error("AgentWebSocketClient.finish failed") 648 649 if self._microphone is not None and self._microphone_created: 650 self._microphone.finish() 651 self._microphone_created = False 652 653 if self._speaker is not None and self._speaker_created: 654 self._speaker.finish() 655 self._speaker_created = False 656 657 # debug the threads 658 for thread in threading.enumerate(): 659 self._logger.debug("before running thread: %s", thread.name) 660 self._logger.debug("number of active threads: %s", threading.active_count()) 661 662 # stop the threads 663 self._logger.verbose("cancelling tasks...") 664 if self._keep_alive_thread is not None: 665 self._keep_alive_thread.join() 666 self._keep_alive_thread = None 667 self._logger.notice("processing _keep_alive_thread thread joined") 668 669 if self._listen_thread is not None: 670 self._listen_thread.join() 671 self._listen_thread = None 672 self._logger.notice("listening thread joined") 673 674 self._speaker = None 675 self._microphone = None 676 677 # debug the threads 678 for thread in threading.enumerate(): 679 self._logger.debug("before running thread: %s", thread.name) 680 self._logger.debug("number of active threads: %s", threading.active_count()) 681 682 self._logger.notice("finish succeeded") 683 self._logger.spam("AgentWebSocketClient.finish LEAVE") 684 return True
Client for interacting with Deepgram's live transcription services over WebSockets.
This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
Args: config (DeepgramClientOptions): all the options for the client.
AgentWebSocketClient(config: deepgram.options.DeepgramClientOptions)
91 def __init__(self, config: DeepgramClientOptions): 92 if config is None: 93 raise DeepgramError("Config is required") 94 95 self._logger = verboselogs.VerboseLogger(__name__) 96 self._logger.addHandler(logging.StreamHandler()) 97 self._logger.setLevel(config.verbose) 98 99 self._config = config 100 101 # needs to be "wss://agent.deepgram.com/agent" 102 self._endpoint = "agent" 103 104 # override the endpoint since it needs to be "wss://agent.deepgram.com/agent" 105 self._config.url = "agent.deepgram.com" 106 107 self._keep_alive_thread = None 108 109 # init handlers 110 self._event_handlers = { 111 event: [] for event in AgentWebSocketEvents.__members__.values() 112 } 113 114 if self._config.options.get("microphone_record") == "true": 115 self._logger.info("microphone_record is enabled") 116 rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE) 117 channels = self._config.options.get( 118 "microphone_record_channels", MICROPHONE_CHANNELS 119 ) 120 device_index = self._config.options.get("microphone_record_device_index") 121 122 self._logger.debug("rate: %s", rate) 123 self._logger.debug("channels: %s", channels) 124 125 self._microphone_created = True 126 127 if device_index is not None: 128 self._logger.debug("device_index: %s", device_index) 129 self._microphone = Microphone( 130 rate=rate, 131 channels=channels, 132 verbose=self._config.verbose, 133 input_device_index=device_index, 134 ) 135 else: 136 self._microphone = Microphone( 137 rate=rate, 138 channels=channels, 139 verbose=self._config.verbose, 140 ) 141 142 if self._config.options.get("speaker_playback") == "true": 143 self._logger.info("speaker_playback is enabled") 144 rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE) 145 channels = self._config.options.get( 146 "speaker_playback_channels", SPEAKER_CHANNELS 147 ) 148 playback_delta_in_ms = self._config.options.get( 149 "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA 150 ) 151 device_index = self._config.options.get("speaker_playback_device_index") 152 153 self._logger.debug("rate: %s", rate) 154 self._logger.debug("channels: %s", channels) 155 156 self._speaker_created = True 157 158 if device_index is not None: 159 self._logger.debug("device_index: %s", device_index) 160 161 self._speaker = Speaker( 162 rate=rate, 163 channels=channels, 164 last_play_delta_in_ms=playback_delta_in_ms, 165 verbose=self._config.verbose, 166 output_device_index=device_index, 167 microphone=self._microphone, 168 ) 169 else: 170 self._speaker = Speaker( 171 rate=rate, 172 channels=channels, 173 last_play_delta_in_ms=playback_delta_in_ms, 174 verbose=self._config.verbose, 175 microphone=self._microphone, 176 ) 177 178 # call the parent constructor 179 super().__init__(self._config, self._endpoint)
def
start( self, options: Optional[deepgram.clients.agent.v1.websocket.options.SettingsConfigurationOptions] = None, addons: Optional[Dict] = None, headers: Optional[Dict] = None, members: Optional[Dict] = None, **kwargs) -> bool:
182 def start( 183 self, 184 options: Optional[SettingsConfigurationOptions] = None, 185 addons: Optional[Dict] = None, 186 headers: Optional[Dict] = None, 187 members: Optional[Dict] = None, 188 **kwargs, 189 ) -> bool: 190 """ 191 Starts the WebSocket connection for agent API. 192 """ 193 self._logger.debug("AgentWebSocketClient.start ENTER") 194 self._logger.info("settings: %s", options) 195 self._logger.info("addons: %s", addons) 196 self._logger.info("headers: %s", headers) 197 self._logger.info("members: %s", members) 198 self._logger.info("kwargs: %s", kwargs) 199 200 if isinstance(options, SettingsConfigurationOptions) and not options.check(): 201 self._logger.error("settings.check failed") 202 self._logger.debug("AgentWebSocketClient.start LEAVE") 203 raise DeepgramError("Fatal agent settings error") 204 205 self._addons = addons 206 self._headers = headers 207 208 # add "members" as members of the class 209 if members is not None: 210 self.__dict__.update(members) 211 212 # set kwargs as members of the class 213 if kwargs is not None: 214 self._kwargs = kwargs 215 else: 216 self._kwargs = {} 217 218 if isinstance(options, SettingsConfigurationOptions): 219 self._logger.info("options is class") 220 self._settings = options 221 elif isinstance(options, dict): 222 self._logger.info("options is dict") 223 self._settings = SettingsConfigurationOptions.from_dict(options) 224 elif isinstance(options, str): 225 self._logger.info("options is json") 226 self._settings = SettingsConfigurationOptions.from_json(options) 227 else: 228 raise DeepgramError("Invalid options type") 229 230 if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"): 231 raise DeepgramError("Keyterms are only supported for nova-3 models") 232 233 try: 234 # speaker substitutes the listening thread 235 if self._speaker is not None: 236 self._logger.notice("passing speaker to delegate_listening") 237 super().delegate_listening(self._speaker) 238 239 # call parent start 240 if ( 241 super().start( 242 {}, 243 self._addons, 244 self._headers, 245 **dict(cast(Dict[Any, Any], self._kwargs)), 246 ) 247 is False 248 ): 249 self._logger.error("AgentWebSocketClient.start failed") 250 self._logger.debug("AgentWebSocketClient.start LEAVE") 251 return False 252 253 if self._speaker is not None: 254 self._logger.notice("speaker is delegate_listening. Starting speaker") 255 self._speaker.start() 256 257 if self._speaker is not None and self._microphone is not None: 258 self._logger.notice( 259 "speaker is delegate_listening. Starting microphone" 260 ) 261 self._microphone.set_callback(self.send) 262 self._microphone.start() 263 264 # debug the threads 265 for thread in threading.enumerate(): 266 self._logger.debug("after running thread: %s", thread.name) 267 self._logger.debug("number of active threads: %s", threading.active_count()) 268 269 # keepalive thread 270 if self._config.is_keep_alive_enabled(): 271 self._logger.notice("keepalive is enabled") 272 self._keep_alive_thread = threading.Thread(target=self._keep_alive) 273 self._keep_alive_thread.start() 274 else: 275 self._logger.notice("keepalive is disabled") 276 277 # debug the threads 278 for thread in threading.enumerate(): 279 self._logger.debug("after running thread: %s", thread.name) 280 self._logger.debug("number of active threads: %s", threading.active_count()) 281 282 # send the configurationsetting message 283 self._logger.notice("Sending ConfigurationSettings...") 284 ret_send_cs = self.send(str(self._settings)) 285 if not ret_send_cs: 286 self._logger.error("ConfigurationSettings failed") 287 288 err_error: ErrorResponse = ErrorResponse( 289 "Exception in AgentWebSocketClient.start", 290 "ConfigurationSettings failed to send", 291 "Exception", 292 ) 293 self._emit( 294 AgentWebSocketEvents(AgentWebSocketEvents.Error), 295 error=err_error, 296 **dict(cast(Dict[Any, Any], self._kwargs)), 297 ) 298 299 self._logger.debug("AgentWebSocketClient.start LEAVE") 300 return False 301 302 self._logger.notice("start succeeded") 303 self._logger.debug("AgentWebSocketClient.start LEAVE") 304 return True 305 306 except Exception as e: # pylint: disable=broad-except 307 self._logger.error( 308 "WebSocketException in AgentWebSocketClient.start: %s", e 309 ) 310 self._logger.debug("AgentWebSocketClient.start LEAVE") 311 if self._config.options.get("termination_exception_connect") is True: 312 raise e 313 return False
Starts the WebSocket connection for agent API.
317 def on(self, event: AgentWebSocketEvents, handler: Callable) -> None: 318 """ 319 Registers event handlers for specific events. 320 """ 321 self._logger.info("event subscribed: %s", event) 322 if event in AgentWebSocketEvents.__members__.values() and callable(handler): 323 self._event_handlers[event].append(handler)
Registers event handlers for specific events.
def
keep_alive(self) -> bool:
614 def keep_alive(self) -> bool: 615 """ 616 Sends a KeepAlive message 617 """ 618 self._logger.spam("AgentWebSocketClient.keep_alive ENTER") 619 620 self._logger.notice("Sending KeepAlive...") 621 ret = self.send(json.dumps({"type": "KeepAlive"})) 622 623 if not ret: 624 self._logger.error("keep_alive failed") 625 self._logger.spam("AgentWebSocketClient.keep_alive LEAVE") 626 return False 627 628 self._logger.notice("keep_alive succeeded") 629 self._logger.spam("AgentWebSocketClient.keep_alive LEAVE") 630 631 return True
Sends a KeepAlive message
def
finish(self) -> bool:
639 def finish(self) -> bool: 640 """ 641 Closes the WebSocket connection gracefully. 642 """ 643 self._logger.spam("AgentWebSocketClient.finish ENTER") 644 645 # call parent finish 646 if super().finish() is False: 647 self._logger.error("AgentWebSocketClient.finish failed") 648 649 if self._microphone is not None and self._microphone_created: 650 self._microphone.finish() 651 self._microphone_created = False 652 653 if self._speaker is not None and self._speaker_created: 654 self._speaker.finish() 655 self._speaker_created = False 656 657 # debug the threads 658 for thread in threading.enumerate(): 659 self._logger.debug("before running thread: %s", thread.name) 660 self._logger.debug("number of active threads: %s", threading.active_count()) 661 662 # stop the threads 663 self._logger.verbose("cancelling tasks...") 664 if self._keep_alive_thread is not None: 665 self._keep_alive_thread.join() 666 self._keep_alive_thread = None 667 self._logger.notice("processing _keep_alive_thread thread joined") 668 669 if self._listen_thread is not None: 670 self._listen_thread.join() 671 self._listen_thread = None 672 self._logger.notice("listening thread joined") 673 674 self._speaker = None 675 self._microphone = None 676 677 # debug the threads 678 for thread in threading.enumerate(): 679 self._logger.debug("before running thread: %s", thread.name) 680 self._logger.debug("number of active threads: %s", threading.active_count()) 681 682 self._logger.notice("finish succeeded") 683 self._logger.spam("AgentWebSocketClient.finish LEAVE") 684 return True
Closes the WebSocket connection gracefully.