deepgram.clients.agent.v1.websocket.client

  1# Copyright 2024 Deepgram SDK contributors. All Rights Reserved.
  2# Use of this source code is governed by a MIT license that can be found in the LICENSE file.
  3# SPDX-License-Identifier: MIT
  4
  5import json
  6import logging
  7from typing import Dict, Union, Optional, cast, Any, Callable
  8import threading
  9import time
 10
 11from .....utils import verboselogs
 12from .....options import DeepgramClientOptions
 13from ...enums import AgentWebSocketEvents
 14from ....common import AbstractSyncWebSocketClient
 15from ....common import DeepgramError
 16
 17from .response import (
 18    OpenResponse,
 19    WelcomeResponse,
 20    SettingsAppliedResponse,
 21    ConversationTextResponse,
 22    UserStartedSpeakingResponse,
 23    AgentThinkingResponse,
 24    FunctionCalling,
 25    FunctionCallRequest,
 26    AgentStartedSpeakingResponse,
 27    AgentAudioDoneResponse,
 28    InjectionRefusedResponse,
 29    CloseResponse,
 30    ErrorResponse,
 31    UnhandledResponse,
 32)
 33from .options import (
 34    SettingsConfigurationOptions,
 35    UpdateInstructionsOptions,
 36    UpdateSpeakOptions,
 37    InjectAgentMessageOptions,
 38    FunctionCallResponse,
 39    AgentKeepAlive,
 40)
 41
 42from .....audio.speaker import (
 43    Speaker,
 44    RATE as SPEAKER_RATE,
 45    CHANNELS as SPEAKER_CHANNELS,
 46    PLAYBACK_DELTA as SPEAKER_PLAYBACK_DELTA,
 47)
 48from .....audio.microphone import (
 49    Microphone,
 50    RATE as MICROPHONE_RATE,
 51    CHANNELS as MICROPHONE_CHANNELS,
 52)
 53
 54ONE_SECOND = 1
 55HALF_SECOND = 0.5
 56DEEPGRAM_INTERVAL = 5
 57
 58
 59class AgentWebSocketClient(
 60    AbstractSyncWebSocketClient
 61):  # pylint: disable=too-many-instance-attributes
 62    """
 63    Client for interacting with Deepgram's live transcription services over WebSockets.
 64
 65     This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
 66
 67     Args:
 68         config (DeepgramClientOptions): all the options for the client.
 69    """
 70
 71    _logger: verboselogs.VerboseLogger
 72    _config: DeepgramClientOptions
 73    _endpoint: str
 74
 75    _event_handlers: Dict[AgentWebSocketEvents, list]
 76
 77    _keep_alive_thread: Union[threading.Thread, None]
 78
 79    _kwargs: Optional[Dict] = None
 80    _addons: Optional[Dict] = None
 81    # note the distinction here. We can't use _config because it's already used in the parent
 82    _settings: Optional[SettingsConfigurationOptions] = None
 83    _headers: Optional[Dict] = None
 84
 85    _speaker_created: bool = False
 86    _speaker: Optional[Speaker] = None
 87    _microphone_created: bool = False
 88    _microphone: Optional[Microphone] = None
 89
 90    def __init__(self, config: DeepgramClientOptions):
 91        if config is None:
 92            raise DeepgramError("Config is required")
 93
 94        self._logger = verboselogs.VerboseLogger(__name__)
 95        self._logger.addHandler(logging.StreamHandler())
 96        self._logger.setLevel(config.verbose)
 97
 98        self._config = config
 99
100        # needs to be "wss://agent.deepgram.com/agent"
101        self._endpoint = "agent"
102
103        # override the endpoint since it needs to be "wss://agent.deepgram.com/agent"
104        self._config.url = "agent.deepgram.com"
105
106        self._keep_alive_thread = None
107
108        # init handlers
109        self._event_handlers = {
110            event: [] for event in AgentWebSocketEvents.__members__.values()
111        }
112
113        if self._config.options.get("microphone_record") == "true":
114            self._logger.info("microphone_record is enabled")
115            rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE)
116            channels = self._config.options.get(
117                "microphone_record_channels", MICROPHONE_CHANNELS
118            )
119            device_index = self._config.options.get("microphone_record_device_index")
120
121            self._logger.debug("rate: %s", rate)
122            self._logger.debug("channels: %s", channels)
123
124            self._microphone_created = True
125
126            if device_index is not None:
127                self._logger.debug("device_index: %s", device_index)
128                self._microphone = Microphone(
129                    rate=rate,
130                    channels=channels,
131                    verbose=self._config.verbose,
132                    input_device_index=device_index,
133                )
134            else:
135                self._microphone = Microphone(
136                    rate=rate,
137                    channels=channels,
138                    verbose=self._config.verbose,
139                )
140
141        if self._config.options.get("speaker_playback") == "true":
142            self._logger.info("speaker_playback is enabled")
143            rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE)
144            channels = self._config.options.get(
145                "speaker_playback_channels", SPEAKER_CHANNELS
146            )
147            playback_delta_in_ms = self._config.options.get(
148                "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA
149            )
150            device_index = self._config.options.get("speaker_playback_device_index")
151
152            self._logger.debug("rate: %s", rate)
153            self._logger.debug("channels: %s", channels)
154
155            self._speaker_created = True
156
157            if device_index is not None:
158                self._logger.debug("device_index: %s", device_index)
159
160                self._speaker = Speaker(
161                    rate=rate,
162                    channels=channels,
163                    last_play_delta_in_ms=playback_delta_in_ms,
164                    verbose=self._config.verbose,
165                    output_device_index=device_index,
166                    microphone=self._microphone,
167                )
168            else:
169                self._speaker = Speaker(
170                    rate=rate,
171                    channels=channels,
172                    last_play_delta_in_ms=playback_delta_in_ms,
173                    verbose=self._config.verbose,
174                    microphone=self._microphone,
175                )
176
177        # call the parent constructor
178        super().__init__(self._config, self._endpoint)
179
180    # pylint: disable=too-many-statements,too-many-branches
181    def start(
182        self,
183        options: Optional[SettingsConfigurationOptions] = None,
184        addons: Optional[Dict] = None,
185        headers: Optional[Dict] = None,
186        members: Optional[Dict] = None,
187        **kwargs,
188    ) -> bool:
189        """
190        Starts the WebSocket connection for agent API.
191        """
192        self._logger.debug("AgentWebSocketClient.start ENTER")
193        self._logger.info("settings: %s", options)
194        self._logger.info("addons: %s", addons)
195        self._logger.info("headers: %s", headers)
196        self._logger.info("members: %s", members)
197        self._logger.info("kwargs: %s", kwargs)
198
199        if isinstance(options, SettingsConfigurationOptions) and not options.check():
200            self._logger.error("settings.check failed")
201            self._logger.debug("AgentWebSocketClient.start LEAVE")
202            raise DeepgramError("Fatal agent settings error")
203
204        self._addons = addons
205        self._headers = headers
206
207        # add "members" as members of the class
208        if members is not None:
209            self.__dict__.update(members)
210
211        # set kwargs as members of the class
212        if kwargs is not None:
213            self._kwargs = kwargs
214        else:
215            self._kwargs = {}
216
217        if isinstance(options, SettingsConfigurationOptions):
218            self._logger.info("options is class")
219            self._settings = options
220        elif isinstance(options, dict):
221            self._logger.info("options is dict")
222            self._settings = SettingsConfigurationOptions.from_dict(options)
223        elif isinstance(options, str):
224            self._logger.info("options is json")
225            self._settings = SettingsConfigurationOptions.from_json(options)
226        else:
227            raise DeepgramError("Invalid options type")
228
229        if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"):
230            raise DeepgramError("Keyterms are only supported for nova-3 models")
231
232        try:
233            # speaker substitutes the listening thread
234            if self._speaker is not None:
235                self._logger.notice("passing speaker to delegate_listening")
236                super().delegate_listening(self._speaker)
237
238            # call parent start
239            if (
240                super().start(
241                    {},
242                    self._addons,
243                    self._headers,
244                    **dict(cast(Dict[Any, Any], self._kwargs)),
245                )
246                is False
247            ):
248                self._logger.error("AgentWebSocketClient.start failed")
249                self._logger.debug("AgentWebSocketClient.start LEAVE")
250                return False
251
252            if self._speaker is not None:
253                self._logger.notice("speaker is delegate_listening. Starting speaker")
254                self._speaker.start()
255
256            if self._speaker is not None and self._microphone is not None:
257                self._logger.notice(
258                    "speaker is delegate_listening. Starting microphone"
259                )
260                self._microphone.set_callback(self.send)
261                self._microphone.start()
262
263            # debug the threads
264            for thread in threading.enumerate():
265                self._logger.debug("after running thread: %s", thread.name)
266            self._logger.debug("number of active threads: %s", threading.active_count())
267
268            # keepalive thread
269            if self._config.is_keep_alive_enabled():
270                self._logger.notice("keepalive is enabled")
271                self._keep_alive_thread = threading.Thread(target=self._keep_alive)
272                self._keep_alive_thread.start()
273            else:
274                self._logger.notice("keepalive is disabled")
275
276            # debug the threads
277            for thread in threading.enumerate():
278                self._logger.debug("after running thread: %s", thread.name)
279            self._logger.debug("number of active threads: %s", threading.active_count())
280
281            # send the configurationsetting message
282            self._logger.notice("Sending ConfigurationSettings...")
283            ret_send_cs = self.send(str(self._settings))
284            if not ret_send_cs:
285                self._logger.error("ConfigurationSettings failed")
286
287                err_error: ErrorResponse = ErrorResponse(
288                    "Exception in AgentWebSocketClient.start",
289                    "ConfigurationSettings failed to send",
290                    "Exception",
291                )
292                self._emit(
293                    AgentWebSocketEvents(AgentWebSocketEvents.Error),
294                    error=err_error,
295                    **dict(cast(Dict[Any, Any], self._kwargs)),
296                )
297
298                self._logger.debug("AgentWebSocketClient.start LEAVE")
299                return False
300
301            self._logger.notice("start succeeded")
302            self._logger.debug("AgentWebSocketClient.start LEAVE")
303            return True
304
305        except Exception as e:  # pylint: disable=broad-except
306            self._logger.error(
307                "WebSocketException in AgentWebSocketClient.start: %s", e
308            )
309            self._logger.debug("AgentWebSocketClient.start LEAVE")
310            if self._config.options.get("termination_exception_connect") is True:
311                raise e
312            return False
313
314    # pylint: enable=too-many-statements,too-many-branches
315
316    def on(self, event: AgentWebSocketEvents, handler: Callable) -> None:
317        """
318        Registers event handlers for specific events.
319        """
320        self._logger.info("event subscribed: %s", event)
321        if event in AgentWebSocketEvents.__members__.values() and callable(handler):
322            self._event_handlers[event].append(handler)
323
324    def _emit(self, event: AgentWebSocketEvents, *args, **kwargs) -> None:
325        """
326        Emits events to the registered event handlers.
327        """
328        self._logger.debug("AgentWebSocketClient._emit ENTER")
329        self._logger.debug("callback handlers for: %s", event)
330
331        # debug the threads
332        for thread in threading.enumerate():
333            self._logger.debug("after running thread: %s", thread.name)
334        self._logger.debug("number of active threads: %s", threading.active_count())
335
336        self._logger.debug("callback handlers for: %s", event)
337        for handler in self._event_handlers[event]:
338            handler(self, *args, **kwargs)
339
340        # debug the threads
341        for thread in threading.enumerate():
342            self._logger.debug("after running thread: %s", thread.name)
343        self._logger.debug("number of active threads: %s", threading.active_count())
344
345        self._logger.debug("AgentWebSocketClient._emit LEAVE")
346
347    # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches
348    def _process_text(self, message: str) -> None:
349        """
350        Processes messages received over the WebSocket connection.
351        """
352        self._logger.debug("AgentWebSocketClient._process_text ENTER")
353
354        try:
355            self._logger.debug("Text data received")
356            if len(message) == 0:
357                self._logger.debug("message is empty")
358                self._logger.debug("AgentWebSocketClient._process_text LEAVE")
359                return
360
361            data = json.loads(message)
362            response_type = data.get("type")
363            self._logger.debug("response_type: %s, data: %s", response_type, data)
364
365            match response_type:
366                case AgentWebSocketEvents.Open:
367                    open_result: OpenResponse = OpenResponse.from_json(message)
368                    self._logger.verbose("OpenResponse: %s", open_result)
369                    self._emit(
370                        AgentWebSocketEvents(AgentWebSocketEvents.Open),
371                        open=open_result,
372                        **dict(cast(Dict[Any, Any], self._kwargs)),
373                    )
374                case AgentWebSocketEvents.Welcome:
375                    welcome_result: WelcomeResponse = WelcomeResponse.from_json(message)
376                    self._logger.verbose("WelcomeResponse: %s", welcome_result)
377                    self._emit(
378                        AgentWebSocketEvents(AgentWebSocketEvents.Welcome),
379                        welcome=welcome_result,
380                        **dict(cast(Dict[Any, Any], self._kwargs)),
381                    )
382                case AgentWebSocketEvents.SettingsApplied:
383                    settings_applied_result: SettingsAppliedResponse = (
384                        SettingsAppliedResponse.from_json(message)
385                    )
386                    self._logger.verbose(
387                        "SettingsAppliedResponse: %s", settings_applied_result
388                    )
389                    self._emit(
390                        AgentWebSocketEvents(AgentWebSocketEvents.SettingsApplied),
391                        settings_applied=settings_applied_result,
392                        **dict(cast(Dict[Any, Any], self._kwargs)),
393                    )
394                case AgentWebSocketEvents.ConversationText:
395                    conversation_text_result: ConversationTextResponse = (
396                        ConversationTextResponse.from_json(message)
397                    )
398                    self._logger.verbose(
399                        "ConversationTextResponse: %s", conversation_text_result
400                    )
401                    self._emit(
402                        AgentWebSocketEvents(AgentWebSocketEvents.ConversationText),
403                        conversation_text=conversation_text_result,
404                        **dict(cast(Dict[Any, Any], self._kwargs)),
405                    )
406                case AgentWebSocketEvents.UserStartedSpeaking:
407                    user_started_speaking_result: UserStartedSpeakingResponse = (
408                        UserStartedSpeakingResponse.from_json(message)
409                    )
410                    self._logger.verbose(
411                        "UserStartedSpeakingResponse: %s", user_started_speaking_result
412                    )
413                    self._emit(
414                        AgentWebSocketEvents(AgentWebSocketEvents.UserStartedSpeaking),
415                        user_started_speaking=user_started_speaking_result,
416                        **dict(cast(Dict[Any, Any], self._kwargs)),
417                    )
418                case AgentWebSocketEvents.AgentThinking:
419                    agent_thinking_result: AgentThinkingResponse = (
420                        AgentThinkingResponse.from_json(message)
421                    )
422                    self._logger.verbose(
423                        "AgentThinkingResponse: %s", agent_thinking_result
424                    )
425                    self._emit(
426                        AgentWebSocketEvents(AgentWebSocketEvents.AgentThinking),
427                        agent_thinking=agent_thinking_result,
428                        **dict(cast(Dict[Any, Any], self._kwargs)),
429                    )
430                case AgentWebSocketEvents.FunctionCalling:
431                    function_calling_result: FunctionCalling = (
432                        FunctionCalling.from_json(message)
433                    )
434                    self._logger.verbose("FunctionCalling: %s", function_calling_result)
435                    self._emit(
436                        AgentWebSocketEvents(AgentWebSocketEvents.FunctionCalling),
437                        function_calling=function_calling_result,
438                        **dict(cast(Dict[Any, Any], self._kwargs)),
439                    )
440                case AgentWebSocketEvents.FunctionCallRequest:
441                    function_call_request_result: FunctionCallRequest = (
442                        FunctionCallRequest.from_json(message)
443                    )
444                    self._logger.verbose(
445                        "FunctionCallRequest: %s", function_call_request_result
446                    )
447                    self._emit(
448                        AgentWebSocketEvents(AgentWebSocketEvents.FunctionCallRequest),
449                        function_call_request=function_call_request_result,
450                        **dict(cast(Dict[Any, Any], self._kwargs)),
451                    )
452                case AgentWebSocketEvents.AgentStartedSpeaking:
453                    agent_started_speaking_result: AgentStartedSpeakingResponse = (
454                        AgentStartedSpeakingResponse.from_json(message)
455                    )
456                    self._logger.verbose(
457                        "AgentStartedSpeakingResponse: %s",
458                        agent_started_speaking_result,
459                    )
460                    self._emit(
461                        AgentWebSocketEvents(AgentWebSocketEvents.AgentStartedSpeaking),
462                        agent_started_speaking=agent_started_speaking_result,
463                        **dict(cast(Dict[Any, Any], self._kwargs)),
464                    )
465                case AgentWebSocketEvents.AgentAudioDone:
466                    agent_audio_done_result: AgentAudioDoneResponse = (
467                        AgentAudioDoneResponse.from_json(message)
468                    )
469                    self._logger.verbose(
470                        "AgentAudioDoneResponse: %s", agent_audio_done_result
471                    )
472                    self._emit(
473                        AgentWebSocketEvents(AgentWebSocketEvents.AgentAudioDone),
474                        agent_audio_done=agent_audio_done_result,
475                        **dict(cast(Dict[Any, Any], self._kwargs)),
476                    )
477                case AgentWebSocketEvents.InjectionRefused:
478                    injection_refused_result: InjectionRefusedResponse = (
479                        InjectionRefusedResponse.from_json(message)
480                    )
481                    self._logger.verbose(
482                        "InjectionRefused: %s", injection_refused_result
483                    )
484                    self._emit(
485                        AgentWebSocketEvents(AgentWebSocketEvents.InjectionRefused),
486                        injection_refused=injection_refused_result,
487                        **dict(cast(Dict[Any, Any], self._kwargs)),
488                    )
489                case AgentWebSocketEvents.Close:
490                    close_result: CloseResponse = CloseResponse.from_json(message)
491                    self._logger.verbose("CloseResponse: %s", close_result)
492                    self._emit(
493                        AgentWebSocketEvents(AgentWebSocketEvents.Close),
494                        close=close_result,
495                        **dict(cast(Dict[Any, Any], self._kwargs)),
496                    )
497                case AgentWebSocketEvents.Error:
498                    err_error: ErrorResponse = ErrorResponse.from_json(message)
499                    self._logger.verbose("ErrorResponse: %s", err_error)
500                    self._emit(
501                        AgentWebSocketEvents(AgentWebSocketEvents.Error),
502                        error=err_error,
503                        **dict(cast(Dict[Any, Any], self._kwargs)),
504                    )
505                case _:
506                    self._logger.warning(
507                        "Unknown Message: response_type: %s, data: %s",
508                        response_type,
509                        data,
510                    )
511                    unhandled_error: UnhandledResponse = UnhandledResponse(
512                        type=AgentWebSocketEvents(AgentWebSocketEvents.Unhandled),
513                        raw=message,
514                    )
515                    self._emit(
516                        AgentWebSocketEvents(AgentWebSocketEvents.Unhandled),
517                        unhandled=unhandled_error,
518                        **dict(cast(Dict[Any, Any], self._kwargs)),
519                    )
520
521            self._logger.notice("_process_text Succeeded")
522            self._logger.debug("SpeakStreamClient._process_text LEAVE")
523
524        except Exception as e:  # pylint: disable=broad-except
525            self._logger.error("Exception in AgentWebSocketClient._process_text: %s", e)
526            e_error: ErrorResponse = ErrorResponse(
527                "Exception in AgentWebSocketClient._process_text",
528                f"{e}",
529                "Exception",
530            )
531            self._logger.error(
532                "Exception in AgentWebSocketClient._process_text: %s", str(e)
533            )
534            self._emit(
535                AgentWebSocketEvents(AgentWebSocketEvents.Error),
536                error=e_error,
537                **dict(cast(Dict[Any, Any], self._kwargs)),
538            )
539
540            # signal exit and close
541            super()._signal_exit()
542
543            self._logger.debug("AgentWebSocketClient._process_text LEAVE")
544
545            if self._config.options.get("termination_exception") is True:
546                raise
547            return
548
549    # pylint: enable=too-many-return-statements,too-many-statements
550
551    def _process_binary(self, message: bytes) -> None:
552        self._logger.debug("AgentWebSocketClient._process_binary ENTER")
553        self._logger.debug("Binary data received")
554
555        self._emit(
556            AgentWebSocketEvents(AgentWebSocketEvents.AudioData),
557            data=message,
558            **dict(cast(Dict[Any, Any], self._kwargs)),
559        )
560
561        self._logger.notice("_process_binary Succeeded")
562        self._logger.debug("AgentWebSocketClient._process_binary LEAVE")
563
564    # pylint: disable=too-many-return-statements
565    def _keep_alive(self) -> None:
566        """
567        Sends keepalive messages to the WebSocket connection.
568        """
569        self._logger.debug("AgentWebSocketClient._keep_alive ENTER")
570
571        counter = 0
572        while True:
573            try:
574                counter += 1
575                self._exit_event.wait(timeout=ONE_SECOND)
576
577                if self._exit_event.is_set():
578                    self._logger.notice("_keep_alive exiting gracefully")
579                    self._logger.debug("AgentWebSocketClient._keep_alive LEAVE")
580                    return
581
582                # deepgram keepalive
583                if counter % DEEPGRAM_INTERVAL == 0:
584                    self.keep_alive()
585
586            except Exception as e:  # pylint: disable=broad-except
587                self._logger.error(
588                    "Exception in AgentWebSocketClient._keep_alive: %s", e
589                )
590                e_error: ErrorResponse = ErrorResponse(
591                    "Exception in AgentWebSocketClient._keep_alive",
592                    f"{e}",
593                    "Exception",
594                )
595                self._logger.error(
596                    "Exception in AgentWebSocketClient._keep_alive: %s", str(e)
597                )
598                self._emit(
599                    AgentWebSocketEvents(AgentWebSocketEvents.Error),
600                    error=e_error,
601                    **dict(cast(Dict[Any, Any], self._kwargs)),
602                )
603
604                # signal exit and close
605                super()._signal_exit()
606
607                self._logger.debug("AgentWebSocketClient._keep_alive LEAVE")
608
609                if self._config.options.get("termination_exception") is True:
610                    raise
611                return
612
613    def keep_alive(self) -> bool:
614        """
615        Sends a KeepAlive message
616        """
617        self._logger.spam("AgentWebSocketClient.keep_alive ENTER")
618
619        self._logger.notice("Sending KeepAlive...")
620        ret = self.send(json.dumps({"type": "KeepAlive"}))
621
622        if not ret:
623            self._logger.error("keep_alive failed")
624            self._logger.spam("AgentWebSocketClient.keep_alive LEAVE")
625            return False
626
627        self._logger.notice("keep_alive succeeded")
628        self._logger.spam("AgentWebSocketClient.keep_alive LEAVE")
629
630        return True
631
632    def _close_message(self) -> bool:
633        # TODO: No known API close message # pylint: disable=fixme
634        # return self.send(json.dumps({"type": "Close"}))
635        return True
636
637    # closes the WebSocket connection gracefully
638    def finish(self) -> bool:
639        """
640        Closes the WebSocket connection gracefully.
641        """
642        self._logger.spam("AgentWebSocketClient.finish ENTER")
643
644        # call parent finish
645        if super().finish() is False:
646            self._logger.error("AgentWebSocketClient.finish failed")
647
648        if self._microphone is not None and self._microphone_created:
649            self._microphone.finish()
650            self._microphone_created = False
651
652        if self._speaker is not None and self._speaker_created:
653            self._speaker.finish()
654            self._speaker_created = False
655
656        # debug the threads
657        for thread in threading.enumerate():
658            self._logger.debug("before running thread: %s", thread.name)
659        self._logger.debug("number of active threads: %s", threading.active_count())
660
661        # stop the threads
662        self._logger.verbose("cancelling tasks...")
663        if self._keep_alive_thread is not None:
664            self._keep_alive_thread.join()
665            self._keep_alive_thread = None
666            self._logger.notice("processing _keep_alive_thread thread joined")
667
668        if self._listen_thread is not None:
669            self._listen_thread.join()
670            self._listen_thread = None
671        self._logger.notice("listening thread joined")
672
673        self._speaker = None
674        self._microphone = None
675
676        # debug the threads
677        for thread in threading.enumerate():
678            self._logger.debug("before running thread: %s", thread.name)
679        self._logger.debug("number of active threads: %s", threading.active_count())
680
681        self._logger.notice("finish succeeded")
682        self._logger.spam("AgentWebSocketClient.finish LEAVE")
683        return True
ONE_SECOND = 1
HALF_SECOND = 0.5
DEEPGRAM_INTERVAL = 5
 60class AgentWebSocketClient(
 61    AbstractSyncWebSocketClient
 62):  # pylint: disable=too-many-instance-attributes
 63    """
 64    Client for interacting with Deepgram's live transcription services over WebSockets.
 65
 66     This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
 67
 68     Args:
 69         config (DeepgramClientOptions): all the options for the client.
 70    """
 71
 72    _logger: verboselogs.VerboseLogger
 73    _config: DeepgramClientOptions
 74    _endpoint: str
 75
 76    _event_handlers: Dict[AgentWebSocketEvents, list]
 77
 78    _keep_alive_thread: Union[threading.Thread, None]
 79
 80    _kwargs: Optional[Dict] = None
 81    _addons: Optional[Dict] = None
 82    # note the distinction here. We can't use _config because it's already used in the parent
 83    _settings: Optional[SettingsConfigurationOptions] = None
 84    _headers: Optional[Dict] = None
 85
 86    _speaker_created: bool = False
 87    _speaker: Optional[Speaker] = None
 88    _microphone_created: bool = False
 89    _microphone: Optional[Microphone] = None
 90
 91    def __init__(self, config: DeepgramClientOptions):
 92        if config is None:
 93            raise DeepgramError("Config is required")
 94
 95        self._logger = verboselogs.VerboseLogger(__name__)
 96        self._logger.addHandler(logging.StreamHandler())
 97        self._logger.setLevel(config.verbose)
 98
 99        self._config = config
100
101        # needs to be "wss://agent.deepgram.com/agent"
102        self._endpoint = "agent"
103
104        # override the endpoint since it needs to be "wss://agent.deepgram.com/agent"
105        self._config.url = "agent.deepgram.com"
106
107        self._keep_alive_thread = None
108
109        # init handlers
110        self._event_handlers = {
111            event: [] for event in AgentWebSocketEvents.__members__.values()
112        }
113
114        if self._config.options.get("microphone_record") == "true":
115            self._logger.info("microphone_record is enabled")
116            rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE)
117            channels = self._config.options.get(
118                "microphone_record_channels", MICROPHONE_CHANNELS
119            )
120            device_index = self._config.options.get("microphone_record_device_index")
121
122            self._logger.debug("rate: %s", rate)
123            self._logger.debug("channels: %s", channels)
124
125            self._microphone_created = True
126
127            if device_index is not None:
128                self._logger.debug("device_index: %s", device_index)
129                self._microphone = Microphone(
130                    rate=rate,
131                    channels=channels,
132                    verbose=self._config.verbose,
133                    input_device_index=device_index,
134                )
135            else:
136                self._microphone = Microphone(
137                    rate=rate,
138                    channels=channels,
139                    verbose=self._config.verbose,
140                )
141
142        if self._config.options.get("speaker_playback") == "true":
143            self._logger.info("speaker_playback is enabled")
144            rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE)
145            channels = self._config.options.get(
146                "speaker_playback_channels", SPEAKER_CHANNELS
147            )
148            playback_delta_in_ms = self._config.options.get(
149                "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA
150            )
151            device_index = self._config.options.get("speaker_playback_device_index")
152
153            self._logger.debug("rate: %s", rate)
154            self._logger.debug("channels: %s", channels)
155
156            self._speaker_created = True
157
158            if device_index is not None:
159                self._logger.debug("device_index: %s", device_index)
160
161                self._speaker = Speaker(
162                    rate=rate,
163                    channels=channels,
164                    last_play_delta_in_ms=playback_delta_in_ms,
165                    verbose=self._config.verbose,
166                    output_device_index=device_index,
167                    microphone=self._microphone,
168                )
169            else:
170                self._speaker = Speaker(
171                    rate=rate,
172                    channels=channels,
173                    last_play_delta_in_ms=playback_delta_in_ms,
174                    verbose=self._config.verbose,
175                    microphone=self._microphone,
176                )
177
178        # call the parent constructor
179        super().__init__(self._config, self._endpoint)
180
181    # pylint: disable=too-many-statements,too-many-branches
182    def start(
183        self,
184        options: Optional[SettingsConfigurationOptions] = None,
185        addons: Optional[Dict] = None,
186        headers: Optional[Dict] = None,
187        members: Optional[Dict] = None,
188        **kwargs,
189    ) -> bool:
190        """
191        Starts the WebSocket connection for agent API.
192        """
193        self._logger.debug("AgentWebSocketClient.start ENTER")
194        self._logger.info("settings: %s", options)
195        self._logger.info("addons: %s", addons)
196        self._logger.info("headers: %s", headers)
197        self._logger.info("members: %s", members)
198        self._logger.info("kwargs: %s", kwargs)
199
200        if isinstance(options, SettingsConfigurationOptions) and not options.check():
201            self._logger.error("settings.check failed")
202            self._logger.debug("AgentWebSocketClient.start LEAVE")
203            raise DeepgramError("Fatal agent settings error")
204
205        self._addons = addons
206        self._headers = headers
207
208        # add "members" as members of the class
209        if members is not None:
210            self.__dict__.update(members)
211
212        # set kwargs as members of the class
213        if kwargs is not None:
214            self._kwargs = kwargs
215        else:
216            self._kwargs = {}
217
218        if isinstance(options, SettingsConfigurationOptions):
219            self._logger.info("options is class")
220            self._settings = options
221        elif isinstance(options, dict):
222            self._logger.info("options is dict")
223            self._settings = SettingsConfigurationOptions.from_dict(options)
224        elif isinstance(options, str):
225            self._logger.info("options is json")
226            self._settings = SettingsConfigurationOptions.from_json(options)
227        else:
228            raise DeepgramError("Invalid options type")
229
230        if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"):
231            raise DeepgramError("Keyterms are only supported for nova-3 models")
232
233        try:
234            # speaker substitutes the listening thread
235            if self._speaker is not None:
236                self._logger.notice("passing speaker to delegate_listening")
237                super().delegate_listening(self._speaker)
238
239            # call parent start
240            if (
241                super().start(
242                    {},
243                    self._addons,
244                    self._headers,
245                    **dict(cast(Dict[Any, Any], self._kwargs)),
246                )
247                is False
248            ):
249                self._logger.error("AgentWebSocketClient.start failed")
250                self._logger.debug("AgentWebSocketClient.start LEAVE")
251                return False
252
253            if self._speaker is not None:
254                self._logger.notice("speaker is delegate_listening. Starting speaker")
255                self._speaker.start()
256
257            if self._speaker is not None and self._microphone is not None:
258                self._logger.notice(
259                    "speaker is delegate_listening. Starting microphone"
260                )
261                self._microphone.set_callback(self.send)
262                self._microphone.start()
263
264            # debug the threads
265            for thread in threading.enumerate():
266                self._logger.debug("after running thread: %s", thread.name)
267            self._logger.debug("number of active threads: %s", threading.active_count())
268
269            # keepalive thread
270            if self._config.is_keep_alive_enabled():
271                self._logger.notice("keepalive is enabled")
272                self._keep_alive_thread = threading.Thread(target=self._keep_alive)
273                self._keep_alive_thread.start()
274            else:
275                self._logger.notice("keepalive is disabled")
276
277            # debug the threads
278            for thread in threading.enumerate():
279                self._logger.debug("after running thread: %s", thread.name)
280            self._logger.debug("number of active threads: %s", threading.active_count())
281
282            # send the configurationsetting message
283            self._logger.notice("Sending ConfigurationSettings...")
284            ret_send_cs = self.send(str(self._settings))
285            if not ret_send_cs:
286                self._logger.error("ConfigurationSettings failed")
287
288                err_error: ErrorResponse = ErrorResponse(
289                    "Exception in AgentWebSocketClient.start",
290                    "ConfigurationSettings failed to send",
291                    "Exception",
292                )
293                self._emit(
294                    AgentWebSocketEvents(AgentWebSocketEvents.Error),
295                    error=err_error,
296                    **dict(cast(Dict[Any, Any], self._kwargs)),
297                )
298
299                self._logger.debug("AgentWebSocketClient.start LEAVE")
300                return False
301
302            self._logger.notice("start succeeded")
303            self._logger.debug("AgentWebSocketClient.start LEAVE")
304            return True
305
306        except Exception as e:  # pylint: disable=broad-except
307            self._logger.error(
308                "WebSocketException in AgentWebSocketClient.start: %s", e
309            )
310            self._logger.debug("AgentWebSocketClient.start LEAVE")
311            if self._config.options.get("termination_exception_connect") is True:
312                raise e
313            return False
314
315    # pylint: enable=too-many-statements,too-many-branches
316
317    def on(self, event: AgentWebSocketEvents, handler: Callable) -> None:
318        """
319        Registers event handlers for specific events.
320        """
321        self._logger.info("event subscribed: %s", event)
322        if event in AgentWebSocketEvents.__members__.values() and callable(handler):
323            self._event_handlers[event].append(handler)
324
325    def _emit(self, event: AgentWebSocketEvents, *args, **kwargs) -> None:
326        """
327        Emits events to the registered event handlers.
328        """
329        self._logger.debug("AgentWebSocketClient._emit ENTER")
330        self._logger.debug("callback handlers for: %s", event)
331
332        # debug the threads
333        for thread in threading.enumerate():
334            self._logger.debug("after running thread: %s", thread.name)
335        self._logger.debug("number of active threads: %s", threading.active_count())
336
337        self._logger.debug("callback handlers for: %s", event)
338        for handler in self._event_handlers[event]:
339            handler(self, *args, **kwargs)
340
341        # debug the threads
342        for thread in threading.enumerate():
343            self._logger.debug("after running thread: %s", thread.name)
344        self._logger.debug("number of active threads: %s", threading.active_count())
345
346        self._logger.debug("AgentWebSocketClient._emit LEAVE")
347
348    # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches
349    def _process_text(self, message: str) -> None:
350        """
351        Processes messages received over the WebSocket connection.
352        """
353        self._logger.debug("AgentWebSocketClient._process_text ENTER")
354
355        try:
356            self._logger.debug("Text data received")
357            if len(message) == 0:
358                self._logger.debug("message is empty")
359                self._logger.debug("AgentWebSocketClient._process_text LEAVE")
360                return
361
362            data = json.loads(message)
363            response_type = data.get("type")
364            self._logger.debug("response_type: %s, data: %s", response_type, data)
365
366            match response_type:
367                case AgentWebSocketEvents.Open:
368                    open_result: OpenResponse = OpenResponse.from_json(message)
369                    self._logger.verbose("OpenResponse: %s", open_result)
370                    self._emit(
371                        AgentWebSocketEvents(AgentWebSocketEvents.Open),
372                        open=open_result,
373                        **dict(cast(Dict[Any, Any], self._kwargs)),
374                    )
375                case AgentWebSocketEvents.Welcome:
376                    welcome_result: WelcomeResponse = WelcomeResponse.from_json(message)
377                    self._logger.verbose("WelcomeResponse: %s", welcome_result)
378                    self._emit(
379                        AgentWebSocketEvents(AgentWebSocketEvents.Welcome),
380                        welcome=welcome_result,
381                        **dict(cast(Dict[Any, Any], self._kwargs)),
382                    )
383                case AgentWebSocketEvents.SettingsApplied:
384                    settings_applied_result: SettingsAppliedResponse = (
385                        SettingsAppliedResponse.from_json(message)
386                    )
387                    self._logger.verbose(
388                        "SettingsAppliedResponse: %s", settings_applied_result
389                    )
390                    self._emit(
391                        AgentWebSocketEvents(AgentWebSocketEvents.SettingsApplied),
392                        settings_applied=settings_applied_result,
393                        **dict(cast(Dict[Any, Any], self._kwargs)),
394                    )
395                case AgentWebSocketEvents.ConversationText:
396                    conversation_text_result: ConversationTextResponse = (
397                        ConversationTextResponse.from_json(message)
398                    )
399                    self._logger.verbose(
400                        "ConversationTextResponse: %s", conversation_text_result
401                    )
402                    self._emit(
403                        AgentWebSocketEvents(AgentWebSocketEvents.ConversationText),
404                        conversation_text=conversation_text_result,
405                        **dict(cast(Dict[Any, Any], self._kwargs)),
406                    )
407                case AgentWebSocketEvents.UserStartedSpeaking:
408                    user_started_speaking_result: UserStartedSpeakingResponse = (
409                        UserStartedSpeakingResponse.from_json(message)
410                    )
411                    self._logger.verbose(
412                        "UserStartedSpeakingResponse: %s", user_started_speaking_result
413                    )
414                    self._emit(
415                        AgentWebSocketEvents(AgentWebSocketEvents.UserStartedSpeaking),
416                        user_started_speaking=user_started_speaking_result,
417                        **dict(cast(Dict[Any, Any], self._kwargs)),
418                    )
419                case AgentWebSocketEvents.AgentThinking:
420                    agent_thinking_result: AgentThinkingResponse = (
421                        AgentThinkingResponse.from_json(message)
422                    )
423                    self._logger.verbose(
424                        "AgentThinkingResponse: %s", agent_thinking_result
425                    )
426                    self._emit(
427                        AgentWebSocketEvents(AgentWebSocketEvents.AgentThinking),
428                        agent_thinking=agent_thinking_result,
429                        **dict(cast(Dict[Any, Any], self._kwargs)),
430                    )
431                case AgentWebSocketEvents.FunctionCalling:
432                    function_calling_result: FunctionCalling = (
433                        FunctionCalling.from_json(message)
434                    )
435                    self._logger.verbose("FunctionCalling: %s", function_calling_result)
436                    self._emit(
437                        AgentWebSocketEvents(AgentWebSocketEvents.FunctionCalling),
438                        function_calling=function_calling_result,
439                        **dict(cast(Dict[Any, Any], self._kwargs)),
440                    )
441                case AgentWebSocketEvents.FunctionCallRequest:
442                    function_call_request_result: FunctionCallRequest = (
443                        FunctionCallRequest.from_json(message)
444                    )
445                    self._logger.verbose(
446                        "FunctionCallRequest: %s", function_call_request_result
447                    )
448                    self._emit(
449                        AgentWebSocketEvents(AgentWebSocketEvents.FunctionCallRequest),
450                        function_call_request=function_call_request_result,
451                        **dict(cast(Dict[Any, Any], self._kwargs)),
452                    )
453                case AgentWebSocketEvents.AgentStartedSpeaking:
454                    agent_started_speaking_result: AgentStartedSpeakingResponse = (
455                        AgentStartedSpeakingResponse.from_json(message)
456                    )
457                    self._logger.verbose(
458                        "AgentStartedSpeakingResponse: %s",
459                        agent_started_speaking_result,
460                    )
461                    self._emit(
462                        AgentWebSocketEvents(AgentWebSocketEvents.AgentStartedSpeaking),
463                        agent_started_speaking=agent_started_speaking_result,
464                        **dict(cast(Dict[Any, Any], self._kwargs)),
465                    )
466                case AgentWebSocketEvents.AgentAudioDone:
467                    agent_audio_done_result: AgentAudioDoneResponse = (
468                        AgentAudioDoneResponse.from_json(message)
469                    )
470                    self._logger.verbose(
471                        "AgentAudioDoneResponse: %s", agent_audio_done_result
472                    )
473                    self._emit(
474                        AgentWebSocketEvents(AgentWebSocketEvents.AgentAudioDone),
475                        agent_audio_done=agent_audio_done_result,
476                        **dict(cast(Dict[Any, Any], self._kwargs)),
477                    )
478                case AgentWebSocketEvents.InjectionRefused:
479                    injection_refused_result: InjectionRefusedResponse = (
480                        InjectionRefusedResponse.from_json(message)
481                    )
482                    self._logger.verbose(
483                        "InjectionRefused: %s", injection_refused_result
484                    )
485                    self._emit(
486                        AgentWebSocketEvents(AgentWebSocketEvents.InjectionRefused),
487                        injection_refused=injection_refused_result,
488                        **dict(cast(Dict[Any, Any], self._kwargs)),
489                    )
490                case AgentWebSocketEvents.Close:
491                    close_result: CloseResponse = CloseResponse.from_json(message)
492                    self._logger.verbose("CloseResponse: %s", close_result)
493                    self._emit(
494                        AgentWebSocketEvents(AgentWebSocketEvents.Close),
495                        close=close_result,
496                        **dict(cast(Dict[Any, Any], self._kwargs)),
497                    )
498                case AgentWebSocketEvents.Error:
499                    err_error: ErrorResponse = ErrorResponse.from_json(message)
500                    self._logger.verbose("ErrorResponse: %s", err_error)
501                    self._emit(
502                        AgentWebSocketEvents(AgentWebSocketEvents.Error),
503                        error=err_error,
504                        **dict(cast(Dict[Any, Any], self._kwargs)),
505                    )
506                case _:
507                    self._logger.warning(
508                        "Unknown Message: response_type: %s, data: %s",
509                        response_type,
510                        data,
511                    )
512                    unhandled_error: UnhandledResponse = UnhandledResponse(
513                        type=AgentWebSocketEvents(AgentWebSocketEvents.Unhandled),
514                        raw=message,
515                    )
516                    self._emit(
517                        AgentWebSocketEvents(AgentWebSocketEvents.Unhandled),
518                        unhandled=unhandled_error,
519                        **dict(cast(Dict[Any, Any], self._kwargs)),
520                    )
521
522            self._logger.notice("_process_text Succeeded")
523            self._logger.debug("SpeakStreamClient._process_text LEAVE")
524
525        except Exception as e:  # pylint: disable=broad-except
526            self._logger.error("Exception in AgentWebSocketClient._process_text: %s", e)
527            e_error: ErrorResponse = ErrorResponse(
528                "Exception in AgentWebSocketClient._process_text",
529                f"{e}",
530                "Exception",
531            )
532            self._logger.error(
533                "Exception in AgentWebSocketClient._process_text: %s", str(e)
534            )
535            self._emit(
536                AgentWebSocketEvents(AgentWebSocketEvents.Error),
537                error=e_error,
538                **dict(cast(Dict[Any, Any], self._kwargs)),
539            )
540
541            # signal exit and close
542            super()._signal_exit()
543
544            self._logger.debug("AgentWebSocketClient._process_text LEAVE")
545
546            if self._config.options.get("termination_exception") is True:
547                raise
548            return
549
550    # pylint: enable=too-many-return-statements,too-many-statements
551
552    def _process_binary(self, message: bytes) -> None:
553        self._logger.debug("AgentWebSocketClient._process_binary ENTER")
554        self._logger.debug("Binary data received")
555
556        self._emit(
557            AgentWebSocketEvents(AgentWebSocketEvents.AudioData),
558            data=message,
559            **dict(cast(Dict[Any, Any], self._kwargs)),
560        )
561
562        self._logger.notice("_process_binary Succeeded")
563        self._logger.debug("AgentWebSocketClient._process_binary LEAVE")
564
565    # pylint: disable=too-many-return-statements
566    def _keep_alive(self) -> None:
567        """
568        Sends keepalive messages to the WebSocket connection.
569        """
570        self._logger.debug("AgentWebSocketClient._keep_alive ENTER")
571
572        counter = 0
573        while True:
574            try:
575                counter += 1
576                self._exit_event.wait(timeout=ONE_SECOND)
577
578                if self._exit_event.is_set():
579                    self._logger.notice("_keep_alive exiting gracefully")
580                    self._logger.debug("AgentWebSocketClient._keep_alive LEAVE")
581                    return
582
583                # deepgram keepalive
584                if counter % DEEPGRAM_INTERVAL == 0:
585                    self.keep_alive()
586
587            except Exception as e:  # pylint: disable=broad-except
588                self._logger.error(
589                    "Exception in AgentWebSocketClient._keep_alive: %s", e
590                )
591                e_error: ErrorResponse = ErrorResponse(
592                    "Exception in AgentWebSocketClient._keep_alive",
593                    f"{e}",
594                    "Exception",
595                )
596                self._logger.error(
597                    "Exception in AgentWebSocketClient._keep_alive: %s", str(e)
598                )
599                self._emit(
600                    AgentWebSocketEvents(AgentWebSocketEvents.Error),
601                    error=e_error,
602                    **dict(cast(Dict[Any, Any], self._kwargs)),
603                )
604
605                # signal exit and close
606                super()._signal_exit()
607
608                self._logger.debug("AgentWebSocketClient._keep_alive LEAVE")
609
610                if self._config.options.get("termination_exception") is True:
611                    raise
612                return
613
614    def keep_alive(self) -> bool:
615        """
616        Sends a KeepAlive message
617        """
618        self._logger.spam("AgentWebSocketClient.keep_alive ENTER")
619
620        self._logger.notice("Sending KeepAlive...")
621        ret = self.send(json.dumps({"type": "KeepAlive"}))
622
623        if not ret:
624            self._logger.error("keep_alive failed")
625            self._logger.spam("AgentWebSocketClient.keep_alive LEAVE")
626            return False
627
628        self._logger.notice("keep_alive succeeded")
629        self._logger.spam("AgentWebSocketClient.keep_alive LEAVE")
630
631        return True
632
633    def _close_message(self) -> bool:
634        # TODO: No known API close message # pylint: disable=fixme
635        # return self.send(json.dumps({"type": "Close"}))
636        return True
637
638    # closes the WebSocket connection gracefully
639    def finish(self) -> bool:
640        """
641        Closes the WebSocket connection gracefully.
642        """
643        self._logger.spam("AgentWebSocketClient.finish ENTER")
644
645        # call parent finish
646        if super().finish() is False:
647            self._logger.error("AgentWebSocketClient.finish failed")
648
649        if self._microphone is not None and self._microphone_created:
650            self._microphone.finish()
651            self._microphone_created = False
652
653        if self._speaker is not None and self._speaker_created:
654            self._speaker.finish()
655            self._speaker_created = False
656
657        # debug the threads
658        for thread in threading.enumerate():
659            self._logger.debug("before running thread: %s", thread.name)
660        self._logger.debug("number of active threads: %s", threading.active_count())
661
662        # stop the threads
663        self._logger.verbose("cancelling tasks...")
664        if self._keep_alive_thread is not None:
665            self._keep_alive_thread.join()
666            self._keep_alive_thread = None
667            self._logger.notice("processing _keep_alive_thread thread joined")
668
669        if self._listen_thread is not None:
670            self._listen_thread.join()
671            self._listen_thread = None
672        self._logger.notice("listening thread joined")
673
674        self._speaker = None
675        self._microphone = None
676
677        # debug the threads
678        for thread in threading.enumerate():
679            self._logger.debug("before running thread: %s", thread.name)
680        self._logger.debug("number of active threads: %s", threading.active_count())
681
682        self._logger.notice("finish succeeded")
683        self._logger.spam("AgentWebSocketClient.finish LEAVE")
684        return True

Client for interacting with Deepgram's live transcription services over WebSockets.

This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.

Args: config (DeepgramClientOptions): all the options for the client.

AgentWebSocketClient(config: deepgram.options.DeepgramClientOptions)
 91    def __init__(self, config: DeepgramClientOptions):
 92        if config is None:
 93            raise DeepgramError("Config is required")
 94
 95        self._logger = verboselogs.VerboseLogger(__name__)
 96        self._logger.addHandler(logging.StreamHandler())
 97        self._logger.setLevel(config.verbose)
 98
 99        self._config = config
100
101        # needs to be "wss://agent.deepgram.com/agent"
102        self._endpoint = "agent"
103
104        # override the endpoint since it needs to be "wss://agent.deepgram.com/agent"
105        self._config.url = "agent.deepgram.com"
106
107        self._keep_alive_thread = None
108
109        # init handlers
110        self._event_handlers = {
111            event: [] for event in AgentWebSocketEvents.__members__.values()
112        }
113
114        if self._config.options.get("microphone_record") == "true":
115            self._logger.info("microphone_record is enabled")
116            rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE)
117            channels = self._config.options.get(
118                "microphone_record_channels", MICROPHONE_CHANNELS
119            )
120            device_index = self._config.options.get("microphone_record_device_index")
121
122            self._logger.debug("rate: %s", rate)
123            self._logger.debug("channels: %s", channels)
124
125            self._microphone_created = True
126
127            if device_index is not None:
128                self._logger.debug("device_index: %s", device_index)
129                self._microphone = Microphone(
130                    rate=rate,
131                    channels=channels,
132                    verbose=self._config.verbose,
133                    input_device_index=device_index,
134                )
135            else:
136                self._microphone = Microphone(
137                    rate=rate,
138                    channels=channels,
139                    verbose=self._config.verbose,
140                )
141
142        if self._config.options.get("speaker_playback") == "true":
143            self._logger.info("speaker_playback is enabled")
144            rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE)
145            channels = self._config.options.get(
146                "speaker_playback_channels", SPEAKER_CHANNELS
147            )
148            playback_delta_in_ms = self._config.options.get(
149                "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA
150            )
151            device_index = self._config.options.get("speaker_playback_device_index")
152
153            self._logger.debug("rate: %s", rate)
154            self._logger.debug("channels: %s", channels)
155
156            self._speaker_created = True
157
158            if device_index is not None:
159                self._logger.debug("device_index: %s", device_index)
160
161                self._speaker = Speaker(
162                    rate=rate,
163                    channels=channels,
164                    last_play_delta_in_ms=playback_delta_in_ms,
165                    verbose=self._config.verbose,
166                    output_device_index=device_index,
167                    microphone=self._microphone,
168                )
169            else:
170                self._speaker = Speaker(
171                    rate=rate,
172                    channels=channels,
173                    last_play_delta_in_ms=playback_delta_in_ms,
174                    verbose=self._config.verbose,
175                    microphone=self._microphone,
176                )
177
178        # call the parent constructor
179        super().__init__(self._config, self._endpoint)
def start( self, options: Optional[deepgram.clients.agent.v1.websocket.options.SettingsConfigurationOptions] = None, addons: Optional[Dict] = None, headers: Optional[Dict] = None, members: Optional[Dict] = None, **kwargs) -> bool:
182    def start(
183        self,
184        options: Optional[SettingsConfigurationOptions] = None,
185        addons: Optional[Dict] = None,
186        headers: Optional[Dict] = None,
187        members: Optional[Dict] = None,
188        **kwargs,
189    ) -> bool:
190        """
191        Starts the WebSocket connection for agent API.
192        """
193        self._logger.debug("AgentWebSocketClient.start ENTER")
194        self._logger.info("settings: %s", options)
195        self._logger.info("addons: %s", addons)
196        self._logger.info("headers: %s", headers)
197        self._logger.info("members: %s", members)
198        self._logger.info("kwargs: %s", kwargs)
199
200        if isinstance(options, SettingsConfigurationOptions) and not options.check():
201            self._logger.error("settings.check failed")
202            self._logger.debug("AgentWebSocketClient.start LEAVE")
203            raise DeepgramError("Fatal agent settings error")
204
205        self._addons = addons
206        self._headers = headers
207
208        # add "members" as members of the class
209        if members is not None:
210            self.__dict__.update(members)
211
212        # set kwargs as members of the class
213        if kwargs is not None:
214            self._kwargs = kwargs
215        else:
216            self._kwargs = {}
217
218        if isinstance(options, SettingsConfigurationOptions):
219            self._logger.info("options is class")
220            self._settings = options
221        elif isinstance(options, dict):
222            self._logger.info("options is dict")
223            self._settings = SettingsConfigurationOptions.from_dict(options)
224        elif isinstance(options, str):
225            self._logger.info("options is json")
226            self._settings = SettingsConfigurationOptions.from_json(options)
227        else:
228            raise DeepgramError("Invalid options type")
229
230        if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"):
231            raise DeepgramError("Keyterms are only supported for nova-3 models")
232
233        try:
234            # speaker substitutes the listening thread
235            if self._speaker is not None:
236                self._logger.notice("passing speaker to delegate_listening")
237                super().delegate_listening(self._speaker)
238
239            # call parent start
240            if (
241                super().start(
242                    {},
243                    self._addons,
244                    self._headers,
245                    **dict(cast(Dict[Any, Any], self._kwargs)),
246                )
247                is False
248            ):
249                self._logger.error("AgentWebSocketClient.start failed")
250                self._logger.debug("AgentWebSocketClient.start LEAVE")
251                return False
252
253            if self._speaker is not None:
254                self._logger.notice("speaker is delegate_listening. Starting speaker")
255                self._speaker.start()
256
257            if self._speaker is not None and self._microphone is not None:
258                self._logger.notice(
259                    "speaker is delegate_listening. Starting microphone"
260                )
261                self._microphone.set_callback(self.send)
262                self._microphone.start()
263
264            # debug the threads
265            for thread in threading.enumerate():
266                self._logger.debug("after running thread: %s", thread.name)
267            self._logger.debug("number of active threads: %s", threading.active_count())
268
269            # keepalive thread
270            if self._config.is_keep_alive_enabled():
271                self._logger.notice("keepalive is enabled")
272                self._keep_alive_thread = threading.Thread(target=self._keep_alive)
273                self._keep_alive_thread.start()
274            else:
275                self._logger.notice("keepalive is disabled")
276
277            # debug the threads
278            for thread in threading.enumerate():
279                self._logger.debug("after running thread: %s", thread.name)
280            self._logger.debug("number of active threads: %s", threading.active_count())
281
282            # send the configurationsetting message
283            self._logger.notice("Sending ConfigurationSettings...")
284            ret_send_cs = self.send(str(self._settings))
285            if not ret_send_cs:
286                self._logger.error("ConfigurationSettings failed")
287
288                err_error: ErrorResponse = ErrorResponse(
289                    "Exception in AgentWebSocketClient.start",
290                    "ConfigurationSettings failed to send",
291                    "Exception",
292                )
293                self._emit(
294                    AgentWebSocketEvents(AgentWebSocketEvents.Error),
295                    error=err_error,
296                    **dict(cast(Dict[Any, Any], self._kwargs)),
297                )
298
299                self._logger.debug("AgentWebSocketClient.start LEAVE")
300                return False
301
302            self._logger.notice("start succeeded")
303            self._logger.debug("AgentWebSocketClient.start LEAVE")
304            return True
305
306        except Exception as e:  # pylint: disable=broad-except
307            self._logger.error(
308                "WebSocketException in AgentWebSocketClient.start: %s", e
309            )
310            self._logger.debug("AgentWebSocketClient.start LEAVE")
311            if self._config.options.get("termination_exception_connect") is True:
312                raise e
313            return False

Starts the WebSocket connection for agent API.

def on( self, event: deepgram.clients.agent.enums.AgentWebSocketEvents, handler: Callable) -> None:
317    def on(self, event: AgentWebSocketEvents, handler: Callable) -> None:
318        """
319        Registers event handlers for specific events.
320        """
321        self._logger.info("event subscribed: %s", event)
322        if event in AgentWebSocketEvents.__members__.values() and callable(handler):
323            self._event_handlers[event].append(handler)

Registers event handlers for specific events.

def keep_alive(self) -> bool:
614    def keep_alive(self) -> bool:
615        """
616        Sends a KeepAlive message
617        """
618        self._logger.spam("AgentWebSocketClient.keep_alive ENTER")
619
620        self._logger.notice("Sending KeepAlive...")
621        ret = self.send(json.dumps({"type": "KeepAlive"}))
622
623        if not ret:
624            self._logger.error("keep_alive failed")
625            self._logger.spam("AgentWebSocketClient.keep_alive LEAVE")
626            return False
627
628        self._logger.notice("keep_alive succeeded")
629        self._logger.spam("AgentWebSocketClient.keep_alive LEAVE")
630
631        return True

Sends a KeepAlive message

def finish(self) -> bool:
639    def finish(self) -> bool:
640        """
641        Closes the WebSocket connection gracefully.
642        """
643        self._logger.spam("AgentWebSocketClient.finish ENTER")
644
645        # call parent finish
646        if super().finish() is False:
647            self._logger.error("AgentWebSocketClient.finish failed")
648
649        if self._microphone is not None and self._microphone_created:
650            self._microphone.finish()
651            self._microphone_created = False
652
653        if self._speaker is not None and self._speaker_created:
654            self._speaker.finish()
655            self._speaker_created = False
656
657        # debug the threads
658        for thread in threading.enumerate():
659            self._logger.debug("before running thread: %s", thread.name)
660        self._logger.debug("number of active threads: %s", threading.active_count())
661
662        # stop the threads
663        self._logger.verbose("cancelling tasks...")
664        if self._keep_alive_thread is not None:
665            self._keep_alive_thread.join()
666            self._keep_alive_thread = None
667            self._logger.notice("processing _keep_alive_thread thread joined")
668
669        if self._listen_thread is not None:
670            self._listen_thread.join()
671            self._listen_thread = None
672        self._logger.notice("listening thread joined")
673
674        self._speaker = None
675        self._microphone = None
676
677        # debug the threads
678        for thread in threading.enumerate():
679            self._logger.debug("before running thread: %s", thread.name)
680        self._logger.debug("number of active threads: %s", threading.active_count())
681
682        self._logger.notice("finish succeeded")
683        self._logger.spam("AgentWebSocketClient.finish LEAVE")
684        return True

Closes the WebSocket connection gracefully.