deepgram.clients.agent.client

  1# Copyright 2024 Deepgram SDK contributors. All Rights Reserved.
  2# Use of this source code is governed by a MIT license that can be found in the LICENSE file.
  3# SPDX-License-Identifier: MIT
  4
  5# websocket
  6from .v1 import (
  7    AgentWebSocketClient as LatestAgentWebSocketClient,
  8    AsyncAgentWebSocketClient as LatestAsyncAgentWebSocketClient,
  9)
 10
 11from .v1 import (
 12    #### common websocket response
 13    BaseResponse as LatestBaseResponse,
 14    OpenResponse as LatestOpenResponse,
 15    CloseResponse as LatestCloseResponse,
 16    ErrorResponse as LatestErrorResponse,
 17    UnhandledResponse as LatestUnhandledResponse,
 18    #### unique
 19    WelcomeResponse as LatestWelcomeResponse,
 20    SettingsAppliedResponse as LatestSettingsAppliedResponse,
 21    ConversationTextResponse as LatestConversationTextResponse,
 22    UserStartedSpeakingResponse as LatestUserStartedSpeakingResponse,
 23    AgentThinkingResponse as LatestAgentThinkingResponse,
 24    FunctionCalling as LatestFunctionCalling,
 25    FunctionCallRequest as LatestFunctionCallRequest,
 26    AgentStartedSpeakingResponse as LatestAgentStartedSpeakingResponse,
 27    AgentAudioDoneResponse as LatestAgentAudioDoneResponse,
 28    InjectionRefusedResponse as LatestInjectionRefusedResponse,
 29)
 30
 31from .v1 import (
 32    # top level
 33    SettingsConfigurationOptions as LatestSettingsConfigurationOptions,
 34    UpdateInstructionsOptions as LatestUpdateInstructionsOptions,
 35    UpdateSpeakOptions as LatestUpdateSpeakOptions,
 36    InjectAgentMessageOptions as LatestInjectAgentMessageOptions,
 37    FunctionCallResponse as LatestFunctionCallResponse,
 38    AgentKeepAlive as LatestAgentKeepAlive,
 39    # sub level
 40    Listen as LatestListen,
 41    Speak as LatestSpeak,
 42    Header as LatestHeader,
 43    Item as LatestItem,
 44    Properties as LatestProperties,
 45    Parameters as LatestParameters,
 46    Function as LatestFunction,
 47    Provider as LatestProvider,
 48    Think as LatestThink,
 49    Agent as LatestAgent,
 50    Input as LatestInput,
 51    Output as LatestOutput,
 52    Audio as LatestAudio,
 53    Context as LatestContext,
 54)
 55
 56
 57# The vX/client.py points to the current supported version in the SDK.
 58# Older versions are supported in the SDK for backwards compatibility.
 59
 60AgentWebSocketClient = LatestAgentWebSocketClient
 61AsyncAgentWebSocketClient = LatestAsyncAgentWebSocketClient
 62
 63OpenResponse = LatestOpenResponse
 64CloseResponse = LatestCloseResponse
 65ErrorResponse = LatestErrorResponse
 66UnhandledResponse = LatestUnhandledResponse
 67
 68WelcomeResponse = LatestWelcomeResponse
 69SettingsAppliedResponse = LatestSettingsAppliedResponse
 70ConversationTextResponse = LatestConversationTextResponse
 71UserStartedSpeakingResponse = LatestUserStartedSpeakingResponse
 72AgentThinkingResponse = LatestAgentThinkingResponse
 73FunctionCalling = LatestFunctionCalling
 74FunctionCallRequest = LatestFunctionCallRequest
 75AgentStartedSpeakingResponse = LatestAgentStartedSpeakingResponse
 76AgentAudioDoneResponse = LatestAgentAudioDoneResponse
 77InjectionRefusedResponse = LatestInjectionRefusedResponse
 78
 79
 80SettingsConfigurationOptions = LatestSettingsConfigurationOptions
 81UpdateInstructionsOptions = LatestUpdateInstructionsOptions
 82UpdateSpeakOptions = LatestUpdateSpeakOptions
 83InjectAgentMessageOptions = LatestInjectAgentMessageOptions
 84FunctionCallResponse = LatestFunctionCallResponse
 85AgentKeepAlive = LatestAgentKeepAlive
 86
 87Listen = LatestListen
 88Speak = LatestSpeak
 89Header = LatestHeader
 90Item = LatestItem
 91Properties = LatestProperties
 92Parameters = LatestParameters
 93Function = LatestFunction
 94Provider = LatestProvider
 95Think = LatestThink
 96Agent = LatestAgent
 97Input = LatestInput
 98Output = LatestOutput
 99Audio = LatestAudio
100Context = LatestContext
 60class AgentWebSocketClient(
 61    AbstractSyncWebSocketClient
 62):  # pylint: disable=too-many-instance-attributes
 63    """
 64    Client for interacting with Deepgram's live transcription services over WebSockets.
 65
 66     This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
 67
 68     Args:
 69         config (DeepgramClientOptions): all the options for the client.
 70    """
 71
 72    _logger: verboselogs.VerboseLogger
 73    _config: DeepgramClientOptions
 74    _endpoint: str
 75
 76    _event_handlers: Dict[AgentWebSocketEvents, list]
 77
 78    _keep_alive_thread: Union[threading.Thread, None]
 79
 80    _kwargs: Optional[Dict] = None
 81    _addons: Optional[Dict] = None
 82    # note the distinction here. We can't use _config because it's already used in the parent
 83    _settings: Optional[SettingsConfigurationOptions] = None
 84    _headers: Optional[Dict] = None
 85
 86    _speaker_created: bool = False
 87    _speaker: Optional[Speaker] = None
 88    _microphone_created: bool = False
 89    _microphone: Optional[Microphone] = None
 90
 91    def __init__(self, config: DeepgramClientOptions):
 92        if config is None:
 93            raise DeepgramError("Config is required")
 94
 95        self._logger = verboselogs.VerboseLogger(__name__)
 96        self._logger.addHandler(logging.StreamHandler())
 97        self._logger.setLevel(config.verbose)
 98
 99        self._config = config
100
101        # needs to be "wss://agent.deepgram.com/agent"
102        self._endpoint = "agent"
103
104        # override the endpoint since it needs to be "wss://agent.deepgram.com/agent"
105        self._config.url = "agent.deepgram.com"
106
107        self._keep_alive_thread = None
108
109        # init handlers
110        self._event_handlers = {
111            event: [] for event in AgentWebSocketEvents.__members__.values()
112        }
113
114        if self._config.options.get("microphone_record") == "true":
115            self._logger.info("microphone_record is enabled")
116            rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE)
117            channels = self._config.options.get(
118                "microphone_record_channels", MICROPHONE_CHANNELS
119            )
120            device_index = self._config.options.get("microphone_record_device_index")
121
122            self._logger.debug("rate: %s", rate)
123            self._logger.debug("channels: %s", channels)
124
125            self._microphone_created = True
126
127            if device_index is not None:
128                self._logger.debug("device_index: %s", device_index)
129                self._microphone = Microphone(
130                    rate=rate,
131                    channels=channels,
132                    verbose=self._config.verbose,
133                    input_device_index=device_index,
134                )
135            else:
136                self._microphone = Microphone(
137                    rate=rate,
138                    channels=channels,
139                    verbose=self._config.verbose,
140                )
141
142        if self._config.options.get("speaker_playback") == "true":
143            self._logger.info("speaker_playback is enabled")
144            rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE)
145            channels = self._config.options.get(
146                "speaker_playback_channels", SPEAKER_CHANNELS
147            )
148            playback_delta_in_ms = self._config.options.get(
149                "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA
150            )
151            device_index = self._config.options.get("speaker_playback_device_index")
152
153            self._logger.debug("rate: %s", rate)
154            self._logger.debug("channels: %s", channels)
155
156            self._speaker_created = True
157
158            if device_index is not None:
159                self._logger.debug("device_index: %s", device_index)
160
161                self._speaker = Speaker(
162                    rate=rate,
163                    channels=channels,
164                    last_play_delta_in_ms=playback_delta_in_ms,
165                    verbose=self._config.verbose,
166                    output_device_index=device_index,
167                    microphone=self._microphone,
168                )
169            else:
170                self._speaker = Speaker(
171                    rate=rate,
172                    channels=channels,
173                    last_play_delta_in_ms=playback_delta_in_ms,
174                    verbose=self._config.verbose,
175                    microphone=self._microphone,
176                )
177
178        # call the parent constructor
179        super().__init__(self._config, self._endpoint)
180
181    # pylint: disable=too-many-statements,too-many-branches
182    def start(
183        self,
184        options: Optional[SettingsConfigurationOptions] = None,
185        addons: Optional[Dict] = None,
186        headers: Optional[Dict] = None,
187        members: Optional[Dict] = None,
188        **kwargs,
189    ) -> bool:
190        """
191        Starts the WebSocket connection for agent API.
192        """
193        self._logger.debug("AgentWebSocketClient.start ENTER")
194        self._logger.info("settings: %s", options)
195        self._logger.info("addons: %s", addons)
196        self._logger.info("headers: %s", headers)
197        self._logger.info("members: %s", members)
198        self._logger.info("kwargs: %s", kwargs)
199
200        if isinstance(options, SettingsConfigurationOptions) and not options.check():
201            self._logger.error("settings.check failed")
202            self._logger.debug("AgentWebSocketClient.start LEAVE")
203            raise DeepgramError("Fatal agent settings error")
204
205        self._addons = addons
206        self._headers = headers
207
208        # add "members" as members of the class
209        if members is not None:
210            self.__dict__.update(members)
211
212        # set kwargs as members of the class
213        if kwargs is not None:
214            self._kwargs = kwargs
215        else:
216            self._kwargs = {}
217
218        if isinstance(options, SettingsConfigurationOptions):
219            self._logger.info("options is class")
220            self._settings = options
221        elif isinstance(options, dict):
222            self._logger.info("options is dict")
223            self._settings = SettingsConfigurationOptions.from_dict(options)
224        elif isinstance(options, str):
225            self._logger.info("options is json")
226            self._settings = SettingsConfigurationOptions.from_json(options)
227        else:
228            raise DeepgramError("Invalid options type")
229
230        if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"):
231            raise DeepgramError("Keyterms are only supported for nova-3 models")
232
233        try:
234            # speaker substitutes the listening thread
235            if self._speaker is not None:
236                self._logger.notice("passing speaker to delegate_listening")
237                super().delegate_listening(self._speaker)
238
239            # call parent start
240            if (
241                super().start(
242                    {},
243                    self._addons,
244                    self._headers,
245                    **dict(cast(Dict[Any, Any], self._kwargs)),
246                )
247                is False
248            ):
249                self._logger.error("AgentWebSocketClient.start failed")
250                self._logger.debug("AgentWebSocketClient.start LEAVE")
251                return False
252
253            if self._speaker is not None:
254                self._logger.notice("speaker is delegate_listening. Starting speaker")
255                self._speaker.start()
256
257            if self._speaker is not None and self._microphone is not None:
258                self._logger.notice(
259                    "speaker is delegate_listening. Starting microphone"
260                )
261                self._microphone.set_callback(self.send)
262                self._microphone.start()
263
264            # debug the threads
265            for thread in threading.enumerate():
266                self._logger.debug("after running thread: %s", thread.name)
267            self._logger.debug("number of active threads: %s", threading.active_count())
268
269            # keepalive thread
270            if self._config.is_keep_alive_enabled():
271                self._logger.notice("keepalive is enabled")
272                self._keep_alive_thread = threading.Thread(target=self._keep_alive)
273                self._keep_alive_thread.start()
274            else:
275                self._logger.notice("keepalive is disabled")
276
277            # debug the threads
278            for thread in threading.enumerate():
279                self._logger.debug("after running thread: %s", thread.name)
280            self._logger.debug("number of active threads: %s", threading.active_count())
281
282            # send the configurationsetting message
283            self._logger.notice("Sending ConfigurationSettings...")
284            ret_send_cs = self.send(str(self._settings))
285            if not ret_send_cs:
286                self._logger.error("ConfigurationSettings failed")
287
288                err_error: ErrorResponse = ErrorResponse(
289                    "Exception in AgentWebSocketClient.start",
290                    "ConfigurationSettings failed to send",
291                    "Exception",
292                )
293                self._emit(
294                    AgentWebSocketEvents(AgentWebSocketEvents.Error),
295                    error=err_error,
296                    **dict(cast(Dict[Any, Any], self._kwargs)),
297                )
298
299                self._logger.debug("AgentWebSocketClient.start LEAVE")
300                return False
301
302            self._logger.notice("start succeeded")
303            self._logger.debug("AgentWebSocketClient.start LEAVE")
304            return True
305
306        except Exception as e:  # pylint: disable=broad-except
307            self._logger.error(
308                "WebSocketException in AgentWebSocketClient.start: %s", e
309            )
310            self._logger.debug("AgentWebSocketClient.start LEAVE")
311            if self._config.options.get("termination_exception_connect") is True:
312                raise e
313            return False
314
315    # pylint: enable=too-many-statements,too-many-branches
316
317    def on(self, event: AgentWebSocketEvents, handler: Callable) -> None:
318        """
319        Registers event handlers for specific events.
320        """
321        self._logger.info("event subscribed: %s", event)
322        if event in AgentWebSocketEvents.__members__.values() and callable(handler):
323            self._event_handlers[event].append(handler)
324
325    def _emit(self, event: AgentWebSocketEvents, *args, **kwargs) -> None:
326        """
327        Emits events to the registered event handlers.
328        """
329        self._logger.debug("AgentWebSocketClient._emit ENTER")
330        self._logger.debug("callback handlers for: %s", event)
331
332        # debug the threads
333        for thread in threading.enumerate():
334            self._logger.debug("after running thread: %s", thread.name)
335        self._logger.debug("number of active threads: %s", threading.active_count())
336
337        self._logger.debug("callback handlers for: %s", event)
338        for handler in self._event_handlers[event]:
339            handler(self, *args, **kwargs)
340
341        # debug the threads
342        for thread in threading.enumerate():
343            self._logger.debug("after running thread: %s", thread.name)
344        self._logger.debug("number of active threads: %s", threading.active_count())
345
346        self._logger.debug("AgentWebSocketClient._emit LEAVE")
347
348    # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches
349    def _process_text(self, message: str) -> None:
350        """
351        Processes messages received over the WebSocket connection.
352        """
353        self._logger.debug("AgentWebSocketClient._process_text ENTER")
354
355        try:
356            self._logger.debug("Text data received")
357            if len(message) == 0:
358                self._logger.debug("message is empty")
359                self._logger.debug("AgentWebSocketClient._process_text LEAVE")
360                return
361
362            data = json.loads(message)
363            response_type = data.get("type")
364            self._logger.debug("response_type: %s, data: %s", response_type, data)
365
366            match response_type:
367                case AgentWebSocketEvents.Open:
368                    open_result: OpenResponse = OpenResponse.from_json(message)
369                    self._logger.verbose("OpenResponse: %s", open_result)
370                    self._emit(
371                        AgentWebSocketEvents(AgentWebSocketEvents.Open),
372                        open=open_result,
373                        **dict(cast(Dict[Any, Any], self._kwargs)),
374                    )
375                case AgentWebSocketEvents.Welcome:
376                    welcome_result: WelcomeResponse = WelcomeResponse.from_json(message)
377                    self._logger.verbose("WelcomeResponse: %s", welcome_result)
378                    self._emit(
379                        AgentWebSocketEvents(AgentWebSocketEvents.Welcome),
380                        welcome=welcome_result,
381                        **dict(cast(Dict[Any, Any], self._kwargs)),
382                    )
383                case AgentWebSocketEvents.SettingsApplied:
384                    settings_applied_result: SettingsAppliedResponse = (
385                        SettingsAppliedResponse.from_json(message)
386                    )
387                    self._logger.verbose(
388                        "SettingsAppliedResponse: %s", settings_applied_result
389                    )
390                    self._emit(
391                        AgentWebSocketEvents(AgentWebSocketEvents.SettingsApplied),
392                        settings_applied=settings_applied_result,
393                        **dict(cast(Dict[Any, Any], self._kwargs)),
394                    )
395                case AgentWebSocketEvents.ConversationText:
396                    conversation_text_result: ConversationTextResponse = (
397                        ConversationTextResponse.from_json(message)
398                    )
399                    self._logger.verbose(
400                        "ConversationTextResponse: %s", conversation_text_result
401                    )
402                    self._emit(
403                        AgentWebSocketEvents(AgentWebSocketEvents.ConversationText),
404                        conversation_text=conversation_text_result,
405                        **dict(cast(Dict[Any, Any], self._kwargs)),
406                    )
407                case AgentWebSocketEvents.UserStartedSpeaking:
408                    user_started_speaking_result: UserStartedSpeakingResponse = (
409                        UserStartedSpeakingResponse.from_json(message)
410                    )
411                    self._logger.verbose(
412                        "UserStartedSpeakingResponse: %s", user_started_speaking_result
413                    )
414                    self._emit(
415                        AgentWebSocketEvents(AgentWebSocketEvents.UserStartedSpeaking),
416                        user_started_speaking=user_started_speaking_result,
417                        **dict(cast(Dict[Any, Any], self._kwargs)),
418                    )
419                case AgentWebSocketEvents.AgentThinking:
420                    agent_thinking_result: AgentThinkingResponse = (
421                        AgentThinkingResponse.from_json(message)
422                    )
423                    self._logger.verbose(
424                        "AgentThinkingResponse: %s", agent_thinking_result
425                    )
426                    self._emit(
427                        AgentWebSocketEvents(AgentWebSocketEvents.AgentThinking),
428                        agent_thinking=agent_thinking_result,
429                        **dict(cast(Dict[Any, Any], self._kwargs)),
430                    )
431                case AgentWebSocketEvents.FunctionCalling:
432                    function_calling_result: FunctionCalling = (
433                        FunctionCalling.from_json(message)
434                    )
435                    self._logger.verbose("FunctionCalling: %s", function_calling_result)
436                    self._emit(
437                        AgentWebSocketEvents(AgentWebSocketEvents.FunctionCalling),
438                        function_calling=function_calling_result,
439                        **dict(cast(Dict[Any, Any], self._kwargs)),
440                    )
441                case AgentWebSocketEvents.FunctionCallRequest:
442                    function_call_request_result: FunctionCallRequest = (
443                        FunctionCallRequest.from_json(message)
444                    )
445                    self._logger.verbose(
446                        "FunctionCallRequest: %s", function_call_request_result
447                    )
448                    self._emit(
449                        AgentWebSocketEvents(AgentWebSocketEvents.FunctionCallRequest),
450                        function_call_request=function_call_request_result,
451                        **dict(cast(Dict[Any, Any], self._kwargs)),
452                    )
453                case AgentWebSocketEvents.AgentStartedSpeaking:
454                    agent_started_speaking_result: AgentStartedSpeakingResponse = (
455                        AgentStartedSpeakingResponse.from_json(message)
456                    )
457                    self._logger.verbose(
458                        "AgentStartedSpeakingResponse: %s",
459                        agent_started_speaking_result,
460                    )
461                    self._emit(
462                        AgentWebSocketEvents(AgentWebSocketEvents.AgentStartedSpeaking),
463                        agent_started_speaking=agent_started_speaking_result,
464                        **dict(cast(Dict[Any, Any], self._kwargs)),
465                    )
466                case AgentWebSocketEvents.AgentAudioDone:
467                    agent_audio_done_result: AgentAudioDoneResponse = (
468                        AgentAudioDoneResponse.from_json(message)
469                    )
470                    self._logger.verbose(
471                        "AgentAudioDoneResponse: %s", agent_audio_done_result
472                    )
473                    self._emit(
474                        AgentWebSocketEvents(AgentWebSocketEvents.AgentAudioDone),
475                        agent_audio_done=agent_audio_done_result,
476                        **dict(cast(Dict[Any, Any], self._kwargs)),
477                    )
478                case AgentWebSocketEvents.InjectionRefused:
479                    injection_refused_result: InjectionRefusedResponse = (
480                        InjectionRefusedResponse.from_json(message)
481                    )
482                    self._logger.verbose(
483                        "InjectionRefused: %s", injection_refused_result
484                    )
485                    self._emit(
486                        AgentWebSocketEvents(AgentWebSocketEvents.InjectionRefused),
487                        injection_refused=injection_refused_result,
488                        **dict(cast(Dict[Any, Any], self._kwargs)),
489                    )
490                case AgentWebSocketEvents.Close:
491                    close_result: CloseResponse = CloseResponse.from_json(message)
492                    self._logger.verbose("CloseResponse: %s", close_result)
493                    self._emit(
494                        AgentWebSocketEvents(AgentWebSocketEvents.Close),
495                        close=close_result,
496                        **dict(cast(Dict[Any, Any], self._kwargs)),
497                    )
498                case AgentWebSocketEvents.Error:
499                    err_error: ErrorResponse = ErrorResponse.from_json(message)
500                    self._logger.verbose("ErrorResponse: %s", err_error)
501                    self._emit(
502                        AgentWebSocketEvents(AgentWebSocketEvents.Error),
503                        error=err_error,
504                        **dict(cast(Dict[Any, Any], self._kwargs)),
505                    )
506                case _:
507                    self._logger.warning(
508                        "Unknown Message: response_type: %s, data: %s",
509                        response_type,
510                        data,
511                    )
512                    unhandled_error: UnhandledResponse = UnhandledResponse(
513                        type=AgentWebSocketEvents(AgentWebSocketEvents.Unhandled),
514                        raw=message,
515                    )
516                    self._emit(
517                        AgentWebSocketEvents(AgentWebSocketEvents.Unhandled),
518                        unhandled=unhandled_error,
519                        **dict(cast(Dict[Any, Any], self._kwargs)),
520                    )
521
522            self._logger.notice("_process_text Succeeded")
523            self._logger.debug("SpeakStreamClient._process_text LEAVE")
524
525        except Exception as e:  # pylint: disable=broad-except
526            self._logger.error("Exception in AgentWebSocketClient._process_text: %s", e)
527            e_error: ErrorResponse = ErrorResponse(
528                "Exception in AgentWebSocketClient._process_text",
529                f"{e}",
530                "Exception",
531            )
532            self._logger.error(
533                "Exception in AgentWebSocketClient._process_text: %s", str(e)
534            )
535            self._emit(
536                AgentWebSocketEvents(AgentWebSocketEvents.Error),
537                error=e_error,
538                **dict(cast(Dict[Any, Any], self._kwargs)),
539            )
540
541            # signal exit and close
542            super()._signal_exit()
543
544            self._logger.debug("AgentWebSocketClient._process_text LEAVE")
545
546            if self._config.options.get("termination_exception") is True:
547                raise
548            return
549
550    # pylint: enable=too-many-return-statements,too-many-statements
551
552    def _process_binary(self, message: bytes) -> None:
553        self._logger.debug("AgentWebSocketClient._process_binary ENTER")
554        self._logger.debug("Binary data received")
555
556        self._emit(
557            AgentWebSocketEvents(AgentWebSocketEvents.AudioData),
558            data=message,
559            **dict(cast(Dict[Any, Any], self._kwargs)),
560        )
561
562        self._logger.notice("_process_binary Succeeded")
563        self._logger.debug("AgentWebSocketClient._process_binary LEAVE")
564
565    # pylint: disable=too-many-return-statements
566    def _keep_alive(self) -> None:
567        """
568        Sends keepalive messages to the WebSocket connection.
569        """
570        self._logger.debug("AgentWebSocketClient._keep_alive ENTER")
571
572        counter = 0
573        while True:
574            try:
575                counter += 1
576                self._exit_event.wait(timeout=ONE_SECOND)
577
578                if self._exit_event.is_set():
579                    self._logger.notice("_keep_alive exiting gracefully")
580                    self._logger.debug("AgentWebSocketClient._keep_alive LEAVE")
581                    return
582
583                # deepgram keepalive
584                if counter % DEEPGRAM_INTERVAL == 0:
585                    self.keep_alive()
586
587            except Exception as e:  # pylint: disable=broad-except
588                self._logger.error(
589                    "Exception in AgentWebSocketClient._keep_alive: %s", e
590                )
591                e_error: ErrorResponse = ErrorResponse(
592                    "Exception in AgentWebSocketClient._keep_alive",
593                    f"{e}",
594                    "Exception",
595                )
596                self._logger.error(
597                    "Exception in AgentWebSocketClient._keep_alive: %s", str(e)
598                )
599                self._emit(
600                    AgentWebSocketEvents(AgentWebSocketEvents.Error),
601                    error=e_error,
602                    **dict(cast(Dict[Any, Any], self._kwargs)),
603                )
604
605                # signal exit and close
606                super()._signal_exit()
607
608                self._logger.debug("AgentWebSocketClient._keep_alive LEAVE")
609
610                if self._config.options.get("termination_exception") is True:
611                    raise
612                return
613
614    def keep_alive(self) -> bool:
615        """
616        Sends a KeepAlive message
617        """
618        self._logger.spam("AgentWebSocketClient.keep_alive ENTER")
619
620        self._logger.notice("Sending KeepAlive...")
621        ret = self.send(json.dumps({"type": "KeepAlive"}))
622
623        if not ret:
624            self._logger.error("keep_alive failed")
625            self._logger.spam("AgentWebSocketClient.keep_alive LEAVE")
626            return False
627
628        self._logger.notice("keep_alive succeeded")
629        self._logger.spam("AgentWebSocketClient.keep_alive LEAVE")
630
631        return True
632
633    def _close_message(self) -> bool:
634        # TODO: No known API close message # pylint: disable=fixme
635        # return self.send(json.dumps({"type": "Close"}))
636        return True
637
638    # closes the WebSocket connection gracefully
639    def finish(self) -> bool:
640        """
641        Closes the WebSocket connection gracefully.
642        """
643        self._logger.spam("AgentWebSocketClient.finish ENTER")
644
645        # call parent finish
646        if super().finish() is False:
647            self._logger.error("AgentWebSocketClient.finish failed")
648
649        if self._microphone is not None and self._microphone_created:
650            self._microphone.finish()
651            self._microphone_created = False
652
653        if self._speaker is not None and self._speaker_created:
654            self._speaker.finish()
655            self._speaker_created = False
656
657        # debug the threads
658        for thread in threading.enumerate():
659            self._logger.debug("before running thread: %s", thread.name)
660        self._logger.debug("number of active threads: %s", threading.active_count())
661
662        # stop the threads
663        self._logger.verbose("cancelling tasks...")
664        if self._keep_alive_thread is not None:
665            self._keep_alive_thread.join()
666            self._keep_alive_thread = None
667            self._logger.notice("processing _keep_alive_thread thread joined")
668
669        if self._listen_thread is not None:
670            self._listen_thread.join()
671            self._listen_thread = None
672        self._logger.notice("listening thread joined")
673
674        self._speaker = None
675        self._microphone = None
676
677        # debug the threads
678        for thread in threading.enumerate():
679            self._logger.debug("before running thread: %s", thread.name)
680        self._logger.debug("number of active threads: %s", threading.active_count())
681
682        self._logger.notice("finish succeeded")
683        self._logger.spam("AgentWebSocketClient.finish LEAVE")
684        return True

Client for interacting with Deepgram's live transcription services over WebSockets.

This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.

Args: config (DeepgramClientOptions): all the options for the client.

AgentWebSocketClient(config: deepgram.options.DeepgramClientOptions)
 91    def __init__(self, config: DeepgramClientOptions):
 92        if config is None:
 93            raise DeepgramError("Config is required")
 94
 95        self._logger = verboselogs.VerboseLogger(__name__)
 96        self._logger.addHandler(logging.StreamHandler())
 97        self._logger.setLevel(config.verbose)
 98
 99        self._config = config
100
101        # needs to be "wss://agent.deepgram.com/agent"
102        self._endpoint = "agent"
103
104        # override the endpoint since it needs to be "wss://agent.deepgram.com/agent"
105        self._config.url = "agent.deepgram.com"
106
107        self._keep_alive_thread = None
108
109        # init handlers
110        self._event_handlers = {
111            event: [] for event in AgentWebSocketEvents.__members__.values()
112        }
113
114        if self._config.options.get("microphone_record") == "true":
115            self._logger.info("microphone_record is enabled")
116            rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE)
117            channels = self._config.options.get(
118                "microphone_record_channels", MICROPHONE_CHANNELS
119            )
120            device_index = self._config.options.get("microphone_record_device_index")
121
122            self._logger.debug("rate: %s", rate)
123            self._logger.debug("channels: %s", channels)
124
125            self._microphone_created = True
126
127            if device_index is not None:
128                self._logger.debug("device_index: %s", device_index)
129                self._microphone = Microphone(
130                    rate=rate,
131                    channels=channels,
132                    verbose=self._config.verbose,
133                    input_device_index=device_index,
134                )
135            else:
136                self._microphone = Microphone(
137                    rate=rate,
138                    channels=channels,
139                    verbose=self._config.verbose,
140                )
141
142        if self._config.options.get("speaker_playback") == "true":
143            self._logger.info("speaker_playback is enabled")
144            rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE)
145            channels = self._config.options.get(
146                "speaker_playback_channels", SPEAKER_CHANNELS
147            )
148            playback_delta_in_ms = self._config.options.get(
149                "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA
150            )
151            device_index = self._config.options.get("speaker_playback_device_index")
152
153            self._logger.debug("rate: %s", rate)
154            self._logger.debug("channels: %s", channels)
155
156            self._speaker_created = True
157
158            if device_index is not None:
159                self._logger.debug("device_index: %s", device_index)
160
161                self._speaker = Speaker(
162                    rate=rate,
163                    channels=channels,
164                    last_play_delta_in_ms=playback_delta_in_ms,
165                    verbose=self._config.verbose,
166                    output_device_index=device_index,
167                    microphone=self._microphone,
168                )
169            else:
170                self._speaker = Speaker(
171                    rate=rate,
172                    channels=channels,
173                    last_play_delta_in_ms=playback_delta_in_ms,
174                    verbose=self._config.verbose,
175                    microphone=self._microphone,
176                )
177
178        # call the parent constructor
179        super().__init__(self._config, self._endpoint)
def start( self, options: Optional[SettingsConfigurationOptions] = None, addons: Optional[Dict] = None, headers: Optional[Dict] = None, members: Optional[Dict] = None, **kwargs) -> bool:
182    def start(
183        self,
184        options: Optional[SettingsConfigurationOptions] = None,
185        addons: Optional[Dict] = None,
186        headers: Optional[Dict] = None,
187        members: Optional[Dict] = None,
188        **kwargs,
189    ) -> bool:
190        """
191        Starts the WebSocket connection for agent API.
192        """
193        self._logger.debug("AgentWebSocketClient.start ENTER")
194        self._logger.info("settings: %s", options)
195        self._logger.info("addons: %s", addons)
196        self._logger.info("headers: %s", headers)
197        self._logger.info("members: %s", members)
198        self._logger.info("kwargs: %s", kwargs)
199
200        if isinstance(options, SettingsConfigurationOptions) and not options.check():
201            self._logger.error("settings.check failed")
202            self._logger.debug("AgentWebSocketClient.start LEAVE")
203            raise DeepgramError("Fatal agent settings error")
204
205        self._addons = addons
206        self._headers = headers
207
208        # add "members" as members of the class
209        if members is not None:
210            self.__dict__.update(members)
211
212        # set kwargs as members of the class
213        if kwargs is not None:
214            self._kwargs = kwargs
215        else:
216            self._kwargs = {}
217
218        if isinstance(options, SettingsConfigurationOptions):
219            self._logger.info("options is class")
220            self._settings = options
221        elif isinstance(options, dict):
222            self._logger.info("options is dict")
223            self._settings = SettingsConfigurationOptions.from_dict(options)
224        elif isinstance(options, str):
225            self._logger.info("options is json")
226            self._settings = SettingsConfigurationOptions.from_json(options)
227        else:
228            raise DeepgramError("Invalid options type")
229
230        if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"):
231            raise DeepgramError("Keyterms are only supported for nova-3 models")
232
233        try:
234            # speaker substitutes the listening thread
235            if self._speaker is not None:
236                self._logger.notice("passing speaker to delegate_listening")
237                super().delegate_listening(self._speaker)
238
239            # call parent start
240            if (
241                super().start(
242                    {},
243                    self._addons,
244                    self._headers,
245                    **dict(cast(Dict[Any, Any], self._kwargs)),
246                )
247                is False
248            ):
249                self._logger.error("AgentWebSocketClient.start failed")
250                self._logger.debug("AgentWebSocketClient.start LEAVE")
251                return False
252
253            if self._speaker is not None:
254                self._logger.notice("speaker is delegate_listening. Starting speaker")
255                self._speaker.start()
256
257            if self._speaker is not None and self._microphone is not None:
258                self._logger.notice(
259                    "speaker is delegate_listening. Starting microphone"
260                )
261                self._microphone.set_callback(self.send)
262                self._microphone.start()
263
264            # debug the threads
265            for thread in threading.enumerate():
266                self._logger.debug("after running thread: %s", thread.name)
267            self._logger.debug("number of active threads: %s", threading.active_count())
268
269            # keepalive thread
270            if self._config.is_keep_alive_enabled():
271                self._logger.notice("keepalive is enabled")
272                self._keep_alive_thread = threading.Thread(target=self._keep_alive)
273                self._keep_alive_thread.start()
274            else:
275                self._logger.notice("keepalive is disabled")
276
277            # debug the threads
278            for thread in threading.enumerate():
279                self._logger.debug("after running thread: %s", thread.name)
280            self._logger.debug("number of active threads: %s", threading.active_count())
281
282            # send the configurationsetting message
283            self._logger.notice("Sending ConfigurationSettings...")
284            ret_send_cs = self.send(str(self._settings))
285            if not ret_send_cs:
286                self._logger.error("ConfigurationSettings failed")
287
288                err_error: ErrorResponse = ErrorResponse(
289                    "Exception in AgentWebSocketClient.start",
290                    "ConfigurationSettings failed to send",
291                    "Exception",
292                )
293                self._emit(
294                    AgentWebSocketEvents(AgentWebSocketEvents.Error),
295                    error=err_error,
296                    **dict(cast(Dict[Any, Any], self._kwargs)),
297                )
298
299                self._logger.debug("AgentWebSocketClient.start LEAVE")
300                return False
301
302            self._logger.notice("start succeeded")
303            self._logger.debug("AgentWebSocketClient.start LEAVE")
304            return True
305
306        except Exception as e:  # pylint: disable=broad-except
307            self._logger.error(
308                "WebSocketException in AgentWebSocketClient.start: %s", e
309            )
310            self._logger.debug("AgentWebSocketClient.start LEAVE")
311            if self._config.options.get("termination_exception_connect") is True:
312                raise e
313            return False

Starts the WebSocket connection for agent API.

def on( self, event: deepgram.clients.agent.enums.AgentWebSocketEvents, handler: Callable) -> None:
317    def on(self, event: AgentWebSocketEvents, handler: Callable) -> None:
318        """
319        Registers event handlers for specific events.
320        """
321        self._logger.info("event subscribed: %s", event)
322        if event in AgentWebSocketEvents.__members__.values() and callable(handler):
323            self._event_handlers[event].append(handler)

Registers event handlers for specific events.

def keep_alive(self) -> bool:
614    def keep_alive(self) -> bool:
615        """
616        Sends a KeepAlive message
617        """
618        self._logger.spam("AgentWebSocketClient.keep_alive ENTER")
619
620        self._logger.notice("Sending KeepAlive...")
621        ret = self.send(json.dumps({"type": "KeepAlive"}))
622
623        if not ret:
624            self._logger.error("keep_alive failed")
625            self._logger.spam("AgentWebSocketClient.keep_alive LEAVE")
626            return False
627
628        self._logger.notice("keep_alive succeeded")
629        self._logger.spam("AgentWebSocketClient.keep_alive LEAVE")
630
631        return True

Sends a KeepAlive message

def finish(self) -> bool:
639    def finish(self) -> bool:
640        """
641        Closes the WebSocket connection gracefully.
642        """
643        self._logger.spam("AgentWebSocketClient.finish ENTER")
644
645        # call parent finish
646        if super().finish() is False:
647            self._logger.error("AgentWebSocketClient.finish failed")
648
649        if self._microphone is not None and self._microphone_created:
650            self._microphone.finish()
651            self._microphone_created = False
652
653        if self._speaker is not None and self._speaker_created:
654            self._speaker.finish()
655            self._speaker_created = False
656
657        # debug the threads
658        for thread in threading.enumerate():
659            self._logger.debug("before running thread: %s", thread.name)
660        self._logger.debug("number of active threads: %s", threading.active_count())
661
662        # stop the threads
663        self._logger.verbose("cancelling tasks...")
664        if self._keep_alive_thread is not None:
665            self._keep_alive_thread.join()
666            self._keep_alive_thread = None
667            self._logger.notice("processing _keep_alive_thread thread joined")
668
669        if self._listen_thread is not None:
670            self._listen_thread.join()
671            self._listen_thread = None
672        self._logger.notice("listening thread joined")
673
674        self._speaker = None
675        self._microphone = None
676
677        # debug the threads
678        for thread in threading.enumerate():
679            self._logger.debug("before running thread: %s", thread.name)
680        self._logger.debug("number of active threads: %s", threading.active_count())
681
682        self._logger.notice("finish succeeded")
683        self._logger.spam("AgentWebSocketClient.finish LEAVE")
684        return True

Closes the WebSocket connection gracefully.

 60class AsyncAgentWebSocketClient(
 61    AbstractAsyncWebSocketClient
 62):  # pylint: disable=too-many-instance-attributes
 63    """
 64    Client for interacting with Deepgram's live transcription services over WebSockets.
 65
 66     This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
 67
 68     Args:
 69         config (DeepgramClientOptions): all the options for the client.
 70    """
 71
 72    _logger: verboselogs.VerboseLogger
 73    _config: DeepgramClientOptions
 74    _endpoint: str
 75
 76    _event_handlers: Dict[AgentWebSocketEvents, list]
 77
 78    _keep_alive_thread: Union[asyncio.Task, None]
 79
 80    _kwargs: Optional[Dict] = None
 81    _addons: Optional[Dict] = None
 82    # note the distinction here. We can't use _config because it's already used in the parent
 83    _settings: Optional[SettingsConfigurationOptions] = None
 84    _headers: Optional[Dict] = None
 85
 86    _speaker_created: bool = False
 87    _speaker: Optional[Speaker] = None
 88    _microphone_created: bool = False
 89    _microphone: Optional[Microphone] = None
 90
 91    def __init__(self, config: DeepgramClientOptions):
 92        if config is None:
 93            raise DeepgramError("Config is required")
 94
 95        self._logger = verboselogs.VerboseLogger(__name__)
 96        self._logger.addHandler(logging.StreamHandler())
 97        self._logger.setLevel(config.verbose)
 98
 99        self._config = config
100
101        # needs to be "wss://agent.deepgram.com/agent"
102        self._endpoint = "agent"
103
104        # override the endpoint since it needs to be "wss://agent.deepgram.com/agent"
105        self._config.url = "agent.deepgram.com"
106        self._keep_alive_thread = None
107
108        # init handlers
109        self._event_handlers = {
110            event: [] for event in AgentWebSocketEvents.__members__.values()
111        }
112
113        if self._config.options.get("microphone_record") == "true":
114            self._logger.info("microphone_record is enabled")
115            rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE)
116            channels = self._config.options.get(
117                "microphone_record_channels", MICROPHONE_CHANNELS
118            )
119            device_index = self._config.options.get("microphone_record_device_index")
120
121            self._logger.debug("rate: %s", rate)
122            self._logger.debug("channels: %s", channels)
123            if device_index is not None:
124                self._logger.debug("device_index: %s", device_index)
125
126            self._microphone_created = True
127
128            if device_index is not None:
129                self._microphone = Microphone(
130                    rate=rate,
131                    channels=channels,
132                    verbose=self._config.verbose,
133                    input_device_index=device_index,
134                )
135            else:
136                self._microphone = Microphone(
137                    rate=rate,
138                    channels=channels,
139                    verbose=self._config.verbose,
140                )
141
142        if self._config.options.get("speaker_playback") == "true":
143            self._logger.info("speaker_playback is enabled")
144            rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE)
145            channels = self._config.options.get(
146                "speaker_playback_channels", SPEAKER_CHANNELS
147            )
148            playback_delta_in_ms = self._config.options.get(
149                "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA
150            )
151            device_index = self._config.options.get("speaker_playback_device_index")
152
153            self._logger.debug("rate: %s", rate)
154            self._logger.debug("channels: %s", channels)
155
156            self._speaker_created = True
157
158            if device_index is not None:
159                self._logger.debug("device_index: %s", device_index)
160
161                self._speaker = Speaker(
162                    rate=rate,
163                    channels=channels,
164                    last_play_delta_in_ms=playback_delta_in_ms,
165                    verbose=self._config.verbose,
166                    output_device_index=device_index,
167                    microphone=self._microphone,
168                )
169            else:
170                self._speaker = Speaker(
171                    rate=rate,
172                    channels=channels,
173                    last_play_delta_in_ms=playback_delta_in_ms,
174                    verbose=self._config.verbose,
175                    microphone=self._microphone,
176                )
177        # call the parent constructor
178        super().__init__(self._config, self._endpoint)
179
180    # pylint: disable=too-many-branches,too-many-statements
181    async def start(
182        self,
183        options: Optional[SettingsConfigurationOptions] = None,
184        addons: Optional[Dict] = None,
185        headers: Optional[Dict] = None,
186        members: Optional[Dict] = None,
187        **kwargs,
188    ) -> bool:
189        """
190        Starts the WebSocket connection for agent API.
191        """
192        self._logger.debug("AsyncAgentWebSocketClient.start ENTER")
193        self._logger.info("settings: %s", options)
194        self._logger.info("addons: %s", addons)
195        self._logger.info("headers: %s", headers)
196        self._logger.info("members: %s", members)
197        self._logger.info("kwargs: %s", kwargs)
198
199        if isinstance(options, SettingsConfigurationOptions) and not options.check():
200            self._logger.error("settings.check failed")
201            self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
202            raise DeepgramError("Fatal agent settings error")
203
204        self._addons = addons
205        self._headers = headers
206
207        # add "members" as members of the class
208        if members is not None:
209            self.__dict__.update(members)
210
211        # set kwargs as members of the class
212        if kwargs is not None:
213            self._kwargs = kwargs
214        else:
215            self._kwargs = {}
216
217        if isinstance(options, SettingsConfigurationOptions):
218            self._logger.info("options is class")
219            self._settings = options
220        elif isinstance(options, dict):
221            self._logger.info("options is dict")
222            self._settings = SettingsConfigurationOptions.from_dict(options)
223        elif isinstance(options, str):
224            self._logger.info("options is json")
225            self._settings = SettingsConfigurationOptions.from_json(options)
226        else:
227            raise DeepgramError("Invalid options type")
228
229        if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"):
230            raise DeepgramError("Keyterms are only supported for nova-3 models")
231
232        try:
233            # speaker substitutes the listening thread
234            if self._speaker is not None:
235                self._logger.notice("passing speaker to delegate_listening")
236                super().delegate_listening(self._speaker)
237
238            # call parent start
239            if (
240                await super().start(
241                    {},
242                    self._addons,
243                    self._headers,
244                    **dict(cast(Dict[Any, Any], self._kwargs)),
245                )
246                is False
247            ):
248                self._logger.error("AsyncAgentWebSocketClient.start failed")
249                self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
250                return False
251
252            if self._speaker is not None:
253                self._logger.notice("speaker is delegate_listening. Starting speaker")
254                self._speaker.start()
255
256            if self._speaker is not None and self._microphone is not None:
257                self._logger.notice(
258                    "speaker is delegate_listening. Starting microphone"
259                )
260                self._microphone.set_callback(self.send)
261                self._microphone.start()
262
263            # debug the threads
264            for thread in threading.enumerate():
265                self._logger.debug("after running thread: %s", thread.name)
266            self._logger.debug("number of active threads: %s", threading.active_count())
267
268            # keepalive thread
269            if self._config.is_keep_alive_enabled():
270                self._logger.notice("keepalive is enabled")
271                self._keep_alive_thread = asyncio.create_task(self._keep_alive())
272            else:
273                self._logger.notice("keepalive is disabled")
274
275            # debug the threads
276            for thread in threading.enumerate():
277                self._logger.debug("after running thread: %s", thread.name)
278            self._logger.debug("number of active threads: %s", threading.active_count())
279
280            # send the configurationsetting message
281            self._logger.notice("Sending ConfigurationSettings...")
282            ret_send_cs = await self.send(str(self._settings))
283            if not ret_send_cs:
284                self._logger.error("ConfigurationSettings failed")
285
286                err_error: ErrorResponse = ErrorResponse(
287                    "Exception in AsyncAgentWebSocketClient.start",
288                    "ConfigurationSettings failed to send",
289                    "Exception",
290                )
291                await self._emit(
292                    AgentWebSocketEvents(AgentWebSocketEvents.Error),
293                    error=err_error,
294                    **dict(cast(Dict[Any, Any], self._kwargs)),
295                )
296
297                self._logger.debug("AgentWebSocketClient.start LEAVE")
298                return False
299
300            self._logger.notice("start succeeded")
301            self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
302            return True
303
304        except Exception as e:  # pylint: disable=broad-except
305            self._logger.error(
306                "WebSocketException in AsyncAgentWebSocketClient.start: %s", e
307            )
308            self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
309            if self._config.options.get("termination_exception_connect") is True:
310                raise e
311            return False
312
313    # pylint: enable=too-many-branches,too-many-statements
314
315    def on(self, event: AgentWebSocketEvents, handler: Callable) -> None:
316        """
317        Registers event handlers for specific events.
318        """
319        self._logger.info("event subscribed: %s", event)
320        if event in AgentWebSocketEvents.__members__.values() and callable(handler):
321            self._event_handlers[event].append(handler)
322
323    async def _emit(self, event: AgentWebSocketEvents, *args, **kwargs) -> None:
324        """
325        Emits events to the registered event handlers.
326        """
327        self._logger.debug("AsyncAgentWebSocketClient._emit ENTER")
328        self._logger.debug("callback handlers for: %s", event)
329
330        # debug the threads
331        for thread in threading.enumerate():
332            self._logger.debug("after running thread: %s", thread.name)
333        self._logger.debug("number of active threads: %s", threading.active_count())
334
335        self._logger.debug("callback handlers for: %s", event)
336        tasks = []
337        for handler in self._event_handlers[event]:
338            task = asyncio.create_task(handler(self, *args, **kwargs))
339            tasks.append(task)
340
341        if tasks:
342            self._logger.debug("waiting for tasks to finish...")
343            await asyncio.gather(*tasks, return_exceptions=True)
344            tasks.clear()
345
346        # debug the threads
347        for thread in threading.enumerate():
348            self._logger.debug("after running thread: %s", thread.name)
349        self._logger.debug("number of active threads: %s", threading.active_count())
350
351        self._logger.debug("AsyncAgentWebSocketClient._emit LEAVE")
352
353    # pylint: disable=too-many-locals,too-many-statements
354    async def _process_text(self, message: str) -> None:
355        """
356        Processes messages received over the WebSocket connection.
357        """
358        self._logger.debug("AsyncAgentWebSocketClient._process_text ENTER")
359
360        try:
361            self._logger.debug("Text data received")
362            if len(message) == 0:
363                self._logger.debug("message is empty")
364                self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE")
365                return
366
367            data = json.loads(message)
368            response_type = data.get("type")
369            self._logger.debug("response_type: %s, data: %s", response_type, data)
370
371            match response_type:
372                case AgentWebSocketEvents.Open:
373                    open_result: OpenResponse = OpenResponse.from_json(message)
374                    self._logger.verbose("OpenResponse: %s", open_result)
375                    await self._emit(
376                        AgentWebSocketEvents(AgentWebSocketEvents.Open),
377                        open=open_result,
378                        **dict(cast(Dict[Any, Any], self._kwargs)),
379                    )
380                case AgentWebSocketEvents.Welcome:
381                    welcome_result: WelcomeResponse = WelcomeResponse.from_json(message)
382                    self._logger.verbose("WelcomeResponse: %s", welcome_result)
383                    await self._emit(
384                        AgentWebSocketEvents(AgentWebSocketEvents.Welcome),
385                        welcome=welcome_result,
386                        **dict(cast(Dict[Any, Any], self._kwargs)),
387                    )
388                case AgentWebSocketEvents.SettingsApplied:
389                    settings_applied_result: SettingsAppliedResponse = (
390                        SettingsAppliedResponse.from_json(message)
391                    )
392                    self._logger.verbose(
393                        "SettingsAppliedResponse: %s", settings_applied_result
394                    )
395                    await self._emit(
396                        AgentWebSocketEvents(AgentWebSocketEvents.SettingsApplied),
397                        settings_applied=settings_applied_result,
398                        **dict(cast(Dict[Any, Any], self._kwargs)),
399                    )
400                case AgentWebSocketEvents.ConversationText:
401                    conversation_text_result: ConversationTextResponse = (
402                        ConversationTextResponse.from_json(message)
403                    )
404                    self._logger.verbose(
405                        "ConversationTextResponse: %s", conversation_text_result
406                    )
407                    await self._emit(
408                        AgentWebSocketEvents(AgentWebSocketEvents.ConversationText),
409                        conversation_text=conversation_text_result,
410                        **dict(cast(Dict[Any, Any], self._kwargs)),
411                    )
412                case AgentWebSocketEvents.UserStartedSpeaking:
413                    user_started_speaking_result: UserStartedSpeakingResponse = (
414                        UserStartedSpeakingResponse.from_json(message)
415                    )
416                    self._logger.verbose(
417                        "UserStartedSpeakingResponse: %s", user_started_speaking_result
418                    )
419                    await self._emit(
420                        AgentWebSocketEvents(AgentWebSocketEvents.UserStartedSpeaking),
421                        user_started_speaking=user_started_speaking_result,
422                        **dict(cast(Dict[Any, Any], self._kwargs)),
423                    )
424                case AgentWebSocketEvents.AgentThinking:
425                    agent_thinking_result: AgentThinkingResponse = (
426                        AgentThinkingResponse.from_json(message)
427                    )
428                    self._logger.verbose(
429                        "AgentThinkingResponse: %s", agent_thinking_result
430                    )
431                    await self._emit(
432                        AgentWebSocketEvents(AgentWebSocketEvents.AgentThinking),
433                        agent_thinking=agent_thinking_result,
434                        **dict(cast(Dict[Any, Any], self._kwargs)),
435                    )
436                case AgentWebSocketEvents.FunctionCalling:
437                    function_calling_result: FunctionCalling = (
438                        FunctionCalling.from_json(message)
439                    )
440                    self._logger.verbose("FunctionCalling: %s", function_calling_result)
441                    await self._emit(
442                        AgentWebSocketEvents(AgentWebSocketEvents.FunctionCalling),
443                        function_calling=function_calling_result,
444                        **dict(cast(Dict[Any, Any], self._kwargs)),
445                    )
446                case AgentWebSocketEvents.FunctionCallRequest:
447                    function_call_request_result: FunctionCallRequest = (
448                        FunctionCallRequest.from_json(message)
449                    )
450                    self._logger.verbose(
451                        "FunctionCallRequest: %s", function_call_request_result
452                    )
453                    await self._emit(
454                        AgentWebSocketEvents(AgentWebSocketEvents.FunctionCallRequest),
455                        function_call_request=function_call_request_result,
456                        **dict(cast(Dict[Any, Any], self._kwargs)),
457                    )
458                case AgentWebSocketEvents.AgentStartedSpeaking:
459                    agent_started_speaking_result: AgentStartedSpeakingResponse = (
460                        AgentStartedSpeakingResponse.from_json(message)
461                    )
462                    self._logger.verbose(
463                        "AgentStartedSpeakingResponse: %s",
464                        agent_started_speaking_result,
465                    )
466                    await self._emit(
467                        AgentWebSocketEvents(AgentWebSocketEvents.AgentStartedSpeaking),
468                        agent_started_speaking=agent_started_speaking_result,
469                        **dict(cast(Dict[Any, Any], self._kwargs)),
470                    )
471                case AgentWebSocketEvents.AgentAudioDone:
472                    agent_audio_done_result: AgentAudioDoneResponse = (
473                        AgentAudioDoneResponse.from_json(message)
474                    )
475                    self._logger.verbose(
476                        "AgentAudioDoneResponse: %s", agent_audio_done_result
477                    )
478                    await self._emit(
479                        AgentWebSocketEvents(AgentWebSocketEvents.AgentAudioDone),
480                        agent_audio_done=agent_audio_done_result,
481                        **dict(cast(Dict[Any, Any], self._kwargs)),
482                    )
483                case AgentWebSocketEvents.InjectionRefused:
484                    injection_refused_result: InjectionRefusedResponse = (
485                        InjectionRefusedResponse.from_json(message)
486                    )
487                    self._logger.verbose(
488                        "InjectionRefused: %s", injection_refused_result
489                    )
490                    await self._emit(
491                        AgentWebSocketEvents(AgentWebSocketEvents.InjectionRefused),
492                        injection_refused=injection_refused_result,
493                        **dict(cast(Dict[Any, Any], self._kwargs)),
494                    )
495                case AgentWebSocketEvents.Close:
496                    close_result: CloseResponse = CloseResponse.from_json(message)
497                    self._logger.verbose("CloseResponse: %s", close_result)
498                    await self._emit(
499                        AgentWebSocketEvents(AgentWebSocketEvents.Close),
500                        close=close_result,
501                        **dict(cast(Dict[Any, Any], self._kwargs)),
502                    )
503                case AgentWebSocketEvents.Error:
504                    err_error: ErrorResponse = ErrorResponse.from_json(message)
505                    self._logger.verbose("ErrorResponse: %s", err_error)
506                    await self._emit(
507                        AgentWebSocketEvents(AgentWebSocketEvents.Error),
508                        error=err_error,
509                        **dict(cast(Dict[Any, Any], self._kwargs)),
510                    )
511                case _:
512                    self._logger.warning(
513                        "Unknown Message: response_type: %s, data: %s",
514                        response_type,
515                        data,
516                    )
517                    unhandled_error: UnhandledResponse = UnhandledResponse(
518                        type=AgentWebSocketEvents(AgentWebSocketEvents.Unhandled),
519                        raw=message,
520                    )
521                    await self._emit(
522                        AgentWebSocketEvents(AgentWebSocketEvents.Unhandled),
523                        unhandled=unhandled_error,
524                        **dict(cast(Dict[Any, Any], self._kwargs)),
525                    )
526
527            self._logger.notice("_process_text Succeeded")
528            self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE")
529
530        except Exception as e:  # pylint: disable=broad-except
531            self._logger.error(
532                "Exception in AsyncAgentWebSocketClient._process_text: %s", e
533            )
534            e_error: ErrorResponse = ErrorResponse(
535                "Exception in AsyncAgentWebSocketClient._process_text",
536                f"{e}",
537                "Exception",
538            )
539            await self._emit(
540                AgentWebSocketEvents(AgentWebSocketEvents.Error),
541                error=e_error,
542                **dict(cast(Dict[Any, Any], self._kwargs)),
543            )
544
545            # signal exit and close
546            await super()._signal_exit()
547
548            self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE")
549
550            if self._config.options.get("termination_exception") is True:
551                raise
552            return
553
554    # pylint: enable=too-many-locals,too-many-statements
555
556    async def _process_binary(self, message: bytes) -> None:
557        self._logger.debug("AsyncAgentWebSocketClient._process_binary ENTER")
558        self._logger.debug("Binary data received")
559
560        await self._emit(
561            AgentWebSocketEvents(AgentWebSocketEvents.AudioData),
562            data=message,
563            **dict(cast(Dict[Any, Any], self._kwargs)),
564        )
565
566        self._logger.notice("_process_binary Succeeded")
567        self._logger.debug("AsyncAgentWebSocketClient._process_binary LEAVE")
568
569    # pylint: disable=too-many-return-statements
570    async def _keep_alive(self) -> None:
571        """
572        Sends keepalive messages to the WebSocket connection.
573        """
574        self._logger.debug("AsyncAgentWebSocketClient._keep_alive ENTER")
575
576        counter = 0
577        while True:
578            try:
579                counter += 1
580                await asyncio.sleep(ONE_SECOND)
581
582                if self._exit_event.is_set():
583                    self._logger.notice("_keep_alive exiting gracefully")
584                    self._logger.debug("AsyncAgentWebSocketClient._keep_alive LEAVE")
585                    return
586
587                # deepgram keepalive
588                if counter % DEEPGRAM_INTERVAL == 0:
589                    await self.keep_alive()
590
591            except Exception as e:  # pylint: disable=broad-except
592                self._logger.error(
593                    "Exception in AsyncAgentWebSocketClient._keep_alive: %s", e
594                )
595                e_error: ErrorResponse = ErrorResponse(
596                    "Exception in AsyncAgentWebSocketClient._keep_alive",
597                    f"{e}",
598                    "Exception",
599                )
600                self._logger.error(
601                    "Exception in AsyncAgentWebSocketClient._keep_alive: %s", str(e)
602                )
603                await self._emit(
604                    AgentWebSocketEvents(AgentWebSocketEvents.Error),
605                    error=e_error,
606                    **dict(cast(Dict[Any, Any], self._kwargs)),
607                )
608
609                # signal exit and close
610                await super()._signal_exit()
611
612                self._logger.debug("AsyncAgentWebSocketClient._keep_alive LEAVE")
613
614                if self._config.options.get("termination_exception") is True:
615                    raise
616                return
617
618    async def keep_alive(self) -> bool:
619        """
620        Sends a KeepAlive message
621        """
622        self._logger.spam("AsyncAgentWebSocketClient.keep_alive ENTER")
623
624        self._logger.notice("Sending KeepAlive...")
625        ret = await self.send(json.dumps({"type": "KeepAlive"}))
626
627        if not ret:
628            self._logger.error("keep_alive failed")
629            self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE")
630            return False
631
632        self._logger.notice("keep_alive succeeded")
633        self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE")
634
635        return True
636
637    async def _close_message(self) -> bool:
638        # TODO: No known API close message # pylint: disable=fixme
639        # return await self.send(json.dumps({"type": "Close"}))
640        return True
641
642    async def finish(self) -> bool:
643        """
644        Closes the WebSocket connection gracefully.
645        """
646        self._logger.debug("AsyncAgentWebSocketClient.finish ENTER")
647
648        # stop the threads
649        self._logger.verbose("cancelling tasks...")
650        try:
651            # call parent finish
652            if await super().finish() is False:
653                self._logger.error("AsyncAgentWebSocketClient.finish failed")
654
655            if self._microphone is not None and self._microphone_created:
656                self._microphone.finish()
657                self._microphone_created = False
658
659            if self._speaker is not None and self._speaker_created:
660                self._speaker.finish()
661                self._speaker_created = False
662
663            # Before cancelling, check if the tasks were created
664            # debug the threads
665            for thread in threading.enumerate():
666                self._logger.debug("before running thread: %s", thread.name)
667            self._logger.debug("number of active threads: %s", threading.active_count())
668
669            tasks = []
670            if self._keep_alive_thread is not None:
671                self._keep_alive_thread.cancel()
672                tasks.append(self._keep_alive_thread)
673                self._logger.notice("processing _keep_alive_thread cancel...")
674
675            # Use asyncio.gather to wait for tasks to be cancelled
676            # Prevent indefinite waiting by setting a timeout
677            await asyncio.wait_for(asyncio.gather(*tasks), timeout=10)
678            self._logger.notice("threads joined")
679
680            self._speaker = None
681            self._microphone = None
682
683            # debug the threads
684            for thread in threading.enumerate():
685                self._logger.debug("after running thread: %s", thread.name)
686            self._logger.debug("number of active threads: %s", threading.active_count())
687
688            self._logger.notice("finish succeeded")
689            self._logger.spam("AsyncAgentWebSocketClient.finish LEAVE")
690            return True
691
692        except asyncio.CancelledError as e:
693            self._logger.error("tasks cancelled error: %s", e)
694            self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE")
695            return False
696
697        except asyncio.TimeoutError as e:
698            self._logger.error("tasks cancellation timed out: %s", e)
699            self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE")
700            return False

Client for interacting with Deepgram's live transcription services over WebSockets.

This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.

Args: config (DeepgramClientOptions): all the options for the client.

AsyncAgentWebSocketClient(config: deepgram.options.DeepgramClientOptions)
 91    def __init__(self, config: DeepgramClientOptions):
 92        if config is None:
 93            raise DeepgramError("Config is required")
 94
 95        self._logger = verboselogs.VerboseLogger(__name__)
 96        self._logger.addHandler(logging.StreamHandler())
 97        self._logger.setLevel(config.verbose)
 98
 99        self._config = config
100
101        # needs to be "wss://agent.deepgram.com/agent"
102        self._endpoint = "agent"
103
104        # override the endpoint since it needs to be "wss://agent.deepgram.com/agent"
105        self._config.url = "agent.deepgram.com"
106        self._keep_alive_thread = None
107
108        # init handlers
109        self._event_handlers = {
110            event: [] for event in AgentWebSocketEvents.__members__.values()
111        }
112
113        if self._config.options.get("microphone_record") == "true":
114            self._logger.info("microphone_record is enabled")
115            rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE)
116            channels = self._config.options.get(
117                "microphone_record_channels", MICROPHONE_CHANNELS
118            )
119            device_index = self._config.options.get("microphone_record_device_index")
120
121            self._logger.debug("rate: %s", rate)
122            self._logger.debug("channels: %s", channels)
123            if device_index is not None:
124                self._logger.debug("device_index: %s", device_index)
125
126            self._microphone_created = True
127
128            if device_index is not None:
129                self._microphone = Microphone(
130                    rate=rate,
131                    channels=channels,
132                    verbose=self._config.verbose,
133                    input_device_index=device_index,
134                )
135            else:
136                self._microphone = Microphone(
137                    rate=rate,
138                    channels=channels,
139                    verbose=self._config.verbose,
140                )
141
142        if self._config.options.get("speaker_playback") == "true":
143            self._logger.info("speaker_playback is enabled")
144            rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE)
145            channels = self._config.options.get(
146                "speaker_playback_channels", SPEAKER_CHANNELS
147            )
148            playback_delta_in_ms = self._config.options.get(
149                "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA
150            )
151            device_index = self._config.options.get("speaker_playback_device_index")
152
153            self._logger.debug("rate: %s", rate)
154            self._logger.debug("channels: %s", channels)
155
156            self._speaker_created = True
157
158            if device_index is not None:
159                self._logger.debug("device_index: %s", device_index)
160
161                self._speaker = Speaker(
162                    rate=rate,
163                    channels=channels,
164                    last_play_delta_in_ms=playback_delta_in_ms,
165                    verbose=self._config.verbose,
166                    output_device_index=device_index,
167                    microphone=self._microphone,
168                )
169            else:
170                self._speaker = Speaker(
171                    rate=rate,
172                    channels=channels,
173                    last_play_delta_in_ms=playback_delta_in_ms,
174                    verbose=self._config.verbose,
175                    microphone=self._microphone,
176                )
177        # call the parent constructor
178        super().__init__(self._config, self._endpoint)
async def start( self, options: Optional[SettingsConfigurationOptions] = None, addons: Optional[Dict] = None, headers: Optional[Dict] = None, members: Optional[Dict] = None, **kwargs) -> bool:
181    async def start(
182        self,
183        options: Optional[SettingsConfigurationOptions] = None,
184        addons: Optional[Dict] = None,
185        headers: Optional[Dict] = None,
186        members: Optional[Dict] = None,
187        **kwargs,
188    ) -> bool:
189        """
190        Starts the WebSocket connection for agent API.
191        """
192        self._logger.debug("AsyncAgentWebSocketClient.start ENTER")
193        self._logger.info("settings: %s", options)
194        self._logger.info("addons: %s", addons)
195        self._logger.info("headers: %s", headers)
196        self._logger.info("members: %s", members)
197        self._logger.info("kwargs: %s", kwargs)
198
199        if isinstance(options, SettingsConfigurationOptions) and not options.check():
200            self._logger.error("settings.check failed")
201            self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
202            raise DeepgramError("Fatal agent settings error")
203
204        self._addons = addons
205        self._headers = headers
206
207        # add "members" as members of the class
208        if members is not None:
209            self.__dict__.update(members)
210
211        # set kwargs as members of the class
212        if kwargs is not None:
213            self._kwargs = kwargs
214        else:
215            self._kwargs = {}
216
217        if isinstance(options, SettingsConfigurationOptions):
218            self._logger.info("options is class")
219            self._settings = options
220        elif isinstance(options, dict):
221            self._logger.info("options is dict")
222            self._settings = SettingsConfigurationOptions.from_dict(options)
223        elif isinstance(options, str):
224            self._logger.info("options is json")
225            self._settings = SettingsConfigurationOptions.from_json(options)
226        else:
227            raise DeepgramError("Invalid options type")
228
229        if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"):
230            raise DeepgramError("Keyterms are only supported for nova-3 models")
231
232        try:
233            # speaker substitutes the listening thread
234            if self._speaker is not None:
235                self._logger.notice("passing speaker to delegate_listening")
236                super().delegate_listening(self._speaker)
237
238            # call parent start
239            if (
240                await super().start(
241                    {},
242                    self._addons,
243                    self._headers,
244                    **dict(cast(Dict[Any, Any], self._kwargs)),
245                )
246                is False
247            ):
248                self._logger.error("AsyncAgentWebSocketClient.start failed")
249                self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
250                return False
251
252            if self._speaker is not None:
253                self._logger.notice("speaker is delegate_listening. Starting speaker")
254                self._speaker.start()
255
256            if self._speaker is not None and self._microphone is not None:
257                self._logger.notice(
258                    "speaker is delegate_listening. Starting microphone"
259                )
260                self._microphone.set_callback(self.send)
261                self._microphone.start()
262
263            # debug the threads
264            for thread in threading.enumerate():
265                self._logger.debug("after running thread: %s", thread.name)
266            self._logger.debug("number of active threads: %s", threading.active_count())
267
268            # keepalive thread
269            if self._config.is_keep_alive_enabled():
270                self._logger.notice("keepalive is enabled")
271                self._keep_alive_thread = asyncio.create_task(self._keep_alive())
272            else:
273                self._logger.notice("keepalive is disabled")
274
275            # debug the threads
276            for thread in threading.enumerate():
277                self._logger.debug("after running thread: %s", thread.name)
278            self._logger.debug("number of active threads: %s", threading.active_count())
279
280            # send the configurationsetting message
281            self._logger.notice("Sending ConfigurationSettings...")
282            ret_send_cs = await self.send(str(self._settings))
283            if not ret_send_cs:
284                self._logger.error("ConfigurationSettings failed")
285
286                err_error: ErrorResponse = ErrorResponse(
287                    "Exception in AsyncAgentWebSocketClient.start",
288                    "ConfigurationSettings failed to send",
289                    "Exception",
290                )
291                await self._emit(
292                    AgentWebSocketEvents(AgentWebSocketEvents.Error),
293                    error=err_error,
294                    **dict(cast(Dict[Any, Any], self._kwargs)),
295                )
296
297                self._logger.debug("AgentWebSocketClient.start LEAVE")
298                return False
299
300            self._logger.notice("start succeeded")
301            self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
302            return True
303
304        except Exception as e:  # pylint: disable=broad-except
305            self._logger.error(
306                "WebSocketException in AsyncAgentWebSocketClient.start: %s", e
307            )
308            self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
309            if self._config.options.get("termination_exception_connect") is True:
310                raise e
311            return False

Starts the WebSocket connection for agent API.

def on( self, event: deepgram.clients.agent.enums.AgentWebSocketEvents, handler: Callable) -> None:
315    def on(self, event: AgentWebSocketEvents, handler: Callable) -> None:
316        """
317        Registers event handlers for specific events.
318        """
319        self._logger.info("event subscribed: %s", event)
320        if event in AgentWebSocketEvents.__members__.values() and callable(handler):
321            self._event_handlers[event].append(handler)

Registers event handlers for specific events.

async def keep_alive(self) -> bool:
618    async def keep_alive(self) -> bool:
619        """
620        Sends a KeepAlive message
621        """
622        self._logger.spam("AsyncAgentWebSocketClient.keep_alive ENTER")
623
624        self._logger.notice("Sending KeepAlive...")
625        ret = await self.send(json.dumps({"type": "KeepAlive"}))
626
627        if not ret:
628            self._logger.error("keep_alive failed")
629            self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE")
630            return False
631
632        self._logger.notice("keep_alive succeeded")
633        self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE")
634
635        return True

Sends a KeepAlive message

async def finish(self) -> bool:
642    async def finish(self) -> bool:
643        """
644        Closes the WebSocket connection gracefully.
645        """
646        self._logger.debug("AsyncAgentWebSocketClient.finish ENTER")
647
648        # stop the threads
649        self._logger.verbose("cancelling tasks...")
650        try:
651            # call parent finish
652            if await super().finish() is False:
653                self._logger.error("AsyncAgentWebSocketClient.finish failed")
654
655            if self._microphone is not None and self._microphone_created:
656                self._microphone.finish()
657                self._microphone_created = False
658
659            if self._speaker is not None and self._speaker_created:
660                self._speaker.finish()
661                self._speaker_created = False
662
663            # Before cancelling, check if the tasks were created
664            # debug the threads
665            for thread in threading.enumerate():
666                self._logger.debug("before running thread: %s", thread.name)
667            self._logger.debug("number of active threads: %s", threading.active_count())
668
669            tasks = []
670            if self._keep_alive_thread is not None:
671                self._keep_alive_thread.cancel()
672                tasks.append(self._keep_alive_thread)
673                self._logger.notice("processing _keep_alive_thread cancel...")
674
675            # Use asyncio.gather to wait for tasks to be cancelled
676            # Prevent indefinite waiting by setting a timeout
677            await asyncio.wait_for(asyncio.gather(*tasks), timeout=10)
678            self._logger.notice("threads joined")
679
680            self._speaker = None
681            self._microphone = None
682
683            # debug the threads
684            for thread in threading.enumerate():
685                self._logger.debug("after running thread: %s", thread.name)
686            self._logger.debug("number of active threads: %s", threading.active_count())
687
688            self._logger.notice("finish succeeded")
689            self._logger.spam("AsyncAgentWebSocketClient.finish LEAVE")
690            return True
691
692        except asyncio.CancelledError as e:
693            self._logger.error("tasks cancelled error: %s", e)
694            self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE")
695            return False
696
697        except asyncio.TimeoutError as e:
698            self._logger.error("tasks cancellation timed out: %s", e)
699            self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE")
700            return False

Closes the WebSocket connection gracefully.

@dataclass
class OpenResponse(deepgram.clients.common.v1.shared_response.BaseResponse):
17@dataclass
18class OpenResponse(BaseResponse):
19    """
20    Open Message from the Deepgram Platform
21    """
22
23    type: str = ""

Open Message from the Deepgram Platform

OpenResponse(type: str = '')
type: str = ''
@dataclass
class CloseResponse(deepgram.clients.common.v1.shared_response.BaseResponse):
29@dataclass
30class CloseResponse(BaseResponse):
31    """
32    Close Message from the Deepgram Platform
33    """
34
35    type: str = ""

Close Message from the Deepgram Platform

CloseResponse(type: str = '')
type: str = ''
@dataclass
class ErrorResponse(deepgram.clients.common.v1.shared_response.BaseResponse):
41@dataclass
42class ErrorResponse(BaseResponse):
43    """
44    Error Message from the Deepgram Platform
45    """
46
47    description: str = ""
48    message: str = ""
49    type: str = ""
50    variant: Optional[str] = field(
51        default=None, metadata=dataclass_config(exclude=lambda f: f is None)
52    )

Error Message from the Deepgram Platform

ErrorResponse( description: str = '', message: str = '', type: str = '', variant: Optional[str] = None)
description: str = ''
message: str = ''
type: str = ''
variant: Optional[str] = None
@dataclass
class UnhandledResponse(deepgram.clients.common.v1.shared_response.BaseResponse):
58@dataclass
59class UnhandledResponse(BaseResponse):
60    """
61    Unhandled Message from the Deepgram Platform
62    """
63
64    type: str = ""
65    raw: str = ""

Unhandled Message from the Deepgram Platform

UnhandledResponse(type: str = '', raw: str = '')
type: str = ''
raw: str = ''
@dataclass
class WelcomeResponse(deepgram.clients.common.v1.shared_response.BaseResponse):
22@dataclass
23class WelcomeResponse(BaseResponse):
24    """
25    The server will send a Welcome message as soon as the websocket opens.
26    """
27
28    type: str
29    session_id: str

The server will send a Welcome message as soon as the websocket opens.

WelcomeResponse(type: str, session_id: str)
type: str
session_id: str
@dataclass
class SettingsAppliedResponse(deepgram.clients.common.v1.shared_response.BaseResponse):
32@dataclass
33class SettingsAppliedResponse(BaseResponse):
34    """
35    The server will send a SettingsApplied message as soon as the settings are applied.
36    """
37
38    type: str

The server will send a SettingsApplied message as soon as the settings are applied.

SettingsAppliedResponse(type: str)
type: str
@dataclass
class ConversationTextResponse(deepgram.clients.common.v1.shared_response.BaseResponse):
41@dataclass
42class ConversationTextResponse(BaseResponse):
43    """
44    The server will send a ConversationText message every time the agent hears the user say something, and every time the agent speaks something itself.
45    """
46
47    type: str
48    role: str
49    content: str

The server will send a ConversationText message every time the agent hears the user say something, and every time the agent speaks something itself.

ConversationTextResponse(type: str, role: str, content: str)
type: str
role: str
content: str
@dataclass
class UserStartedSpeakingResponse(deepgram.clients.common.v1.shared_response.BaseResponse):
52@dataclass
53class UserStartedSpeakingResponse(BaseResponse):
54    """
55    The server will send a UserStartedSpeaking message every time the user begins a new utterance.
56    """
57
58    type: str

The server will send a UserStartedSpeaking message every time the user begins a new utterance.

UserStartedSpeakingResponse(type: str)
type: str
@dataclass
class AgentThinkingResponse(deepgram.clients.common.v1.shared_response.BaseResponse):
61@dataclass
62class AgentThinkingResponse(BaseResponse):
63    """
64    The server will send an AgentThinking message to inform the client of a non-verbalized agent thought.
65    """
66
67    type: str
68    content: str

The server will send an AgentThinking message to inform the client of a non-verbalized agent thought.

AgentThinkingResponse(type: str, content: str)
type: str
content: str
@dataclass
class FunctionCalling(deepgram.clients.common.v1.shared_response.BaseResponse):
71@dataclass
72class FunctionCalling(BaseResponse):
73    """
74    The server will sometimes send FunctionCalling messages when making function calls to help the client developer debug function calling workflows.
75    """
76
77    type: str

The server will sometimes send FunctionCalling messages when making function calls to help the client developer debug function calling workflows.

FunctionCalling(type: str)
type: str
@dataclass
class FunctionCallRequest(deepgram.clients.common.v1.shared_response.BaseResponse):
80@dataclass
81class FunctionCallRequest(BaseResponse):
82    """
83    The FunctionCallRequest message is used to call a function from the server to the client.
84    """
85
86    type: str
87    function_name: str
88    function_call_id: str
89    input: str

The FunctionCallRequest message is used to call a function from the server to the client.

FunctionCallRequest(type: str, function_name: str, function_call_id: str, input: str)
type: str
function_name: str
function_call_id: str
input: str
@dataclass
class AgentStartedSpeakingResponse(deepgram.clients.common.v1.shared_response.BaseResponse):
 92@dataclass
 93class AgentStartedSpeakingResponse(BaseResponse):
 94    """
 95    The server will send an AgentStartedSpeaking message when it begins streaming an agent audio response to the client for playback.
 96    """
 97
 98    total_latency: float
 99    tts_latency: float
100    ttt_latency: float

The server will send an AgentStartedSpeaking message when it begins streaming an agent audio response to the client for playback.

AgentStartedSpeakingResponse(total_latency: float, tts_latency: float, ttt_latency: float)
total_latency: float
tts_latency: float
ttt_latency: float
@dataclass
class AgentAudioDoneResponse(deepgram.clients.common.v1.shared_response.BaseResponse):
103@dataclass
104class AgentAudioDoneResponse(BaseResponse):
105    """
106    The server will send an AgentAudioDone message immediately after it sends the last audio message in a piece of agent speech.
107    """
108
109    type: str

The server will send an AgentAudioDone message immediately after it sends the last audio message in a piece of agent speech.

AgentAudioDoneResponse(type: str)
type: str
@dataclass
class InjectionRefusedResponse(deepgram.clients.common.v1.shared_response.BaseResponse):
112@dataclass
113class InjectionRefusedResponse(BaseResponse):
114    """
115    The server will send an InjectionRefused message when an InjectAgentMessage request is ignored because it arrived while the user was speaking or while the server was sending audio for an agent response.
116    """
117
118    type: str

The server will send an InjectionRefused message when an InjectAgentMessage request is ignored because it arrived while the user was speaking or while the server was sending audio for an agent response.

InjectionRefusedResponse(type: str)
type: str
@dataclass
class SettingsConfigurationOptions(deepgram.clients.common.v1.shared_response.BaseResponse):
257@dataclass
258class SettingsConfigurationOptions(BaseResponse):
259    """
260    The client should send a SettingsConfiguration message immediately after opening the websocket and before sending any audio.
261    """
262
263    type: str = str(AgentWebSocketEvents.SettingsConfiguration)
264    audio: Audio = field(default_factory=Audio)
265    agent: Agent = field(default_factory=Agent)
266    context: Optional[Context] = field(
267        default=None, metadata=dataclass_config(exclude=lambda f: f is None)
268    )
269
270    def __getitem__(self, key):
271        _dict = self.to_dict()
272        if "audio" in _dict:
273            _dict["audio"] = [Audio.from_dict(audio) for audio in _dict["audio"]]
274        if "agent" in _dict:
275            _dict["agent"] = [Agent.from_dict(agent) for agent in _dict["agent"]]
276        if "context" in _dict:
277            _dict["context"] = [
278                Context.from_dict(context) for context in _dict["context"]
279            ]
280        return _dict[key]
281
282    def check(self):
283        """
284        Check the options for any deprecated or soon-to-be-deprecated options.
285        """
286        logger = verboselogs.VerboseLogger(__name__)
287        logger.addHandler(logging.StreamHandler())
288        prev = logger.level
289        logger.setLevel(verboselogs.ERROR)
290
291        # do we need to check anything here?
292
293        logger.setLevel(prev)
294
295        return True

The client should send a SettingsConfiguration message immediately after opening the websocket and before sending any audio.

SettingsConfigurationOptions( type: str = 'SettingsConfiguration', audio: Audio = <factory>, agent: Agent = <factory>, context: Optional[Context] = None)
type: str = 'SettingsConfiguration'
audio: Audio
agent: Agent
context: Optional[Context] = None
def check(self):
282    def check(self):
283        """
284        Check the options for any deprecated or soon-to-be-deprecated options.
285        """
286        logger = verboselogs.VerboseLogger(__name__)
287        logger.addHandler(logging.StreamHandler())
288        prev = logger.level
289        logger.setLevel(verboselogs.ERROR)
290
291        # do we need to check anything here?
292
293        logger.setLevel(prev)
294
295        return True

Check the options for any deprecated or soon-to-be-deprecated options.

@dataclass
class UpdateInstructionsOptions(deepgram.clients.common.v1.shared_response.BaseResponse):
301@dataclass
302class UpdateInstructionsOptions(BaseResponse):
303    """
304    The client can send an UpdateInstructions message to give additional instructions to the Think model in the middle of a conversation.
305    """
306
307    type: str = str(AgentWebSocketEvents.UpdateInstructions)
308    instructions: str = field(default="")

The client can send an UpdateInstructions message to give additional instructions to the Think model in the middle of a conversation.

UpdateInstructionsOptions(type: str = 'UpdateInstructions', instructions: str = '')
type: str = 'UpdateInstructions'
instructions: str = ''
@dataclass
class UpdateSpeakOptions(deepgram.clients.common.v1.shared_response.BaseResponse):
314@dataclass
315class UpdateSpeakOptions(BaseResponse):
316    """
317    The client can send an UpdateSpeak message to change the Speak model in the middle of a conversation.
318    """
319
320    type: str = str(AgentWebSocketEvents.UpdateSpeak)
321    model: str = field(default="")

The client can send an UpdateSpeak message to change the Speak model in the middle of a conversation.

UpdateSpeakOptions(type: str = 'UpdateSpeak', model: str = '')
type: str = 'UpdateSpeak'
model: str = ''
@dataclass
class InjectAgentMessageOptions(deepgram.clients.common.v1.shared_response.BaseResponse):
327@dataclass
328class InjectAgentMessageOptions(BaseResponse):
329    """
330    The client can send an InjectAgentMessage to immediately trigger an agent statement. If the injection request arrives while the user is speaking, or while the server is in the middle of sending audio for an agent response, then the request will be ignored and the server will reply with an InjectionRefused.
331    """
332
333    type: str = str(AgentWebSocketEvents.InjectAgentMessage)
334    message: str = field(default="")

The client can send an InjectAgentMessage to immediately trigger an agent statement. If the injection request arrives while the user is speaking, or while the server is in the middle of sending audio for an agent response, then the request will be ignored and the server will reply with an InjectionRefused.

InjectAgentMessageOptions(type: str = 'InjectAgentMessage', message: str = '')
type: str = 'InjectAgentMessage'
message: str = ''
@dataclass
class FunctionCallResponse(deepgram.clients.common.v1.shared_response.BaseResponse):
340@dataclass
341class FunctionCallResponse(BaseResponse):
342    """
343    TheFunctionCallResponse message is a JSON command that the client should reply with every time there is a FunctionCallRequest received.
344    """
345
346    type: str = "FunctionCallResponse"
347    function_call_id: str = field(default="")
348    output: str = field(default="")

TheFunctionCallResponse message is a JSON command that the client should reply with every time there is a FunctionCallRequest received.

FunctionCallResponse( type: str = 'FunctionCallResponse', function_call_id: str = '', output: str = '')
type: str = 'FunctionCallResponse'
function_call_id: str = ''
output: str = ''
@dataclass
class AgentKeepAlive(deepgram.clients.common.v1.shared_response.BaseResponse):
354@dataclass
355class AgentKeepAlive(BaseResponse):
356    """
357    The KeepAlive message is a JSON command that you can use to ensure that the server does not close the connection.
358    """
359
360    type: str = "KeepAlive"

The KeepAlive message is a JSON command that you can use to ensure that the server does not close the connection.

AgentKeepAlive(type: str = 'KeepAlive')
type: str = 'KeepAlive'
@dataclass
class Listen(deepgram.clients.common.v1.shared_response.BaseResponse):
21@dataclass
22class Listen(BaseResponse):
23    """
24    This class defines any configuration settings for the Listen model.
25    """
26
27    model: Optional[str] = field(
28        default=None, metadata=dataclass_config(exclude=lambda f: f is None)
29    )
30    keyterms: Optional[List[str]] = field(
31        default=None, metadata=dataclass_config(exclude=lambda f: f is None)
32    )

This class defines any configuration settings for the Listen model.

Listen(model: Optional[str] = None, keyterms: Optional[List[str]] = None)
model: Optional[str] = None
keyterms: Optional[List[str]] = None
@dataclass
class Speak(deepgram.clients.common.v1.shared_response.BaseResponse):
35@dataclass
36class Speak(BaseResponse):
37    """
38    This class defines any configuration settings for the Speak model.
39    """
40
41    model: Optional[str] = field(
42        default="aura-asteria-en",
43        metadata=dataclass_config(exclude=lambda f: f is None),
44    )
45    provider: Optional[str] = field(
46        default=None, metadata=dataclass_config(exclude=lambda f: f is None)
47    )
48    voice_id: Optional[str] = field(
49        default=None, metadata=dataclass_config(exclude=lambda f: f is None)
50    )

This class defines any configuration settings for the Speak model.

Speak( model: Optional[str] = 'aura-asteria-en', provider: Optional[str] = None, voice_id: Optional[str] = None)
model: Optional[str] = 'aura-asteria-en'
provider: Optional[str] = None
voice_id: Optional[str] = None
@dataclass
class Item(deepgram.clients.common.v1.shared_response.BaseResponse):
63@dataclass
64class Item(BaseResponse):
65    """
66    This class defines a single item in a list of items.
67    """
68
69    type: str
70    description: str

This class defines a single item in a list of items.

Item(type: str, description: str)
type: str
description: str
@dataclass
class Properties(deepgram.clients.common.v1.shared_response.BaseResponse):
73@dataclass
74class Properties(BaseResponse):
75    """
76    This class defines the properties which is just a list of items.
77    """
78
79    item: Item
80
81    def __getitem__(self, key):
82        _dict = self.to_dict()
83        if "item" in _dict:
84            _dict["item"] = [Item.from_dict(item) for item in _dict["item"]]
85        return _dict[key]

This class defines the properties which is just a list of items.

Properties(item: Item)
item: Item
@dataclass
class Parameters(deepgram.clients.common.v1.shared_response.BaseResponse):
 88@dataclass
 89class Parameters(BaseResponse):
 90    """
 91    This class defines the parameters for a function.
 92    """
 93
 94    type: str
 95    properties: Properties
 96    required: List[str]
 97
 98    def __getitem__(self, key):
 99        _dict = self.to_dict()
100        if "properties" in _dict:
101            _dict["properties"] = _dict["properties"].copy()
102        return _dict[key]

This class defines the parameters for a function.

Parameters( type: str, properties: Properties, required: List[str])
type: str
properties: Properties
required: List[str]
@dataclass
class Function(deepgram.clients.common.v1.shared_response.BaseResponse):
105@dataclass
106class Function(BaseResponse):
107    """
108    This class defines a function for the Think model.
109    """
110
111    name: str
112    description: str
113    url: str
114    method: str
115    headers: Optional[List[Header]] = field(
116        default=None, metadata=dataclass_config(exclude=lambda f: f is None)
117    )
118    parameters: Optional[Parameters] = field(
119        default=None, metadata=dataclass_config(exclude=lambda f: f is None)
120    )
121
122    def __getitem__(self, key):
123        _dict = self.to_dict()
124        if "parameters" in _dict:
125            _dict["parameters"] = [
126                Parameters.from_dict(parameters) for parameters in _dict["parameters"]
127            ]
128        if "headers" in _dict:
129            _dict["headers"] = [
130                Header.from_dict(headers) for headers in _dict["headers"]
131            ]
132        return _dict[key]

This class defines a function for the Think model.

Function( name: str, description: str, url: str, method: str, headers: Optional[List[Header]] = None, parameters: Optional[Parameters] = None)
name: str
description: str
url: str
method: str
headers: Optional[List[Header]] = None
parameters: Optional[Parameters] = None
@dataclass
class Provider(deepgram.clients.common.v1.shared_response.BaseResponse):
135@dataclass
136class Provider(BaseResponse):
137    """
138    This class defines the provider for the Think model.
139    """
140
141    type: Optional[str] = field(
142        default=None, metadata=dataclass_config(exclude=lambda f: f is None)
143    )

This class defines the provider for the Think model.

Provider(type: Optional[str] = None)
type: Optional[str] = None
@dataclass
class Think(deepgram.clients.common.v1.shared_response.BaseResponse):
146@dataclass
147class Think(BaseResponse):
148    """
149    This class defines any configuration settings for the Think model.
150    """
151
152    provider: Provider = field(default_factory=Provider)
153    model: Optional[str] = field(
154        default=None, metadata=dataclass_config(exclude=lambda f: f is None)
155    )
156    instructions: Optional[str] = field(
157        default=None, metadata=dataclass_config(exclude=lambda f: f is None)
158    )
159    functions: Optional[List[Function]] = field(
160        default=None, metadata=dataclass_config(exclude=lambda f: f is None)
161    )
162
163    def __getitem__(self, key):
164        _dict = self.to_dict()
165        if "provider" in _dict:
166            _dict["provider"] = [
167                Provider.from_dict(provider) for provider in _dict["provider"]
168            ]
169        if "functions" in _dict:
170            _dict["functions"] = [
171                Function.from_dict(functions) for functions in _dict["functions"]
172            ]
173        return _dict[key]

This class defines any configuration settings for the Think model.

Think( provider: Provider = <factory>, model: Optional[str] = None, instructions: Optional[str] = None, functions: Optional[List[Function]] = None)
provider: Provider
model: Optional[str] = None
instructions: Optional[str] = None
functions: Optional[List[Function]] = None
@dataclass
class Agent(deepgram.clients.common.v1.shared_response.BaseResponse):
176@dataclass
177class Agent(BaseResponse):
178    """
179    This class defines any configuration settings for the Agent model.
180    """
181
182    listen: Listen = field(default_factory=Listen)
183    think: Think = field(default_factory=Think)
184    speak: Speak = field(default_factory=Speak)
185
186    def __getitem__(self, key):
187        _dict = self.to_dict()
188        if "listen" in _dict:
189            _dict["listen"] = [Listen.from_dict(listen) for listen in _dict["listen"]]
190        if "think" in _dict:
191            _dict["think"] = [Think.from_dict(think) for think in _dict["think"]]
192        if "speak" in _dict:
193            _dict["speak"] = [Speak.from_dict(speak) for speak in _dict["speak"]]
194        return _dict[key]

This class defines any configuration settings for the Agent model.

Agent( listen: Listen = <factory>, think: Think = <factory>, speak: Speak = <factory>)
listen: Listen
think: Think
speak: Speak
@dataclass
class Input(deepgram.clients.common.v1.shared_response.BaseResponse):
197@dataclass
198class Input(BaseResponse):
199    """
200    This class defines any configuration settings for the input audio.
201    """
202
203    encoding: Optional[str] = field(default="linear16")
204    sample_rate: int = field(default=16000)

This class defines any configuration settings for the input audio.

Input(encoding: Optional[str] = 'linear16', sample_rate: int = 16000)
encoding: Optional[str] = 'linear16'
sample_rate: int = 16000
@dataclass
class Output(deepgram.clients.common.v1.shared_response.BaseResponse):
207@dataclass
208class Output(BaseResponse):
209    """
210    This class defines any configuration settings for the output audio.
211    """
212
213    encoding: Optional[str] = field(default="linear16")
214    sample_rate: Optional[int] = field(default=16000)
215    bitrate: Optional[int] = field(
216        default=None, metadata=dataclass_config(exclude=lambda f: f is None)
217    )
218    container: Optional[str] = field(default="none")

This class defines any configuration settings for the output audio.

Output( encoding: Optional[str] = 'linear16', sample_rate: Optional[int] = 16000, bitrate: Optional[int] = None, container: Optional[str] = 'none')
encoding: Optional[str] = 'linear16'
sample_rate: Optional[int] = 16000
bitrate: Optional[int] = None
container: Optional[str] = 'none'
@dataclass
class Audio(deepgram.clients.common.v1.shared_response.BaseResponse):
221@dataclass
222class Audio(BaseResponse):
223    """
224    This class defines any configuration settings for the audio.
225    """
226
227    input: Optional[Input] = field(default_factory=Input)
228    output: Optional[Output] = field(default_factory=Output)
229
230    def __getitem__(self, key):
231        _dict = self.to_dict()
232        if "input" in _dict:
233            _dict["input"] = [Input.from_dict(input) for input in _dict["input"]]
234        if "output" in _dict:
235            _dict["output"] = [Output.from_dict(output) for output in _dict["output"]]
236        return _dict[key]

This class defines any configuration settings for the audio.

Audio( input: Optional[Input] = <factory>, output: Optional[Output] = <factory>)
input: Optional[Input]
output: Optional[Output]
@dataclass
class Context(deepgram.clients.common.v1.shared_response.BaseResponse):
239@dataclass
240class Context(BaseResponse):
241    """
242    This class defines any configuration settings for the context.
243    """
244
245    messages: Optional[List[Tuple[str, str]]] = field(
246        default=None, metadata=dataclass_config(exclude=lambda f: f is None)
247    )
248    replay: Optional[bool] = field(default=False)
249
250    def __getitem__(self, key):
251        _dict = self.to_dict()
252        if "messages" in _dict:
253            _dict["messages"] = _dict["messages"].copy()
254        return _dict[key]

This class defines any configuration settings for the context.

Context( messages: Optional[List[Tuple[str, str]]] = None, replay: Optional[bool] = False)
messages: Optional[List[Tuple[str, str]]] = None
replay: Optional[bool] = False