deepgram.clients.agent.v1.websocket.async_client

  1# Copyright 2024 Deepgram SDK contributors. All Rights Reserved.
  2# Use of this source code is governed by a MIT license that can be found in the LICENSE file.
  3# SPDX-License-Identifier: MIT
  4
  5import asyncio
  6import json
  7import logging
  8from typing import Dict, Union, Optional, cast, Any, Callable
  9import threading
 10
 11from .....utils import verboselogs
 12from .....options import DeepgramClientOptions
 13from ...enums import AgentWebSocketEvents
 14from ....common import AbstractAsyncWebSocketClient
 15from ....common import DeepgramError
 16
 17from .response import (
 18    OpenResponse,
 19    WelcomeResponse,
 20    SettingsAppliedResponse,
 21    ConversationTextResponse,
 22    UserStartedSpeakingResponse,
 23    AgentThinkingResponse,
 24    FunctionCalling,
 25    FunctionCallRequest,
 26    AgentStartedSpeakingResponse,
 27    AgentAudioDoneResponse,
 28    InjectionRefusedResponse,
 29    CloseResponse,
 30    ErrorResponse,
 31    UnhandledResponse,
 32)
 33from .options import (
 34    SettingsConfigurationOptions,
 35    UpdateInstructionsOptions,
 36    UpdateSpeakOptions,
 37    InjectAgentMessageOptions,
 38    FunctionCallResponse,
 39    AgentKeepAlive,
 40)
 41
 42from .....audio.speaker import (
 43    Speaker,
 44    RATE as SPEAKER_RATE,
 45    CHANNELS as SPEAKER_CHANNELS,
 46    PLAYBACK_DELTA as SPEAKER_PLAYBACK_DELTA,
 47)
 48from .....audio.microphone import (
 49    Microphone,
 50    RATE as MICROPHONE_RATE,
 51    CHANNELS as MICROPHONE_CHANNELS,
 52)
 53
 54ONE_SECOND = 1
 55HALF_SECOND = 0.5
 56DEEPGRAM_INTERVAL = 5
 57
 58
 59class AsyncAgentWebSocketClient(
 60    AbstractAsyncWebSocketClient
 61):  # pylint: disable=too-many-instance-attributes
 62    """
 63    Client for interacting with Deepgram's live transcription services over WebSockets.
 64
 65     This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
 66
 67     Args:
 68         config (DeepgramClientOptions): all the options for the client.
 69    """
 70
 71    _logger: verboselogs.VerboseLogger
 72    _config: DeepgramClientOptions
 73    _endpoint: str
 74
 75    _event_handlers: Dict[AgentWebSocketEvents, list]
 76
 77    _keep_alive_thread: Union[asyncio.Task, None]
 78
 79    _kwargs: Optional[Dict] = None
 80    _addons: Optional[Dict] = None
 81    # note the distinction here. We can't use _config because it's already used in the parent
 82    _settings: Optional[SettingsConfigurationOptions] = None
 83    _headers: Optional[Dict] = None
 84
 85    _speaker_created: bool = False
 86    _speaker: Optional[Speaker] = None
 87    _microphone_created: bool = False
 88    _microphone: Optional[Microphone] = None
 89
 90    def __init__(self, config: DeepgramClientOptions):
 91        if config is None:
 92            raise DeepgramError("Config is required")
 93
 94        self._logger = verboselogs.VerboseLogger(__name__)
 95        self._logger.addHandler(logging.StreamHandler())
 96        self._logger.setLevel(config.verbose)
 97
 98        self._config = config
 99
100        # needs to be "wss://agent.deepgram.com/agent"
101        self._endpoint = "agent"
102
103        # override the endpoint since it needs to be "wss://agent.deepgram.com/agent"
104        self._config.url = "agent.deepgram.com"
105        self._keep_alive_thread = None
106
107        # init handlers
108        self._event_handlers = {
109            event: [] for event in AgentWebSocketEvents.__members__.values()
110        }
111
112        if self._config.options.get("microphone_record") == "true":
113            self._logger.info("microphone_record is enabled")
114            rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE)
115            channels = self._config.options.get(
116                "microphone_record_channels", MICROPHONE_CHANNELS
117            )
118            device_index = self._config.options.get("microphone_record_device_index")
119
120            self._logger.debug("rate: %s", rate)
121            self._logger.debug("channels: %s", channels)
122            if device_index is not None:
123                self._logger.debug("device_index: %s", device_index)
124
125            self._microphone_created = True
126
127            if device_index is not None:
128                self._microphone = Microphone(
129                    rate=rate,
130                    channels=channels,
131                    verbose=self._config.verbose,
132                    input_device_index=device_index,
133                )
134            else:
135                self._microphone = Microphone(
136                    rate=rate,
137                    channels=channels,
138                    verbose=self._config.verbose,
139                )
140
141        if self._config.options.get("speaker_playback") == "true":
142            self._logger.info("speaker_playback is enabled")
143            rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE)
144            channels = self._config.options.get(
145                "speaker_playback_channels", SPEAKER_CHANNELS
146            )
147            playback_delta_in_ms = self._config.options.get(
148                "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA
149            )
150            device_index = self._config.options.get("speaker_playback_device_index")
151
152            self._logger.debug("rate: %s", rate)
153            self._logger.debug("channels: %s", channels)
154
155            self._speaker_created = True
156
157            if device_index is not None:
158                self._logger.debug("device_index: %s", device_index)
159
160                self._speaker = Speaker(
161                    rate=rate,
162                    channels=channels,
163                    last_play_delta_in_ms=playback_delta_in_ms,
164                    verbose=self._config.verbose,
165                    output_device_index=device_index,
166                    microphone=self._microphone,
167                )
168            else:
169                self._speaker = Speaker(
170                    rate=rate,
171                    channels=channels,
172                    last_play_delta_in_ms=playback_delta_in_ms,
173                    verbose=self._config.verbose,
174                    microphone=self._microphone,
175                )
176        # call the parent constructor
177        super().__init__(self._config, self._endpoint)
178
179    # pylint: disable=too-many-branches,too-many-statements
180    async def start(
181        self,
182        options: Optional[SettingsConfigurationOptions] = None,
183        addons: Optional[Dict] = None,
184        headers: Optional[Dict] = None,
185        members: Optional[Dict] = None,
186        **kwargs,
187    ) -> bool:
188        """
189        Starts the WebSocket connection for agent API.
190        """
191        self._logger.debug("AsyncAgentWebSocketClient.start ENTER")
192        self._logger.info("settings: %s", options)
193        self._logger.info("addons: %s", addons)
194        self._logger.info("headers: %s", headers)
195        self._logger.info("members: %s", members)
196        self._logger.info("kwargs: %s", kwargs)
197
198        if isinstance(options, SettingsConfigurationOptions) and not options.check():
199            self._logger.error("settings.check failed")
200            self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
201            raise DeepgramError("Fatal agent settings error")
202
203        self._addons = addons
204        self._headers = headers
205
206        # add "members" as members of the class
207        if members is not None:
208            self.__dict__.update(members)
209
210        # set kwargs as members of the class
211        if kwargs is not None:
212            self._kwargs = kwargs
213        else:
214            self._kwargs = {}
215
216        if isinstance(options, SettingsConfigurationOptions):
217            self._logger.info("options is class")
218            self._settings = options
219        elif isinstance(options, dict):
220            self._logger.info("options is dict")
221            self._settings = SettingsConfigurationOptions.from_dict(options)
222        elif isinstance(options, str):
223            self._logger.info("options is json")
224            self._settings = SettingsConfigurationOptions.from_json(options)
225        else:
226            raise DeepgramError("Invalid options type")
227
228        if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"):
229            raise DeepgramError("Keyterms are only supported for nova-3 models")
230
231        try:
232            # speaker substitutes the listening thread
233            if self._speaker is not None:
234                self._logger.notice("passing speaker to delegate_listening")
235                super().delegate_listening(self._speaker)
236
237            # call parent start
238            if (
239                await super().start(
240                    {},
241                    self._addons,
242                    self._headers,
243                    **dict(cast(Dict[Any, Any], self._kwargs)),
244                )
245                is False
246            ):
247                self._logger.error("AsyncAgentWebSocketClient.start failed")
248                self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
249                return False
250
251            if self._speaker is not None:
252                self._logger.notice("speaker is delegate_listening. Starting speaker")
253                self._speaker.start()
254
255            if self._speaker is not None and self._microphone is not None:
256                self._logger.notice(
257                    "speaker is delegate_listening. Starting microphone"
258                )
259                self._microphone.set_callback(self.send)
260                self._microphone.start()
261
262            # debug the threads
263            for thread in threading.enumerate():
264                self._logger.debug("after running thread: %s", thread.name)
265            self._logger.debug("number of active threads: %s", threading.active_count())
266
267            # keepalive thread
268            if self._config.is_keep_alive_enabled():
269                self._logger.notice("keepalive is enabled")
270                self._keep_alive_thread = asyncio.create_task(self._keep_alive())
271            else:
272                self._logger.notice("keepalive is disabled")
273
274            # debug the threads
275            for thread in threading.enumerate():
276                self._logger.debug("after running thread: %s", thread.name)
277            self._logger.debug("number of active threads: %s", threading.active_count())
278
279            # send the configurationsetting message
280            self._logger.notice("Sending ConfigurationSettings...")
281            ret_send_cs = await self.send(str(self._settings))
282            if not ret_send_cs:
283                self._logger.error("ConfigurationSettings failed")
284
285                err_error: ErrorResponse = ErrorResponse(
286                    "Exception in AsyncAgentWebSocketClient.start",
287                    "ConfigurationSettings failed to send",
288                    "Exception",
289                )
290                await self._emit(
291                    AgentWebSocketEvents(AgentWebSocketEvents.Error),
292                    error=err_error,
293                    **dict(cast(Dict[Any, Any], self._kwargs)),
294                )
295
296                self._logger.debug("AgentWebSocketClient.start LEAVE")
297                return False
298
299            self._logger.notice("start succeeded")
300            self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
301            return True
302
303        except Exception as e:  # pylint: disable=broad-except
304            self._logger.error(
305                "WebSocketException in AsyncAgentWebSocketClient.start: %s", e
306            )
307            self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
308            if self._config.options.get("termination_exception_connect") is True:
309                raise e
310            return False
311
312    # pylint: enable=too-many-branches,too-many-statements
313
314    def on(self, event: AgentWebSocketEvents, handler: Callable) -> None:
315        """
316        Registers event handlers for specific events.
317        """
318        self._logger.info("event subscribed: %s", event)
319        if event in AgentWebSocketEvents.__members__.values() and callable(handler):
320            self._event_handlers[event].append(handler)
321
322    async def _emit(self, event: AgentWebSocketEvents, *args, **kwargs) -> None:
323        """
324        Emits events to the registered event handlers.
325        """
326        self._logger.debug("AsyncAgentWebSocketClient._emit ENTER")
327        self._logger.debug("callback handlers for: %s", event)
328
329        # debug the threads
330        for thread in threading.enumerate():
331            self._logger.debug("after running thread: %s", thread.name)
332        self._logger.debug("number of active threads: %s", threading.active_count())
333
334        self._logger.debug("callback handlers for: %s", event)
335        tasks = []
336        for handler in self._event_handlers[event]:
337            task = asyncio.create_task(handler(self, *args, **kwargs))
338            tasks.append(task)
339
340        if tasks:
341            self._logger.debug("waiting for tasks to finish...")
342            await asyncio.gather(*tasks, return_exceptions=True)
343            tasks.clear()
344
345        # debug the threads
346        for thread in threading.enumerate():
347            self._logger.debug("after running thread: %s", thread.name)
348        self._logger.debug("number of active threads: %s", threading.active_count())
349
350        self._logger.debug("AsyncAgentWebSocketClient._emit LEAVE")
351
352    # pylint: disable=too-many-locals,too-many-statements
353    async def _process_text(self, message: str) -> None:
354        """
355        Processes messages received over the WebSocket connection.
356        """
357        self._logger.debug("AsyncAgentWebSocketClient._process_text ENTER")
358
359        try:
360            self._logger.debug("Text data received")
361            if len(message) == 0:
362                self._logger.debug("message is empty")
363                self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE")
364                return
365
366            data = json.loads(message)
367            response_type = data.get("type")
368            self._logger.debug("response_type: %s, data: %s", response_type, data)
369
370            match response_type:
371                case AgentWebSocketEvents.Open:
372                    open_result: OpenResponse = OpenResponse.from_json(message)
373                    self._logger.verbose("OpenResponse: %s", open_result)
374                    await self._emit(
375                        AgentWebSocketEvents(AgentWebSocketEvents.Open),
376                        open=open_result,
377                        **dict(cast(Dict[Any, Any], self._kwargs)),
378                    )
379                case AgentWebSocketEvents.Welcome:
380                    welcome_result: WelcomeResponse = WelcomeResponse.from_json(message)
381                    self._logger.verbose("WelcomeResponse: %s", welcome_result)
382                    await self._emit(
383                        AgentWebSocketEvents(AgentWebSocketEvents.Welcome),
384                        welcome=welcome_result,
385                        **dict(cast(Dict[Any, Any], self._kwargs)),
386                    )
387                case AgentWebSocketEvents.SettingsApplied:
388                    settings_applied_result: SettingsAppliedResponse = (
389                        SettingsAppliedResponse.from_json(message)
390                    )
391                    self._logger.verbose(
392                        "SettingsAppliedResponse: %s", settings_applied_result
393                    )
394                    await self._emit(
395                        AgentWebSocketEvents(AgentWebSocketEvents.SettingsApplied),
396                        settings_applied=settings_applied_result,
397                        **dict(cast(Dict[Any, Any], self._kwargs)),
398                    )
399                case AgentWebSocketEvents.ConversationText:
400                    conversation_text_result: ConversationTextResponse = (
401                        ConversationTextResponse.from_json(message)
402                    )
403                    self._logger.verbose(
404                        "ConversationTextResponse: %s", conversation_text_result
405                    )
406                    await self._emit(
407                        AgentWebSocketEvents(AgentWebSocketEvents.ConversationText),
408                        conversation_text=conversation_text_result,
409                        **dict(cast(Dict[Any, Any], self._kwargs)),
410                    )
411                case AgentWebSocketEvents.UserStartedSpeaking:
412                    user_started_speaking_result: UserStartedSpeakingResponse = (
413                        UserStartedSpeakingResponse.from_json(message)
414                    )
415                    self._logger.verbose(
416                        "UserStartedSpeakingResponse: %s", user_started_speaking_result
417                    )
418                    await self._emit(
419                        AgentWebSocketEvents(AgentWebSocketEvents.UserStartedSpeaking),
420                        user_started_speaking=user_started_speaking_result,
421                        **dict(cast(Dict[Any, Any], self._kwargs)),
422                    )
423                case AgentWebSocketEvents.AgentThinking:
424                    agent_thinking_result: AgentThinkingResponse = (
425                        AgentThinkingResponse.from_json(message)
426                    )
427                    self._logger.verbose(
428                        "AgentThinkingResponse: %s", agent_thinking_result
429                    )
430                    await self._emit(
431                        AgentWebSocketEvents(AgentWebSocketEvents.AgentThinking),
432                        agent_thinking=agent_thinking_result,
433                        **dict(cast(Dict[Any, Any], self._kwargs)),
434                    )
435                case AgentWebSocketEvents.FunctionCalling:
436                    function_calling_result: FunctionCalling = (
437                        FunctionCalling.from_json(message)
438                    )
439                    self._logger.verbose("FunctionCalling: %s", function_calling_result)
440                    await self._emit(
441                        AgentWebSocketEvents(AgentWebSocketEvents.FunctionCalling),
442                        function_calling=function_calling_result,
443                        **dict(cast(Dict[Any, Any], self._kwargs)),
444                    )
445                case AgentWebSocketEvents.FunctionCallRequest:
446                    function_call_request_result: FunctionCallRequest = (
447                        FunctionCallRequest.from_json(message)
448                    )
449                    self._logger.verbose(
450                        "FunctionCallRequest: %s", function_call_request_result
451                    )
452                    await self._emit(
453                        AgentWebSocketEvents(AgentWebSocketEvents.FunctionCallRequest),
454                        function_call_request=function_call_request_result,
455                        **dict(cast(Dict[Any, Any], self._kwargs)),
456                    )
457                case AgentWebSocketEvents.AgentStartedSpeaking:
458                    agent_started_speaking_result: AgentStartedSpeakingResponse = (
459                        AgentStartedSpeakingResponse.from_json(message)
460                    )
461                    self._logger.verbose(
462                        "AgentStartedSpeakingResponse: %s",
463                        agent_started_speaking_result,
464                    )
465                    await self._emit(
466                        AgentWebSocketEvents(AgentWebSocketEvents.AgentStartedSpeaking),
467                        agent_started_speaking=agent_started_speaking_result,
468                        **dict(cast(Dict[Any, Any], self._kwargs)),
469                    )
470                case AgentWebSocketEvents.AgentAudioDone:
471                    agent_audio_done_result: AgentAudioDoneResponse = (
472                        AgentAudioDoneResponse.from_json(message)
473                    )
474                    self._logger.verbose(
475                        "AgentAudioDoneResponse: %s", agent_audio_done_result
476                    )
477                    await self._emit(
478                        AgentWebSocketEvents(AgentWebSocketEvents.AgentAudioDone),
479                        agent_audio_done=agent_audio_done_result,
480                        **dict(cast(Dict[Any, Any], self._kwargs)),
481                    )
482                case AgentWebSocketEvents.InjectionRefused:
483                    injection_refused_result: InjectionRefusedResponse = (
484                        InjectionRefusedResponse.from_json(message)
485                    )
486                    self._logger.verbose(
487                        "InjectionRefused: %s", injection_refused_result
488                    )
489                    await self._emit(
490                        AgentWebSocketEvents(AgentWebSocketEvents.InjectionRefused),
491                        injection_refused=injection_refused_result,
492                        **dict(cast(Dict[Any, Any], self._kwargs)),
493                    )
494                case AgentWebSocketEvents.Close:
495                    close_result: CloseResponse = CloseResponse.from_json(message)
496                    self._logger.verbose("CloseResponse: %s", close_result)
497                    await self._emit(
498                        AgentWebSocketEvents(AgentWebSocketEvents.Close),
499                        close=close_result,
500                        **dict(cast(Dict[Any, Any], self._kwargs)),
501                    )
502                case AgentWebSocketEvents.Error:
503                    err_error: ErrorResponse = ErrorResponse.from_json(message)
504                    self._logger.verbose("ErrorResponse: %s", err_error)
505                    await self._emit(
506                        AgentWebSocketEvents(AgentWebSocketEvents.Error),
507                        error=err_error,
508                        **dict(cast(Dict[Any, Any], self._kwargs)),
509                    )
510                case _:
511                    self._logger.warning(
512                        "Unknown Message: response_type: %s, data: %s",
513                        response_type,
514                        data,
515                    )
516                    unhandled_error: UnhandledResponse = UnhandledResponse(
517                        type=AgentWebSocketEvents(AgentWebSocketEvents.Unhandled),
518                        raw=message,
519                    )
520                    await self._emit(
521                        AgentWebSocketEvents(AgentWebSocketEvents.Unhandled),
522                        unhandled=unhandled_error,
523                        **dict(cast(Dict[Any, Any], self._kwargs)),
524                    )
525
526            self._logger.notice("_process_text Succeeded")
527            self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE")
528
529        except Exception as e:  # pylint: disable=broad-except
530            self._logger.error(
531                "Exception in AsyncAgentWebSocketClient._process_text: %s", e
532            )
533            e_error: ErrorResponse = ErrorResponse(
534                "Exception in AsyncAgentWebSocketClient._process_text",
535                f"{e}",
536                "Exception",
537            )
538            await self._emit(
539                AgentWebSocketEvents(AgentWebSocketEvents.Error),
540                error=e_error,
541                **dict(cast(Dict[Any, Any], self._kwargs)),
542            )
543
544            # signal exit and close
545            await super()._signal_exit()
546
547            self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE")
548
549            if self._config.options.get("termination_exception") is True:
550                raise
551            return
552
553    # pylint: enable=too-many-locals,too-many-statements
554
555    async def _process_binary(self, message: bytes) -> None:
556        self._logger.debug("AsyncAgentWebSocketClient._process_binary ENTER")
557        self._logger.debug("Binary data received")
558
559        await self._emit(
560            AgentWebSocketEvents(AgentWebSocketEvents.AudioData),
561            data=message,
562            **dict(cast(Dict[Any, Any], self._kwargs)),
563        )
564
565        self._logger.notice("_process_binary Succeeded")
566        self._logger.debug("AsyncAgentWebSocketClient._process_binary LEAVE")
567
568    # pylint: disable=too-many-return-statements
569    async def _keep_alive(self) -> None:
570        """
571        Sends keepalive messages to the WebSocket connection.
572        """
573        self._logger.debug("AsyncAgentWebSocketClient._keep_alive ENTER")
574
575        counter = 0
576        while True:
577            try:
578                counter += 1
579                await asyncio.sleep(ONE_SECOND)
580
581                if self._exit_event.is_set():
582                    self._logger.notice("_keep_alive exiting gracefully")
583                    self._logger.debug("AsyncAgentWebSocketClient._keep_alive LEAVE")
584                    return
585
586                # deepgram keepalive
587                if counter % DEEPGRAM_INTERVAL == 0:
588                    await self.keep_alive()
589
590            except Exception as e:  # pylint: disable=broad-except
591                self._logger.error(
592                    "Exception in AsyncAgentWebSocketClient._keep_alive: %s", e
593                )
594                e_error: ErrorResponse = ErrorResponse(
595                    "Exception in AsyncAgentWebSocketClient._keep_alive",
596                    f"{e}",
597                    "Exception",
598                )
599                self._logger.error(
600                    "Exception in AsyncAgentWebSocketClient._keep_alive: %s", str(e)
601                )
602                await self._emit(
603                    AgentWebSocketEvents(AgentWebSocketEvents.Error),
604                    error=e_error,
605                    **dict(cast(Dict[Any, Any], self._kwargs)),
606                )
607
608                # signal exit and close
609                await super()._signal_exit()
610
611                self._logger.debug("AsyncAgentWebSocketClient._keep_alive LEAVE")
612
613                if self._config.options.get("termination_exception") is True:
614                    raise
615                return
616
617    async def keep_alive(self) -> bool:
618        """
619        Sends a KeepAlive message
620        """
621        self._logger.spam("AsyncAgentWebSocketClient.keep_alive ENTER")
622
623        self._logger.notice("Sending KeepAlive...")
624        ret = await self.send(json.dumps({"type": "KeepAlive"}))
625
626        if not ret:
627            self._logger.error("keep_alive failed")
628            self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE")
629            return False
630
631        self._logger.notice("keep_alive succeeded")
632        self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE")
633
634        return True
635
636    async def _close_message(self) -> bool:
637        # TODO: No known API close message # pylint: disable=fixme
638        # return await self.send(json.dumps({"type": "Close"}))
639        return True
640
641    async def finish(self) -> bool:
642        """
643        Closes the WebSocket connection gracefully.
644        """
645        self._logger.debug("AsyncAgentWebSocketClient.finish ENTER")
646
647        # stop the threads
648        self._logger.verbose("cancelling tasks...")
649        try:
650            # call parent finish
651            if await super().finish() is False:
652                self._logger.error("AsyncAgentWebSocketClient.finish failed")
653
654            if self._microphone is not None and self._microphone_created:
655                self._microphone.finish()
656                self._microphone_created = False
657
658            if self._speaker is not None and self._speaker_created:
659                self._speaker.finish()
660                self._speaker_created = False
661
662            # Before cancelling, check if the tasks were created
663            # debug the threads
664            for thread in threading.enumerate():
665                self._logger.debug("before running thread: %s", thread.name)
666            self._logger.debug("number of active threads: %s", threading.active_count())
667
668            tasks = []
669            if self._keep_alive_thread is not None:
670                self._keep_alive_thread.cancel()
671                tasks.append(self._keep_alive_thread)
672                self._logger.notice("processing _keep_alive_thread cancel...")
673
674            # Use asyncio.gather to wait for tasks to be cancelled
675            # Prevent indefinite waiting by setting a timeout
676            await asyncio.wait_for(asyncio.gather(*tasks), timeout=10)
677            self._logger.notice("threads joined")
678
679            self._speaker = None
680            self._microphone = None
681
682            # debug the threads
683            for thread in threading.enumerate():
684                self._logger.debug("after running thread: %s", thread.name)
685            self._logger.debug("number of active threads: %s", threading.active_count())
686
687            self._logger.notice("finish succeeded")
688            self._logger.spam("AsyncAgentWebSocketClient.finish LEAVE")
689            return True
690
691        except asyncio.CancelledError as e:
692            self._logger.error("tasks cancelled error: %s", e)
693            self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE")
694            return False
695
696        except asyncio.TimeoutError as e:
697            self._logger.error("tasks cancellation timed out: %s", e)
698            self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE")
699            return False
ONE_SECOND = 1
HALF_SECOND = 0.5
DEEPGRAM_INTERVAL = 5
 60class AsyncAgentWebSocketClient(
 61    AbstractAsyncWebSocketClient
 62):  # pylint: disable=too-many-instance-attributes
 63    """
 64    Client for interacting with Deepgram's live transcription services over WebSockets.
 65
 66     This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
 67
 68     Args:
 69         config (DeepgramClientOptions): all the options for the client.
 70    """
 71
 72    _logger: verboselogs.VerboseLogger
 73    _config: DeepgramClientOptions
 74    _endpoint: str
 75
 76    _event_handlers: Dict[AgentWebSocketEvents, list]
 77
 78    _keep_alive_thread: Union[asyncio.Task, None]
 79
 80    _kwargs: Optional[Dict] = None
 81    _addons: Optional[Dict] = None
 82    # note the distinction here. We can't use _config because it's already used in the parent
 83    _settings: Optional[SettingsConfigurationOptions] = None
 84    _headers: Optional[Dict] = None
 85
 86    _speaker_created: bool = False
 87    _speaker: Optional[Speaker] = None
 88    _microphone_created: bool = False
 89    _microphone: Optional[Microphone] = None
 90
 91    def __init__(self, config: DeepgramClientOptions):
 92        if config is None:
 93            raise DeepgramError("Config is required")
 94
 95        self._logger = verboselogs.VerboseLogger(__name__)
 96        self._logger.addHandler(logging.StreamHandler())
 97        self._logger.setLevel(config.verbose)
 98
 99        self._config = config
100
101        # needs to be "wss://agent.deepgram.com/agent"
102        self._endpoint = "agent"
103
104        # override the endpoint since it needs to be "wss://agent.deepgram.com/agent"
105        self._config.url = "agent.deepgram.com"
106        self._keep_alive_thread = None
107
108        # init handlers
109        self._event_handlers = {
110            event: [] for event in AgentWebSocketEvents.__members__.values()
111        }
112
113        if self._config.options.get("microphone_record") == "true":
114            self._logger.info("microphone_record is enabled")
115            rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE)
116            channels = self._config.options.get(
117                "microphone_record_channels", MICROPHONE_CHANNELS
118            )
119            device_index = self._config.options.get("microphone_record_device_index")
120
121            self._logger.debug("rate: %s", rate)
122            self._logger.debug("channels: %s", channels)
123            if device_index is not None:
124                self._logger.debug("device_index: %s", device_index)
125
126            self._microphone_created = True
127
128            if device_index is not None:
129                self._microphone = Microphone(
130                    rate=rate,
131                    channels=channels,
132                    verbose=self._config.verbose,
133                    input_device_index=device_index,
134                )
135            else:
136                self._microphone = Microphone(
137                    rate=rate,
138                    channels=channels,
139                    verbose=self._config.verbose,
140                )
141
142        if self._config.options.get("speaker_playback") == "true":
143            self._logger.info("speaker_playback is enabled")
144            rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE)
145            channels = self._config.options.get(
146                "speaker_playback_channels", SPEAKER_CHANNELS
147            )
148            playback_delta_in_ms = self._config.options.get(
149                "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA
150            )
151            device_index = self._config.options.get("speaker_playback_device_index")
152
153            self._logger.debug("rate: %s", rate)
154            self._logger.debug("channels: %s", channels)
155
156            self._speaker_created = True
157
158            if device_index is not None:
159                self._logger.debug("device_index: %s", device_index)
160
161                self._speaker = Speaker(
162                    rate=rate,
163                    channels=channels,
164                    last_play_delta_in_ms=playback_delta_in_ms,
165                    verbose=self._config.verbose,
166                    output_device_index=device_index,
167                    microphone=self._microphone,
168                )
169            else:
170                self._speaker = Speaker(
171                    rate=rate,
172                    channels=channels,
173                    last_play_delta_in_ms=playback_delta_in_ms,
174                    verbose=self._config.verbose,
175                    microphone=self._microphone,
176                )
177        # call the parent constructor
178        super().__init__(self._config, self._endpoint)
179
180    # pylint: disable=too-many-branches,too-many-statements
181    async def start(
182        self,
183        options: Optional[SettingsConfigurationOptions] = None,
184        addons: Optional[Dict] = None,
185        headers: Optional[Dict] = None,
186        members: Optional[Dict] = None,
187        **kwargs,
188    ) -> bool:
189        """
190        Starts the WebSocket connection for agent API.
191        """
192        self._logger.debug("AsyncAgentWebSocketClient.start ENTER")
193        self._logger.info("settings: %s", options)
194        self._logger.info("addons: %s", addons)
195        self._logger.info("headers: %s", headers)
196        self._logger.info("members: %s", members)
197        self._logger.info("kwargs: %s", kwargs)
198
199        if isinstance(options, SettingsConfigurationOptions) and not options.check():
200            self._logger.error("settings.check failed")
201            self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
202            raise DeepgramError("Fatal agent settings error")
203
204        self._addons = addons
205        self._headers = headers
206
207        # add "members" as members of the class
208        if members is not None:
209            self.__dict__.update(members)
210
211        # set kwargs as members of the class
212        if kwargs is not None:
213            self._kwargs = kwargs
214        else:
215            self._kwargs = {}
216
217        if isinstance(options, SettingsConfigurationOptions):
218            self._logger.info("options is class")
219            self._settings = options
220        elif isinstance(options, dict):
221            self._logger.info("options is dict")
222            self._settings = SettingsConfigurationOptions.from_dict(options)
223        elif isinstance(options, str):
224            self._logger.info("options is json")
225            self._settings = SettingsConfigurationOptions.from_json(options)
226        else:
227            raise DeepgramError("Invalid options type")
228
229        if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"):
230            raise DeepgramError("Keyterms are only supported for nova-3 models")
231
232        try:
233            # speaker substitutes the listening thread
234            if self._speaker is not None:
235                self._logger.notice("passing speaker to delegate_listening")
236                super().delegate_listening(self._speaker)
237
238            # call parent start
239            if (
240                await super().start(
241                    {},
242                    self._addons,
243                    self._headers,
244                    **dict(cast(Dict[Any, Any], self._kwargs)),
245                )
246                is False
247            ):
248                self._logger.error("AsyncAgentWebSocketClient.start failed")
249                self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
250                return False
251
252            if self._speaker is not None:
253                self._logger.notice("speaker is delegate_listening. Starting speaker")
254                self._speaker.start()
255
256            if self._speaker is not None and self._microphone is not None:
257                self._logger.notice(
258                    "speaker is delegate_listening. Starting microphone"
259                )
260                self._microphone.set_callback(self.send)
261                self._microphone.start()
262
263            # debug the threads
264            for thread in threading.enumerate():
265                self._logger.debug("after running thread: %s", thread.name)
266            self._logger.debug("number of active threads: %s", threading.active_count())
267
268            # keepalive thread
269            if self._config.is_keep_alive_enabled():
270                self._logger.notice("keepalive is enabled")
271                self._keep_alive_thread = asyncio.create_task(self._keep_alive())
272            else:
273                self._logger.notice("keepalive is disabled")
274
275            # debug the threads
276            for thread in threading.enumerate():
277                self._logger.debug("after running thread: %s", thread.name)
278            self._logger.debug("number of active threads: %s", threading.active_count())
279
280            # send the configurationsetting message
281            self._logger.notice("Sending ConfigurationSettings...")
282            ret_send_cs = await self.send(str(self._settings))
283            if not ret_send_cs:
284                self._logger.error("ConfigurationSettings failed")
285
286                err_error: ErrorResponse = ErrorResponse(
287                    "Exception in AsyncAgentWebSocketClient.start",
288                    "ConfigurationSettings failed to send",
289                    "Exception",
290                )
291                await self._emit(
292                    AgentWebSocketEvents(AgentWebSocketEvents.Error),
293                    error=err_error,
294                    **dict(cast(Dict[Any, Any], self._kwargs)),
295                )
296
297                self._logger.debug("AgentWebSocketClient.start LEAVE")
298                return False
299
300            self._logger.notice("start succeeded")
301            self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
302            return True
303
304        except Exception as e:  # pylint: disable=broad-except
305            self._logger.error(
306                "WebSocketException in AsyncAgentWebSocketClient.start: %s", e
307            )
308            self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
309            if self._config.options.get("termination_exception_connect") is True:
310                raise e
311            return False
312
313    # pylint: enable=too-many-branches,too-many-statements
314
315    def on(self, event: AgentWebSocketEvents, handler: Callable) -> None:
316        """
317        Registers event handlers for specific events.
318        """
319        self._logger.info("event subscribed: %s", event)
320        if event in AgentWebSocketEvents.__members__.values() and callable(handler):
321            self._event_handlers[event].append(handler)
322
323    async def _emit(self, event: AgentWebSocketEvents, *args, **kwargs) -> None:
324        """
325        Emits events to the registered event handlers.
326        """
327        self._logger.debug("AsyncAgentWebSocketClient._emit ENTER")
328        self._logger.debug("callback handlers for: %s", event)
329
330        # debug the threads
331        for thread in threading.enumerate():
332            self._logger.debug("after running thread: %s", thread.name)
333        self._logger.debug("number of active threads: %s", threading.active_count())
334
335        self._logger.debug("callback handlers for: %s", event)
336        tasks = []
337        for handler in self._event_handlers[event]:
338            task = asyncio.create_task(handler(self, *args, **kwargs))
339            tasks.append(task)
340
341        if tasks:
342            self._logger.debug("waiting for tasks to finish...")
343            await asyncio.gather(*tasks, return_exceptions=True)
344            tasks.clear()
345
346        # debug the threads
347        for thread in threading.enumerate():
348            self._logger.debug("after running thread: %s", thread.name)
349        self._logger.debug("number of active threads: %s", threading.active_count())
350
351        self._logger.debug("AsyncAgentWebSocketClient._emit LEAVE")
352
353    # pylint: disable=too-many-locals,too-many-statements
354    async def _process_text(self, message: str) -> None:
355        """
356        Processes messages received over the WebSocket connection.
357        """
358        self._logger.debug("AsyncAgentWebSocketClient._process_text ENTER")
359
360        try:
361            self._logger.debug("Text data received")
362            if len(message) == 0:
363                self._logger.debug("message is empty")
364                self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE")
365                return
366
367            data = json.loads(message)
368            response_type = data.get("type")
369            self._logger.debug("response_type: %s, data: %s", response_type, data)
370
371            match response_type:
372                case AgentWebSocketEvents.Open:
373                    open_result: OpenResponse = OpenResponse.from_json(message)
374                    self._logger.verbose("OpenResponse: %s", open_result)
375                    await self._emit(
376                        AgentWebSocketEvents(AgentWebSocketEvents.Open),
377                        open=open_result,
378                        **dict(cast(Dict[Any, Any], self._kwargs)),
379                    )
380                case AgentWebSocketEvents.Welcome:
381                    welcome_result: WelcomeResponse = WelcomeResponse.from_json(message)
382                    self._logger.verbose("WelcomeResponse: %s", welcome_result)
383                    await self._emit(
384                        AgentWebSocketEvents(AgentWebSocketEvents.Welcome),
385                        welcome=welcome_result,
386                        **dict(cast(Dict[Any, Any], self._kwargs)),
387                    )
388                case AgentWebSocketEvents.SettingsApplied:
389                    settings_applied_result: SettingsAppliedResponse = (
390                        SettingsAppliedResponse.from_json(message)
391                    )
392                    self._logger.verbose(
393                        "SettingsAppliedResponse: %s", settings_applied_result
394                    )
395                    await self._emit(
396                        AgentWebSocketEvents(AgentWebSocketEvents.SettingsApplied),
397                        settings_applied=settings_applied_result,
398                        **dict(cast(Dict[Any, Any], self._kwargs)),
399                    )
400                case AgentWebSocketEvents.ConversationText:
401                    conversation_text_result: ConversationTextResponse = (
402                        ConversationTextResponse.from_json(message)
403                    )
404                    self._logger.verbose(
405                        "ConversationTextResponse: %s", conversation_text_result
406                    )
407                    await self._emit(
408                        AgentWebSocketEvents(AgentWebSocketEvents.ConversationText),
409                        conversation_text=conversation_text_result,
410                        **dict(cast(Dict[Any, Any], self._kwargs)),
411                    )
412                case AgentWebSocketEvents.UserStartedSpeaking:
413                    user_started_speaking_result: UserStartedSpeakingResponse = (
414                        UserStartedSpeakingResponse.from_json(message)
415                    )
416                    self._logger.verbose(
417                        "UserStartedSpeakingResponse: %s", user_started_speaking_result
418                    )
419                    await self._emit(
420                        AgentWebSocketEvents(AgentWebSocketEvents.UserStartedSpeaking),
421                        user_started_speaking=user_started_speaking_result,
422                        **dict(cast(Dict[Any, Any], self._kwargs)),
423                    )
424                case AgentWebSocketEvents.AgentThinking:
425                    agent_thinking_result: AgentThinkingResponse = (
426                        AgentThinkingResponse.from_json(message)
427                    )
428                    self._logger.verbose(
429                        "AgentThinkingResponse: %s", agent_thinking_result
430                    )
431                    await self._emit(
432                        AgentWebSocketEvents(AgentWebSocketEvents.AgentThinking),
433                        agent_thinking=agent_thinking_result,
434                        **dict(cast(Dict[Any, Any], self._kwargs)),
435                    )
436                case AgentWebSocketEvents.FunctionCalling:
437                    function_calling_result: FunctionCalling = (
438                        FunctionCalling.from_json(message)
439                    )
440                    self._logger.verbose("FunctionCalling: %s", function_calling_result)
441                    await self._emit(
442                        AgentWebSocketEvents(AgentWebSocketEvents.FunctionCalling),
443                        function_calling=function_calling_result,
444                        **dict(cast(Dict[Any, Any], self._kwargs)),
445                    )
446                case AgentWebSocketEvents.FunctionCallRequest:
447                    function_call_request_result: FunctionCallRequest = (
448                        FunctionCallRequest.from_json(message)
449                    )
450                    self._logger.verbose(
451                        "FunctionCallRequest: %s", function_call_request_result
452                    )
453                    await self._emit(
454                        AgentWebSocketEvents(AgentWebSocketEvents.FunctionCallRequest),
455                        function_call_request=function_call_request_result,
456                        **dict(cast(Dict[Any, Any], self._kwargs)),
457                    )
458                case AgentWebSocketEvents.AgentStartedSpeaking:
459                    agent_started_speaking_result: AgentStartedSpeakingResponse = (
460                        AgentStartedSpeakingResponse.from_json(message)
461                    )
462                    self._logger.verbose(
463                        "AgentStartedSpeakingResponse: %s",
464                        agent_started_speaking_result,
465                    )
466                    await self._emit(
467                        AgentWebSocketEvents(AgentWebSocketEvents.AgentStartedSpeaking),
468                        agent_started_speaking=agent_started_speaking_result,
469                        **dict(cast(Dict[Any, Any], self._kwargs)),
470                    )
471                case AgentWebSocketEvents.AgentAudioDone:
472                    agent_audio_done_result: AgentAudioDoneResponse = (
473                        AgentAudioDoneResponse.from_json(message)
474                    )
475                    self._logger.verbose(
476                        "AgentAudioDoneResponse: %s", agent_audio_done_result
477                    )
478                    await self._emit(
479                        AgentWebSocketEvents(AgentWebSocketEvents.AgentAudioDone),
480                        agent_audio_done=agent_audio_done_result,
481                        **dict(cast(Dict[Any, Any], self._kwargs)),
482                    )
483                case AgentWebSocketEvents.InjectionRefused:
484                    injection_refused_result: InjectionRefusedResponse = (
485                        InjectionRefusedResponse.from_json(message)
486                    )
487                    self._logger.verbose(
488                        "InjectionRefused: %s", injection_refused_result
489                    )
490                    await self._emit(
491                        AgentWebSocketEvents(AgentWebSocketEvents.InjectionRefused),
492                        injection_refused=injection_refused_result,
493                        **dict(cast(Dict[Any, Any], self._kwargs)),
494                    )
495                case AgentWebSocketEvents.Close:
496                    close_result: CloseResponse = CloseResponse.from_json(message)
497                    self._logger.verbose("CloseResponse: %s", close_result)
498                    await self._emit(
499                        AgentWebSocketEvents(AgentWebSocketEvents.Close),
500                        close=close_result,
501                        **dict(cast(Dict[Any, Any], self._kwargs)),
502                    )
503                case AgentWebSocketEvents.Error:
504                    err_error: ErrorResponse = ErrorResponse.from_json(message)
505                    self._logger.verbose("ErrorResponse: %s", err_error)
506                    await self._emit(
507                        AgentWebSocketEvents(AgentWebSocketEvents.Error),
508                        error=err_error,
509                        **dict(cast(Dict[Any, Any], self._kwargs)),
510                    )
511                case _:
512                    self._logger.warning(
513                        "Unknown Message: response_type: %s, data: %s",
514                        response_type,
515                        data,
516                    )
517                    unhandled_error: UnhandledResponse = UnhandledResponse(
518                        type=AgentWebSocketEvents(AgentWebSocketEvents.Unhandled),
519                        raw=message,
520                    )
521                    await self._emit(
522                        AgentWebSocketEvents(AgentWebSocketEvents.Unhandled),
523                        unhandled=unhandled_error,
524                        **dict(cast(Dict[Any, Any], self._kwargs)),
525                    )
526
527            self._logger.notice("_process_text Succeeded")
528            self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE")
529
530        except Exception as e:  # pylint: disable=broad-except
531            self._logger.error(
532                "Exception in AsyncAgentWebSocketClient._process_text: %s", e
533            )
534            e_error: ErrorResponse = ErrorResponse(
535                "Exception in AsyncAgentWebSocketClient._process_text",
536                f"{e}",
537                "Exception",
538            )
539            await self._emit(
540                AgentWebSocketEvents(AgentWebSocketEvents.Error),
541                error=e_error,
542                **dict(cast(Dict[Any, Any], self._kwargs)),
543            )
544
545            # signal exit and close
546            await super()._signal_exit()
547
548            self._logger.debug("AsyncAgentWebSocketClient._process_text LEAVE")
549
550            if self._config.options.get("termination_exception") is True:
551                raise
552            return
553
554    # pylint: enable=too-many-locals,too-many-statements
555
556    async def _process_binary(self, message: bytes) -> None:
557        self._logger.debug("AsyncAgentWebSocketClient._process_binary ENTER")
558        self._logger.debug("Binary data received")
559
560        await self._emit(
561            AgentWebSocketEvents(AgentWebSocketEvents.AudioData),
562            data=message,
563            **dict(cast(Dict[Any, Any], self._kwargs)),
564        )
565
566        self._logger.notice("_process_binary Succeeded")
567        self._logger.debug("AsyncAgentWebSocketClient._process_binary LEAVE")
568
569    # pylint: disable=too-many-return-statements
570    async def _keep_alive(self) -> None:
571        """
572        Sends keepalive messages to the WebSocket connection.
573        """
574        self._logger.debug("AsyncAgentWebSocketClient._keep_alive ENTER")
575
576        counter = 0
577        while True:
578            try:
579                counter += 1
580                await asyncio.sleep(ONE_SECOND)
581
582                if self._exit_event.is_set():
583                    self._logger.notice("_keep_alive exiting gracefully")
584                    self._logger.debug("AsyncAgentWebSocketClient._keep_alive LEAVE")
585                    return
586
587                # deepgram keepalive
588                if counter % DEEPGRAM_INTERVAL == 0:
589                    await self.keep_alive()
590
591            except Exception as e:  # pylint: disable=broad-except
592                self._logger.error(
593                    "Exception in AsyncAgentWebSocketClient._keep_alive: %s", e
594                )
595                e_error: ErrorResponse = ErrorResponse(
596                    "Exception in AsyncAgentWebSocketClient._keep_alive",
597                    f"{e}",
598                    "Exception",
599                )
600                self._logger.error(
601                    "Exception in AsyncAgentWebSocketClient._keep_alive: %s", str(e)
602                )
603                await self._emit(
604                    AgentWebSocketEvents(AgentWebSocketEvents.Error),
605                    error=e_error,
606                    **dict(cast(Dict[Any, Any], self._kwargs)),
607                )
608
609                # signal exit and close
610                await super()._signal_exit()
611
612                self._logger.debug("AsyncAgentWebSocketClient._keep_alive LEAVE")
613
614                if self._config.options.get("termination_exception") is True:
615                    raise
616                return
617
618    async def keep_alive(self) -> bool:
619        """
620        Sends a KeepAlive message
621        """
622        self._logger.spam("AsyncAgentWebSocketClient.keep_alive ENTER")
623
624        self._logger.notice("Sending KeepAlive...")
625        ret = await self.send(json.dumps({"type": "KeepAlive"}))
626
627        if not ret:
628            self._logger.error("keep_alive failed")
629            self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE")
630            return False
631
632        self._logger.notice("keep_alive succeeded")
633        self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE")
634
635        return True
636
637    async def _close_message(self) -> bool:
638        # TODO: No known API close message # pylint: disable=fixme
639        # return await self.send(json.dumps({"type": "Close"}))
640        return True
641
642    async def finish(self) -> bool:
643        """
644        Closes the WebSocket connection gracefully.
645        """
646        self._logger.debug("AsyncAgentWebSocketClient.finish ENTER")
647
648        # stop the threads
649        self._logger.verbose("cancelling tasks...")
650        try:
651            # call parent finish
652            if await super().finish() is False:
653                self._logger.error("AsyncAgentWebSocketClient.finish failed")
654
655            if self._microphone is not None and self._microphone_created:
656                self._microphone.finish()
657                self._microphone_created = False
658
659            if self._speaker is not None and self._speaker_created:
660                self._speaker.finish()
661                self._speaker_created = False
662
663            # Before cancelling, check if the tasks were created
664            # debug the threads
665            for thread in threading.enumerate():
666                self._logger.debug("before running thread: %s", thread.name)
667            self._logger.debug("number of active threads: %s", threading.active_count())
668
669            tasks = []
670            if self._keep_alive_thread is not None:
671                self._keep_alive_thread.cancel()
672                tasks.append(self._keep_alive_thread)
673                self._logger.notice("processing _keep_alive_thread cancel...")
674
675            # Use asyncio.gather to wait for tasks to be cancelled
676            # Prevent indefinite waiting by setting a timeout
677            await asyncio.wait_for(asyncio.gather(*tasks), timeout=10)
678            self._logger.notice("threads joined")
679
680            self._speaker = None
681            self._microphone = None
682
683            # debug the threads
684            for thread in threading.enumerate():
685                self._logger.debug("after running thread: %s", thread.name)
686            self._logger.debug("number of active threads: %s", threading.active_count())
687
688            self._logger.notice("finish succeeded")
689            self._logger.spam("AsyncAgentWebSocketClient.finish LEAVE")
690            return True
691
692        except asyncio.CancelledError as e:
693            self._logger.error("tasks cancelled error: %s", e)
694            self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE")
695            return False
696
697        except asyncio.TimeoutError as e:
698            self._logger.error("tasks cancellation timed out: %s", e)
699            self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE")
700            return False

Client for interacting with Deepgram's live transcription services over WebSockets.

This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.

Args: config (DeepgramClientOptions): all the options for the client.

AsyncAgentWebSocketClient(config: deepgram.options.DeepgramClientOptions)
 91    def __init__(self, config: DeepgramClientOptions):
 92        if config is None:
 93            raise DeepgramError("Config is required")
 94
 95        self._logger = verboselogs.VerboseLogger(__name__)
 96        self._logger.addHandler(logging.StreamHandler())
 97        self._logger.setLevel(config.verbose)
 98
 99        self._config = config
100
101        # needs to be "wss://agent.deepgram.com/agent"
102        self._endpoint = "agent"
103
104        # override the endpoint since it needs to be "wss://agent.deepgram.com/agent"
105        self._config.url = "agent.deepgram.com"
106        self._keep_alive_thread = None
107
108        # init handlers
109        self._event_handlers = {
110            event: [] for event in AgentWebSocketEvents.__members__.values()
111        }
112
113        if self._config.options.get("microphone_record") == "true":
114            self._logger.info("microphone_record is enabled")
115            rate = self._config.options.get("microphone_record_rate", MICROPHONE_RATE)
116            channels = self._config.options.get(
117                "microphone_record_channels", MICROPHONE_CHANNELS
118            )
119            device_index = self._config.options.get("microphone_record_device_index")
120
121            self._logger.debug("rate: %s", rate)
122            self._logger.debug("channels: %s", channels)
123            if device_index is not None:
124                self._logger.debug("device_index: %s", device_index)
125
126            self._microphone_created = True
127
128            if device_index is not None:
129                self._microphone = Microphone(
130                    rate=rate,
131                    channels=channels,
132                    verbose=self._config.verbose,
133                    input_device_index=device_index,
134                )
135            else:
136                self._microphone = Microphone(
137                    rate=rate,
138                    channels=channels,
139                    verbose=self._config.verbose,
140                )
141
142        if self._config.options.get("speaker_playback") == "true":
143            self._logger.info("speaker_playback is enabled")
144            rate = self._config.options.get("speaker_playback_rate", SPEAKER_RATE)
145            channels = self._config.options.get(
146                "speaker_playback_channels", SPEAKER_CHANNELS
147            )
148            playback_delta_in_ms = self._config.options.get(
149                "speaker_playback_delta_in_ms", SPEAKER_PLAYBACK_DELTA
150            )
151            device_index = self._config.options.get("speaker_playback_device_index")
152
153            self._logger.debug("rate: %s", rate)
154            self._logger.debug("channels: %s", channels)
155
156            self._speaker_created = True
157
158            if device_index is not None:
159                self._logger.debug("device_index: %s", device_index)
160
161                self._speaker = Speaker(
162                    rate=rate,
163                    channels=channels,
164                    last_play_delta_in_ms=playback_delta_in_ms,
165                    verbose=self._config.verbose,
166                    output_device_index=device_index,
167                    microphone=self._microphone,
168                )
169            else:
170                self._speaker = Speaker(
171                    rate=rate,
172                    channels=channels,
173                    last_play_delta_in_ms=playback_delta_in_ms,
174                    verbose=self._config.verbose,
175                    microphone=self._microphone,
176                )
177        # call the parent constructor
178        super().__init__(self._config, self._endpoint)
async def start( self, options: Optional[deepgram.clients.agent.v1.websocket.options.SettingsConfigurationOptions] = None, addons: Optional[Dict] = None, headers: Optional[Dict] = None, members: Optional[Dict] = None, **kwargs) -> bool:
181    async def start(
182        self,
183        options: Optional[SettingsConfigurationOptions] = None,
184        addons: Optional[Dict] = None,
185        headers: Optional[Dict] = None,
186        members: Optional[Dict] = None,
187        **kwargs,
188    ) -> bool:
189        """
190        Starts the WebSocket connection for agent API.
191        """
192        self._logger.debug("AsyncAgentWebSocketClient.start ENTER")
193        self._logger.info("settings: %s", options)
194        self._logger.info("addons: %s", addons)
195        self._logger.info("headers: %s", headers)
196        self._logger.info("members: %s", members)
197        self._logger.info("kwargs: %s", kwargs)
198
199        if isinstance(options, SettingsConfigurationOptions) and not options.check():
200            self._logger.error("settings.check failed")
201            self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
202            raise DeepgramError("Fatal agent settings error")
203
204        self._addons = addons
205        self._headers = headers
206
207        # add "members" as members of the class
208        if members is not None:
209            self.__dict__.update(members)
210
211        # set kwargs as members of the class
212        if kwargs is not None:
213            self._kwargs = kwargs
214        else:
215            self._kwargs = {}
216
217        if isinstance(options, SettingsConfigurationOptions):
218            self._logger.info("options is class")
219            self._settings = options
220        elif isinstance(options, dict):
221            self._logger.info("options is dict")
222            self._settings = SettingsConfigurationOptions.from_dict(options)
223        elif isinstance(options, str):
224            self._logger.info("options is json")
225            self._settings = SettingsConfigurationOptions.from_json(options)
226        else:
227            raise DeepgramError("Invalid options type")
228
229        if self._settings.agent.listen.keyterms is not None and self._settings.agent.listen.model is not None and not self._settings.agent.listen.model.startswith("nova-3"):
230            raise DeepgramError("Keyterms are only supported for nova-3 models")
231
232        try:
233            # speaker substitutes the listening thread
234            if self._speaker is not None:
235                self._logger.notice("passing speaker to delegate_listening")
236                super().delegate_listening(self._speaker)
237
238            # call parent start
239            if (
240                await super().start(
241                    {},
242                    self._addons,
243                    self._headers,
244                    **dict(cast(Dict[Any, Any], self._kwargs)),
245                )
246                is False
247            ):
248                self._logger.error("AsyncAgentWebSocketClient.start failed")
249                self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
250                return False
251
252            if self._speaker is not None:
253                self._logger.notice("speaker is delegate_listening. Starting speaker")
254                self._speaker.start()
255
256            if self._speaker is not None and self._microphone is not None:
257                self._logger.notice(
258                    "speaker is delegate_listening. Starting microphone"
259                )
260                self._microphone.set_callback(self.send)
261                self._microphone.start()
262
263            # debug the threads
264            for thread in threading.enumerate():
265                self._logger.debug("after running thread: %s", thread.name)
266            self._logger.debug("number of active threads: %s", threading.active_count())
267
268            # keepalive thread
269            if self._config.is_keep_alive_enabled():
270                self._logger.notice("keepalive is enabled")
271                self._keep_alive_thread = asyncio.create_task(self._keep_alive())
272            else:
273                self._logger.notice("keepalive is disabled")
274
275            # debug the threads
276            for thread in threading.enumerate():
277                self._logger.debug("after running thread: %s", thread.name)
278            self._logger.debug("number of active threads: %s", threading.active_count())
279
280            # send the configurationsetting message
281            self._logger.notice("Sending ConfigurationSettings...")
282            ret_send_cs = await self.send(str(self._settings))
283            if not ret_send_cs:
284                self._logger.error("ConfigurationSettings failed")
285
286                err_error: ErrorResponse = ErrorResponse(
287                    "Exception in AsyncAgentWebSocketClient.start",
288                    "ConfigurationSettings failed to send",
289                    "Exception",
290                )
291                await self._emit(
292                    AgentWebSocketEvents(AgentWebSocketEvents.Error),
293                    error=err_error,
294                    **dict(cast(Dict[Any, Any], self._kwargs)),
295                )
296
297                self._logger.debug("AgentWebSocketClient.start LEAVE")
298                return False
299
300            self._logger.notice("start succeeded")
301            self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
302            return True
303
304        except Exception as e:  # pylint: disable=broad-except
305            self._logger.error(
306                "WebSocketException in AsyncAgentWebSocketClient.start: %s", e
307            )
308            self._logger.debug("AsyncAgentWebSocketClient.start LEAVE")
309            if self._config.options.get("termination_exception_connect") is True:
310                raise e
311            return False

Starts the WebSocket connection for agent API.

def on( self, event: deepgram.clients.agent.enums.AgentWebSocketEvents, handler: Callable) -> None:
315    def on(self, event: AgentWebSocketEvents, handler: Callable) -> None:
316        """
317        Registers event handlers for specific events.
318        """
319        self._logger.info("event subscribed: %s", event)
320        if event in AgentWebSocketEvents.__members__.values() and callable(handler):
321            self._event_handlers[event].append(handler)

Registers event handlers for specific events.

async def keep_alive(self) -> bool:
618    async def keep_alive(self) -> bool:
619        """
620        Sends a KeepAlive message
621        """
622        self._logger.spam("AsyncAgentWebSocketClient.keep_alive ENTER")
623
624        self._logger.notice("Sending KeepAlive...")
625        ret = await self.send(json.dumps({"type": "KeepAlive"}))
626
627        if not ret:
628            self._logger.error("keep_alive failed")
629            self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE")
630            return False
631
632        self._logger.notice("keep_alive succeeded")
633        self._logger.spam("AsyncAgentWebSocketClient.keep_alive LEAVE")
634
635        return True

Sends a KeepAlive message

async def finish(self) -> bool:
642    async def finish(self) -> bool:
643        """
644        Closes the WebSocket connection gracefully.
645        """
646        self._logger.debug("AsyncAgentWebSocketClient.finish ENTER")
647
648        # stop the threads
649        self._logger.verbose("cancelling tasks...")
650        try:
651            # call parent finish
652            if await super().finish() is False:
653                self._logger.error("AsyncAgentWebSocketClient.finish failed")
654
655            if self._microphone is not None and self._microphone_created:
656                self._microphone.finish()
657                self._microphone_created = False
658
659            if self._speaker is not None and self._speaker_created:
660                self._speaker.finish()
661                self._speaker_created = False
662
663            # Before cancelling, check if the tasks were created
664            # debug the threads
665            for thread in threading.enumerate():
666                self._logger.debug("before running thread: %s", thread.name)
667            self._logger.debug("number of active threads: %s", threading.active_count())
668
669            tasks = []
670            if self._keep_alive_thread is not None:
671                self._keep_alive_thread.cancel()
672                tasks.append(self._keep_alive_thread)
673                self._logger.notice("processing _keep_alive_thread cancel...")
674
675            # Use asyncio.gather to wait for tasks to be cancelled
676            # Prevent indefinite waiting by setting a timeout
677            await asyncio.wait_for(asyncio.gather(*tasks), timeout=10)
678            self._logger.notice("threads joined")
679
680            self._speaker = None
681            self._microphone = None
682
683            # debug the threads
684            for thread in threading.enumerate():
685                self._logger.debug("after running thread: %s", thread.name)
686            self._logger.debug("number of active threads: %s", threading.active_count())
687
688            self._logger.notice("finish succeeded")
689            self._logger.spam("AsyncAgentWebSocketClient.finish LEAVE")
690            return True
691
692        except asyncio.CancelledError as e:
693            self._logger.error("tasks cancelled error: %s", e)
694            self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE")
695            return False
696
697        except asyncio.TimeoutError as e:
698            self._logger.error("tasks cancellation timed out: %s", e)
699            self._logger.debug("AsyncAgentWebSocketClient.finish LEAVE")
700            return False

Closes the WebSocket connection gracefully.