deepgram.clients.speak.v1.websocket.async_client

  1# Copyright 2024 Deepgram SDK contributors. All Rights Reserved.
  2# Use of this source code is governed by a MIT license that can be found in the LICENSE file.
  3# SPDX-License-Identifier: MIT
  4
  5import asyncio
  6import json
  7import logging
  8from typing import Dict, Union, Optional, cast, Any, Callable
  9from datetime import datetime
 10import threading
 11
 12from .....utils import verboselogs
 13from .....options import DeepgramClientOptions
 14from ...enums import SpeakWebSocketEvents, SpeakWebSocketMessage
 15from ....common import AbstractAsyncWebSocketClient
 16from ....common import DeepgramError
 17
 18from .response import (
 19    OpenResponse,
 20    MetadataResponse,
 21    FlushedResponse,
 22    ClearedResponse,
 23    CloseResponse,
 24    WarningResponse,
 25    ErrorResponse,
 26    UnhandledResponse,
 27)
 28from .options import SpeakWSOptions
 29
 30from .....audio.microphone import Microphone
 31from .....audio.speaker import Speaker, RATE, CHANNELS, PLAYBACK_DELTA
 32
 33ONE_SECOND = 1
 34HALF_SECOND = 0.5
 35DEEPGRAM_INTERVAL = 5
 36PING_INTERVAL = 20
 37
 38
 39class AsyncSpeakWSClient(
 40    AbstractAsyncWebSocketClient
 41):  # pylint: disable=too-many-instance-attributes
 42    """
 43    Client for interacting with Deepgram's text-to-speech services over WebSockets.
 44
 45     This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events.
 46
 47     Args:
 48         config (DeepgramClientOptions): all the options for the client.
 49    """
 50
 51    _logger: verboselogs.VerboseLogger
 52    _config: DeepgramClientOptions
 53    _endpoint: str
 54
 55    _event_handlers: Dict[SpeakWebSocketEvents, list]
 56
 57    _flush_thread: Union[asyncio.Task, None]
 58    _last_datagram: Optional[datetime] = None
 59    _flush_count: int
 60
 61    _kwargs: Optional[Dict] = None
 62    _addons: Optional[Dict] = None
 63    _options: Optional[Dict] = None
 64    _headers: Optional[Dict] = None
 65
 66    _speaker_created: bool = False
 67    _speaker: Optional[Speaker] = None
 68    _microphone: Optional[Microphone] = None
 69
 70    def __init__(
 71        self, config: DeepgramClientOptions, microphone: Optional[Microphone] = None
 72    ):
 73        if config is None:
 74            raise DeepgramError("Config is required")
 75        self._logger = verboselogs.VerboseLogger(__name__)
 76        self._logger.addHandler(logging.StreamHandler())
 77        self._logger.setLevel(config.verbose)
 78
 79        self._config = config
 80        self._endpoint = "v1/speak"
 81
 82        self._flush_thread = None
 83
 84        # auto flush
 85        self._last_datagram = None
 86        self._flush_count = 0
 87
 88        # microphone
 89        self._microphone = microphone
 90
 91        # init handlers
 92        self._event_handlers = {
 93            event: [] for event in SpeakWebSocketEvents.__members__.values()
 94        }
 95
 96        if self._config.options.get("speaker_playback") == "true":
 97            self._logger.info("speaker_playback is enabled")
 98            rate = self._config.options.get("speaker_playback_rate")
 99            if rate is None:
100                rate = RATE
101            channels = self._config.options.get("speaker_playback_channels")
102            if channels is None:
103                channels = CHANNELS
104            playback_delta_in_ms = self._config.options.get(
105                "speaker_playback_delta_in_ms"
106            )
107            if playback_delta_in_ms is None:
108                playback_delta_in_ms = PLAYBACK_DELTA
109            device_index = self._config.options.get("speaker_playback_device_index")
110
111            self._logger.debug("rate: %s", rate)
112            self._logger.debug("channels: %s", channels)
113            self._logger.debug("device_index: %s", device_index)
114
115            self._speaker_created = True
116
117            if device_index is not None:
118                self._speaker = Speaker(
119                    rate=rate,
120                    channels=channels,
121                    last_play_delta_in_ms=playback_delta_in_ms,
122                    verbose=self._config.verbose,
123                    output_device_index=device_index,
124                    microphone=self._microphone,
125                )
126            else:
127                self._speaker = Speaker(
128                    rate=rate,
129                    channels=channels,
130                    last_play_delta_in_ms=playback_delta_in_ms,
131                    verbose=self._config.verbose,
132                    microphone=self._microphone,
133                )
134
135        # call the parent constructor
136        super().__init__(self._config, self._endpoint)
137
138    # pylint: disable=too-many-branches,too-many-statements
139    async def start(
140        self,
141        options: Optional[Union[SpeakWSOptions, Dict]] = None,
142        addons: Optional[Dict] = None,
143        headers: Optional[Dict] = None,
144        members: Optional[Dict] = None,
145        **kwargs,
146    ) -> bool:
147        """
148        Starts the WebSocket connection for text-to-speech synthesis.
149        """
150        self._logger.debug("AsyncSpeakWebSocketClient.start ENTER")
151        self._logger.info("options: %s", options)
152        self._logger.info("addons: %s", addons)
153        self._logger.info("headers: %s", headers)
154        self._logger.info("members: %s", members)
155        self._logger.info("kwargs: %s", kwargs)
156
157        if isinstance(options, SpeakWSOptions) and not options.check():
158            self._logger.error("options.check failed")
159            self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE")
160            raise DeepgramError("Fatal text-to-speech options error")
161
162        self._addons = addons
163        self._headers = headers
164
165        # add "members" as members of the class
166        if members is not None:
167            self.__dict__.update(members)
168
169        # set kwargs as members of the class
170        if kwargs is not None:
171            self._kwargs = kwargs
172        else:
173            self._kwargs = {}
174
175        if isinstance(options, SpeakWSOptions):
176            self._logger.info("SpeakWSOptions switching class -> dict")
177            self._options = options.to_dict()
178        elif options is not None:
179            self._options = options
180        else:
181            self._options = {}
182
183        try:
184            # speaker substitutes the listening thread
185            if self._speaker is not None:
186                self._logger.notice("passing speaker to delegate_listening")
187                super().delegate_listening(self._speaker)
188
189            # call parent start
190            if (
191                await super().start(
192                    self._options,
193                    self._addons,
194                    self._headers,
195                    **dict(cast(Dict[Any, Any], self._kwargs)),
196                )
197                is False
198            ):
199                self._logger.error("AsyncSpeakWebSocketClient.start failed")
200                self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE")
201                return False
202
203            if self._speaker is not None:
204                self._logger.notice("start delegate_listening thread")
205                self._speaker.start()
206
207            # debug the threads
208            for thread in threading.enumerate():
209                self._logger.debug("after running thread: %s", thread.name)
210            self._logger.debug("number of active threads: %s", threading.active_count())
211
212            # flush thread
213            if self._config.is_auto_flush_speak_enabled():
214                self._logger.notice("autoflush is enabled")
215                self._flush_thread = asyncio.create_task(self._flush())
216            else:
217                self._logger.notice("autoflush is disabled")
218
219            # debug the threads
220            for thread in threading.enumerate():
221                self._logger.debug("after running thread: %s", thread.name)
222            self._logger.debug("number of active threads: %s", threading.active_count())
223
224            self._logger.notice("start succeeded")
225            self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE")
226            return True
227
228        except Exception as e:  # pylint: disable=broad-except
229            self._logger.error(
230                "WebSocketException in AsyncSpeakWebSocketClient.start: %s", e
231            )
232            self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE")
233            if self._config.options.get("termination_exception_connect") is True:
234                raise
235            return False
236
237    # pylint: enable=too-many-branches,too-many-statements
238
239    def on(self, event: SpeakWebSocketEvents, handler: Callable) -> None:
240        """
241        Registers event handlers for specific events.
242        """
243        self._logger.info("event subscribed: %s", event)
244        if event in SpeakWebSocketEvents.__members__.values() and callable(handler):
245            self._event_handlers[event].append(handler)
246
247    # triggers the registered event handlers for a specific event
248    async def _emit(self, event: SpeakWebSocketEvents, *args, **kwargs) -> None:
249        """
250        Emits events to the registered event handlers.
251        """
252        self._logger.debug("AsyncSpeakWebSocketClient._emit ENTER")
253        self._logger.debug("callback handlers for: %s", event)
254
255        # debug the threads
256        for thread in threading.enumerate():
257            self._logger.debug("after running thread: %s", thread.name)
258        self._logger.debug("number of active threads: %s", threading.active_count())
259
260        tasks = []
261        for handler in self._event_handlers[event]:
262            task = asyncio.create_task(handler(self, *args, **kwargs))
263            tasks.append(task)
264
265        if tasks:
266            self._logger.debug("waiting for tasks to finish...")
267            await asyncio.gather(*filter(None, tasks), return_exceptions=True)
268            tasks.clear()
269
270        # debug the threads
271        for thread in threading.enumerate():
272            self._logger.debug("after running thread: %s", thread.name)
273        self._logger.debug("number of active threads: %s", threading.active_count())
274
275        self._logger.debug("AsyncSpeakWebSocketClient._emit LEAVE")
276
277    async def _process_text(self, message: Union[str, bytes]) -> None:
278        """
279        Processes messages received over the WebSocket connection.
280        """
281        self._logger.debug("AsyncSpeakWebSocketClient._process_text ENTER")
282
283        try:
284            self._logger.debug("Text data received")
285
286            if len(message) == 0:
287                self._logger.debug("message is empty")
288                self._logger.debug("AsyncSpeakWebSocketClient._process_text LEAVE")
289                return
290
291            data = json.loads(message)
292            response_type = data.get("type")
293            self._logger.debug("response_type: %s, data: %s", response_type, data)
294
295            match response_type:
296                case SpeakWebSocketEvents.Open:
297                    open_result: OpenResponse = OpenResponse.from_json(message)
298                    self._logger.verbose("OpenResponse: %s", open_result)
299                    await self._emit(
300                        SpeakWebSocketEvents(SpeakWebSocketEvents.Open),
301                        open=open_result,
302                        **dict(cast(Dict[Any, Any], self._kwargs)),
303                    )
304                case SpeakWebSocketEvents.Metadata:
305                    meta_result: MetadataResponse = MetadataResponse.from_json(message)
306                    self._logger.verbose("MetadataResponse: %s", meta_result)
307                    await self._emit(
308                        SpeakWebSocketEvents(SpeakWebSocketEvents.Metadata),
309                        metadata=meta_result,
310                        **dict(cast(Dict[Any, Any], self._kwargs)),
311                    )
312                case SpeakWebSocketEvents.Flushed:
313                    fl_result: FlushedResponse = FlushedResponse.from_json(message)
314                    self._logger.verbose("FlushedResponse: %s", fl_result)
315
316                    # auto flush
317                    if self._config.is_inspecting_speak():
318                        self._flush_count -= 1
319                        self._logger.debug(
320                            "Decrement AutoFlush count: %d",
321                            self._flush_count,
322                        )
323
324                    await self._emit(
325                        SpeakWebSocketEvents(SpeakWebSocketEvents.Flushed),
326                        flushed=fl_result,
327                        **dict(cast(Dict[Any, Any], self._kwargs)),
328                    )
329                case SpeakWebSocketEvents.Cleared:
330                    clear_result: ClearedResponse = ClearedResponse.from_json(message)
331                    self._logger.verbose("ClearedResponse: %s", clear_result)
332                    await self._emit(
333                        SpeakWebSocketEvents(SpeakWebSocketEvents.Cleared),
334                        cleared=clear_result,
335                        **dict(cast(Dict[Any, Any], self._kwargs)),
336                    )
337                case SpeakWebSocketEvents.Close:
338                    close_result: CloseResponse = CloseResponse.from_json(message)
339                    self._logger.verbose("CloseResponse: %s", close_result)
340                    await self._emit(
341                        SpeakWebSocketEvents(SpeakWebSocketEvents.Close),
342                        close=close_result,
343                        **dict(cast(Dict[Any, Any], self._kwargs)),
344                    )
345                case SpeakWebSocketEvents.Warning:
346                    war_warning: WarningResponse = WarningResponse.from_json(message)
347                    self._logger.verbose("WarningResponse: %s", war_warning)
348                    await self._emit(
349                        SpeakWebSocketEvents(SpeakWebSocketEvents.Warning),
350                        warning=war_warning,
351                        **dict(cast(Dict[Any, Any], self._kwargs)),
352                    )
353                case SpeakWebSocketEvents.Error:
354                    err_error: ErrorResponse = ErrorResponse.from_json(message)
355                    self._logger.verbose("ErrorResponse: %s", err_error)
356                    await self._emit(
357                        SpeakWebSocketEvents(SpeakWebSocketEvents.Error),
358                        error=err_error,
359                        **dict(cast(Dict[Any, Any], self._kwargs)),
360                    )
361                case _:
362                    self._logger.warning(
363                        "Unknown Message: response_type: %s, data: %s",
364                        response_type,
365                        data,
366                    )
367                    unhandled_error: UnhandledResponse = UnhandledResponse(
368                        type=SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled),
369                        raw=str(message),
370                    )
371                    await self._emit(
372                        SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled),
373                        unhandled=unhandled_error,
374                        **dict(cast(Dict[Any, Any], self._kwargs)),
375                    )
376
377            self._logger.notice("_process_text Succeeded")
378            self._logger.debug("AsyncSpeakWebSocketClient._process_text LEAVE")
379
380        except Exception as e:  # pylint: disable=broad-except
381            self._logger.error(
382                "Exception in AsyncSpeakWebSocketClient._process_text: %s", e
383            )
384            e_error: ErrorResponse = ErrorResponse(
385                "Exception in AsyncSpeakWebSocketClient._process_text",
386                f"{e}",
387                "Exception",
388            )
389            await self._emit(
390                SpeakWebSocketEvents(SpeakWebSocketEvents.Error),
391                error=e_error,
392                **dict(cast(Dict[Any, Any], self._kwargs)),
393            )
394
395            # signal exit and close
396            await super()._signal_exit()
397
398            self._logger.debug("AsyncSpeakWebSocketClient._process_text LEAVE")
399
400            if self._config.options.get("termination_exception") is True:
401                raise
402            return
403
404    # pylint: enable=too-many-return-statements,too-many-statements
405
406    async def _process_binary(self, message: bytes) -> None:
407        self._logger.debug("SpeakWebSocketClient._process_binary ENTER")
408        self._logger.debug("Binary data received")
409
410        await self._emit(
411            SpeakWebSocketEvents(SpeakWebSocketEvents.AudioData),
412            data=message,
413            **dict(cast(Dict[Any, Any], self._kwargs)),
414        )
415
416        self._logger.notice("_process_binary Succeeded")
417        self._logger.debug("SpeakWebSocketClient._process_binary LEAVE")
418
419    ## pylint: disable=too-many-return-statements
420    async def _flush(self) -> None:
421        self._logger.debug("AsyncSpeakWebSocketClient._flush ENTER")
422
423        delta_in_ms_str = self._config.options.get("auto_flush_speak_delta")
424        if delta_in_ms_str is None:
425            self._logger.error("auto_flush_speak_delta is None")
426            self._logger.debug("AsyncSpeakWebSocketClient._flush LEAVE")
427            return
428        delta_in_ms = float(delta_in_ms_str)
429
430        while True:
431            try:
432                await asyncio.sleep(HALF_SECOND)
433
434                if self._exit_event.is_set():
435                    self._logger.notice("_flush exiting gracefully")
436                    self._logger.debug("AsyncSpeakWebSocketClient._flush LEAVE")
437                    return
438
439                if self._last_datagram is None:
440                    self._logger.debug("AutoFlush last_datagram is None")
441                    continue
442
443                delta = datetime.now() - self._last_datagram
444                diff_in_ms = delta.total_seconds() * 1000
445                self._logger.debug("AutoFlush delta: %f", diff_in_ms)
446                if diff_in_ms < delta_in_ms:
447                    self._logger.debug("AutoFlush delta is less than threshold")
448                    continue
449
450                await self.flush()
451
452            except Exception as e:  # pylint: disable=broad-except
453                self._logger.error(
454                    "Exception in AsyncSpeakWebSocketClient._flush: %s", e
455                )
456                e_error: ErrorResponse = ErrorResponse(
457                    "Exception in AsyncSpeakWebSocketClient._flush",
458                    f"{e}",
459                    "Exception",
460                )
461                self._logger.error(
462                    "Exception in AsyncSpeakWebSocketClient._flush: %s", str(e)
463                )
464                await self._emit(
465                    SpeakWebSocketEvents(SpeakWebSocketEvents.Error),
466                    error=e_error,
467                    **dict(cast(Dict[Any, Any], self._kwargs)),
468                )
469
470                # signal exit and close
471                await super()._signal_exit()
472
473                self._logger.debug("AsyncSpeakWebSocketClient._flush LEAVE")
474
475                if self._config.options.get("termination_exception") is True:
476                    raise
477                return
478
479    # pylint: enable=too-many-return-statements
480
481    async def send_text(self, text_input: str) -> bool:
482        """
483        Sends text to the WebSocket connection to generate audio.
484
485        Args:
486            text_input (str): The raw text to be synthesized. This function will automatically wrap
487                the text in a JSON object of type "Speak" with the key "text".
488
489        Returns:
490            bool: True if the text was successfully sent, False otherwise.
491        """
492        return await self.send_raw(json.dumps({"type": "Speak", "text": text_input}))
493
494    async def send(self, data: Union[bytes, str]) -> bool:
495        """
496        Alias for send_text. Please see send_text for more information.
497        """
498        if isinstance(data, bytes):
499            self._logger.error("send() failed - data is bytes")
500            return False
501
502        return await self.send_text(data)
503
504    # pylint: disable=unused-argument
505    async def send_control(
506        self, msg_type: Union[SpeakWebSocketMessage, str], data: Optional[str] = ""
507    ) -> bool:
508        """
509        Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection.
510
511        Args:
512            msg_type (SpeakWebSocketEvents): The type of control message to send.
513            (Optional) data (str): The data to send with the control message.
514
515        Returns:
516            bool: True if the control message was successfully sent, False otherwise.
517        """
518        control_msg = json.dumps({"type": msg_type})
519        return await self.send_raw(control_msg)
520
521    # pylint: enable=unused-argument
522
523    # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
524    async def send_raw(self, msg: str) -> bool:
525        """
526        Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object.
527
528        Args:
529            msg (str): The raw message to send over the WebSocket connection.
530
531        Returns:
532            bool: True if the message was successfully sent, False otherwise.
533        """
534        self._logger.spam("AsyncSpeakWebSocketClient.send_raw ENTER")
535
536        if self._config.is_inspecting_speak():
537            try:
538                _tmp_json = json.loads(msg)
539                if "type" in _tmp_json:
540                    self._logger.debug(
541                        "Inspecting Message: Sending %s", _tmp_json["type"]
542                    )
543                    match _tmp_json["type"]:
544                        case SpeakWebSocketMessage.Speak:
545                            inspect_res = await self._inspect()
546                            if not inspect_res:
547                                self._logger.error("inspect_res failed")
548                        case SpeakWebSocketMessage.Flush:
549                            self._last_datagram = None
550                            self._flush_count += 1
551                            self._logger.debug(
552                                "Increment Flush count: %d", self._flush_count
553                            )
554            except Exception as e:  # pylint: disable=broad-except
555                self._logger.error("send_raw() failed - Exception: %s", str(e))
556
557        try:
558            if await super().send(msg) is False:
559                self._logger.error("send_raw() failed")
560                self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE")
561                return False
562            self._logger.spam("send_raw() succeeded")
563            self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE")
564            return True
565        except Exception as e:  # pylint: disable=broad-except
566            self._logger.error("send_raw() failed - Exception: %s", str(e))
567            self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE")
568            if self._config.options.get("termination_exception_send") is True:
569                raise
570            return False
571
572    # pylint: enable=too-many-return-statements,too-many-branches
573
574    async def flush(self) -> bool:
575        """
576        Flushes the current buffer and returns generated audio
577        """
578        self._logger.spam("AsyncSpeakWebSocketClient.flush ENTER")
579
580        self._logger.notice("Sending Flush...")
581        ret = await self.send_control(SpeakWebSocketMessage.Flush)
582
583        if not ret:
584            self._logger.error("flush failed")
585            self._logger.spam("AsyncSpeakWebSocketClient.flush LEAVE")
586            return False
587
588        self._logger.notice("flush succeeded")
589        self._logger.spam("AsyncSpeakWebSocketClient.flush LEAVE")
590
591        return True
592
593    async def clear(self) -> bool:
594        """
595        Clears the current buffer on the server
596        """
597        self._logger.spam("AsyncSpeakWebSocketClient.clear ENTER")
598
599        self._logger.notice("Sending Clear...")
600        ret = await self.send_control(SpeakWebSocketMessage.Clear)
601
602        if not ret:
603            self._logger.error("clear failed")
604            self._logger.spam("AsyncSpeakWebSocketClient.clear LEAVE")
605            return False
606
607        self._logger.notice("clear succeeded")
608        self._logger.spam("AsyncSpeakWebSocketClient.clear LEAVE")
609
610        return True
611
612    async def wait_for_complete(self):
613        """
614        This method will block until the speak is done playing sound.
615        """
616        self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete ENTER")
617
618        if self._speaker is None:
619            self._logger.error("speaker is None. Return immediately")
620            return
621
622        loop = asyncio.get_event_loop()
623        await loop.run_in_executor(None, self._speaker.wait_for_complete)
624        self._logger.notice("wait_for_complete succeeded")
625        self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete LEAVE")
626
627    async def _close_message(self) -> bool:
628        return await self.send_control(SpeakWebSocketMessage.Close)
629
630    async def finish(self) -> bool:
631        """
632        Closes the WebSocket connection gracefully.
633        """
634        self._logger.debug("AsyncSpeakWebSocketClient.finish ENTER")
635
636        # stop the threads
637        self._logger.verbose("cancelling tasks...")
638        try:
639            # call parent finish
640            if await super().finish() is False:
641                self._logger.error("AsyncListenWebSocketClient.finish failed")
642
643            if self._speaker is not None and self._speaker_created:
644                self._speaker.finish()
645                self._speaker_created = False
646
647            # Before cancelling, check if the tasks were created
648            # debug the threads
649            for thread in threading.enumerate():
650                self._logger.debug("before running thread: %s", thread.name)
651            self._logger.debug("number of active threads: %s", threading.active_count())
652
653            tasks = []
654
655            if self._speaker is not None:
656                self._logger.notice("stopping speaker...")
657                self._speaker.finish()
658                self._speaker = None
659                self._logger.notice("speaker stopped")
660
661            if self._flush_thread is not None:
662                self._logger.notice("stopping _flush_thread...")
663                self._flush_thread.cancel()
664                tasks.append(self._flush_thread)
665                self._logger.notice("_flush_thread cancelled")
666
667            # Use asyncio.gather to wait for tasks to be cancelled
668            # Prevent indefinite waiting by setting a timeout
669            await asyncio.wait_for(asyncio.gather(*tasks), timeout=10)
670            self._logger.notice("threads joined")
671
672            # debug the threads
673            for thread in threading.enumerate():
674                self._logger.debug("after running thread: %s", thread.name)
675            self._logger.debug("number of active threads: %s", threading.active_count())
676
677            self._logger.notice("finish succeeded")
678            self._logger.spam("AsyncSpeakWebSocketClient.finish LEAVE")
679            return True
680
681        except asyncio.CancelledError:
682            self._logger.debug("tasks cancelled")
683            self._logger.debug("AsyncSpeakWebSocketClient.finish LEAVE")
684            return False
685
686        except asyncio.TimeoutError as e:
687            self._logger.error("tasks cancellation timed out: %s", e)
688            self._logger.debug("AsyncSpeakWebSocketClient.finish LEAVE")
689            return False
690
691    async def _inspect(self) -> bool:
692        # auto flush_inspect is generically used to track any messages you might want to snoop on
693        # place additional logic here to inspect messages of interest
694
695        # for auto flush functionality
696        # set the last datagram
697        self._last_datagram = datetime.now()
698        self._logger.debug(
699            "AutoFlush last received: %s",
700            str(self._last_datagram),
701        )
702
703        return True
704
705
706AsyncSpeakWebSocketClient = AsyncSpeakWSClient
ONE_SECOND = 1
HALF_SECOND = 0.5
DEEPGRAM_INTERVAL = 5
PING_INTERVAL = 20
 40class AsyncSpeakWSClient(
 41    AbstractAsyncWebSocketClient
 42):  # pylint: disable=too-many-instance-attributes
 43    """
 44    Client for interacting with Deepgram's text-to-speech services over WebSockets.
 45
 46     This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events.
 47
 48     Args:
 49         config (DeepgramClientOptions): all the options for the client.
 50    """
 51
 52    _logger: verboselogs.VerboseLogger
 53    _config: DeepgramClientOptions
 54    _endpoint: str
 55
 56    _event_handlers: Dict[SpeakWebSocketEvents, list]
 57
 58    _flush_thread: Union[asyncio.Task, None]
 59    _last_datagram: Optional[datetime] = None
 60    _flush_count: int
 61
 62    _kwargs: Optional[Dict] = None
 63    _addons: Optional[Dict] = None
 64    _options: Optional[Dict] = None
 65    _headers: Optional[Dict] = None
 66
 67    _speaker_created: bool = False
 68    _speaker: Optional[Speaker] = None
 69    _microphone: Optional[Microphone] = None
 70
 71    def __init__(
 72        self, config: DeepgramClientOptions, microphone: Optional[Microphone] = None
 73    ):
 74        if config is None:
 75            raise DeepgramError("Config is required")
 76        self._logger = verboselogs.VerboseLogger(__name__)
 77        self._logger.addHandler(logging.StreamHandler())
 78        self._logger.setLevel(config.verbose)
 79
 80        self._config = config
 81        self._endpoint = "v1/speak"
 82
 83        self._flush_thread = None
 84
 85        # auto flush
 86        self._last_datagram = None
 87        self._flush_count = 0
 88
 89        # microphone
 90        self._microphone = microphone
 91
 92        # init handlers
 93        self._event_handlers = {
 94            event: [] for event in SpeakWebSocketEvents.__members__.values()
 95        }
 96
 97        if self._config.options.get("speaker_playback") == "true":
 98            self._logger.info("speaker_playback is enabled")
 99            rate = self._config.options.get("speaker_playback_rate")
100            if rate is None:
101                rate = RATE
102            channels = self._config.options.get("speaker_playback_channels")
103            if channels is None:
104                channels = CHANNELS
105            playback_delta_in_ms = self._config.options.get(
106                "speaker_playback_delta_in_ms"
107            )
108            if playback_delta_in_ms is None:
109                playback_delta_in_ms = PLAYBACK_DELTA
110            device_index = self._config.options.get("speaker_playback_device_index")
111
112            self._logger.debug("rate: %s", rate)
113            self._logger.debug("channels: %s", channels)
114            self._logger.debug("device_index: %s", device_index)
115
116            self._speaker_created = True
117
118            if device_index is not None:
119                self._speaker = Speaker(
120                    rate=rate,
121                    channels=channels,
122                    last_play_delta_in_ms=playback_delta_in_ms,
123                    verbose=self._config.verbose,
124                    output_device_index=device_index,
125                    microphone=self._microphone,
126                )
127            else:
128                self._speaker = Speaker(
129                    rate=rate,
130                    channels=channels,
131                    last_play_delta_in_ms=playback_delta_in_ms,
132                    verbose=self._config.verbose,
133                    microphone=self._microphone,
134                )
135
136        # call the parent constructor
137        super().__init__(self._config, self._endpoint)
138
139    # pylint: disable=too-many-branches,too-many-statements
140    async def start(
141        self,
142        options: Optional[Union[SpeakWSOptions, Dict]] = None,
143        addons: Optional[Dict] = None,
144        headers: Optional[Dict] = None,
145        members: Optional[Dict] = None,
146        **kwargs,
147    ) -> bool:
148        """
149        Starts the WebSocket connection for text-to-speech synthesis.
150        """
151        self._logger.debug("AsyncSpeakWebSocketClient.start ENTER")
152        self._logger.info("options: %s", options)
153        self._logger.info("addons: %s", addons)
154        self._logger.info("headers: %s", headers)
155        self._logger.info("members: %s", members)
156        self._logger.info("kwargs: %s", kwargs)
157
158        if isinstance(options, SpeakWSOptions) and not options.check():
159            self._logger.error("options.check failed")
160            self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE")
161            raise DeepgramError("Fatal text-to-speech options error")
162
163        self._addons = addons
164        self._headers = headers
165
166        # add "members" as members of the class
167        if members is not None:
168            self.__dict__.update(members)
169
170        # set kwargs as members of the class
171        if kwargs is not None:
172            self._kwargs = kwargs
173        else:
174            self._kwargs = {}
175
176        if isinstance(options, SpeakWSOptions):
177            self._logger.info("SpeakWSOptions switching class -> dict")
178            self._options = options.to_dict()
179        elif options is not None:
180            self._options = options
181        else:
182            self._options = {}
183
184        try:
185            # speaker substitutes the listening thread
186            if self._speaker is not None:
187                self._logger.notice("passing speaker to delegate_listening")
188                super().delegate_listening(self._speaker)
189
190            # call parent start
191            if (
192                await super().start(
193                    self._options,
194                    self._addons,
195                    self._headers,
196                    **dict(cast(Dict[Any, Any], self._kwargs)),
197                )
198                is False
199            ):
200                self._logger.error("AsyncSpeakWebSocketClient.start failed")
201                self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE")
202                return False
203
204            if self._speaker is not None:
205                self._logger.notice("start delegate_listening thread")
206                self._speaker.start()
207
208            # debug the threads
209            for thread in threading.enumerate():
210                self._logger.debug("after running thread: %s", thread.name)
211            self._logger.debug("number of active threads: %s", threading.active_count())
212
213            # flush thread
214            if self._config.is_auto_flush_speak_enabled():
215                self._logger.notice("autoflush is enabled")
216                self._flush_thread = asyncio.create_task(self._flush())
217            else:
218                self._logger.notice("autoflush is disabled")
219
220            # debug the threads
221            for thread in threading.enumerate():
222                self._logger.debug("after running thread: %s", thread.name)
223            self._logger.debug("number of active threads: %s", threading.active_count())
224
225            self._logger.notice("start succeeded")
226            self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE")
227            return True
228
229        except Exception as e:  # pylint: disable=broad-except
230            self._logger.error(
231                "WebSocketException in AsyncSpeakWebSocketClient.start: %s", e
232            )
233            self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE")
234            if self._config.options.get("termination_exception_connect") is True:
235                raise
236            return False
237
238    # pylint: enable=too-many-branches,too-many-statements
239
240    def on(self, event: SpeakWebSocketEvents, handler: Callable) -> None:
241        """
242        Registers event handlers for specific events.
243        """
244        self._logger.info("event subscribed: %s", event)
245        if event in SpeakWebSocketEvents.__members__.values() and callable(handler):
246            self._event_handlers[event].append(handler)
247
248    # triggers the registered event handlers for a specific event
249    async def _emit(self, event: SpeakWebSocketEvents, *args, **kwargs) -> None:
250        """
251        Emits events to the registered event handlers.
252        """
253        self._logger.debug("AsyncSpeakWebSocketClient._emit ENTER")
254        self._logger.debug("callback handlers for: %s", event)
255
256        # debug the threads
257        for thread in threading.enumerate():
258            self._logger.debug("after running thread: %s", thread.name)
259        self._logger.debug("number of active threads: %s", threading.active_count())
260
261        tasks = []
262        for handler in self._event_handlers[event]:
263            task = asyncio.create_task(handler(self, *args, **kwargs))
264            tasks.append(task)
265
266        if tasks:
267            self._logger.debug("waiting for tasks to finish...")
268            await asyncio.gather(*filter(None, tasks), return_exceptions=True)
269            tasks.clear()
270
271        # debug the threads
272        for thread in threading.enumerate():
273            self._logger.debug("after running thread: %s", thread.name)
274        self._logger.debug("number of active threads: %s", threading.active_count())
275
276        self._logger.debug("AsyncSpeakWebSocketClient._emit LEAVE")
277
278    async def _process_text(self, message: Union[str, bytes]) -> None:
279        """
280        Processes messages received over the WebSocket connection.
281        """
282        self._logger.debug("AsyncSpeakWebSocketClient._process_text ENTER")
283
284        try:
285            self._logger.debug("Text data received")
286
287            if len(message) == 0:
288                self._logger.debug("message is empty")
289                self._logger.debug("AsyncSpeakWebSocketClient._process_text LEAVE")
290                return
291
292            data = json.loads(message)
293            response_type = data.get("type")
294            self._logger.debug("response_type: %s, data: %s", response_type, data)
295
296            match response_type:
297                case SpeakWebSocketEvents.Open:
298                    open_result: OpenResponse = OpenResponse.from_json(message)
299                    self._logger.verbose("OpenResponse: %s", open_result)
300                    await self._emit(
301                        SpeakWebSocketEvents(SpeakWebSocketEvents.Open),
302                        open=open_result,
303                        **dict(cast(Dict[Any, Any], self._kwargs)),
304                    )
305                case SpeakWebSocketEvents.Metadata:
306                    meta_result: MetadataResponse = MetadataResponse.from_json(message)
307                    self._logger.verbose("MetadataResponse: %s", meta_result)
308                    await self._emit(
309                        SpeakWebSocketEvents(SpeakWebSocketEvents.Metadata),
310                        metadata=meta_result,
311                        **dict(cast(Dict[Any, Any], self._kwargs)),
312                    )
313                case SpeakWebSocketEvents.Flushed:
314                    fl_result: FlushedResponse = FlushedResponse.from_json(message)
315                    self._logger.verbose("FlushedResponse: %s", fl_result)
316
317                    # auto flush
318                    if self._config.is_inspecting_speak():
319                        self._flush_count -= 1
320                        self._logger.debug(
321                            "Decrement AutoFlush count: %d",
322                            self._flush_count,
323                        )
324
325                    await self._emit(
326                        SpeakWebSocketEvents(SpeakWebSocketEvents.Flushed),
327                        flushed=fl_result,
328                        **dict(cast(Dict[Any, Any], self._kwargs)),
329                    )
330                case SpeakWebSocketEvents.Cleared:
331                    clear_result: ClearedResponse = ClearedResponse.from_json(message)
332                    self._logger.verbose("ClearedResponse: %s", clear_result)
333                    await self._emit(
334                        SpeakWebSocketEvents(SpeakWebSocketEvents.Cleared),
335                        cleared=clear_result,
336                        **dict(cast(Dict[Any, Any], self._kwargs)),
337                    )
338                case SpeakWebSocketEvents.Close:
339                    close_result: CloseResponse = CloseResponse.from_json(message)
340                    self._logger.verbose("CloseResponse: %s", close_result)
341                    await self._emit(
342                        SpeakWebSocketEvents(SpeakWebSocketEvents.Close),
343                        close=close_result,
344                        **dict(cast(Dict[Any, Any], self._kwargs)),
345                    )
346                case SpeakWebSocketEvents.Warning:
347                    war_warning: WarningResponse = WarningResponse.from_json(message)
348                    self._logger.verbose("WarningResponse: %s", war_warning)
349                    await self._emit(
350                        SpeakWebSocketEvents(SpeakWebSocketEvents.Warning),
351                        warning=war_warning,
352                        **dict(cast(Dict[Any, Any], self._kwargs)),
353                    )
354                case SpeakWebSocketEvents.Error:
355                    err_error: ErrorResponse = ErrorResponse.from_json(message)
356                    self._logger.verbose("ErrorResponse: %s", err_error)
357                    await self._emit(
358                        SpeakWebSocketEvents(SpeakWebSocketEvents.Error),
359                        error=err_error,
360                        **dict(cast(Dict[Any, Any], self._kwargs)),
361                    )
362                case _:
363                    self._logger.warning(
364                        "Unknown Message: response_type: %s, data: %s",
365                        response_type,
366                        data,
367                    )
368                    unhandled_error: UnhandledResponse = UnhandledResponse(
369                        type=SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled),
370                        raw=str(message),
371                    )
372                    await self._emit(
373                        SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled),
374                        unhandled=unhandled_error,
375                        **dict(cast(Dict[Any, Any], self._kwargs)),
376                    )
377
378            self._logger.notice("_process_text Succeeded")
379            self._logger.debug("AsyncSpeakWebSocketClient._process_text LEAVE")
380
381        except Exception as e:  # pylint: disable=broad-except
382            self._logger.error(
383                "Exception in AsyncSpeakWebSocketClient._process_text: %s", e
384            )
385            e_error: ErrorResponse = ErrorResponse(
386                "Exception in AsyncSpeakWebSocketClient._process_text",
387                f"{e}",
388                "Exception",
389            )
390            await self._emit(
391                SpeakWebSocketEvents(SpeakWebSocketEvents.Error),
392                error=e_error,
393                **dict(cast(Dict[Any, Any], self._kwargs)),
394            )
395
396            # signal exit and close
397            await super()._signal_exit()
398
399            self._logger.debug("AsyncSpeakWebSocketClient._process_text LEAVE")
400
401            if self._config.options.get("termination_exception") is True:
402                raise
403            return
404
405    # pylint: enable=too-many-return-statements,too-many-statements
406
407    async def _process_binary(self, message: bytes) -> None:
408        self._logger.debug("SpeakWebSocketClient._process_binary ENTER")
409        self._logger.debug("Binary data received")
410
411        await self._emit(
412            SpeakWebSocketEvents(SpeakWebSocketEvents.AudioData),
413            data=message,
414            **dict(cast(Dict[Any, Any], self._kwargs)),
415        )
416
417        self._logger.notice("_process_binary Succeeded")
418        self._logger.debug("SpeakWebSocketClient._process_binary LEAVE")
419
420    ## pylint: disable=too-many-return-statements
421    async def _flush(self) -> None:
422        self._logger.debug("AsyncSpeakWebSocketClient._flush ENTER")
423
424        delta_in_ms_str = self._config.options.get("auto_flush_speak_delta")
425        if delta_in_ms_str is None:
426            self._logger.error("auto_flush_speak_delta is None")
427            self._logger.debug("AsyncSpeakWebSocketClient._flush LEAVE")
428            return
429        delta_in_ms = float(delta_in_ms_str)
430
431        while True:
432            try:
433                await asyncio.sleep(HALF_SECOND)
434
435                if self._exit_event.is_set():
436                    self._logger.notice("_flush exiting gracefully")
437                    self._logger.debug("AsyncSpeakWebSocketClient._flush LEAVE")
438                    return
439
440                if self._last_datagram is None:
441                    self._logger.debug("AutoFlush last_datagram is None")
442                    continue
443
444                delta = datetime.now() - self._last_datagram
445                diff_in_ms = delta.total_seconds() * 1000
446                self._logger.debug("AutoFlush delta: %f", diff_in_ms)
447                if diff_in_ms < delta_in_ms:
448                    self._logger.debug("AutoFlush delta is less than threshold")
449                    continue
450
451                await self.flush()
452
453            except Exception as e:  # pylint: disable=broad-except
454                self._logger.error(
455                    "Exception in AsyncSpeakWebSocketClient._flush: %s", e
456                )
457                e_error: ErrorResponse = ErrorResponse(
458                    "Exception in AsyncSpeakWebSocketClient._flush",
459                    f"{e}",
460                    "Exception",
461                )
462                self._logger.error(
463                    "Exception in AsyncSpeakWebSocketClient._flush: %s", str(e)
464                )
465                await self._emit(
466                    SpeakWebSocketEvents(SpeakWebSocketEvents.Error),
467                    error=e_error,
468                    **dict(cast(Dict[Any, Any], self._kwargs)),
469                )
470
471                # signal exit and close
472                await super()._signal_exit()
473
474                self._logger.debug("AsyncSpeakWebSocketClient._flush LEAVE")
475
476                if self._config.options.get("termination_exception") is True:
477                    raise
478                return
479
480    # pylint: enable=too-many-return-statements
481
482    async def send_text(self, text_input: str) -> bool:
483        """
484        Sends text to the WebSocket connection to generate audio.
485
486        Args:
487            text_input (str): The raw text to be synthesized. This function will automatically wrap
488                the text in a JSON object of type "Speak" with the key "text".
489
490        Returns:
491            bool: True if the text was successfully sent, False otherwise.
492        """
493        return await self.send_raw(json.dumps({"type": "Speak", "text": text_input}))
494
495    async def send(self, data: Union[bytes, str]) -> bool:
496        """
497        Alias for send_text. Please see send_text for more information.
498        """
499        if isinstance(data, bytes):
500            self._logger.error("send() failed - data is bytes")
501            return False
502
503        return await self.send_text(data)
504
505    # pylint: disable=unused-argument
506    async def send_control(
507        self, msg_type: Union[SpeakWebSocketMessage, str], data: Optional[str] = ""
508    ) -> bool:
509        """
510        Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection.
511
512        Args:
513            msg_type (SpeakWebSocketEvents): The type of control message to send.
514            (Optional) data (str): The data to send with the control message.
515
516        Returns:
517            bool: True if the control message was successfully sent, False otherwise.
518        """
519        control_msg = json.dumps({"type": msg_type})
520        return await self.send_raw(control_msg)
521
522    # pylint: enable=unused-argument
523
524    # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
525    async def send_raw(self, msg: str) -> bool:
526        """
527        Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object.
528
529        Args:
530            msg (str): The raw message to send over the WebSocket connection.
531
532        Returns:
533            bool: True if the message was successfully sent, False otherwise.
534        """
535        self._logger.spam("AsyncSpeakWebSocketClient.send_raw ENTER")
536
537        if self._config.is_inspecting_speak():
538            try:
539                _tmp_json = json.loads(msg)
540                if "type" in _tmp_json:
541                    self._logger.debug(
542                        "Inspecting Message: Sending %s", _tmp_json["type"]
543                    )
544                    match _tmp_json["type"]:
545                        case SpeakWebSocketMessage.Speak:
546                            inspect_res = await self._inspect()
547                            if not inspect_res:
548                                self._logger.error("inspect_res failed")
549                        case SpeakWebSocketMessage.Flush:
550                            self._last_datagram = None
551                            self._flush_count += 1
552                            self._logger.debug(
553                                "Increment Flush count: %d", self._flush_count
554                            )
555            except Exception as e:  # pylint: disable=broad-except
556                self._logger.error("send_raw() failed - Exception: %s", str(e))
557
558        try:
559            if await super().send(msg) is False:
560                self._logger.error("send_raw() failed")
561                self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE")
562                return False
563            self._logger.spam("send_raw() succeeded")
564            self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE")
565            return True
566        except Exception as e:  # pylint: disable=broad-except
567            self._logger.error("send_raw() failed - Exception: %s", str(e))
568            self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE")
569            if self._config.options.get("termination_exception_send") is True:
570                raise
571            return False
572
573    # pylint: enable=too-many-return-statements,too-many-branches
574
575    async def flush(self) -> bool:
576        """
577        Flushes the current buffer and returns generated audio
578        """
579        self._logger.spam("AsyncSpeakWebSocketClient.flush ENTER")
580
581        self._logger.notice("Sending Flush...")
582        ret = await self.send_control(SpeakWebSocketMessage.Flush)
583
584        if not ret:
585            self._logger.error("flush failed")
586            self._logger.spam("AsyncSpeakWebSocketClient.flush LEAVE")
587            return False
588
589        self._logger.notice("flush succeeded")
590        self._logger.spam("AsyncSpeakWebSocketClient.flush LEAVE")
591
592        return True
593
594    async def clear(self) -> bool:
595        """
596        Clears the current buffer on the server
597        """
598        self._logger.spam("AsyncSpeakWebSocketClient.clear ENTER")
599
600        self._logger.notice("Sending Clear...")
601        ret = await self.send_control(SpeakWebSocketMessage.Clear)
602
603        if not ret:
604            self._logger.error("clear failed")
605            self._logger.spam("AsyncSpeakWebSocketClient.clear LEAVE")
606            return False
607
608        self._logger.notice("clear succeeded")
609        self._logger.spam("AsyncSpeakWebSocketClient.clear LEAVE")
610
611        return True
612
613    async def wait_for_complete(self):
614        """
615        This method will block until the speak is done playing sound.
616        """
617        self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete ENTER")
618
619        if self._speaker is None:
620            self._logger.error("speaker is None. Return immediately")
621            return
622
623        loop = asyncio.get_event_loop()
624        await loop.run_in_executor(None, self._speaker.wait_for_complete)
625        self._logger.notice("wait_for_complete succeeded")
626        self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete LEAVE")
627
628    async def _close_message(self) -> bool:
629        return await self.send_control(SpeakWebSocketMessage.Close)
630
631    async def finish(self) -> bool:
632        """
633        Closes the WebSocket connection gracefully.
634        """
635        self._logger.debug("AsyncSpeakWebSocketClient.finish ENTER")
636
637        # stop the threads
638        self._logger.verbose("cancelling tasks...")
639        try:
640            # call parent finish
641            if await super().finish() is False:
642                self._logger.error("AsyncListenWebSocketClient.finish failed")
643
644            if self._speaker is not None and self._speaker_created:
645                self._speaker.finish()
646                self._speaker_created = False
647
648            # Before cancelling, check if the tasks were created
649            # debug the threads
650            for thread in threading.enumerate():
651                self._logger.debug("before running thread: %s", thread.name)
652            self._logger.debug("number of active threads: %s", threading.active_count())
653
654            tasks = []
655
656            if self._speaker is not None:
657                self._logger.notice("stopping speaker...")
658                self._speaker.finish()
659                self._speaker = None
660                self._logger.notice("speaker stopped")
661
662            if self._flush_thread is not None:
663                self._logger.notice("stopping _flush_thread...")
664                self._flush_thread.cancel()
665                tasks.append(self._flush_thread)
666                self._logger.notice("_flush_thread cancelled")
667
668            # Use asyncio.gather to wait for tasks to be cancelled
669            # Prevent indefinite waiting by setting a timeout
670            await asyncio.wait_for(asyncio.gather(*tasks), timeout=10)
671            self._logger.notice("threads joined")
672
673            # debug the threads
674            for thread in threading.enumerate():
675                self._logger.debug("after running thread: %s", thread.name)
676            self._logger.debug("number of active threads: %s", threading.active_count())
677
678            self._logger.notice("finish succeeded")
679            self._logger.spam("AsyncSpeakWebSocketClient.finish LEAVE")
680            return True
681
682        except asyncio.CancelledError:
683            self._logger.debug("tasks cancelled")
684            self._logger.debug("AsyncSpeakWebSocketClient.finish LEAVE")
685            return False
686
687        except asyncio.TimeoutError as e:
688            self._logger.error("tasks cancellation timed out: %s", e)
689            self._logger.debug("AsyncSpeakWebSocketClient.finish LEAVE")
690            return False
691
692    async def _inspect(self) -> bool:
693        # auto flush_inspect is generically used to track any messages you might want to snoop on
694        # place additional logic here to inspect messages of interest
695
696        # for auto flush functionality
697        # set the last datagram
698        self._last_datagram = datetime.now()
699        self._logger.debug(
700            "AutoFlush last received: %s",
701            str(self._last_datagram),
702        )
703
704        return True

Client for interacting with Deepgram's text-to-speech services over WebSockets.

This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events.

Args: config (DeepgramClientOptions): all the options for the client.

AsyncSpeakWSClient( config: deepgram.options.DeepgramClientOptions, microphone: Optional[deepgram.audio.microphone.microphone.Microphone] = None)
 71    def __init__(
 72        self, config: DeepgramClientOptions, microphone: Optional[Microphone] = None
 73    ):
 74        if config is None:
 75            raise DeepgramError("Config is required")
 76        self._logger = verboselogs.VerboseLogger(__name__)
 77        self._logger.addHandler(logging.StreamHandler())
 78        self._logger.setLevel(config.verbose)
 79
 80        self._config = config
 81        self._endpoint = "v1/speak"
 82
 83        self._flush_thread = None
 84
 85        # auto flush
 86        self._last_datagram = None
 87        self._flush_count = 0
 88
 89        # microphone
 90        self._microphone = microphone
 91
 92        # init handlers
 93        self._event_handlers = {
 94            event: [] for event in SpeakWebSocketEvents.__members__.values()
 95        }
 96
 97        if self._config.options.get("speaker_playback") == "true":
 98            self._logger.info("speaker_playback is enabled")
 99            rate = self._config.options.get("speaker_playback_rate")
100            if rate is None:
101                rate = RATE
102            channels = self._config.options.get("speaker_playback_channels")
103            if channels is None:
104                channels = CHANNELS
105            playback_delta_in_ms = self._config.options.get(
106                "speaker_playback_delta_in_ms"
107            )
108            if playback_delta_in_ms is None:
109                playback_delta_in_ms = PLAYBACK_DELTA
110            device_index = self._config.options.get("speaker_playback_device_index")
111
112            self._logger.debug("rate: %s", rate)
113            self._logger.debug("channels: %s", channels)
114            self._logger.debug("device_index: %s", device_index)
115
116            self._speaker_created = True
117
118            if device_index is not None:
119                self._speaker = Speaker(
120                    rate=rate,
121                    channels=channels,
122                    last_play_delta_in_ms=playback_delta_in_ms,
123                    verbose=self._config.verbose,
124                    output_device_index=device_index,
125                    microphone=self._microphone,
126                )
127            else:
128                self._speaker = Speaker(
129                    rate=rate,
130                    channels=channels,
131                    last_play_delta_in_ms=playback_delta_in_ms,
132                    verbose=self._config.verbose,
133                    microphone=self._microphone,
134                )
135
136        # call the parent constructor
137        super().__init__(self._config, self._endpoint)
async def start( self, options: Union[deepgram.clients.speak.v1.websocket.options.SpeakWSOptions, Dict, NoneType] = None, addons: Optional[Dict] = None, headers: Optional[Dict] = None, members: Optional[Dict] = None, **kwargs) -> bool:
140    async def start(
141        self,
142        options: Optional[Union[SpeakWSOptions, Dict]] = None,
143        addons: Optional[Dict] = None,
144        headers: Optional[Dict] = None,
145        members: Optional[Dict] = None,
146        **kwargs,
147    ) -> bool:
148        """
149        Starts the WebSocket connection for text-to-speech synthesis.
150        """
151        self._logger.debug("AsyncSpeakWebSocketClient.start ENTER")
152        self._logger.info("options: %s", options)
153        self._logger.info("addons: %s", addons)
154        self._logger.info("headers: %s", headers)
155        self._logger.info("members: %s", members)
156        self._logger.info("kwargs: %s", kwargs)
157
158        if isinstance(options, SpeakWSOptions) and not options.check():
159            self._logger.error("options.check failed")
160            self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE")
161            raise DeepgramError("Fatal text-to-speech options error")
162
163        self._addons = addons
164        self._headers = headers
165
166        # add "members" as members of the class
167        if members is not None:
168            self.__dict__.update(members)
169
170        # set kwargs as members of the class
171        if kwargs is not None:
172            self._kwargs = kwargs
173        else:
174            self._kwargs = {}
175
176        if isinstance(options, SpeakWSOptions):
177            self._logger.info("SpeakWSOptions switching class -> dict")
178            self._options = options.to_dict()
179        elif options is not None:
180            self._options = options
181        else:
182            self._options = {}
183
184        try:
185            # speaker substitutes the listening thread
186            if self._speaker is not None:
187                self._logger.notice("passing speaker to delegate_listening")
188                super().delegate_listening(self._speaker)
189
190            # call parent start
191            if (
192                await super().start(
193                    self._options,
194                    self._addons,
195                    self._headers,
196                    **dict(cast(Dict[Any, Any], self._kwargs)),
197                )
198                is False
199            ):
200                self._logger.error("AsyncSpeakWebSocketClient.start failed")
201                self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE")
202                return False
203
204            if self._speaker is not None:
205                self._logger.notice("start delegate_listening thread")
206                self._speaker.start()
207
208            # debug the threads
209            for thread in threading.enumerate():
210                self._logger.debug("after running thread: %s", thread.name)
211            self._logger.debug("number of active threads: %s", threading.active_count())
212
213            # flush thread
214            if self._config.is_auto_flush_speak_enabled():
215                self._logger.notice("autoflush is enabled")
216                self._flush_thread = asyncio.create_task(self._flush())
217            else:
218                self._logger.notice("autoflush is disabled")
219
220            # debug the threads
221            for thread in threading.enumerate():
222                self._logger.debug("after running thread: %s", thread.name)
223            self._logger.debug("number of active threads: %s", threading.active_count())
224
225            self._logger.notice("start succeeded")
226            self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE")
227            return True
228
229        except Exception as e:  # pylint: disable=broad-except
230            self._logger.error(
231                "WebSocketException in AsyncSpeakWebSocketClient.start: %s", e
232            )
233            self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE")
234            if self._config.options.get("termination_exception_connect") is True:
235                raise
236            return False

Starts the WebSocket connection for text-to-speech synthesis.

def on( self, event: deepgram.clients.speak.enums.SpeakWebSocketEvents, handler: Callable) -> None:
240    def on(self, event: SpeakWebSocketEvents, handler: Callable) -> None:
241        """
242        Registers event handlers for specific events.
243        """
244        self._logger.info("event subscribed: %s", event)
245        if event in SpeakWebSocketEvents.__members__.values() and callable(handler):
246            self._event_handlers[event].append(handler)

Registers event handlers for specific events.

async def send_text(self, text_input: str) -> bool:
482    async def send_text(self, text_input: str) -> bool:
483        """
484        Sends text to the WebSocket connection to generate audio.
485
486        Args:
487            text_input (str): The raw text to be synthesized. This function will automatically wrap
488                the text in a JSON object of type "Speak" with the key "text".
489
490        Returns:
491            bool: True if the text was successfully sent, False otherwise.
492        """
493        return await self.send_raw(json.dumps({"type": "Speak", "text": text_input}))

Sends text to the WebSocket connection to generate audio.

Args: text_input (str): The raw text to be synthesized. This function will automatically wrap the text in a JSON object of type "Speak" with the key "text".

Returns: bool: True if the text was successfully sent, False otherwise.

async def send(self, data: Union[bytes, str]) -> bool:
495    async def send(self, data: Union[bytes, str]) -> bool:
496        """
497        Alias for send_text. Please see send_text for more information.
498        """
499        if isinstance(data, bytes):
500            self._logger.error("send() failed - data is bytes")
501            return False
502
503        return await self.send_text(data)

Alias for send_text. Please see send_text for more information.

async def send_control( self, msg_type: Union[deepgram.clients.speak.enums.SpeakWebSocketMessage, str], data: Optional[str] = '') -> bool:
506    async def send_control(
507        self, msg_type: Union[SpeakWebSocketMessage, str], data: Optional[str] = ""
508    ) -> bool:
509        """
510        Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection.
511
512        Args:
513            msg_type (SpeakWebSocketEvents): The type of control message to send.
514            (Optional) data (str): The data to send with the control message.
515
516        Returns:
517            bool: True if the control message was successfully sent, False otherwise.
518        """
519        control_msg = json.dumps({"type": msg_type})
520        return await self.send_raw(control_msg)

Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection.

Args: msg_type (SpeakWebSocketEvents): The type of control message to send. (Optional) data (str): The data to send with the control message.

Returns: bool: True if the control message was successfully sent, False otherwise.

async def send_raw(self, msg: str) -> bool:
525    async def send_raw(self, msg: str) -> bool:
526        """
527        Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object.
528
529        Args:
530            msg (str): The raw message to send over the WebSocket connection.
531
532        Returns:
533            bool: True if the message was successfully sent, False otherwise.
534        """
535        self._logger.spam("AsyncSpeakWebSocketClient.send_raw ENTER")
536
537        if self._config.is_inspecting_speak():
538            try:
539                _tmp_json = json.loads(msg)
540                if "type" in _tmp_json:
541                    self._logger.debug(
542                        "Inspecting Message: Sending %s", _tmp_json["type"]
543                    )
544                    match _tmp_json["type"]:
545                        case SpeakWebSocketMessage.Speak:
546                            inspect_res = await self._inspect()
547                            if not inspect_res:
548                                self._logger.error("inspect_res failed")
549                        case SpeakWebSocketMessage.Flush:
550                            self._last_datagram = None
551                            self._flush_count += 1
552                            self._logger.debug(
553                                "Increment Flush count: %d", self._flush_count
554                            )
555            except Exception as e:  # pylint: disable=broad-except
556                self._logger.error("send_raw() failed - Exception: %s", str(e))
557
558        try:
559            if await super().send(msg) is False:
560                self._logger.error("send_raw() failed")
561                self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE")
562                return False
563            self._logger.spam("send_raw() succeeded")
564            self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE")
565            return True
566        except Exception as e:  # pylint: disable=broad-except
567            self._logger.error("send_raw() failed - Exception: %s", str(e))
568            self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE")
569            if self._config.options.get("termination_exception_send") is True:
570                raise
571            return False

Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object.

Args: msg (str): The raw message to send over the WebSocket connection.

Returns: bool: True if the message was successfully sent, False otherwise.

async def flush(self) -> bool:
575    async def flush(self) -> bool:
576        """
577        Flushes the current buffer and returns generated audio
578        """
579        self._logger.spam("AsyncSpeakWebSocketClient.flush ENTER")
580
581        self._logger.notice("Sending Flush...")
582        ret = await self.send_control(SpeakWebSocketMessage.Flush)
583
584        if not ret:
585            self._logger.error("flush failed")
586            self._logger.spam("AsyncSpeakWebSocketClient.flush LEAVE")
587            return False
588
589        self._logger.notice("flush succeeded")
590        self._logger.spam("AsyncSpeakWebSocketClient.flush LEAVE")
591
592        return True

Flushes the current buffer and returns generated audio

async def clear(self) -> bool:
594    async def clear(self) -> bool:
595        """
596        Clears the current buffer on the server
597        """
598        self._logger.spam("AsyncSpeakWebSocketClient.clear ENTER")
599
600        self._logger.notice("Sending Clear...")
601        ret = await self.send_control(SpeakWebSocketMessage.Clear)
602
603        if not ret:
604            self._logger.error("clear failed")
605            self._logger.spam("AsyncSpeakWebSocketClient.clear LEAVE")
606            return False
607
608        self._logger.notice("clear succeeded")
609        self._logger.spam("AsyncSpeakWebSocketClient.clear LEAVE")
610
611        return True

Clears the current buffer on the server

async def wait_for_complete(self):
613    async def wait_for_complete(self):
614        """
615        This method will block until the speak is done playing sound.
616        """
617        self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete ENTER")
618
619        if self._speaker is None:
620            self._logger.error("speaker is None. Return immediately")
621            return
622
623        loop = asyncio.get_event_loop()
624        await loop.run_in_executor(None, self._speaker.wait_for_complete)
625        self._logger.notice("wait_for_complete succeeded")
626        self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete LEAVE")

This method will block until the speak is done playing sound.

async def finish(self) -> bool:
631    async def finish(self) -> bool:
632        """
633        Closes the WebSocket connection gracefully.
634        """
635        self._logger.debug("AsyncSpeakWebSocketClient.finish ENTER")
636
637        # stop the threads
638        self._logger.verbose("cancelling tasks...")
639        try:
640            # call parent finish
641            if await super().finish() is False:
642                self._logger.error("AsyncListenWebSocketClient.finish failed")
643
644            if self._speaker is not None and self._speaker_created:
645                self._speaker.finish()
646                self._speaker_created = False
647
648            # Before cancelling, check if the tasks were created
649            # debug the threads
650            for thread in threading.enumerate():
651                self._logger.debug("before running thread: %s", thread.name)
652            self._logger.debug("number of active threads: %s", threading.active_count())
653
654            tasks = []
655
656            if self._speaker is not None:
657                self._logger.notice("stopping speaker...")
658                self._speaker.finish()
659                self._speaker = None
660                self._logger.notice("speaker stopped")
661
662            if self._flush_thread is not None:
663                self._logger.notice("stopping _flush_thread...")
664                self._flush_thread.cancel()
665                tasks.append(self._flush_thread)
666                self._logger.notice("_flush_thread cancelled")
667
668            # Use asyncio.gather to wait for tasks to be cancelled
669            # Prevent indefinite waiting by setting a timeout
670            await asyncio.wait_for(asyncio.gather(*tasks), timeout=10)
671            self._logger.notice("threads joined")
672
673            # debug the threads
674            for thread in threading.enumerate():
675                self._logger.debug("after running thread: %s", thread.name)
676            self._logger.debug("number of active threads: %s", threading.active_count())
677
678            self._logger.notice("finish succeeded")
679            self._logger.spam("AsyncSpeakWebSocketClient.finish LEAVE")
680            return True
681
682        except asyncio.CancelledError:
683            self._logger.debug("tasks cancelled")
684            self._logger.debug("AsyncSpeakWebSocketClient.finish LEAVE")
685            return False
686
687        except asyncio.TimeoutError as e:
688            self._logger.error("tasks cancellation timed out: %s", e)
689            self._logger.debug("AsyncSpeakWebSocketClient.finish LEAVE")
690            return False

Closes the WebSocket connection gracefully.

AsyncSpeakWebSocketClient = <class 'AsyncSpeakWSClient'>