deepgram.clients.agent.client
1# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. 2# Use of this source code is governed by a MIT license that can be found in the LICENSE file. 3# SPDX-License-Identifier: MIT 4 5# websocket 6from .v1 import ( 7 AgentWebSocketClient as LatestAgentWebSocketClient, 8 AsyncAgentWebSocketClient as LatestAsyncAgentWebSocketClient, 9) 10 11from .v1 import ( 12 #### common websocket response 13 BaseResponse as LatestBaseResponse, 14 OpenResponse as LatestOpenResponse, 15 CloseResponse as LatestCloseResponse, 16 ErrorResponse as LatestErrorResponse, 17 UnhandledResponse as LatestUnhandledResponse, 18 #### unique 19 WelcomeResponse as LatestWelcomeResponse, 20 SettingsAppliedResponse as LatestSettingsAppliedResponse, 21 ConversationTextResponse as LatestConversationTextResponse, 22 UserStartedSpeakingResponse as LatestUserStartedSpeakingResponse, 23 AgentThinkingResponse as LatestAgentThinkingResponse, 24 FunctionCalling as LatestFunctionCalling, 25 FunctionCallRequest as LatestFunctionCallRequest, 26 AgentStartedSpeakingResponse as LatestAgentStartedSpeakingResponse, 27 AgentAudioDoneResponse as LatestAgentAudioDoneResponse, 28 InjectionRefusedResponse as LatestInjectionRefusedResponse, 29) 30 31from .v1 import ( 32 # top level 33 SettingsConfigurationOptions as LatestSettingsConfigurationOptions, 34 UpdateInstructionsOptions as LatestUpdateInstructionsOptions, 35 UpdateSpeakOptions as LatestUpdateSpeakOptions, 36 InjectAgentMessageOptions as LatestInjectAgentMessageOptions, 37 FunctionCallResponse as LatestFunctionCallResponse, 38 AgentKeepAlive as LatestAgentKeepAlive, 39 # sub level 40 Listen as LatestListen, 41 Speak as LatestSpeak, 42 Header as LatestHeader, 43 Item as LatestItem, 44 Properties as LatestProperties, 45 Parameters as LatestParameters, 46 Function as LatestFunction, 47 Provider as LatestProvider, 48 Think as LatestThink, 49 Agent as LatestAgent, 50 Input as LatestInput, 51 Output as LatestOutput, 52 Audio as LatestAudio, 53 Context as LatestContext, 54) 55 56 57# The vX/client.py points to the current supported version in the SDK. 58# Older versions are supported in the SDK for backwards compatibility. 59 60AgentWebSocketClient = LatestAgentWebSocketClient 61AsyncAgentWebSocketClient = LatestAsyncAgentWebSocketClient 62 63OpenResponse = LatestOpenResponse 64CloseResponse = LatestCloseResponse 65ErrorResponse = LatestErrorResponse 66UnhandledResponse = LatestUnhandledResponse 67 68WelcomeResponse = LatestWelcomeResponse 69SettingsAppliedResponse = LatestSettingsAppliedResponse 70ConversationTextResponse = LatestConversationTextResponse 71UserStartedSpeakingResponse = LatestUserStartedSpeakingResponse 72AgentThinkingResponse = LatestAgentThinkingResponse 73FunctionCalling = LatestFunctionCalling 74FunctionCallRequest = LatestFunctionCallRequest 75AgentStartedSpeakingResponse = LatestAgentStartedSpeakingResponse 76AgentAudioDoneResponse = LatestAgentAudioDoneResponse 77InjectionRefusedResponse = LatestInjectionRefusedResponse 78 79 80SettingsConfigurationOptions = LatestSettingsConfigurationOptions 81UpdateInstructionsOptions = LatestUpdateInstructionsOptions 82UpdateSpeakOptions = LatestUpdateSpeakOptions 83InjectAgentMessageOptions = LatestInjectAgentMessageOptions 84FunctionCallResponse = LatestFunctionCallResponse 85AgentKeepAlive = LatestAgentKeepAlive 86 87Listen = LatestListen 88Speak = LatestSpeak 89Header = LatestHeader 90Item = LatestItem 91Properties = LatestProperties 92Parameters = LatestParameters 93Function = LatestFunction 94Provider = LatestProvider 95Think = LatestThink 96Agent = LatestAgent 97Input = LatestInput 98Output = LatestOutput 99Audio = LatestAudio 100Context = LatestContext
60class AgentWebSocketClient( 61 AbstractSyncWebSocketClient 62): # pylint: disable=too-many-instance-attributes 63 """ 64 Client for interacting with Deepgram's live transcription services over WebSockets. 65 66 This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events. 67 68 Args: 69 config (DeepgramClientOptions): all the options for the client. 70 """ 71 72 _logger: verboselogs.VerboseLogger 73 _config: DeepgramClientOptions 74 _endpoint: str 75 76 _event_handlers: Dict[AgentWebSocketEvents, list] 77 78 _keep_alive_thread: Union[threading.Thread, None] 79 80 _kwargs: Optional[Dict] = None 81 _addons: Optional[Dict] = None 82 # note the distinction here. We can't use _config because it's already used in the parent 83 _settings: Optional[SettingsConfigurationOptions] = None 84 _headers: Optional[Dict] = None 85 86 _speaker_created: bool = False 87 _speaker: Optional[Speaker] = None 88 _microphone_created: bool = False 89 _microphone: Optional[Microphone] = None 90 91 def __init__(self, config: DeepgramClientOptions): 92 if config is None: 93 raise DeepgramError("Config is required") 94 95 self._logger = verboselogs.VerboseLogger(__name__) 96 self._logger.addHandler(logging.StreamHandler()) 97 self._logger.setLevel(config.verbose) 98 99 self._config = config 100 101 # needs to be "wss://agent.deepgram.com/agent" 102 self._endpoint = "agent" 103 104 # override the endpoint since it needs to be "wss://agent.deepgram.com/agent" 105 self._config.url = "agent.deepgram.com" 106 107 self._keep_alive_thread = None 108 109 # init handlers 110 self._event_handlers = { 111 event: [] for event in AgentWebSocketEvents.__members__.values() 112 } 113 114 if self._config.options.get("microphone_record") == "true": 115 self._logger.info("microphone_record is enabled") 116 rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE) 117 channels = self._config.options.get( 118 "microphone_record_channels", MICROPHONE_CHANNELS 119 ) 120 device_index = self._config.options.get("microphone_record_device_index") 121 122 self._logger.debug("rate: %s", rate) 123 self._logger.debug("channels: %s", channels) 124 125 self._microphone_created = True 126 127 if device_index is not None: 128 self._logger.debug("device_index: %s", device_index) 129 self._microphone = Microphone( 130 rate=rate, 131 channels=channels, 132 verbose=self._config.verbose, 133 input_device_index=device_index, 134 ) 135 else: 136 self._microphone = Microphone( 137 rate=rate, 138 channels=channels, 139 verbose=self._config.verbose, 140 ) 141 142 if self._config.options.get("speaker_playback") == "true": 143 self._logger.info("speaker_playback is enabled") 144 rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE) 145 channels = self._config.options.get( 146 "speaker_playback_channels", SPEAKER_CHANNELS 147 ) 148 playback_delta_in_ms = self._config.options.get( 149 "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA 150 ) 151 device_index = self._config.options.get("speaker_playback_device_index") 152 153 self._logger.debug("rate: %s", rate) 154 self._logger.debug("channels: %s", channels) 155 156 self._speaker_created = True 157 158 if device_index is not None: 159 self._logger.debug("device_index: %s", device_index) 160 161 self._speaker = Speaker( 162 rate=rate, 163 channels=channels, 164 last_play_delta_in_ms=playback_delta_in_ms, 165 verbose=self._config.verbose, 166 output_device_index=device_index, 167 microphone=self._microphone, 168 ) 169 else: 170 self._speaker = Speaker( 171 rate=rate, 172 channels=channels, 173 last_play_delta_in_ms=playback_delta_in_ms, 174 verbose=self._config.verbose, 175 microphone=self._microphone, 176 ) 177 178 # call the parent constructor 179 super().__init__(self._config, self._endpoint) 180 181 # pylint: disable=too-many-statements,too-many-branches 182 def start( 183 self, 184 options: Optional[SettingsConfigurationOptions] = None, 185 addons: Optional[Dict] = None, 186 headers: Optional[Dict] = None, 187 members: Optional[Dict] = None, 188 **kwargs, 189 ) -> bool: 190 """ 191 Starts the WebSocket connection for agent API. 192 """ 193 self._logger.debug("AgentWebSocketClient.start ENTER") 194 self._logger.info("settings: %s", options) 195 self._logger.info("addons: %s", addons) 196 self._logger.info("headers: %s", headers) 197 self._logger.info("members: %s", members) 198 self._logger.info("kwargs: %s", kwargs) 199 200 if isinstance(options, SettingsConfigurationOptions) and not options.check(): 201 self._logger.error("settings.check failed") 202 self._logger.debug("AgentWebSocketClient.start LEAVE") 203 raise DeepgramError("Fatal agent settings error") 204 205 self._addons = addons 206 self._headers = headers 207 208 # add "members" as members of the class 209 if members is not None: 210 self.__dict__.update(members) 211 212 # set kwargs as members of the class 213 if kwargs is not None: 214 self._kwargs = kwargs 215 else: 216 self._kwargs = {} 217 218 if isinstance(options, SettingsConfigurationOptions): 219 self._logger.info("options is class") 220 self._settings = options 221 elif isinstance(options, dict): 222 self._logger.info("options is dict") 223 self._settings = SettingsConfigurationOptions.from_dict(options) 224 elif isinstance(options, str): 225 self._logger.info("options is json") 226 self._settings = SettingsConfigurationOptions.from_json(options) 227 else: 228 raise DeepgramError("Invalid options type") 229 230 if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"): 231 raise DeepgramError("Keyterms are only supported for nova-3 models") 232 233 try: 234 # speaker substitutes the listening thread 235 if self._speaker is not None: 236 self._logger.notice("passing speaker to delegate_listening") 237 super().delegate_listening(self._speaker) 238 239 # call parent start 240 if ( 241 super().start( 242 {}, 243 self._addons, 244 self._headers, 245 **dict(cast(Dict[Any, Any], self._kwargs)), 246 ) 247 is False 248 ): 249 self._logger.error("AgentWebSocketClient.start failed") 250 self._logger.debug("AgentWebSocketClient.start LEAVE") 251 return False 252 253 if self._speaker is not None: 254 self._logger.notice("speaker is delegate_listening. Starting speaker") 255 self._speaker.start() 256 257 if self._speaker is not None and self._microphone is not None: 258 self._logger.notice( 259 "speaker is delegate_listening. Starting microphone" 260 ) 261 self._microphone.set_callback(self.send) 262 self._microphone.start() 263 264 # debug the threads 265 for thread in threading.enumerate(): 266 self._logger.debug("after running thread: %s", thread.name) 267 self._logger.debug("number of active threads: %s", threading.active_count()) 268 269 # keepalive thread 270 if self._config.is_keep_alive_enabled(): 271 self._logger.notice("keepalive is enabled") 272 self._keep_alive_thread = threading.Thread(target=self._keep_alive) 273 self._keep_alive_thread.start() 274 else: 275 self._logger.notice("keepalive is disabled") 276 277 # debug the threads 278 for thread in threading.enumerate(): 279 self._logger.debug("after running thread: %s", thread.name) 280 self._logger.debug("number of active threads: %s", threading.active_count()) 281 282 # send the configurationsetting message 283 self._logger.notice("Sending ConfigurationSettings...") 284 ret_send_cs = self.send(str(self._settings)) 285 if not ret_send_cs: 286 self._logger.error("ConfigurationSettings failed") 287 288 err_error: ErrorResponse = ErrorResponse( 289 "Exception in AgentWebSocketClient.start", 290 "ConfigurationSettings failed to send", 291 "Exception", 292 ) 293 self._emit( 294 AgentWebSocketEvents(AgentWebSocketEvents.Error), 295 error=err_error, 296 **dict(cast(Dict[Any, Any], self._kwargs)), 297 ) 298 299 self._logger.debug("AgentWebSocketClient.start LEAVE") 300 return False 301 302 self._logger.notice("start succeeded") 303 self._logger.debug("AgentWebSocketClient.start LEAVE") 304 return True 305 306 except Exception as e: # pylint: disable=broad-except 307 self._logger.error( 308 "WebSocketException in AgentWebSocketClient.start: %s", e 309 ) 310 self._logger.debug("AgentWebSocketClient.start LEAVE") 311 if self._config.options.get("termination_exception_connect") is True: 312 raise e 313 return False 314 315 # pylint: enable=too-many-statements,too-many-branches 316 317 def on(self, event: AgentWebSocketEvents, handler: Callable) -> None: 318 """ 319 Registers event handlers for specific events. 320 """ 321 self._logger.info("event subscribed: %s", event) 322 if event in AgentWebSocketEvents.__members__.values() and callable(handler): 323 self._event_handlers[event].append(handler) 324 325 def _emit(self, event: AgentWebSocketEvents, *args, **kwargs) -> None: 326 """ 327 Emits events to the registered event handlers. 328 """ 329 self._logger.debug("AgentWebSocketClient._emit ENTER") 330 self._logger.debug("callback handlers for: %s", event) 331 332 # debug the threads 333 for thread in threading.enumerate(): 334 self._logger.debug("after running thread: %s", thread.name) 335 self._logger.debug("number of active threads: %s", threading.active_count()) 336 337 self._logger.debug("callback handlers for: %s", event) 338 for handler in self._event_handlers[event]: 339 handler(self, *args, **kwargs) 340 341 # debug the threads 342 for thread in threading.enumerate(): 343 self._logger.debug("after running thread: %s", thread.name) 344 self._logger.debug("number of active threads: %s", threading.active_count()) 345 346 self._logger.debug("AgentWebSocketClient._emit LEAVE") 347 348 # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches 349 def _process_text(self, message: str) -> None: 350 """ 351 Processes messages received over the WebSocket connection. 352 """ 353 self._logger.debug("AgentWebSocketClient._process_text ENTER") 354 355 try: 356 self._logger.debug("Text data received") 357 if len(message) == 0: 358 self._logger.debug("message is empty") 359 self._logger.debug("AgentWebSocketClient._process_text LEAVE") 360 return 361 362 data = json.loads(message) 363 response_type = data.get("type") 364 self._logger.debug("response_type: %s, data: %s", response_type, data) 365 366 match response_type: 367 case AgentWebSocketEvents.Open: 368 open_result: OpenResponse = OpenResponse.from_json(message) 369 self._logger.verbose("OpenResponse: %s", open_result) 370 self._emit( 371 AgentWebSocketEvents(AgentWebSocketEvents.Open), 372 open=open_result, 373 **dict(cast(Dict[Any, Any], self._kwargs)), 374 ) 375 case AgentWebSocketEvents.Welcome: 376 welcome_result: WelcomeResponse = WelcomeResponse.from_json(message) 377 self._logger.verbose("WelcomeResponse: %s", welcome_result) 378 self._emit( 379 AgentWebSocketEvents(AgentWebSocketEvents.Welcome), 380 welcome=welcome_result, 381 **dict(cast(Dict[Any, Any], self._kwargs)), 382 ) 383 case AgentWebSocketEvents.SettingsApplied: 384 settings_applied_result: SettingsAppliedResponse = ( 385 SettingsAppliedResponse.from_json(message) 386 ) 387 self._logger.verbose( 388 "SettingsAppliedResponse: %s", settings_applied_result 389 ) 390 self._emit( 391 AgentWebSocketEvents(AgentWebSocketEvents.SettingsApplied), 392 settings_applied=settings_applied_result, 393 **dict(cast(Dict[Any, Any], self._kwargs)), 394 ) 395 case AgentWebSocketEvents.ConversationText: 396 conversation_text_result: ConversationTextResponse = ( 397 ConversationTextResponse.from_json(message) 398 ) 399 self._logger.verbose( 400 "ConversationTextResponse: %s", conversation_text_result 401 ) 402 self._emit( 403 AgentWebSocketEvents(AgentWebSocketEvents.ConversationText), 404 conversation_text=conversation_text_result, 405 **dict(cast(Dict[Any, Any], self._kwargs)), 406 ) 407 case AgentWebSocketEvents.UserStartedSpeaking: 408 user_started_speaking_result: UserStartedSpeakingResponse = ( 409 UserStartedSpeakingResponse.from_json(message) 410 ) 411 self._logger.verbose( 412 "UserStartedSpeakingResponse: %s", user_started_speaking_result 413 ) 414 self._emit( 415 AgentWebSocketEvents(AgentWebSocketEvents.UserStartedSpeaking), 416 user_started_speaking=user_started_speaking_result, 417 **dict(cast(Dict[Any, Any], self._kwargs)), 418 ) 419 case AgentWebSocketEvents.AgentThinking: 420 agent_thinking_result: AgentThinkingResponse = ( 421 AgentThinkingResponse.from_json(message) 422 ) 423 self._logger.verbose( 424 "AgentThinkingResponse: %s", agent_thinking_result 425 ) 426 self._emit( 427 AgentWebSocketEvents(AgentWebSocketEvents.AgentThinking), 428 agent_thinking=agent_thinking_result, 429 **dict(cast(Dict[Any, Any], self._kwargs)), 430 ) 431 case AgentWebSocketEvents.FunctionCalling: 432 function_calling_result: FunctionCalling = ( 433 FunctionCalling.from_json(message) 434 ) 435 self._logger.verbose("FunctionCalling: %s", function_calling_result) 436 self._emit( 437 AgentWebSocketEvents(AgentWebSocketEvents.FunctionCalling), 438 function_calling=function_calling_result, 439 **dict(cast(Dict[Any, Any], self._kwargs)), 440 ) 441 case AgentWebSocketEvents.FunctionCallRequest: 442 function_call_request_result: FunctionCallRequest = ( 443 FunctionCallRequest.from_json(message) 444 ) 445 self._logger.verbose( 446 "FunctionCallRequest: %s", function_call_request_result 447 ) 448 self._emit( 449 AgentWebSocketEvents(AgentWebSocketEvents.FunctionCallRequest), 450 function_call_request=function_call_request_result, 451 **dict(cast(Dict[Any, Any], self._kwargs)), 452 ) 453 case AgentWebSocketEvents.AgentStartedSpeaking: 454 agent_started_speaking_result: AgentStartedSpeakingResponse = ( 455 AgentStartedSpeakingResponse.from_json(message) 456 ) 457 self._logger.verbose( 458 "AgentStartedSpeakingResponse: %s", 459 agent_started_speaking_result, 460 ) 461 self._emit( 462 AgentWebSocketEvents(AgentWebSocketEvents.AgentStartedSpeaking), 463 agent_started_speaking=agent_started_speaking_result, 464 **dict(cast(Dict[Any, Any], self._kwargs)), 465 ) 466 case AgentWebSocketEvents.AgentAudioDone: 467 agent_audio_done_result: AgentAudioDoneResponse = ( 468 AgentAudioDoneResponse.from_json(message) 469 ) 470 self._logger.verbose( 471 "AgentAudioDoneResponse: %s", agent_audio_done_result 472 ) 473 self._emit( 474 AgentWebSocketEvents(AgentWebSocketEvents.AgentAudioDone), 475 agent_audio_done=agent_audio_done_result, 476 **dict(cast(Dict[Any, Any], self._kwargs)), 477 ) 478 case AgentWebSocketEvents.InjectionRefused: 479 injection_refused_result: InjectionRefusedResponse = ( 480 InjectionRefusedResponse.from_json(message) 481 ) 482 self._logger.verbose( 483 "InjectionRefused: %s", injection_refused_result 484 ) 485 self._emit( 486 AgentWebSocketEvents(AgentWebSocketEvents.InjectionRefused), 487 injection_refused=injection_refused_result, 488 **dict(cast(Dict[Any, Any], self._kwargs)), 489 ) 490 case AgentWebSocketEvents.Close: 491 close_result: CloseResponse = CloseResponse.from_json(message) 492 self._logger.verbose("CloseResponse: %s", close_result) 493 self._emit( 494 AgentWebSocketEvents(AgentWebSocketEvents.Close), 495 close=close_result, 496 **dict(cast(Dict[Any, Any], self._kwargs)), 497 ) 498 case AgentWebSocketEvents.Error: 499 err_error: ErrorResponse = ErrorResponse.from_json(message) 500 self._logger.verbose("ErrorResponse: %s", err_error) 501 self._emit( 502 AgentWebSocketEvents(AgentWebSocketEvents.Error), 503 error=err_error, 504 **dict(cast(Dict[Any, Any], self._kwargs)), 505 ) 506 case _: 507 self._logger.warning( 508 "Unknown Message: response_type: %s, data: %s", 509 response_type, 510 data, 511 ) 512 unhandled_error: UnhandledResponse = UnhandledResponse( 513 type=AgentWebSocketEvents(AgentWebSocketEvents.Unhandled), 514 raw=message, 515 ) 516 self._emit( 517 AgentWebSocketEvents(AgentWebSocketEvents.Unhandled), 518 unhandled=unhandled_error, 519 **dict(cast(Dict[Any, Any], self._kwargs)), 520 ) 521 522 self._logger.notice("_process_text Succeeded") 523 self._logger.debug("SpeakStreamClient._process_text LEAVE") 524 525 except Exception as e: # pylint: disable=broad-except 526 self._logger.error("Exception in AgentWebSocketClient._process_text: %s", e) 527 e_error: ErrorResponse = ErrorResponse( 528 "Exception in AgentWebSocketClient._process_text", 529 f"{e}", 530 "Exception", 531 ) 532 self._logger.error( 533 "Exception in AgentWebSocketClient._process_text: %s", str(e) 534 ) 535 self._emit( 536 AgentWebSocketEvents(AgentWebSocketEvents.Error), 537 error=e_error, 538 **dict(cast(Dict[Any, Any], self._kwargs)), 539 ) 540 541 # signal exit and close 542 super()._signal_exit() 543 544 self._logger.debug("AgentWebSocketClient._process_text LEAVE") 545 546 if self._config.options.get("termination_exception") is True: 547 raise 548 return 549 550 # pylint: enable=too-many-return-statements,too-many-statements 551 552 def _process_binary(self, message: bytes) -> None: 553 self._logger.debug("AgentWebSocketClient._process_binary ENTER") 554 self._logger.debug("Binary data received") 555 556 self._emit( 557 AgentWebSocketEvents(AgentWebSocketEvents.AudioData), 558 data=message, 559 **dict(cast(Dict[Any, Any], self._kwargs)), 560 ) 561 562 self._logger.notice("_process_binary Succeeded") 563 self._logger.debug("AgentWebSocketClient._process_binary LEAVE") 564 565 # pylint: disable=too-many-return-statements 566 def _keep_alive(self) -> None: 567 """ 568 Sends keepalive messages to the WebSocket connection. 569 """ 570 self._logger.debug("AgentWebSocketClient._keep_alive ENTER") 571 572 counter = 0 573 while True: 574 try: 575 counter += 1 576 self._exit_event.wait(timeout=ONE_SECOND) 577 578 if self._exit_event.is_set(): 579 self._logger.notice("_keep_alive exiting gracefully") 580 self._logger.debug("AgentWebSocketClient._keep_alive LEAVE") 581 return 582 583 # deepgram keepalive 584 if counter % DEEPGRAM_INTERVAL == 0: 585 self.keep_alive() 586 587 except Exception as e: # pylint: disable=broad-except 588 self._logger.error( 589 "Exception in AgentWebSocketClient._keep_alive: %s", e 590 ) 591 e_error: ErrorResponse = ErrorResponse( 592 "Exception in AgentWebSocketClient._keep_alive", 593 f"{e}", 594 "Exception", 595 ) 596 self._logger.error( 597 "Exception in AgentWebSocketClient._keep_alive: %s", str(e) 598 ) 599 self._emit( 600 AgentWebSocketEvents(AgentWebSocketEvents.Error), 601 error=e_error, 602 **dict(cast(Dict[Any, Any], self._kwargs)), 603 ) 604 605 # signal exit and close 606 super()._signal_exit() 607 608 self._logger.debug("AgentWebSocketClient._keep_alive LEAVE") 609 610 if self._config.options.get("termination_exception") is True: 611 raise 612 return 613 614 def keep_alive(self) -> bool: 615 """ 616 Sends a KeepAlive message 617 """ 618 self._logger.spam("AgentWebSocketClient.keep_alive ENTER") 619 620 self._logger.notice("Sending KeepAlive...") 621 ret = self.send(json.dumps({"type": "KeepAlive"})) 622 623 if not ret: 624 self._logger.error("keep_alive failed") 625 self._logger.spam("AgentWebSocketClient.keep_alive LEAVE") 626 return False 627 628 self._logger.notice("keep_alive succeeded") 629 self._logger.spam("AgentWebSocketClient.keep_alive LEAVE") 630 631 return True 632 633 def _close_message(self) -> bool: 634 # TODO: No known API close message # pylint: disable=fixme 635 # return self.send(json.dumps({"type": "Close"})) 636 return True 637 638 # closes the WebSocket connection gracefully 639 def finish(self) -> bool: 640 """ 641 Closes the WebSocket connection gracefully. 642 """ 643 self._logger.spam("AgentWebSocketClient.finish ENTER") 644 645 # call parent finish 646 if super().finish() is False: 647 self._logger.error("AgentWebSocketClient.finish failed") 648 649 if self._microphone is not None and self._microphone_created: 650 self._microphone.finish() 651 self._microphone_created = False 652 653 if self._speaker is not None and self._speaker_created: 654 self._speaker.finish() 655 self._speaker_created = False 656 657 # debug the threads 658 for thread in threading.enumerate(): 659 self._logger.debug("before running thread: %s", thread.name) 660 self._logger.debug("number of active threads: %s", threading.active_count()) 661 662 # stop the threads 663 self._logger.verbose("cancelling tasks...") 664 if self._keep_alive_thread is not None: 665 self._keep_alive_thread.join() 666 self._keep_alive_thread = None 667 self._logger.notice("processing _keep_alive_thread thread joined") 668 669 if self._listen_thread is not None: 670 self._listen_thread.join() 671 self._listen_thread = None 672 self._logger.notice("listening thread joined") 673 674 self._speaker = None 675 self._microphone = None 676 677 # debug the threads 678 for thread in threading.enumerate(): 679 self._logger.debug("before running thread: %s", thread.name) 680 self._logger.debug("number of active threads: %s", threading.active_count()) 681 682 self._logger.notice("finish succeeded") 683 self._logger.spam("AgentWebSocketClient.finish LEAVE") 684 return True
Client for interacting with Deepgram's live transcription services over WebSockets.
This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
Args: config (DeepgramClientOptions): all the options for the client.
91 def __init__(self, config: DeepgramClientOptions): 92 if config is None: 93 raise DeepgramError("Config is required") 94 95 self._logger = verboselogs.VerboseLogger(__name__) 96 self._logger.addHandler(logging.StreamHandler()) 97 self._logger.setLevel(config.verbose) 98 99 self._config = config 100 101 # needs to be "wss://agent.deepgram.com/agent" 102 self._endpoint = "agent" 103 104 # override the endpoint since it needs to be "wss://agent.deepgram.com/agent" 105 self._config.url = "agent.deepgram.com" 106 107 self._keep_alive_thread = None 108 109 # init handlers 110 self._event_handlers = { 111 event: [] for event in AgentWebSocketEvents.__members__.values() 112 } 113 114 if self._config.options.get("microphone_record") == "true": 115 self._logger.info("microphone_record is enabled") 116 rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE) 117 channels = self._config.options.get( 118 "microphone_record_channels", MICROPHONE_CHANNELS 119 ) 120 device_index = self._config.options.get("microphone_record_device_index") 121 122 self._logger.debug("rate: %s", rate) 123 self._logger.debug("channels: %s", channels) 124 125 self._microphone_created = True 126 127 if device_index is not None: 128 self._logger.debug("device_index: %s", device_index) 129 self._microphone = Microphone( 130 rate=rate, 131 channels=channels, 132 verbose=self._config.verbose, 133 input_device_index=device_index, 134 ) 135 else: 136 self._microphone = Microphone( 137 rate=rate, 138 channels=channels, 139 verbose=self._config.verbose, 140 ) 141 142 if self._config.options.get("speaker_playback") == "true": 143 self._logger.info("speaker_playback is enabled") 144 rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE) 145 channels = self._config.options.get( 146 "speaker_playback_channels", SPEAKER_CHANNELS 147 ) 148 playback_delta_in_ms = self._config.options.get( 149 "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA 150 ) 151 device_index = self._config.options.get("speaker_playback_device_index") 152 153 self._logger.debug("rate: %s", rate) 154 self._logger.debug("channels: %s", channels) 155 156 self._speaker_created = True 157 158 if device_index is not None: 159 self._logger.debug("device_index: %s", device_index) 160 161 self._speaker = Speaker( 162 rate=rate, 163 channels=channels, 164 last_play_delta_in_ms=playback_delta_in_ms, 165 verbose=self._config.verbose, 166 output_device_index=device_index, 167 microphone=self._microphone, 168 ) 169 else: 170 self._speaker = Speaker( 171 rate=rate, 172 channels=channels, 173 last_play_delta_in_ms=playback_delta_in_ms, 174 verbose=self._config.verbose, 175 microphone=self._microphone, 176 ) 177 178 # call the parent constructor 179 super().__init__(self._config, self._endpoint)
182 def start( 183 self, 184 options: Optional[SettingsConfigurationOptions] = None, 185 addons: Optional[Dict] = None, 186 headers: Optional[Dict] = None, 187 members: Optional[Dict] = None, 188 **kwargs, 189 ) -> bool: 190 """ 191 Starts the WebSocket connection for agent API. 192 """ 193 self._logger.debug("AgentWebSocketClient.start ENTER") 194 self._logger.info("settings: %s", options) 195 self._logger.info("addons: %s", addons) 196 self._logger.info("headers: %s", headers) 197 self._logger.info("members: %s", members) 198 self._logger.info("kwargs: %s", kwargs) 199 200 if isinstance(options, SettingsConfigurationOptions) and not options.check(): 201 self._logger.error("settings.check failed") 202 self._logger.debug("AgentWebSocketClient.start LEAVE") 203 raise DeepgramError("Fatal agent settings error") 204 205 self._addons = addons 206 self._headers = headers 207 208 # add "members" as members of the class 209 if members is not None: 210 self.__dict__.update(members) 211 212 # set kwargs as members of the class 213 if kwargs is not None: 214 self._kwargs = kwargs 215 else: 216 self._kwargs = {} 217 218 if isinstance(options, SettingsConfigurationOptions): 219 self._logger.info("options is class") 220 self._settings = options 221 elif isinstance(options, dict): 222 self._logger.info("options is dict") 223 self._settings = SettingsConfigurationOptions.from_dict(options) 224 elif isinstance(options, str): 225 self._logger.info("options is json") 226 self._settings = SettingsConfigurationOptions.from_json(options) 227 else: 228 raise DeepgramError("Invalid options type") 229 230 if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"): 231 raise DeepgramError("Keyterms are only supported for nova-3 models") 232 233 try: 234 # speaker substitutes the listening thread 235 if self._speaker is not None: 236 self._logger.notice("passing speaker to delegate_listening") 237 super().delegate_listening(self._speaker) 238 239 # call parent start 240 if ( 241 super().start( 242 {}, 243 self._addons, 244 self._headers, 245 **dict(cast(Dict[Any, Any], self._kwargs)), 246 ) 247 is False 248 ): 249 self._logger.error("AgentWebSocketClient.start failed") 250 self._logger.debug("AgentWebSocketClient.start LEAVE") 251 return False 252 253 if self._speaker is not None: 254 self._logger.notice("speaker is delegate_listening. Starting speaker") 255 self._speaker.start() 256 257 if self._speaker is not None and self._microphone is not None: 258 self._logger.notice( 259 "speaker is delegate_listening. Starting microphone" 260 ) 261 self._microphone.set_callback(self.send) 262 self._microphone.start() 263 264 # debug the threads 265 for thread in threading.enumerate(): 266 self._logger.debug("after running thread: %s", thread.name) 267 self._logger.debug("number of active threads: %s", threading.active_count()) 268 269 # keepalive thread 270 if self._config.is_keep_alive_enabled(): 271 self._logger.notice("keepalive is enabled") 272 self._keep_alive_thread = threading.Thread(target=self._keep_alive) 273 self._keep_alive_thread.start() 274 else: 275 self._logger.notice("keepalive is disabled") 276 277 # debug the threads 278 for thread in threading.enumerate(): 279 self._logger.debug("after running thread: %s", thread.name) 280 self._logger.debug("number of active threads: %s", threading.active_count()) 281 282 # send the configurationsetting message 283 self._logger.notice("Sending ConfigurationSettings...") 284 ret_send_cs = self.send(str(self._settings)) 285 if not ret_send_cs: 286 self._logger.error("ConfigurationSettings failed") 287 288 err_error: ErrorResponse = ErrorResponse( 289 "Exception in AgentWebSocketClient.start", 290 "ConfigurationSettings failed to send", 291 "Exception", 292 ) 293 self._emit( 294 AgentWebSocketEvents(AgentWebSocketEvents.Error), 295 error=err_error, 296 **dict(cast(Dict[Any, Any], self._kwargs)), 297 ) 298 299 self._logger.debug("AgentWebSocketClient.start LEAVE") 300 return False 301 302 self._logger.notice("start succeeded") 303 self._logger.debug("AgentWebSocketClient.start LEAVE") 304 return True 305 306 except Exception as e: # pylint: disable=broad-except 307 self._logger.error( 308 "WebSocketException in AgentWebSocketClient.start: %s", e 309 ) 310 self._logger.debug("AgentWebSocketClient.start LEAVE") 311 if self._config.options.get("termination_exception_connect") is True: 312 raise e 313 return False
Starts the WebSocket connection for agent API.
317 def on(self, event: AgentWebSocketEvents, handler: Callable) -> None: 318 """ 319 Registers event handlers for specific events. 320 """ 321 self._logger.info("event subscribed: %s", event) 322 if event in AgentWebSocketEvents.__members__.values() and callable(handler): 323 self._event_handlers[event].append(handler)
Registers event handlers for specific events.
614 def keep_alive(self) -> bool: 615 """ 616 Sends a KeepAlive message 617 """ 618 self._logger.spam("AgentWebSocketClient.keep_alive ENTER") 619 620 self._logger.notice("Sending KeepAlive...") 621 ret = self.send(json.dumps({"type": "KeepAlive"})) 622 623 if not ret: 624 self._logger.error("keep_alive failed") 625 self._logger.spam("AgentWebSocketClient.keep_alive LEAVE") 626 return False 627 628 self._logger.notice("keep_alive succeeded") 629 self._logger.spam("AgentWebSocketClient.keep_alive LEAVE") 630 631 return True
Sends a KeepAlive message
639 def finish(self) -> bool: 640 """ 641 Closes the WebSocket connection gracefully. 642 """ 643 self._logger.spam("AgentWebSocketClient.finish ENTER") 644 645 # call parent finish 646 if super().finish() is False: 647 self._logger.error("AgentWebSocketClient.finish failed") 648 649 if self._microphone is not None and self._microphone_created: 650 self._microphone.finish() 651 self._microphone_created = False 652 653 if self._speaker is not None and self._speaker_created: 654 self._speaker.finish() 655 self._speaker_created = False 656 657 # debug the threads 658 for thread in threading.enumerate(): 659 self._logger.debug("before running thread: %s", thread.name) 660 self._logger.debug("number of active threads: %s", threading.active_count()) 661 662 # stop the threads 663 self._logger.verbose("cancelling tasks...") 664 if self._keep_alive_thread is not None: 665 self._keep_alive_thread.join() 666 self._keep_alive_thread = None 667 self._logger.notice("processing _keep_alive_thread thread joined") 668 669 if self._listen_thread is not None: 670 self._listen_thread.join() 671 self._listen_thread = None 672 self._logger.notice("listening thread joined") 673 674 self._speaker = None 675 self._microphone = None 676 677 # debug the threads 678 for thread in threading.enumerate(): 679 self._logger.debug("before running thread: %s", thread.name) 680 self._logger.debug("number of active threads: %s", threading.active_count()) 681 682 self._logger.notice("finish succeeded") 683 self._logger.spam("AgentWebSocketClient.finish LEAVE") 684 return True
Closes the WebSocket connection gracefully.
60class AsyncAgentWebSocketClient( 61 AbstractAsyncWebSocketClient 62): # pylint: disable=too-many-instance-attributes 63 """ 64 Client for interacting with Deepgram's live transcription services over WebSockets. 65 66 This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events. 67 68 Args: 69 config (DeepgramClientOptions): all the options for the client. 70 """ 71 72 _logger: verboselogs.VerboseLogger 73 _config: DeepgramClientOptions 74 _endpoint: str 75 76 _event_handlers: Dict[AgentWebSocketEvents, list] 77 78 _keep_alive_thread: Union[asyncio.Task, None] 79 80 _kwargs: Optional[Dict] = None 81 _addons: Optional[Dict] = None 82 # note the distinction here. We can't use _config because it's already used in the parent 83 _settings: Optional[SettingsConfigurationOptions] = None 84 _headers: Optional[Dict] = None 85 86 _speaker_created: bool = False 87 _speaker: Optional[Speaker] = None 88 _microphone_created: bool = False 89 _microphone: Optional[Microphone] = None 90 91 def __init__(self, config: DeepgramClientOptions): 92 if config is None: 93 raise DeepgramError("Config is required") 94 95 self._logger = verboselogs.VerboseLogger(__name__) 96 self._logger.addHandler(logging.StreamHandler()) 97 self._logger.setLevel(config.verbose) 98 99 self._config = config 100 101 # needs to be "wss://agent.deepgram.com/agent" 102 self._endpoint = "agent" 103 104 # override the endpoint since it needs to be "wss://agent.deepgram.com/agent" 105 self._config.url = "agent.deepgram.com" 106 self._keep_alive_thread = None 107 108 # init handlers 109 self._event_handlers = { 110 event: [] for event in AgentWebSocketEvents.__members__.values() 111 } 112 113 if self._config.options.get("microphone_record") == "true": 114 self._logger.info("microphone_record is enabled") 115 rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE) 116 channels = self._config.options.get( 117 "microphone_record_channels", MICROPHONE_CHANNELS 118 ) 119 device_index = self._config.options.get("microphone_record_device_index") 120 121 self._logger.debug("rate: %s", rate) 122 self._logger.debug("channels: %s", channels) 123 if device_index is not None: 124 self._logger.debug("device_index: %s", device_index) 125 126 self._microphone_created = True 127 128 if device_index is not None: 129 self._microphone = Microphone( 130 rate=rate, 131 channels=channels, 132 verbose=self._config.verbose, 133 input_device_index=device_index, 134 ) 135 else: 136 self._microphone = Microphone( 137 rate=rate, 138 channels=channels, 139 verbose=self._config.verbose, 140 ) 141 142 if self._config.options.get("speaker_playback") == "true": 143 self._logger.info("speaker_playback is enabled") 144 rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE) 145 channels = self._config.options.get( 146 "speaker_playback_channels", SPEAKER_CHANNELS 147 ) 148 playback_delta_in_ms = self._config.options.get( 149 "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA 150 ) 151 device_index = self._config.options.get("speaker_playback_device_index") 152 153 self._logger.debug("rate: %s", rate) 154 self._logger.debug("channels: %s", channels) 155 156 self._speaker_created = True 157 158 if device_index is not None: 159 self._logger.debug("device_index: %s", device_index) 160 161 self._speaker = Speaker( 162 rate=rate, 163 channels=channels, 164 last_play_delta_in_ms=playback_delta_in_ms, 165 verbose=self._config.verbose, 166 output_device_index=device_index, 167 microphone=self._microphone, 168 ) 169 else: 170 self._speaker = Speaker( 171 rate=rate, 172 channels=channels, 173 last_play_delta_in_ms=playback_delta_in_ms, 174 verbose=self._config.verbose, 175 microphone=self._microphone, 176 ) 177 # call the parent constructor 178 super().__init__(self._config, self._endpoint) 179 180 # pylint: disable=too-many-branches,too-many-statements 181 async def start( 182 self, 183 options: Optional[SettingsConfigurationOptions] = None, 184 addons: Optional[Dict] = None, 185 headers: Optional[Dict] = None, 186 members: Optional[Dict] = None, 187 **kwargs, 188 ) -> bool: 189 """ 190 Starts the WebSocket connection for agent API. 191 """ 192 self._logger.debug("AsyncAgentWebSocketClient.start ENTER") 193 self._logger.info("settings: %s", options) 194 self._logger.info("addons: %s", addons) 195 self._logger.info("headers: %s", headers) 196 self._logger.info("members: %s", members) 197 self._logger.info("kwargs: %s", kwargs) 198 199 if isinstance(options, SettingsConfigurationOptions) and not options.check(): 200 self._logger.error("settings.check failed") 201 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 202 raise DeepgramError("Fatal agent settings error") 203 204 self._addons = addons 205 self._headers = headers 206 207 # add "members" as members of the class 208 if members is not None: 209 self.__dict__.update(members) 210 211 # set kwargs as members of the class 212 if kwargs is not None: 213 self._kwargs = kwargs 214 else: 215 self._kwargs = {} 216 217 if isinstance(options, SettingsConfigurationOptions): 218 self._logger.info("options is class") 219 self._settings = options 220 elif isinstance(options, dict): 221 self._logger.info("options is dict") 222 self._settings = SettingsConfigurationOptions.from_dict(options) 223 elif isinstance(options, str): 224 self._logger.info("options is json") 225 self._settings = SettingsConfigurationOptions.from_json(options) 226 else: 227 raise DeepgramError("Invalid options type") 228 229 if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"): 230 raise DeepgramError("Keyterms are only supported for nova-3 models") 231 232 try: 233 # speaker substitutes the listening thread 234 if self._speaker is not None: 235 self._logger.notice("passing speaker to delegate_listening") 236 super().delegate_listening(self._speaker) 237 238 # call parent start 239 if ( 240 await super().start( 241 {}, 242 self._addons, 243 self._headers, 244 **dict(cast(Dict[Any, Any], self._kwargs)), 245 ) 246 is False 247 ): 248 self._logger.error("AsyncAgentWebSocketClient.start failed") 249 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 250 return False 251 252 if self._speaker is not None: 253 self._logger.notice("speaker is delegate_listening. Starting speaker") 254 self._speaker.start() 255 256 if self._speaker is not None and self._microphone is not None: 257 self._logger.notice( 258 "speaker is delegate_listening. Starting microphone" 259 ) 260 self._microphone.set_callback(self.send) 261 self._microphone.start() 262 263 # debug the threads 264 for thread in threading.enumerate(): 265 self._logger.debug("after running thread: %s", thread.name) 266 self._logger.debug("number of active threads: %s", threading.active_count()) 267 268 # keepalive thread 269 if self._config.is_keep_alive_enabled(): 270 self._logger.notice("keepalive is enabled") 271 self._keep_alive_thread = asyncio.create_task(self._keep_alive()) 272 else: 273 self._logger.notice("keepalive is disabled") 274 275 # debug the threads 276 for thread in threading.enumerate(): 277 self._logger.debug("after running thread: %s", thread.name) 278 self._logger.debug("number of active threads: %s", threading.active_count()) 279 280 # send the configurationsetting message 281 self._logger.notice("Sending ConfigurationSettings...") 282 ret_send_cs = await self.send(str(self._settings)) 283 if not ret_send_cs: 284 self._logger.error("ConfigurationSettings failed") 285 286 err_error: ErrorResponse = ErrorResponse( 287 "Exception in AsyncAgentWebSocketClient.start", 288 "ConfigurationSettings failed to send", 289 "Exception", 290 ) 291 await self._emit( 292 AgentWebSocketEvents(AgentWebSocketEvents.Error), 293 error=err_error, 294 **dict(cast(Dict[Any, Any], self._kwargs)), 295 ) 296 297 self._logger.debug("AgentWebSocketClient.start LEAVE") 298 return False 299 300 self._logger.notice("start succeeded") 301 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 302 return True 303 304 except Exception as e: # pylint: disable=broad-except 305 self._logger.error( 306 "WebSocketException in AsyncAgentWebSocketClient.start: %s", e 307 ) 308 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 309 if self._config.options.get("termination_exception_connect") is True: 310 raise e 311 return False 312 313 # pylint: enable=too-many-branches,too-many-statements 314 315 def on(self, event: AgentWebSocketEvents, handler: Callable) -> None: 316 """ 317 Registers event handlers for specific events. 318 """ 319 self._logger.info("event subscribed: %s", event) 320 if event in AgentWebSocketEvents.__members__.values() and callable(handler): 321 self._event_handlers[event].append(handler) 322 323 async def _emit(self, event: AgentWebSocketEvents, *args, **kwargs) -> None: 324 """ 325 Emits events to the registered event handlers. 326 """ 327 self._logger.debug("AsyncAgentWebSocketClient._emit ENTER") 328 self._logger.debug("callback handlers for: %s", event) 329 330 # debug the threads 331 for thread in threading.enumerate(): 332 self._logger.debug("after running thread: %s", thread.name) 333 self._logger.debug("number of active threads: %s", threading.active_count()) 334 335 self._logger.debug("callback handlers for: %s", event) 336 tasks = [] 337 for handler in self._event_handlers[event]: 338 task = asyncio.create_task(handler(self, *args, **kwargs)) 339 tasks.append(task) 340 341 if tasks: 342 self._logger.debug("waiting for tasks to finish...") 343 await asyncio.gather(*tasks, return_exceptions=True) 344 tasks.clear() 345 346 # debug the threads 347 for thread in threading.enumerate(): 348 self._logger.debug("after running thread: %s", thread.name) 349 self._logger.debug("number of active threads: %s", threading.active_count()) 350 351 self._logger.debug("AsyncAgentWebSocketClient._emit LEAVE") 352 353 # pylint: disable=too-many-locals,too-many-statements 354 async def _process_text(self, message: str) -> None: 355 """ 356 Processes messages received over the WebSocket connection. 357 """ 358 self._logger.debug("AsyncAgentWebSocketClient._process_text ENTER") 359 360 try: 361 self._logger.debug("Text data received") 362 if len(message) == 0: 363 self._logger.debug("message is empty") 364 self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE") 365 return 366 367 data = json.loads(message) 368 response_type = data.get("type") 369 self._logger.debug("response_type: %s, data: %s", response_type, data) 370 371 match response_type: 372 case AgentWebSocketEvents.Open: 373 open_result: OpenResponse = OpenResponse.from_json(message) 374 self._logger.verbose("OpenResponse: %s", open_result) 375 await self._emit( 376 AgentWebSocketEvents(AgentWebSocketEvents.Open), 377 open=open_result, 378 **dict(cast(Dict[Any, Any], self._kwargs)), 379 ) 380 case AgentWebSocketEvents.Welcome: 381 welcome_result: WelcomeResponse = WelcomeResponse.from_json(message) 382 self._logger.verbose("WelcomeResponse: %s", welcome_result) 383 await self._emit( 384 AgentWebSocketEvents(AgentWebSocketEvents.Welcome), 385 welcome=welcome_result, 386 **dict(cast(Dict[Any, Any], self._kwargs)), 387 ) 388 case AgentWebSocketEvents.SettingsApplied: 389 settings_applied_result: SettingsAppliedResponse = ( 390 SettingsAppliedResponse.from_json(message) 391 ) 392 self._logger.verbose( 393 "SettingsAppliedResponse: %s", settings_applied_result 394 ) 395 await self._emit( 396 AgentWebSocketEvents(AgentWebSocketEvents.SettingsApplied), 397 settings_applied=settings_applied_result, 398 **dict(cast(Dict[Any, Any], self._kwargs)), 399 ) 400 case AgentWebSocketEvents.ConversationText: 401 conversation_text_result: ConversationTextResponse = ( 402 ConversationTextResponse.from_json(message) 403 ) 404 self._logger.verbose( 405 "ConversationTextResponse: %s", conversation_text_result 406 ) 407 await self._emit( 408 AgentWebSocketEvents(AgentWebSocketEvents.ConversationText), 409 conversation_text=conversation_text_result, 410 **dict(cast(Dict[Any, Any], self._kwargs)), 411 ) 412 case AgentWebSocketEvents.UserStartedSpeaking: 413 user_started_speaking_result: UserStartedSpeakingResponse = ( 414 UserStartedSpeakingResponse.from_json(message) 415 ) 416 self._logger.verbose( 417 "UserStartedSpeakingResponse: %s", user_started_speaking_result 418 ) 419 await self._emit( 420 AgentWebSocketEvents(AgentWebSocketEvents.UserStartedSpeaking), 421 user_started_speaking=user_started_speaking_result, 422 **dict(cast(Dict[Any, Any], self._kwargs)), 423 ) 424 case AgentWebSocketEvents.AgentThinking: 425 agent_thinking_result: AgentThinkingResponse = ( 426 AgentThinkingResponse.from_json(message) 427 ) 428 self._logger.verbose( 429 "AgentThinkingResponse: %s", agent_thinking_result 430 ) 431 await self._emit( 432 AgentWebSocketEvents(AgentWebSocketEvents.AgentThinking), 433 agent_thinking=agent_thinking_result, 434 **dict(cast(Dict[Any, Any], self._kwargs)), 435 ) 436 case AgentWebSocketEvents.FunctionCalling: 437 function_calling_result: FunctionCalling = ( 438 FunctionCalling.from_json(message) 439 ) 440 self._logger.verbose("FunctionCalling: %s", function_calling_result) 441 await self._emit( 442 AgentWebSocketEvents(AgentWebSocketEvents.FunctionCalling), 443 function_calling=function_calling_result, 444 **dict(cast(Dict[Any, Any], self._kwargs)), 445 ) 446 case AgentWebSocketEvents.FunctionCallRequest: 447 function_call_request_result: FunctionCallRequest = ( 448 FunctionCallRequest.from_json(message) 449 ) 450 self._logger.verbose( 451 "FunctionCallRequest: %s", function_call_request_result 452 ) 453 await self._emit( 454 AgentWebSocketEvents(AgentWebSocketEvents.FunctionCallRequest), 455 function_call_request=function_call_request_result, 456 **dict(cast(Dict[Any, Any], self._kwargs)), 457 ) 458 case AgentWebSocketEvents.AgentStartedSpeaking: 459 agent_started_speaking_result: AgentStartedSpeakingResponse = ( 460 AgentStartedSpeakingResponse.from_json(message) 461 ) 462 self._logger.verbose( 463 "AgentStartedSpeakingResponse: %s", 464 agent_started_speaking_result, 465 ) 466 await self._emit( 467 AgentWebSocketEvents(AgentWebSocketEvents.AgentStartedSpeaking), 468 agent_started_speaking=agent_started_speaking_result, 469 **dict(cast(Dict[Any, Any], self._kwargs)), 470 ) 471 case AgentWebSocketEvents.AgentAudioDone: 472 agent_audio_done_result: AgentAudioDoneResponse = ( 473 AgentAudioDoneResponse.from_json(message) 474 ) 475 self._logger.verbose( 476 "AgentAudioDoneResponse: %s", agent_audio_done_result 477 ) 478 await self._emit( 479 AgentWebSocketEvents(AgentWebSocketEvents.AgentAudioDone), 480 agent_audio_done=agent_audio_done_result, 481 **dict(cast(Dict[Any, Any], self._kwargs)), 482 ) 483 case AgentWebSocketEvents.InjectionRefused: 484 injection_refused_result: InjectionRefusedResponse = ( 485 InjectionRefusedResponse.from_json(message) 486 ) 487 self._logger.verbose( 488 "InjectionRefused: %s", injection_refused_result 489 ) 490 await self._emit( 491 AgentWebSocketEvents(AgentWebSocketEvents.InjectionRefused), 492 injection_refused=injection_refused_result, 493 **dict(cast(Dict[Any, Any], self._kwargs)), 494 ) 495 case AgentWebSocketEvents.Close: 496 close_result: CloseResponse = CloseResponse.from_json(message) 497 self._logger.verbose("CloseResponse: %s", close_result) 498 await self._emit( 499 AgentWebSocketEvents(AgentWebSocketEvents.Close), 500 close=close_result, 501 **dict(cast(Dict[Any, Any], self._kwargs)), 502 ) 503 case AgentWebSocketEvents.Error: 504 err_error: ErrorResponse = ErrorResponse.from_json(message) 505 self._logger.verbose("ErrorResponse: %s", err_error) 506 await self._emit( 507 AgentWebSocketEvents(AgentWebSocketEvents.Error), 508 error=err_error, 509 **dict(cast(Dict[Any, Any], self._kwargs)), 510 ) 511 case _: 512 self._logger.warning( 513 "Unknown Message: response_type: %s, data: %s", 514 response_type, 515 data, 516 ) 517 unhandled_error: UnhandledResponse = UnhandledResponse( 518 type=AgentWebSocketEvents(AgentWebSocketEvents.Unhandled), 519 raw=message, 520 ) 521 await self._emit( 522 AgentWebSocketEvents(AgentWebSocketEvents.Unhandled), 523 unhandled=unhandled_error, 524 **dict(cast(Dict[Any, Any], self._kwargs)), 525 ) 526 527 self._logger.notice("_process_text Succeeded") 528 self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE") 529 530 except Exception as e: # pylint: disable=broad-except 531 self._logger.error( 532 "Exception in AsyncAgentWebSocketClient._process_text: %s", e 533 ) 534 e_error: ErrorResponse = ErrorResponse( 535 "Exception in AsyncAgentWebSocketClient._process_text", 536 f"{e}", 537 "Exception", 538 ) 539 await self._emit( 540 AgentWebSocketEvents(AgentWebSocketEvents.Error), 541 error=e_error, 542 **dict(cast(Dict[Any, Any], self._kwargs)), 543 ) 544 545 # signal exit and close 546 await super()._signal_exit() 547 548 self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE") 549 550 if self._config.options.get("termination_exception") is True: 551 raise 552 return 553 554 # pylint: enable=too-many-locals,too-many-statements 555 556 async def _process_binary(self, message: bytes) -> None: 557 self._logger.debug("AsyncAgentWebSocketClient._process_binary ENTER") 558 self._logger.debug("Binary data received") 559 560 await self._emit( 561 AgentWebSocketEvents(AgentWebSocketEvents.AudioData), 562 data=message, 563 **dict(cast(Dict[Any, Any], self._kwargs)), 564 ) 565 566 self._logger.notice("_process_binary Succeeded") 567 self._logger.debug("AsyncAgentWebSocketClient._process_binary LEAVE") 568 569 # pylint: disable=too-many-return-statements 570 async def _keep_alive(self) -> None: 571 """ 572 Sends keepalive messages to the WebSocket connection. 573 """ 574 self._logger.debug("AsyncAgentWebSocketClient._keep_alive ENTER") 575 576 counter = 0 577 while True: 578 try: 579 counter += 1 580 await asyncio.sleep(ONE_SECOND) 581 582 if self._exit_event.is_set(): 583 self._logger.notice("_keep_alive exiting gracefully") 584 self._logger.debug("AsyncAgentWebSocketClient._keep_alive LEAVE") 585 return 586 587 # deepgram keepalive 588 if counter % DEEPGRAM_INTERVAL == 0: 589 await self.keep_alive() 590 591 except Exception as e: # pylint: disable=broad-except 592 self._logger.error( 593 "Exception in AsyncAgentWebSocketClient._keep_alive: %s", e 594 ) 595 e_error: ErrorResponse = ErrorResponse( 596 "Exception in AsyncAgentWebSocketClient._keep_alive", 597 f"{e}", 598 "Exception", 599 ) 600 self._logger.error( 601 "Exception in AsyncAgentWebSocketClient._keep_alive: %s", str(e) 602 ) 603 await self._emit( 604 AgentWebSocketEvents(AgentWebSocketEvents.Error), 605 error=e_error, 606 **dict(cast(Dict[Any, Any], self._kwargs)), 607 ) 608 609 # signal exit and close 610 await super()._signal_exit() 611 612 self._logger.debug("AsyncAgentWebSocketClient._keep_alive LEAVE") 613 614 if self._config.options.get("termination_exception") is True: 615 raise 616 return 617 618 async def keep_alive(self) -> bool: 619 """ 620 Sends a KeepAlive message 621 """ 622 self._logger.spam("AsyncAgentWebSocketClient.keep_alive ENTER") 623 624 self._logger.notice("Sending KeepAlive...") 625 ret = await self.send(json.dumps({"type": "KeepAlive"})) 626 627 if not ret: 628 self._logger.error("keep_alive failed") 629 self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE") 630 return False 631 632 self._logger.notice("keep_alive succeeded") 633 self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE") 634 635 return True 636 637 async def _close_message(self) -> bool: 638 # TODO: No known API close message # pylint: disable=fixme 639 # return await self.send(json.dumps({"type": "Close"})) 640 return True 641 642 async def finish(self) -> bool: 643 """ 644 Closes the WebSocket connection gracefully. 645 """ 646 self._logger.debug("AsyncAgentWebSocketClient.finish ENTER") 647 648 # stop the threads 649 self._logger.verbose("cancelling tasks...") 650 try: 651 # call parent finish 652 if await super().finish() is False: 653 self._logger.error("AsyncAgentWebSocketClient.finish failed") 654 655 if self._microphone is not None and self._microphone_created: 656 self._microphone.finish() 657 self._microphone_created = False 658 659 if self._speaker is not None and self._speaker_created: 660 self._speaker.finish() 661 self._speaker_created = False 662 663 # Before cancelling, check if the tasks were created 664 # debug the threads 665 for thread in threading.enumerate(): 666 self._logger.debug("before running thread: %s", thread.name) 667 self._logger.debug("number of active threads: %s", threading.active_count()) 668 669 tasks = [] 670 if self._keep_alive_thread is not None: 671 self._keep_alive_thread.cancel() 672 tasks.append(self._keep_alive_thread) 673 self._logger.notice("processing _keep_alive_thread cancel...") 674 675 # Use asyncio.gather to wait for tasks to be cancelled 676 # Prevent indefinite waiting by setting a timeout 677 await asyncio.wait_for(asyncio.gather(*tasks), timeout=10) 678 self._logger.notice("threads joined") 679 680 self._speaker = None 681 self._microphone = None 682 683 # debug the threads 684 for thread in threading.enumerate(): 685 self._logger.debug("after running thread: %s", thread.name) 686 self._logger.debug("number of active threads: %s", threading.active_count()) 687 688 self._logger.notice("finish succeeded") 689 self._logger.spam("AsyncAgentWebSocketClient.finish LEAVE") 690 return True 691 692 except asyncio.CancelledError as e: 693 self._logger.error("tasks cancelled error: %s", e) 694 self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE") 695 return False 696 697 except asyncio.TimeoutError as e: 698 self._logger.error("tasks cancellation timed out: %s", e) 699 self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE") 700 return False
Client for interacting with Deepgram's live transcription services over WebSockets.
This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
Args: config (DeepgramClientOptions): all the options for the client.
91 def __init__(self, config: DeepgramClientOptions): 92 if config is None: 93 raise DeepgramError("Config is required") 94 95 self._logger = verboselogs.VerboseLogger(__name__) 96 self._logger.addHandler(logging.StreamHandler()) 97 self._logger.setLevel(config.verbose) 98 99 self._config = config 100 101 # needs to be "wss://agent.deepgram.com/agent" 102 self._endpoint = "agent" 103 104 # override the endpoint since it needs to be "wss://agent.deepgram.com/agent" 105 self._config.url = "agent.deepgram.com" 106 self._keep_alive_thread = None 107 108 # init handlers 109 self._event_handlers = { 110 event: [] for event in AgentWebSocketEvents.__members__.values() 111 } 112 113 if self._config.options.get("microphone_record") == "true": 114 self._logger.info("microphone_record is enabled") 115 rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE) 116 channels = self._config.options.get( 117 "microphone_record_channels", MICROPHONE_CHANNELS 118 ) 119 device_index = self._config.options.get("microphone_record_device_index") 120 121 self._logger.debug("rate: %s", rate) 122 self._logger.debug("channels: %s", channels) 123 if device_index is not None: 124 self._logger.debug("device_index: %s", device_index) 125 126 self._microphone_created = True 127 128 if device_index is not None: 129 self._microphone = Microphone( 130 rate=rate, 131 channels=channels, 132 verbose=self._config.verbose, 133 input_device_index=device_index, 134 ) 135 else: 136 self._microphone = Microphone( 137 rate=rate, 138 channels=channels, 139 verbose=self._config.verbose, 140 ) 141 142 if self._config.options.get("speaker_playback") == "true": 143 self._logger.info("speaker_playback is enabled") 144 rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE) 145 channels = self._config.options.get( 146 "speaker_playback_channels", SPEAKER_CHANNELS 147 ) 148 playback_delta_in_ms = self._config.options.get( 149 "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA 150 ) 151 device_index = self._config.options.get("speaker_playback_device_index") 152 153 self._logger.debug("rate: %s", rate) 154 self._logger.debug("channels: %s", channels) 155 156 self._speaker_created = True 157 158 if device_index is not None: 159 self._logger.debug("device_index: %s", device_index) 160 161 self._speaker = Speaker( 162 rate=rate, 163 channels=channels, 164 last_play_delta_in_ms=playback_delta_in_ms, 165 verbose=self._config.verbose, 166 output_device_index=device_index, 167 microphone=self._microphone, 168 ) 169 else: 170 self._speaker = Speaker( 171 rate=rate, 172 channels=channels, 173 last_play_delta_in_ms=playback_delta_in_ms, 174 verbose=self._config.verbose, 175 microphone=self._microphone, 176 ) 177 # call the parent constructor 178 super().__init__(self._config, self._endpoint)
181 async def start( 182 self, 183 options: Optional[SettingsConfigurationOptions] = None, 184 addons: Optional[Dict] = None, 185 headers: Optional[Dict] = None, 186 members: Optional[Dict] = None, 187 **kwargs, 188 ) -> bool: 189 """ 190 Starts the WebSocket connection for agent API. 191 """ 192 self._logger.debug("AsyncAgentWebSocketClient.start ENTER") 193 self._logger.info("settings: %s", options) 194 self._logger.info("addons: %s", addons) 195 self._logger.info("headers: %s", headers) 196 self._logger.info("members: %s", members) 197 self._logger.info("kwargs: %s", kwargs) 198 199 if isinstance(options, SettingsConfigurationOptions) and not options.check(): 200 self._logger.error("settings.check failed") 201 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 202 raise DeepgramError("Fatal agent settings error") 203 204 self._addons = addons 205 self._headers = headers 206 207 # add "members" as members of the class 208 if members is not None: 209 self.__dict__.update(members) 210 211 # set kwargs as members of the class 212 if kwargs is not None: 213 self._kwargs = kwargs 214 else: 215 self._kwargs = {} 216 217 if isinstance(options, SettingsConfigurationOptions): 218 self._logger.info("options is class") 219 self._settings = options 220 elif isinstance(options, dict): 221 self._logger.info("options is dict") 222 self._settings = SettingsConfigurationOptions.from_dict(options) 223 elif isinstance(options, str): 224 self._logger.info("options is json") 225 self._settings = SettingsConfigurationOptions.from_json(options) 226 else: 227 raise DeepgramError("Invalid options type") 228 229 if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"): 230 raise DeepgramError("Keyterms are only supported for nova-3 models") 231 232 try: 233 # speaker substitutes the listening thread 234 if self._speaker is not None: 235 self._logger.notice("passing speaker to delegate_listening") 236 super().delegate_listening(self._speaker) 237 238 # call parent start 239 if ( 240 await super().start( 241 {}, 242 self._addons, 243 self._headers, 244 **dict(cast(Dict[Any, Any], self._kwargs)), 245 ) 246 is False 247 ): 248 self._logger.error("AsyncAgentWebSocketClient.start failed") 249 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 250 return False 251 252 if self._speaker is not None: 253 self._logger.notice("speaker is delegate_listening. Starting speaker") 254 self._speaker.start() 255 256 if self._speaker is not None and self._microphone is not None: 257 self._logger.notice( 258 "speaker is delegate_listening. Starting microphone" 259 ) 260 self._microphone.set_callback(self.send) 261 self._microphone.start() 262 263 # debug the threads 264 for thread in threading.enumerate(): 265 self._logger.debug("after running thread: %s", thread.name) 266 self._logger.debug("number of active threads: %s", threading.active_count()) 267 268 # keepalive thread 269 if self._config.is_keep_alive_enabled(): 270 self._logger.notice("keepalive is enabled") 271 self._keep_alive_thread = asyncio.create_task(self._keep_alive()) 272 else: 273 self._logger.notice("keepalive is disabled") 274 275 # debug the threads 276 for thread in threading.enumerate(): 277 self._logger.debug("after running thread: %s", thread.name) 278 self._logger.debug("number of active threads: %s", threading.active_count()) 279 280 # send the configurationsetting message 281 self._logger.notice("Sending ConfigurationSettings...") 282 ret_send_cs = await self.send(str(self._settings)) 283 if not ret_send_cs: 284 self._logger.error("ConfigurationSettings failed") 285 286 err_error: ErrorResponse = ErrorResponse( 287 "Exception in AsyncAgentWebSocketClient.start", 288 "ConfigurationSettings failed to send", 289 "Exception", 290 ) 291 await self._emit( 292 AgentWebSocketEvents(AgentWebSocketEvents.Error), 293 error=err_error, 294 **dict(cast(Dict[Any, Any], self._kwargs)), 295 ) 296 297 self._logger.debug("AgentWebSocketClient.start LEAVE") 298 return False 299 300 self._logger.notice("start succeeded") 301 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 302 return True 303 304 except Exception as e: # pylint: disable=broad-except 305 self._logger.error( 306 "WebSocketException in AsyncAgentWebSocketClient.start: %s", e 307 ) 308 self._logger.debug("AsyncAgentWebSocketClient.start LEAVE") 309 if self._config.options.get("termination_exception_connect") is True: 310 raise e 311 return False
Starts the WebSocket connection for agent API.
315 def on(self, event: AgentWebSocketEvents, handler: Callable) -> None: 316 """ 317 Registers event handlers for specific events. 318 """ 319 self._logger.info("event subscribed: %s", event) 320 if event in AgentWebSocketEvents.__members__.values() and callable(handler): 321 self._event_handlers[event].append(handler)
Registers event handlers for specific events.
618 async def keep_alive(self) -> bool: 619 """ 620 Sends a KeepAlive message 621 """ 622 self._logger.spam("AsyncAgentWebSocketClient.keep_alive ENTER") 623 624 self._logger.notice("Sending KeepAlive...") 625 ret = await self.send(json.dumps({"type": "KeepAlive"})) 626 627 if not ret: 628 self._logger.error("keep_alive failed") 629 self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE") 630 return False 631 632 self._logger.notice("keep_alive succeeded") 633 self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE") 634 635 return True
Sends a KeepAlive message
642 async def finish(self) -> bool: 643 """ 644 Closes the WebSocket connection gracefully. 645 """ 646 self._logger.debug("AsyncAgentWebSocketClient.finish ENTER") 647 648 # stop the threads 649 self._logger.verbose("cancelling tasks...") 650 try: 651 # call parent finish 652 if await super().finish() is False: 653 self._logger.error("AsyncAgentWebSocketClient.finish failed") 654 655 if self._microphone is not None and self._microphone_created: 656 self._microphone.finish() 657 self._microphone_created = False 658 659 if self._speaker is not None and self._speaker_created: 660 self._speaker.finish() 661 self._speaker_created = False 662 663 # Before cancelling, check if the tasks were created 664 # debug the threads 665 for thread in threading.enumerate(): 666 self._logger.debug("before running thread: %s", thread.name) 667 self._logger.debug("number of active threads: %s", threading.active_count()) 668 669 tasks = [] 670 if self._keep_alive_thread is not None: 671 self._keep_alive_thread.cancel() 672 tasks.append(self._keep_alive_thread) 673 self._logger.notice("processing _keep_alive_thread cancel...") 674 675 # Use asyncio.gather to wait for tasks to be cancelled 676 # Prevent indefinite waiting by setting a timeout 677 await asyncio.wait_for(asyncio.gather(*tasks), timeout=10) 678 self._logger.notice("threads joined") 679 680 self._speaker = None 681 self._microphone = None 682 683 # debug the threads 684 for thread in threading.enumerate(): 685 self._logger.debug("after running thread: %s", thread.name) 686 self._logger.debug("number of active threads: %s", threading.active_count()) 687 688 self._logger.notice("finish succeeded") 689 self._logger.spam("AsyncAgentWebSocketClient.finish LEAVE") 690 return True 691 692 except asyncio.CancelledError as e: 693 self._logger.error("tasks cancelled error: %s", e) 694 self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE") 695 return False 696 697 except asyncio.TimeoutError as e: 698 self._logger.error("tasks cancellation timed out: %s", e) 699 self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE") 700 return False
Closes the WebSocket connection gracefully.
17@dataclass 18class OpenResponse(BaseResponse): 19 """ 20 Open Message from the Deepgram Platform 21 """ 22 23 type: str = ""
Open Message from the Deepgram Platform
Inherited Members
29@dataclass 30class CloseResponse(BaseResponse): 31 """ 32 Close Message from the Deepgram Platform 33 """ 34 35 type: str = ""
Close Message from the Deepgram Platform
Inherited Members
41@dataclass 42class ErrorResponse(BaseResponse): 43 """ 44 Error Message from the Deepgram Platform 45 """ 46 47 description: str = "" 48 message: str = "" 49 type: str = "" 50 variant: Optional[str] = field( 51 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 52 )
Error Message from the Deepgram Platform
Inherited Members
58@dataclass 59class UnhandledResponse(BaseResponse): 60 """ 61 Unhandled Message from the Deepgram Platform 62 """ 63 64 type: str = "" 65 raw: str = ""
Unhandled Message from the Deepgram Platform
Inherited Members
22@dataclass 23class WelcomeResponse(BaseResponse): 24 """ 25 The server will send a Welcome message as soon as the websocket opens. 26 """ 27 28 type: str 29 session_id: str
The server will send a Welcome message as soon as the websocket opens.
Inherited Members
32@dataclass 33class SettingsAppliedResponse(BaseResponse): 34 """ 35 The server will send a SettingsApplied message as soon as the settings are applied. 36 """ 37 38 type: str
The server will send a SettingsApplied message as soon as the settings are applied.
Inherited Members
41@dataclass 42class ConversationTextResponse(BaseResponse): 43 """ 44 The server will send a ConversationText message every time the agent hears the user say something, and every time the agent speaks something itself. 45 """ 46 47 type: str 48 role: str 49 content: str
The server will send a ConversationText message every time the agent hears the user say something, and every time the agent speaks something itself.
Inherited Members
52@dataclass 53class UserStartedSpeakingResponse(BaseResponse): 54 """ 55 The server will send a UserStartedSpeaking message every time the user begins a new utterance. 56 """ 57 58 type: str
The server will send a UserStartedSpeaking message every time the user begins a new utterance.
Inherited Members
61@dataclass 62class AgentThinkingResponse(BaseResponse): 63 """ 64 The server will send an AgentThinking message to inform the client of a non-verbalized agent thought. 65 """ 66 67 type: str 68 content: str
The server will send an AgentThinking message to inform the client of a non-verbalized agent thought.
Inherited Members
71@dataclass 72class FunctionCalling(BaseResponse): 73 """ 74 The server will sometimes send FunctionCalling messages when making function calls to help the client developer debug function calling workflows. 75 """ 76 77 type: str
The server will sometimes send FunctionCalling messages when making function calls to help the client developer debug function calling workflows.
Inherited Members
80@dataclass 81class FunctionCallRequest(BaseResponse): 82 """ 83 The FunctionCallRequest message is used to call a function from the server to the client. 84 """ 85 86 type: str 87 function_name: str 88 function_call_id: str 89 input: str
The FunctionCallRequest message is used to call a function from the server to the client.
Inherited Members
92@dataclass 93class AgentStartedSpeakingResponse(BaseResponse): 94 """ 95 The server will send an AgentStartedSpeaking message when it begins streaming an agent audio response to the client for playback. 96 """ 97 98 total_latency: float 99 tts_latency: float 100 ttt_latency: float
The server will send an AgentStartedSpeaking message when it begins streaming an agent audio response to the client for playback.
Inherited Members
103@dataclass 104class AgentAudioDoneResponse(BaseResponse): 105 """ 106 The server will send an AgentAudioDone message immediately after it sends the last audio message in a piece of agent speech. 107 """ 108 109 type: str
The server will send an AgentAudioDone message immediately after it sends the last audio message in a piece of agent speech.
Inherited Members
112@dataclass 113class InjectionRefusedResponse(BaseResponse): 114 """ 115 The server will send an InjectionRefused message when an InjectAgentMessage request is ignored because it arrived while the user was speaking or while the server was sending audio for an agent response. 116 """ 117 118 type: str
The server will send an InjectionRefused message when an InjectAgentMessage request is ignored because it arrived while the user was speaking or while the server was sending audio for an agent response.
Inherited Members
257@dataclass 258class SettingsConfigurationOptions(BaseResponse): 259 """ 260 The client should send a SettingsConfiguration message immediately after opening the websocket and before sending any audio. 261 """ 262 263 type: str = str(AgentWebSocketEvents.SettingsConfiguration) 264 audio: Audio = field(default_factory=Audio) 265 agent: Agent = field(default_factory=Agent) 266 context: Optional[Context] = field( 267 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 268 ) 269 270 def __getitem__(self, key): 271 _dict = self.to_dict() 272 if "audio" in _dict: 273 _dict["audio"] = [Audio.from_dict(audio) for audio in _dict["audio"]] 274 if "agent" in _dict: 275 _dict["agent"] = [Agent.from_dict(agent) for agent in _dict["agent"]] 276 if "context" in _dict: 277 _dict["context"] = [ 278 Context.from_dict(context) for context in _dict["context"] 279 ] 280 return _dict[key] 281 282 def check(self): 283 """ 284 Check the options for any deprecated or soon-to-be-deprecated options. 285 """ 286 logger = verboselogs.VerboseLogger(__name__) 287 logger.addHandler(logging.StreamHandler()) 288 prev = logger.level 289 logger.setLevel(verboselogs.ERROR) 290 291 # do we need to check anything here? 292 293 logger.setLevel(prev) 294 295 return True
The client should send a SettingsConfiguration message immediately after opening the websocket and before sending any audio.
282 def check(self): 283 """ 284 Check the options for any deprecated or soon-to-be-deprecated options. 285 """ 286 logger = verboselogs.VerboseLogger(__name__) 287 logger.addHandler(logging.StreamHandler()) 288 prev = logger.level 289 logger.setLevel(verboselogs.ERROR) 290 291 # do we need to check anything here? 292 293 logger.setLevel(prev) 294 295 return True
Check the options for any deprecated or soon-to-be-deprecated options.
Inherited Members
301@dataclass 302class UpdateInstructionsOptions(BaseResponse): 303 """ 304 The client can send an UpdateInstructions message to give additional instructions to the Think model in the middle of a conversation. 305 """ 306 307 type: str = str(AgentWebSocketEvents.UpdateInstructions) 308 instructions: str = field(default="")
The client can send an UpdateInstructions message to give additional instructions to the Think model in the middle of a conversation.
Inherited Members
314@dataclass 315class UpdateSpeakOptions(BaseResponse): 316 """ 317 The client can send an UpdateSpeak message to change the Speak model in the middle of a conversation. 318 """ 319 320 type: str = str(AgentWebSocketEvents.UpdateSpeak) 321 model: str = field(default="")
The client can send an UpdateSpeak message to change the Speak model in the middle of a conversation.
Inherited Members
327@dataclass 328class InjectAgentMessageOptions(BaseResponse): 329 """ 330 The client can send an InjectAgentMessage to immediately trigger an agent statement. If the injection request arrives while the user is speaking, or while the server is in the middle of sending audio for an agent response, then the request will be ignored and the server will reply with an InjectionRefused. 331 """ 332 333 type: str = str(AgentWebSocketEvents.InjectAgentMessage) 334 message: str = field(default="")
The client can send an InjectAgentMessage to immediately trigger an agent statement. If the injection request arrives while the user is speaking, or while the server is in the middle of sending audio for an agent response, then the request will be ignored and the server will reply with an InjectionRefused.
Inherited Members
340@dataclass 341class FunctionCallResponse(BaseResponse): 342 """ 343 TheFunctionCallResponse message is a JSON command that the client should reply with every time there is a FunctionCallRequest received. 344 """ 345 346 type: str = "FunctionCallResponse" 347 function_call_id: str = field(default="") 348 output: str = field(default="")
TheFunctionCallResponse message is a JSON command that the client should reply with every time there is a FunctionCallRequest received.
Inherited Members
354@dataclass 355class AgentKeepAlive(BaseResponse): 356 """ 357 The KeepAlive message is a JSON command that you can use to ensure that the server does not close the connection. 358 """ 359 360 type: str = "KeepAlive"
The KeepAlive message is a JSON command that you can use to ensure that the server does not close the connection.
Inherited Members
21@dataclass 22class Listen(BaseResponse): 23 """ 24 This class defines any configuration settings for the Listen model. 25 """ 26 27 model: Optional[str] = field( 28 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 29 ) 30 keyterms: Optional[List[str]] = field( 31 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 32 )
This class defines any configuration settings for the Listen model.
Inherited Members
35@dataclass 36class Speak(BaseResponse): 37 """ 38 This class defines any configuration settings for the Speak model. 39 """ 40 41 model: Optional[str] = field( 42 default="aura-asteria-en", 43 metadata=dataclass_config(exclude=lambda f: f is None), 44 ) 45 provider: Optional[str] = field( 46 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 47 ) 48 voice_id: Optional[str] = field( 49 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 50 )
This class defines any configuration settings for the Speak model.
Inherited Members
53@dataclass 54class Header(BaseResponse): 55 """ 56 This class defines a single key/value pair for a header. 57 """ 58 59 key: str 60 value: str
This class defines a single key/value pair for a header.
Inherited Members
63@dataclass 64class Item(BaseResponse): 65 """ 66 This class defines a single item in a list of items. 67 """ 68 69 type: str 70 description: str
This class defines a single item in a list of items.
Inherited Members
73@dataclass 74class Properties(BaseResponse): 75 """ 76 This class defines the properties which is just a list of items. 77 """ 78 79 item: Item 80 81 def __getitem__(self, key): 82 _dict = self.to_dict() 83 if "item" in _dict: 84 _dict["item"] = [Item.from_dict(item) for item in _dict["item"]] 85 return _dict[key]
This class defines the properties which is just a list of items.
Inherited Members
88@dataclass 89class Parameters(BaseResponse): 90 """ 91 This class defines the parameters for a function. 92 """ 93 94 type: str 95 properties: Properties 96 required: List[str] 97 98 def __getitem__(self, key): 99 _dict = self.to_dict() 100 if "properties" in _dict: 101 _dict["properties"] = _dict["properties"].copy() 102 return _dict[key]
This class defines the parameters for a function.
Inherited Members
105@dataclass 106class Function(BaseResponse): 107 """ 108 This class defines a function for the Think model. 109 """ 110 111 name: str 112 description: str 113 url: str 114 method: str 115 headers: Optional[List[Header]] = field( 116 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 117 ) 118 parameters: Optional[Parameters] = field( 119 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 120 ) 121 122 def __getitem__(self, key): 123 _dict = self.to_dict() 124 if "parameters" in _dict: 125 _dict["parameters"] = [ 126 Parameters.from_dict(parameters) for parameters in _dict["parameters"] 127 ] 128 if "headers" in _dict: 129 _dict["headers"] = [ 130 Header.from_dict(headers) for headers in _dict["headers"] 131 ] 132 return _dict[key]
This class defines a function for the Think model.
Inherited Members
135@dataclass 136class Provider(BaseResponse): 137 """ 138 This class defines the provider for the Think model. 139 """ 140 141 type: Optional[str] = field( 142 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 143 )
This class defines the provider for the Think model.
Inherited Members
146@dataclass 147class Think(BaseResponse): 148 """ 149 This class defines any configuration settings for the Think model. 150 """ 151 152 provider: Provider = field(default_factory=Provider) 153 model: Optional[str] = field( 154 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 155 ) 156 instructions: Optional[str] = field( 157 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 158 ) 159 functions: Optional[List[Function]] = field( 160 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 161 ) 162 163 def __getitem__(self, key): 164 _dict = self.to_dict() 165 if "provider" in _dict: 166 _dict["provider"] = [ 167 Provider.from_dict(provider) for provider in _dict["provider"] 168 ] 169 if "functions" in _dict: 170 _dict["functions"] = [ 171 Function.from_dict(functions) for functions in _dict["functions"] 172 ] 173 return _dict[key]
This class defines any configuration settings for the Think model.
Inherited Members
176@dataclass 177class Agent(BaseResponse): 178 """ 179 This class defines any configuration settings for the Agent model. 180 """ 181 182 listen: Listen = field(default_factory=Listen) 183 think: Think = field(default_factory=Think) 184 speak: Speak = field(default_factory=Speak) 185 186 def __getitem__(self, key): 187 _dict = self.to_dict() 188 if "listen" in _dict: 189 _dict["listen"] = [Listen.from_dict(listen) for listen in _dict["listen"]] 190 if "think" in _dict: 191 _dict["think"] = [Think.from_dict(think) for think in _dict["think"]] 192 if "speak" in _dict: 193 _dict["speak"] = [Speak.from_dict(speak) for speak in _dict["speak"]] 194 return _dict[key]
This class defines any configuration settings for the Agent model.
Inherited Members
197@dataclass 198class Input(BaseResponse): 199 """ 200 This class defines any configuration settings for the input audio. 201 """ 202 203 encoding: Optional[str] = field(default="linear16") 204 sample_rate: int = field(default=16000)
This class defines any configuration settings for the input audio.
Inherited Members
207@dataclass 208class Output(BaseResponse): 209 """ 210 This class defines any configuration settings for the output audio. 211 """ 212 213 encoding: Optional[str] = field(default="linear16") 214 sample_rate: Optional[int] = field(default=16000) 215 bitrate: Optional[int] = field( 216 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 217 ) 218 container: Optional[str] = field(default="none")
This class defines any configuration settings for the output audio.
Inherited Members
221@dataclass 222class Audio(BaseResponse): 223 """ 224 This class defines any configuration settings for the audio. 225 """ 226 227 input: Optional[Input] = field(default_factory=Input) 228 output: Optional[Output] = field(default_factory=Output) 229 230 def __getitem__(self, key): 231 _dict = self.to_dict() 232 if "input" in _dict: 233 _dict["input"] = [Input.from_dict(input) for input in _dict["input"]] 234 if "output" in _dict: 235 _dict["output"] = [Output.from_dict(output) for output in _dict["output"]] 236 return _dict[key]
This class defines any configuration settings for the audio.
Inherited Members
239@dataclass 240class Context(BaseResponse): 241 """ 242 This class defines any configuration settings for the context. 243 """ 244 245 messages: Optional[List[Tuple[str, str]]] = field( 246 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 247 ) 248 replay: Optional[bool] = field(default=False) 249 250 def __getitem__(self, key): 251 _dict = self.to_dict() 252 if "messages" in _dict: 253 _dict["messages"] = _dict["messages"].copy() 254 return _dict[key]
This class defines any configuration settings for the context.