deepgram.clients.speak.v1.websocket.client

  1# Copyright 2024 Deepgram SDK contributors. All Rights Reserved.
  2# Use of this source code is governed by a MIT license that can be found in the LICENSE file.
  3# SPDX-License-Identifier: MIT
  4
  5import json
  6import time
  7import logging
  8from typing import Dict, Union, Optional, cast, Any, Callable
  9from datetime import datetime
 10import threading
 11
 12from .....utils import verboselogs
 13from .....options import DeepgramClientOptions
 14from ...enums import SpeakWebSocketEvents, SpeakWebSocketMessage
 15from ....common import AbstractSyncWebSocketClient
 16from ....common import DeepgramError
 17
 18from .response import (
 19    OpenResponse,
 20    MetadataResponse,
 21    FlushedResponse,
 22    ClearedResponse,
 23    CloseResponse,
 24    WarningResponse,
 25    ErrorResponse,
 26    UnhandledResponse,
 27)
 28from .options import SpeakWSOptions
 29
 30from .....audio.microphone import Microphone
 31from .....audio.speaker import Speaker, RATE, CHANNELS, PLAYBACK_DELTA
 32
 33ONE_SECOND = 1
 34HALF_SECOND = 0.5
 35DEEPGRAM_INTERVAL = 5
 36PING_INTERVAL = 20
 37
 38
 39class SpeakWSClient(
 40    AbstractSyncWebSocketClient
 41):  # pylint: disable=too-many-instance-attributes
 42    """
 43    Client for interacting with Deepgram's text-to-speech services over WebSockets.
 44
 45     This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events.
 46
 47     Args:
 48         config (DeepgramClientOptions): all the options for the client.
 49    """
 50
 51    _logger: verboselogs.VerboseLogger
 52    _config: DeepgramClientOptions
 53    _endpoint: str
 54
 55    _event_handlers: Dict[SpeakWebSocketEvents, list]
 56
 57    _flush_thread: Union[threading.Thread, None]
 58    _lock_flush: threading.Lock
 59    _last_datagram: Optional[datetime] = None
 60    _flush_count: int
 61
 62    _kwargs: Optional[Dict] = None
 63    _addons: Optional[Dict] = None
 64    _options: Optional[Dict] = None
 65    _headers: Optional[Dict] = None
 66
 67    _speaker_created: bool = False
 68    _speaker: Optional[Speaker] = None
 69    _microphone: Optional[Microphone] = None
 70
 71    def __init__(
 72        self, config: DeepgramClientOptions, microphone: Optional[Microphone] = None
 73    ):
 74        if config is None:
 75            raise DeepgramError("Config is required")
 76
 77        self._logger = verboselogs.VerboseLogger(__name__)
 78        self._logger.addHandler(logging.StreamHandler())
 79        self._logger.setLevel(config.verbose)
 80
 81        self._config = config
 82        self._endpoint = "v1/speak"
 83        self._lock_flush = threading.Lock()
 84
 85        self._flush_thread = None
 86
 87        # auto flush
 88        self._last_datagram = None
 89        self._flush_count = 0
 90
 91        # microphone
 92        self._microphone = microphone
 93
 94        # init handlers
 95        self._event_handlers = {
 96            event: [] for event in SpeakWebSocketEvents.__members__.values()
 97        }
 98
 99        if self._config.options.get("speaker_playback") == "true":
100            self._logger.info("speaker_playback is enabled")
101            rate = self._config.options.get("speaker_playback_rate")
102            if rate is None:
103                rate = RATE
104            channels = self._config.options.get("speaker_playback_channels")
105            if channels is None:
106                channels = CHANNELS
107            playback_delta_in_ms = self._config.options.get(
108                "speaker_playback_delta_in_ms"
109            )
110            if playback_delta_in_ms is None:
111                playback_delta_in_ms = PLAYBACK_DELTA
112            device_index = self._config.options.get("speaker_playback_device_index")
113
114            self._logger.debug("rate: %s", rate)
115            self._logger.debug("channels: %s", channels)
116            self._logger.debug("device_index: %s", device_index)
117
118            self._speaker_created = True
119
120            if device_index is not None:
121                self._speaker = Speaker(
122                    rate=rate,
123                    channels=channels,
124                    last_play_delta_in_ms=playback_delta_in_ms,
125                    verbose=self._config.verbose,
126                    output_device_index=device_index,
127                    microphone=self._microphone,
128                )
129            else:
130                self._speaker = Speaker(
131                    rate=rate,
132                    channels=channels,
133                    last_play_delta_in_ms=playback_delta_in_ms,
134                    verbose=self._config.verbose,
135                    microphone=self._microphone,
136                )
137
138        # call the parent constructor
139        super().__init__(self._config, self._endpoint)
140
141    # pylint: disable=too-many-statements,too-many-branches
142    def start(
143        self,
144        options: Optional[Union[SpeakWSOptions, Dict]] = None,
145        addons: Optional[Dict] = None,
146        headers: Optional[Dict] = None,
147        members: Optional[Dict] = None,
148        **kwargs,
149    ) -> bool:
150        """
151        Starts the WebSocket connection for text-to-speech synthesis.
152        """
153        self._logger.debug("SpeakWebSocketClient.start ENTER")
154        self._logger.info("options: %s", options)
155        self._logger.info("addons: %s", addons)
156        self._logger.info("headers: %s", headers)
157        self._logger.info("members: %s", members)
158        self._logger.info("kwargs: %s", kwargs)
159
160        if isinstance(options, SpeakWSOptions) and not options.check():
161            self._logger.error("options.check failed")
162            self._logger.debug("SpeakWebSocketClient.start LEAVE")
163            raise DeepgramError("Fatal text-to-speech options error")
164
165        self._addons = addons
166        self._headers = headers
167
168        # add "members" as members of the class
169        if members is not None:
170            self.__dict__.update(members)
171
172        # set kwargs as members of the class
173        if kwargs is not None:
174            self._kwargs = kwargs
175        else:
176            self._kwargs = {}
177
178        if isinstance(options, SpeakWSOptions):
179            self._logger.info("SpeakWSOptions switching class -> dict")
180            self._options = options.to_dict()
181        elif options is not None:
182            self._options = options
183        else:
184            self._options = {}
185
186        try:
187            # speaker substitutes the listening thread
188            if self._speaker is not None:
189                self._logger.notice("passing speaker to delegate_listening")
190                super().delegate_listening(self._speaker)
191
192            # call parent start
193            if (
194                super().start(
195                    self._options,
196                    self._addons,
197                    self._headers,
198                    **dict(cast(Dict[Any, Any], self._kwargs)),
199                )
200                is False
201            ):
202                self._logger.error("SpeakWebSocketClient.start failed")
203                self._logger.debug("SpeakWebSocketClient.start LEAVE")
204                return False
205
206            if self._speaker is not None:
207                self._logger.notice("start delegate_listening thread")
208                self._speaker.start()
209
210            # debug the threads
211            for thread in threading.enumerate():
212                self._logger.debug("after running thread: %s", thread.name)
213            self._logger.debug("number of active threads: %s", threading.active_count())
214
215            # flush thread
216            if self._config.is_auto_flush_speak_enabled():
217                self._logger.notice("autoflush is enabled")
218                self._flush_thread = threading.Thread(target=self._flush)
219                self._flush_thread.start()
220            else:
221                self._logger.notice("autoflush is disabled")
222
223            # debug the threads
224            for thread in threading.enumerate():
225                self._logger.debug("after running thread: %s", thread.name)
226            self._logger.debug("number of active threads: %s", threading.active_count())
227
228            self._logger.notice("start succeeded")
229            self._logger.debug("SpeakWebSocketClient.start LEAVE")
230            return True
231
232        except Exception as e:  # pylint: disable=broad-except
233            self._logger.error(
234                "WebSocketException in SpeakWebSocketClient.start: %s", e
235            )
236            self._logger.debug("SpeakWebSocketClient.start LEAVE")
237            if self._config.options.get("termination_exception_connect") is True:
238                raise
239            return False
240
241    # pylint: enable=too-many-statements,too-many-branches
242
243    def on(self, event: SpeakWebSocketEvents, handler: Callable) -> None:
244        """
245        Registers event handlers for specific events.
246        """
247        self._logger.info("event subscribed: %s", event)
248        if event in SpeakWebSocketEvents.__members__.values() and callable(handler):
249            self._event_handlers[event].append(handler)
250
251    def _emit(self, event: SpeakWebSocketEvents, *args, **kwargs) -> None:
252        """
253        Emits events to the registered event handlers.
254        """
255        self._logger.debug("SpeakWebSocketClient._emit ENTER")
256        self._logger.debug("callback handlers for: %s", event)
257
258        # debug the threads
259        for thread in threading.enumerate():
260            self._logger.debug("after running thread: %s", thread.name)
261        self._logger.debug("number of active threads: %s", threading.active_count())
262
263        self._logger.debug("callback handlers for: %s", event)
264        for handler in self._event_handlers[event]:
265            handler(self, *args, **kwargs)
266
267        # debug the threads
268        for thread in threading.enumerate():
269            self._logger.debug("after running thread: %s", thread.name)
270        self._logger.debug("number of active threads: %s", threading.active_count())
271
272        self._logger.debug("ListenWebSocketClient._emit LEAVE")
273
274    def _process_text(self, message: str) -> None:
275        """
276        Processes messages received over the WebSocket connection.
277        """
278        self._logger.debug("SpeakWebSocketClient._process_text ENTER")
279
280        try:
281            self._logger.debug("Text data received")
282
283            if len(message) == 0:
284                self._logger.debug("message is empty")
285                self._logger.debug("SpeakWebSocketClient._process_text LEAVE")
286                return
287
288            data = json.loads(message)
289            response_type = data.get("type")
290            self._logger.debug("response_type: %s, data: %s", response_type, data)
291
292            match response_type:
293                case SpeakWebSocketEvents.Open:
294                    open_result: OpenResponse = OpenResponse.from_json(message)
295                    self._logger.verbose("OpenResponse: %s", open_result)
296                    self._emit(
297                        SpeakWebSocketEvents(SpeakWebSocketEvents.Open),
298                        open=open_result,
299                        **dict(cast(Dict[Any, Any], self._kwargs)),
300                    )
301                case SpeakWebSocketEvents.Metadata:
302                    meta_result: MetadataResponse = MetadataResponse.from_json(message)
303                    self._logger.verbose("MetadataResponse: %s", meta_result)
304                    self._emit(
305                        SpeakWebSocketEvents(SpeakWebSocketEvents.Metadata),
306                        metadata=meta_result,
307                        **dict(cast(Dict[Any, Any], self._kwargs)),
308                    )
309                case SpeakWebSocketEvents.Flushed:
310                    fl_result: FlushedResponse = FlushedResponse.from_json(message)
311                    self._logger.verbose("FlushedResponse: %s", fl_result)
312
313                    # auto flush
314                    if self._config.is_inspecting_speak():
315                        with self._lock_flush:
316                            self._flush_count -= 1
317                            self._logger.debug(
318                                "Decrement Flush count: %d",
319                                self._flush_count,
320                            )
321
322                    self._emit(
323                        SpeakWebSocketEvents(SpeakWebSocketEvents.Flushed),
324                        flushed=fl_result,
325                        **dict(cast(Dict[Any, Any], self._kwargs)),
326                    )
327                case SpeakWebSocketEvents.Cleared:
328                    clear_result: ClearedResponse = ClearedResponse.from_json(message)
329                    self._logger.verbose("ClearedResponse: %s", clear_result)
330                    self._emit(
331                        SpeakWebSocketEvents(SpeakWebSocketEvents.Cleared),
332                        cleared=clear_result,
333                        **dict(cast(Dict[Any, Any], self._kwargs)),
334                    )
335                case SpeakWebSocketEvents.Close:
336                    close_result: CloseResponse = CloseResponse.from_json(message)
337                    self._logger.verbose("CloseResponse: %s", close_result)
338                    self._emit(
339                        SpeakWebSocketEvents(SpeakWebSocketEvents.Close),
340                        close=close_result,
341                        **dict(cast(Dict[Any, Any], self._kwargs)),
342                    )
343                case SpeakWebSocketEvents.Warning:
344                    war_warning: WarningResponse = WarningResponse.from_json(message)
345                    self._logger.verbose("WarningResponse: %s", war_warning)
346                    self._emit(
347                        SpeakWebSocketEvents(SpeakWebSocketEvents.Warning),
348                        warning=war_warning,
349                        **dict(cast(Dict[Any, Any], self._kwargs)),
350                    )
351                case SpeakWebSocketEvents.Error:
352                    err_error: ErrorResponse = ErrorResponse.from_json(message)
353                    self._logger.verbose("ErrorResponse: %s", err_error)
354                    self._emit(
355                        SpeakWebSocketEvents(SpeakWebSocketEvents.Error),
356                        error=err_error,
357                        **dict(cast(Dict[Any, Any], self._kwargs)),
358                    )
359                case _:
360                    self._logger.warning(
361                        "Unknown Message: response_type: %s, data: %s",
362                        response_type,
363                        data,
364                    )
365                    unhandled_error: UnhandledResponse = UnhandledResponse(
366                        type=SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled),
367                        raw=message,
368                    )
369                    self._emit(
370                        SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled),
371                        unhandled=unhandled_error,
372                        **dict(cast(Dict[Any, Any], self._kwargs)),
373                    )
374
375            self._logger.notice("_process_text Succeeded")
376            self._logger.debug("SpeakWebSocketClient._process_text LEAVE")
377
378        except Exception as e:  # pylint: disable=broad-except
379            self._logger.error("Exception in SpeakWebSocketClient._process_text: %s", e)
380            e_error: ErrorResponse = ErrorResponse(
381                "Exception in SpeakWebSocketClient._process_text",
382                f"{e}",
383                "Exception",
384            )
385            self._logger.error(
386                "Exception in SpeakWebSocketClient._process_text: %s", str(e)
387            )
388            self._emit(
389                SpeakWebSocketEvents(SpeakWebSocketEvents.Error),
390                e_error,
391                **dict(cast(Dict[Any, Any], self._kwargs)),
392            )
393
394            # signal exit and close
395            super()._signal_exit()
396
397            self._logger.debug("SpeakWebSocketClient._process_text LEAVE")
398
399            if self._config.options.get("termination_exception") is True:
400                raise
401            return
402
403    # pylint: enable=too-many-return-statements,too-many-statements
404
405    def _process_binary(self, message: bytes) -> None:
406        self._logger.debug("SpeakWebSocketClient._process_binary ENTER")
407        self._logger.debug("Binary data received")
408
409        self._emit(
410            SpeakWebSocketEvents(SpeakWebSocketEvents.AudioData),
411            data=message,
412            **dict(cast(Dict[Any, Any], self._kwargs)),
413        )
414
415        self._logger.notice("_process_binary Succeeded")
416        self._logger.debug("SpeakWebSocketClient._process_binary LEAVE")
417
418    # pylint: disable=too-many-return-statements
419    def _flush(self) -> None:
420        self._logger.debug("SpeakWebSocketClient._flush ENTER")
421
422        delta_in_ms_str = self._config.options.get("auto_flush_speak_delta")
423        if delta_in_ms_str is None:
424            self._logger.error("auto_flush_speak_delta is None")
425            self._logger.debug("SpeakWebSocketClient._flush LEAVE")
426            return
427        delta_in_ms = float(delta_in_ms_str)
428
429        while True:
430            try:
431                self._exit_event.wait(timeout=HALF_SECOND)
432
433                if self._exit_event.is_set():
434                    self._logger.notice("_flush exiting gracefully")
435                    self._logger.debug("ListenWebSocketClient._flush LEAVE")
436                    return
437
438                if self._last_datagram is None:
439                    self._logger.debug("AutoFlush last_datagram is None")
440                    continue
441
442                with self._lock_flush:
443                    delta = datetime.now() - self._last_datagram
444                    diff_in_ms = delta.total_seconds() * 1000
445                    self._logger.debug("AutoFlush delta: %f", diff_in_ms)
446                    if diff_in_ms < delta_in_ms:
447                        self._logger.debug("AutoFlush delta is less than threshold")
448                        continue
449
450                self.flush()
451
452            except Exception as e:  # pylint: disable=broad-except
453                self._logger.error("Exception in SpeakWebSocketClient._flush: %s", e)
454                e_error: ErrorResponse = ErrorResponse(
455                    "Exception in SpeakWebSocketClient._flush",
456                    f"{e}",
457                    "Exception",
458                )
459                self._logger.error(
460                    "Exception in SpeakWebSocketClient._flush: %s", str(e)
461                )
462                self._emit(
463                    SpeakWebSocketEvents(SpeakWebSocketEvents.Error),
464                    error=e_error,
465                    **dict(cast(Dict[Any, Any], self._kwargs)),
466                )
467
468                # signal exit and close
469                super()._signal_exit()
470
471                self._logger.debug("SpeakWebSocketClient._flush LEAVE")
472
473                if self._config.options.get("termination_exception") is True:
474                    raise
475                return
476
477    # pylint: enable=too-many-return-statements
478
479    def send_text(self, text_input: str) -> bool:
480        """
481        Sends text to the WebSocket connection to generate audio.
482
483        Args:
484            text_input (str): The raw text to be synthesized. This function will automatically wrap
485                the text in a JSON object of type "Speak" with the key "text".
486
487        Returns:
488            bool: True if the text was successfully sent, False otherwise.
489        """
490        return self.send_raw(json.dumps({"type": "Speak", "text": text_input}))
491
492    def send(self, data: Union[str, bytes]) -> bool:
493        """
494        Alias for send_text. Please see send_text for more information.
495        """
496        if isinstance(data, bytes):
497            self._logger.error("send() failed - data is bytes")
498            return False
499
500        return self.send_text(data)
501
502    # pylint: disable=unused-argument
503    def send_control(
504        self, msg_type: Union[SpeakWebSocketMessage, str], data: Optional[str] = ""
505    ) -> bool:
506        """
507        Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection.
508
509        Args:
510            msg_type (SpeakWebSocketEvents): The type of control message to send.
511            (Optional) data (str): The data to send with the control message.
512
513        Returns:
514            bool: True if the control message was successfully sent, False otherwise.
515        """
516        control_msg = json.dumps({"type": msg_type})
517        return self.send_raw(control_msg)
518
519    # pylint: enable=unused-argument
520
521    # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
522    def send_raw(self, msg: str) -> bool:
523        """
524        Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object.
525
526        Args:
527            msg (str): The raw message to send over the WebSocket connection.
528
529        Returns:
530            bool: True if the message was successfully sent, False otherwise.
531        """
532        self._logger.spam("SpeakWebSocketClient.send_raw ENTER")
533
534        if self._config.is_inspecting_speak():
535            try:
536                _tmp_json = json.loads(msg)
537                if "type" in _tmp_json:
538                    self._logger.debug(
539                        "Inspecting Message: Sending %s", _tmp_json["type"]
540                    )
541                    match _tmp_json["type"]:
542                        case SpeakWebSocketMessage.Speak:
543                            inspect_res = self._inspect()
544                            if not inspect_res:
545                                self._logger.error("inspect_res failed")
546                        case SpeakWebSocketMessage.Flush:
547                            with self._lock_flush:
548                                self._last_datagram = None
549                                self._flush_count += 1
550                                self._logger.debug(
551                                    "Increment Flush count: %d", self._flush_count
552                                )
553            except Exception as e:  # pylint: disable=broad-except
554                self._logger.error("send_raw() failed - Exception: %s", str(e))
555
556        try:
557            if super().send(msg) is False:
558                self._logger.error("send_raw() failed")
559                self._logger.spam("SpeakWebSocketClient.send_raw LEAVE")
560                return False
561            self._logger.spam("send_raw() succeeded")
562            self._logger.spam("SpeakWebSocketClient.send_raw LEAVE")
563            return True
564        except Exception as e:  # pylint: disable=broad-except
565            self._logger.error("send_raw() failed - Exception: %s", str(e))
566            self._logger.spam("SpeakWebSocketClient.send_raw LEAVE")
567            if self._config.options.get("termination_exception_send") is True:
568                raise
569            return False
570
571    # pylint: enable=too-many-return-statements,too-many-branches
572
573    def flush(self) -> bool:
574        """
575        Flushes the current buffer and returns generated audio
576        """
577        self._logger.spam("SpeakWebSocketClient.flush ENTER")
578
579        self._logger.notice("Sending Flush...")
580        ret = self.send_control(SpeakWebSocketMessage.Flush)
581
582        if not ret:
583            self._logger.error("flush failed")
584            self._logger.spam("SpeakWebSocketClient.flush LEAVE")
585            return False
586
587        self._logger.notice("flush succeeded")
588        self._logger.spam("SpeakWebSocketClient.flush LEAVE")
589
590        return True
591
592    def clear(self) -> bool:
593        """
594        Clears the current buffer on the server
595        """
596        self._logger.spam("SpeakWebSocketClient.clear ENTER")
597
598        self._logger.notice("Sending Clear...")
599        ret = self.send_control(SpeakWebSocketMessage.Clear)
600
601        if not ret:
602            self._logger.error("clear failed")
603            self._logger.spam("SpeakWebSocketClient.clear LEAVE")
604            return False
605
606        self._logger.notice("clear succeeded")
607        self._logger.spam("SpeakWebSocketClient.clear LEAVE")
608
609        return True
610
611    def wait_for_complete(self):
612        """
613        This method will block until the speak is done playing sound.
614        """
615        self._logger.spam("SpeakWebSocketClient.wait_for_complete ENTER")
616
617        if self._speaker is None:
618            self._logger.error("speaker is None. Return immediately")
619            raise DeepgramError("Speaker is not initialized")
620
621        self._speaker.wait_for_complete()
622        self._logger.notice("wait_for_complete succeeded")
623        self._logger.spam("SpeakWebSocketClient.wait_for_complete LEAVE")
624
625    def _close_message(self) -> bool:
626        return self.send_control(SpeakWebSocketMessage.Close)
627
628    # closes the WebSocket connection gracefully
629    def finish(self) -> bool:
630        """
631        Closes the WebSocket connection gracefully.
632        """
633        self._logger.spam("SpeakWebSocketClient.finish ENTER")
634
635        # call parent finish which calls signal_exit
636        if super().finish() is False:
637            self._logger.error("ListenWebSocketClient.finish failed")
638
639        if self._speaker is not None and self._speaker_created:
640            self._speaker.finish()
641            self._speaker_created = False
642
643        # debug the threads
644        for thread in threading.enumerate():
645            self._logger.debug("before running thread: %s", thread.name)
646        self._logger.debug("number of active threads: %s", threading.active_count())
647
648        # stop the threads
649        if self._speaker is not None:
650            self._logger.verbose("stopping speaker...")
651            self._speaker.finish()
652            self._speaker = None
653            self._logger.notice("speaker stopped")
654
655        if self._flush_thread is not None:
656            self._logger.verbose("sdtopping _flush_thread...")
657            self._flush_thread.join()
658            self._flush_thread = None
659            self._logger.notice("_flush_thread joined")
660
661        # debug the threads
662        for thread in threading.enumerate():
663            self._logger.debug("before running thread: %s", thread.name)
664        self._logger.debug("number of active threads: %s", threading.active_count())
665
666        self._logger.notice("finish succeeded")
667        self._logger.spam("SpeakWebSocketClient.finish LEAVE")
668        return True
669
670    def _inspect(self) -> bool:
671        # auto flush_inspect is generically used to track any messages you might want to snoop on
672        # place additional logic here to inspect messages of interest
673
674        # for auto flush functionality
675        # set the last datagram
676        with self._lock_flush:
677            self._last_datagram = datetime.now()
678            self._logger.debug(
679                "AutoFlush last received: %s",
680                str(self._last_datagram),
681            )
682
683        return True
684
685
686SpeakWebSocketClient = SpeakWSClient
ONE_SECOND = 1
HALF_SECOND = 0.5
DEEPGRAM_INTERVAL = 5
PING_INTERVAL = 20
 40class SpeakWSClient(
 41    AbstractSyncWebSocketClient
 42):  # pylint: disable=too-many-instance-attributes
 43    """
 44    Client for interacting with Deepgram's text-to-speech services over WebSockets.
 45
 46     This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events.
 47
 48     Args:
 49         config (DeepgramClientOptions): all the options for the client.
 50    """
 51
 52    _logger: verboselogs.VerboseLogger
 53    _config: DeepgramClientOptions
 54    _endpoint: str
 55
 56    _event_handlers: Dict[SpeakWebSocketEvents, list]
 57
 58    _flush_thread: Union[threading.Thread, None]
 59    _lock_flush: threading.Lock
 60    _last_datagram: Optional[datetime] = None
 61    _flush_count: int
 62
 63    _kwargs: Optional[Dict] = None
 64    _addons: Optional[Dict] = None
 65    _options: Optional[Dict] = None
 66    _headers: Optional[Dict] = None
 67
 68    _speaker_created: bool = False
 69    _speaker: Optional[Speaker] = None
 70    _microphone: Optional[Microphone] = None
 71
 72    def __init__(
 73        self, config: DeepgramClientOptions, microphone: Optional[Microphone] = None
 74    ):
 75        if config is None:
 76            raise DeepgramError("Config is required")
 77
 78        self._logger = verboselogs.VerboseLogger(__name__)
 79        self._logger.addHandler(logging.StreamHandler())
 80        self._logger.setLevel(config.verbose)
 81
 82        self._config = config
 83        self._endpoint = "v1/speak"
 84        self._lock_flush = threading.Lock()
 85
 86        self._flush_thread = None
 87
 88        # auto flush
 89        self._last_datagram = None
 90        self._flush_count = 0
 91
 92        # microphone
 93        self._microphone = microphone
 94
 95        # init handlers
 96        self._event_handlers = {
 97            event: [] for event in SpeakWebSocketEvents.__members__.values()
 98        }
 99
100        if self._config.options.get("speaker_playback") == "true":
101            self._logger.info("speaker_playback is enabled")
102            rate = self._config.options.get("speaker_playback_rate")
103            if rate is None:
104                rate = RATE
105            channels = self._config.options.get("speaker_playback_channels")
106            if channels is None:
107                channels = CHANNELS
108            playback_delta_in_ms = self._config.options.get(
109                "speaker_playback_delta_in_ms"
110            )
111            if playback_delta_in_ms is None:
112                playback_delta_in_ms = PLAYBACK_DELTA
113            device_index = self._config.options.get("speaker_playback_device_index")
114
115            self._logger.debug("rate: %s", rate)
116            self._logger.debug("channels: %s", channels)
117            self._logger.debug("device_index: %s", device_index)
118
119            self._speaker_created = True
120
121            if device_index is not None:
122                self._speaker = Speaker(
123                    rate=rate,
124                    channels=channels,
125                    last_play_delta_in_ms=playback_delta_in_ms,
126                    verbose=self._config.verbose,
127                    output_device_index=device_index,
128                    microphone=self._microphone,
129                )
130            else:
131                self._speaker = Speaker(
132                    rate=rate,
133                    channels=channels,
134                    last_play_delta_in_ms=playback_delta_in_ms,
135                    verbose=self._config.verbose,
136                    microphone=self._microphone,
137                )
138
139        # call the parent constructor
140        super().__init__(self._config, self._endpoint)
141
142    # pylint: disable=too-many-statements,too-many-branches
143    def start(
144        self,
145        options: Optional[Union[SpeakWSOptions, Dict]] = None,
146        addons: Optional[Dict] = None,
147        headers: Optional[Dict] = None,
148        members: Optional[Dict] = None,
149        **kwargs,
150    ) -> bool:
151        """
152        Starts the WebSocket connection for text-to-speech synthesis.
153        """
154        self._logger.debug("SpeakWebSocketClient.start ENTER")
155        self._logger.info("options: %s", options)
156        self._logger.info("addons: %s", addons)
157        self._logger.info("headers: %s", headers)
158        self._logger.info("members: %s", members)
159        self._logger.info("kwargs: %s", kwargs)
160
161        if isinstance(options, SpeakWSOptions) and not options.check():
162            self._logger.error("options.check failed")
163            self._logger.debug("SpeakWebSocketClient.start LEAVE")
164            raise DeepgramError("Fatal text-to-speech options error")
165
166        self._addons = addons
167        self._headers = headers
168
169        # add "members" as members of the class
170        if members is not None:
171            self.__dict__.update(members)
172
173        # set kwargs as members of the class
174        if kwargs is not None:
175            self._kwargs = kwargs
176        else:
177            self._kwargs = {}
178
179        if isinstance(options, SpeakWSOptions):
180            self._logger.info("SpeakWSOptions switching class -> dict")
181            self._options = options.to_dict()
182        elif options is not None:
183            self._options = options
184        else:
185            self._options = {}
186
187        try:
188            # speaker substitutes the listening thread
189            if self._speaker is not None:
190                self._logger.notice("passing speaker to delegate_listening")
191                super().delegate_listening(self._speaker)
192
193            # call parent start
194            if (
195                super().start(
196                    self._options,
197                    self._addons,
198                    self._headers,
199                    **dict(cast(Dict[Any, Any], self._kwargs)),
200                )
201                is False
202            ):
203                self._logger.error("SpeakWebSocketClient.start failed")
204                self._logger.debug("SpeakWebSocketClient.start LEAVE")
205                return False
206
207            if self._speaker is not None:
208                self._logger.notice("start delegate_listening thread")
209                self._speaker.start()
210
211            # debug the threads
212            for thread in threading.enumerate():
213                self._logger.debug("after running thread: %s", thread.name)
214            self._logger.debug("number of active threads: %s", threading.active_count())
215
216            # flush thread
217            if self._config.is_auto_flush_speak_enabled():
218                self._logger.notice("autoflush is enabled")
219                self._flush_thread = threading.Thread(target=self._flush)
220                self._flush_thread.start()
221            else:
222                self._logger.notice("autoflush is disabled")
223
224            # debug the threads
225            for thread in threading.enumerate():
226                self._logger.debug("after running thread: %s", thread.name)
227            self._logger.debug("number of active threads: %s", threading.active_count())
228
229            self._logger.notice("start succeeded")
230            self._logger.debug("SpeakWebSocketClient.start LEAVE")
231            return True
232
233        except Exception as e:  # pylint: disable=broad-except
234            self._logger.error(
235                "WebSocketException in SpeakWebSocketClient.start: %s", e
236            )
237            self._logger.debug("SpeakWebSocketClient.start LEAVE")
238            if self._config.options.get("termination_exception_connect") is True:
239                raise
240            return False
241
242    # pylint: enable=too-many-statements,too-many-branches
243
244    def on(self, event: SpeakWebSocketEvents, handler: Callable) -> None:
245        """
246        Registers event handlers for specific events.
247        """
248        self._logger.info("event subscribed: %s", event)
249        if event in SpeakWebSocketEvents.__members__.values() and callable(handler):
250            self._event_handlers[event].append(handler)
251
252    def _emit(self, event: SpeakWebSocketEvents, *args, **kwargs) -> None:
253        """
254        Emits events to the registered event handlers.
255        """
256        self._logger.debug("SpeakWebSocketClient._emit ENTER")
257        self._logger.debug("callback handlers for: %s", event)
258
259        # debug the threads
260        for thread in threading.enumerate():
261            self._logger.debug("after running thread: %s", thread.name)
262        self._logger.debug("number of active threads: %s", threading.active_count())
263
264        self._logger.debug("callback handlers for: %s", event)
265        for handler in self._event_handlers[event]:
266            handler(self, *args, **kwargs)
267
268        # debug the threads
269        for thread in threading.enumerate():
270            self._logger.debug("after running thread: %s", thread.name)
271        self._logger.debug("number of active threads: %s", threading.active_count())
272
273        self._logger.debug("ListenWebSocketClient._emit LEAVE")
274
275    def _process_text(self, message: str) -> None:
276        """
277        Processes messages received over the WebSocket connection.
278        """
279        self._logger.debug("SpeakWebSocketClient._process_text ENTER")
280
281        try:
282            self._logger.debug("Text data received")
283
284            if len(message) == 0:
285                self._logger.debug("message is empty")
286                self._logger.debug("SpeakWebSocketClient._process_text LEAVE")
287                return
288
289            data = json.loads(message)
290            response_type = data.get("type")
291            self._logger.debug("response_type: %s, data: %s", response_type, data)
292
293            match response_type:
294                case SpeakWebSocketEvents.Open:
295                    open_result: OpenResponse = OpenResponse.from_json(message)
296                    self._logger.verbose("OpenResponse: %s", open_result)
297                    self._emit(
298                        SpeakWebSocketEvents(SpeakWebSocketEvents.Open),
299                        open=open_result,
300                        **dict(cast(Dict[Any, Any], self._kwargs)),
301                    )
302                case SpeakWebSocketEvents.Metadata:
303                    meta_result: MetadataResponse = MetadataResponse.from_json(message)
304                    self._logger.verbose("MetadataResponse: %s", meta_result)
305                    self._emit(
306                        SpeakWebSocketEvents(SpeakWebSocketEvents.Metadata),
307                        metadata=meta_result,
308                        **dict(cast(Dict[Any, Any], self._kwargs)),
309                    )
310                case SpeakWebSocketEvents.Flushed:
311                    fl_result: FlushedResponse = FlushedResponse.from_json(message)
312                    self._logger.verbose("FlushedResponse: %s", fl_result)
313
314                    # auto flush
315                    if self._config.is_inspecting_speak():
316                        with self._lock_flush:
317                            self._flush_count -= 1
318                            self._logger.debug(
319                                "Decrement Flush count: %d",
320                                self._flush_count,
321                            )
322
323                    self._emit(
324                        SpeakWebSocketEvents(SpeakWebSocketEvents.Flushed),
325                        flushed=fl_result,
326                        **dict(cast(Dict[Any, Any], self._kwargs)),
327                    )
328                case SpeakWebSocketEvents.Cleared:
329                    clear_result: ClearedResponse = ClearedResponse.from_json(message)
330                    self._logger.verbose("ClearedResponse: %s", clear_result)
331                    self._emit(
332                        SpeakWebSocketEvents(SpeakWebSocketEvents.Cleared),
333                        cleared=clear_result,
334                        **dict(cast(Dict[Any, Any], self._kwargs)),
335                    )
336                case SpeakWebSocketEvents.Close:
337                    close_result: CloseResponse = CloseResponse.from_json(message)
338                    self._logger.verbose("CloseResponse: %s", close_result)
339                    self._emit(
340                        SpeakWebSocketEvents(SpeakWebSocketEvents.Close),
341                        close=close_result,
342                        **dict(cast(Dict[Any, Any], self._kwargs)),
343                    )
344                case SpeakWebSocketEvents.Warning:
345                    war_warning: WarningResponse = WarningResponse.from_json(message)
346                    self._logger.verbose("WarningResponse: %s", war_warning)
347                    self._emit(
348                        SpeakWebSocketEvents(SpeakWebSocketEvents.Warning),
349                        warning=war_warning,
350                        **dict(cast(Dict[Any, Any], self._kwargs)),
351                    )
352                case SpeakWebSocketEvents.Error:
353                    err_error: ErrorResponse = ErrorResponse.from_json(message)
354                    self._logger.verbose("ErrorResponse: %s", err_error)
355                    self._emit(
356                        SpeakWebSocketEvents(SpeakWebSocketEvents.Error),
357                        error=err_error,
358                        **dict(cast(Dict[Any, Any], self._kwargs)),
359                    )
360                case _:
361                    self._logger.warning(
362                        "Unknown Message: response_type: %s, data: %s",
363                        response_type,
364                        data,
365                    )
366                    unhandled_error: UnhandledResponse = UnhandledResponse(
367                        type=SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled),
368                        raw=message,
369                    )
370                    self._emit(
371                        SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled),
372                        unhandled=unhandled_error,
373                        **dict(cast(Dict[Any, Any], self._kwargs)),
374                    )
375
376            self._logger.notice("_process_text Succeeded")
377            self._logger.debug("SpeakWebSocketClient._process_text LEAVE")
378
379        except Exception as e:  # pylint: disable=broad-except
380            self._logger.error("Exception in SpeakWebSocketClient._process_text: %s", e)
381            e_error: ErrorResponse = ErrorResponse(
382                "Exception in SpeakWebSocketClient._process_text",
383                f"{e}",
384                "Exception",
385            )
386            self._logger.error(
387                "Exception in SpeakWebSocketClient._process_text: %s", str(e)
388            )
389            self._emit(
390                SpeakWebSocketEvents(SpeakWebSocketEvents.Error),
391                e_error,
392                **dict(cast(Dict[Any, Any], self._kwargs)),
393            )
394
395            # signal exit and close
396            super()._signal_exit()
397
398            self._logger.debug("SpeakWebSocketClient._process_text LEAVE")
399
400            if self._config.options.get("termination_exception") is True:
401                raise
402            return
403
404    # pylint: enable=too-many-return-statements,too-many-statements
405
406    def _process_binary(self, message: bytes) -> None:
407        self._logger.debug("SpeakWebSocketClient._process_binary ENTER")
408        self._logger.debug("Binary data received")
409
410        self._emit(
411            SpeakWebSocketEvents(SpeakWebSocketEvents.AudioData),
412            data=message,
413            **dict(cast(Dict[Any, Any], self._kwargs)),
414        )
415
416        self._logger.notice("_process_binary Succeeded")
417        self._logger.debug("SpeakWebSocketClient._process_binary LEAVE")
418
419    # pylint: disable=too-many-return-statements
420    def _flush(self) -> None:
421        self._logger.debug("SpeakWebSocketClient._flush ENTER")
422
423        delta_in_ms_str = self._config.options.get("auto_flush_speak_delta")
424        if delta_in_ms_str is None:
425            self._logger.error("auto_flush_speak_delta is None")
426            self._logger.debug("SpeakWebSocketClient._flush LEAVE")
427            return
428        delta_in_ms = float(delta_in_ms_str)
429
430        while True:
431            try:
432                self._exit_event.wait(timeout=HALF_SECOND)
433
434                if self._exit_event.is_set():
435                    self._logger.notice("_flush exiting gracefully")
436                    self._logger.debug("ListenWebSocketClient._flush LEAVE")
437                    return
438
439                if self._last_datagram is None:
440                    self._logger.debug("AutoFlush last_datagram is None")
441                    continue
442
443                with self._lock_flush:
444                    delta = datetime.now() - self._last_datagram
445                    diff_in_ms = delta.total_seconds() * 1000
446                    self._logger.debug("AutoFlush delta: %f", diff_in_ms)
447                    if diff_in_ms < delta_in_ms:
448                        self._logger.debug("AutoFlush delta is less than threshold")
449                        continue
450
451                self.flush()
452
453            except Exception as e:  # pylint: disable=broad-except
454                self._logger.error("Exception in SpeakWebSocketClient._flush: %s", e)
455                e_error: ErrorResponse = ErrorResponse(
456                    "Exception in SpeakWebSocketClient._flush",
457                    f"{e}",
458                    "Exception",
459                )
460                self._logger.error(
461                    "Exception in SpeakWebSocketClient._flush: %s", str(e)
462                )
463                self._emit(
464                    SpeakWebSocketEvents(SpeakWebSocketEvents.Error),
465                    error=e_error,
466                    **dict(cast(Dict[Any, Any], self._kwargs)),
467                )
468
469                # signal exit and close
470                super()._signal_exit()
471
472                self._logger.debug("SpeakWebSocketClient._flush LEAVE")
473
474                if self._config.options.get("termination_exception") is True:
475                    raise
476                return
477
478    # pylint: enable=too-many-return-statements
479
480    def send_text(self, text_input: str) -> bool:
481        """
482        Sends text to the WebSocket connection to generate audio.
483
484        Args:
485            text_input (str): The raw text to be synthesized. This function will automatically wrap
486                the text in a JSON object of type "Speak" with the key "text".
487
488        Returns:
489            bool: True if the text was successfully sent, False otherwise.
490        """
491        return self.send_raw(json.dumps({"type": "Speak", "text": text_input}))
492
493    def send(self, data: Union[str, bytes]) -> bool:
494        """
495        Alias for send_text. Please see send_text for more information.
496        """
497        if isinstance(data, bytes):
498            self._logger.error("send() failed - data is bytes")
499            return False
500
501        return self.send_text(data)
502
503    # pylint: disable=unused-argument
504    def send_control(
505        self, msg_type: Union[SpeakWebSocketMessage, str], data: Optional[str] = ""
506    ) -> bool:
507        """
508        Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection.
509
510        Args:
511            msg_type (SpeakWebSocketEvents): The type of control message to send.
512            (Optional) data (str): The data to send with the control message.
513
514        Returns:
515            bool: True if the control message was successfully sent, False otherwise.
516        """
517        control_msg = json.dumps({"type": msg_type})
518        return self.send_raw(control_msg)
519
520    # pylint: enable=unused-argument
521
522    # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements
523    def send_raw(self, msg: str) -> bool:
524        """
525        Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object.
526
527        Args:
528            msg (str): The raw message to send over the WebSocket connection.
529
530        Returns:
531            bool: True if the message was successfully sent, False otherwise.
532        """
533        self._logger.spam("SpeakWebSocketClient.send_raw ENTER")
534
535        if self._config.is_inspecting_speak():
536            try:
537                _tmp_json = json.loads(msg)
538                if "type" in _tmp_json:
539                    self._logger.debug(
540                        "Inspecting Message: Sending %s", _tmp_json["type"]
541                    )
542                    match _tmp_json["type"]:
543                        case SpeakWebSocketMessage.Speak:
544                            inspect_res = self._inspect()
545                            if not inspect_res:
546                                self._logger.error("inspect_res failed")
547                        case SpeakWebSocketMessage.Flush:
548                            with self._lock_flush:
549                                self._last_datagram = None
550                                self._flush_count += 1
551                                self._logger.debug(
552                                    "Increment Flush count: %d", self._flush_count
553                                )
554            except Exception as e:  # pylint: disable=broad-except
555                self._logger.error("send_raw() failed - Exception: %s", str(e))
556
557        try:
558            if super().send(msg) is False:
559                self._logger.error("send_raw() failed")
560                self._logger.spam("SpeakWebSocketClient.send_raw LEAVE")
561                return False
562            self._logger.spam("send_raw() succeeded")
563            self._logger.spam("SpeakWebSocketClient.send_raw LEAVE")
564            return True
565        except Exception as e:  # pylint: disable=broad-except
566            self._logger.error("send_raw() failed - Exception: %s", str(e))
567            self._logger.spam("SpeakWebSocketClient.send_raw LEAVE")
568            if self._config.options.get("termination_exception_send") is True:
569                raise
570            return False
571
572    # pylint: enable=too-many-return-statements,too-many-branches
573
574    def flush(self) -> bool:
575        """
576        Flushes the current buffer and returns generated audio
577        """
578        self._logger.spam("SpeakWebSocketClient.flush ENTER")
579
580        self._logger.notice("Sending Flush...")
581        ret = self.send_control(SpeakWebSocketMessage.Flush)
582
583        if not ret:
584            self._logger.error("flush failed")
585            self._logger.spam("SpeakWebSocketClient.flush LEAVE")
586            return False
587
588        self._logger.notice("flush succeeded")
589        self._logger.spam("SpeakWebSocketClient.flush LEAVE")
590
591        return True
592
593    def clear(self) -> bool:
594        """
595        Clears the current buffer on the server
596        """
597        self._logger.spam("SpeakWebSocketClient.clear ENTER")
598
599        self._logger.notice("Sending Clear...")
600        ret = self.send_control(SpeakWebSocketMessage.Clear)
601
602        if not ret:
603            self._logger.error("clear failed")
604            self._logger.spam("SpeakWebSocketClient.clear LEAVE")
605            return False
606
607        self._logger.notice("clear succeeded")
608        self._logger.spam("SpeakWebSocketClient.clear LEAVE")
609
610        return True
611
612    def wait_for_complete(self):
613        """
614        This method will block until the speak is done playing sound.
615        """
616        self._logger.spam("SpeakWebSocketClient.wait_for_complete ENTER")
617
618        if self._speaker is None:
619            self._logger.error("speaker is None. Return immediately")
620            raise DeepgramError("Speaker is not initialized")
621
622        self._speaker.wait_for_complete()
623        self._logger.notice("wait_for_complete succeeded")
624        self._logger.spam("SpeakWebSocketClient.wait_for_complete LEAVE")
625
626    def _close_message(self) -> bool:
627        return self.send_control(SpeakWebSocketMessage.Close)
628
629    # closes the WebSocket connection gracefully
630    def finish(self) -> bool:
631        """
632        Closes the WebSocket connection gracefully.
633        """
634        self._logger.spam("SpeakWebSocketClient.finish ENTER")
635
636        # call parent finish which calls signal_exit
637        if super().finish() is False:
638            self._logger.error("ListenWebSocketClient.finish failed")
639
640        if self._speaker is not None and self._speaker_created:
641            self._speaker.finish()
642            self._speaker_created = False
643
644        # debug the threads
645        for thread in threading.enumerate():
646            self._logger.debug("before running thread: %s", thread.name)
647        self._logger.debug("number of active threads: %s", threading.active_count())
648
649        # stop the threads
650        if self._speaker is not None:
651            self._logger.verbose("stopping speaker...")
652            self._speaker.finish()
653            self._speaker = None
654            self._logger.notice("speaker stopped")
655
656        if self._flush_thread is not None:
657            self._logger.verbose("sdtopping _flush_thread...")
658            self._flush_thread.join()
659            self._flush_thread = None
660            self._logger.notice("_flush_thread joined")
661
662        # debug the threads
663        for thread in threading.enumerate():
664            self._logger.debug("before running thread: %s", thread.name)
665        self._logger.debug("number of active threads: %s", threading.active_count())
666
667        self._logger.notice("finish succeeded")
668        self._logger.spam("SpeakWebSocketClient.finish LEAVE")
669        return True
670
671    def _inspect(self) -> bool:
672        # auto flush_inspect is generically used to track any messages you might want to snoop on
673        # place additional logic here to inspect messages of interest
674
675        # for auto flush functionality
676        # set the last datagram
677        with self._lock_flush:
678            self._last_datagram = datetime.now()
679            self._logger.debug(
680                "AutoFlush last received: %s",
681                str(self._last_datagram),
682            )
683
684        return True

Client for interacting with Deepgram's text-to-speech services over WebSockets.

This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events.

Args: config (DeepgramClientOptions): all the options for the client.

SpeakWSClient( config: deepgram.options.DeepgramClientOptions, microphone: Optional[deepgram.audio.microphone.microphone.Microphone] = None)
 72    def __init__(
 73        self, config: DeepgramClientOptions, microphone: Optional[Microphone] = None
 74    ):
 75        if config is None:
 76            raise DeepgramError("Config is required")
 77
 78        self._logger = verboselogs.VerboseLogger(__name__)
 79        self._logger.addHandler(logging.StreamHandler())
 80        self._logger.setLevel(config.verbose)
 81
 82        self._config = config
 83        self._endpoint = "v1/speak"
 84        self._lock_flush = threading.Lock()
 85
 86        self._flush_thread = None
 87
 88        # auto flush
 89        self._last_datagram = None
 90        self._flush_count = 0
 91
 92        # microphone
 93        self._microphone = microphone
 94
 95        # init handlers
 96        self._event_handlers = {
 97            event: [] for event in SpeakWebSocketEvents.__members__.values()
 98        }
 99
100        if self._config.options.get("speaker_playback") == "true":
101            self._logger.info("speaker_playback is enabled")
102            rate = self._config.options.get("speaker_playback_rate")
103            if rate is None:
104                rate = RATE
105            channels = self._config.options.get("speaker_playback_channels")
106            if channels is None:
107                channels = CHANNELS
108            playback_delta_in_ms = self._config.options.get(
109                "speaker_playback_delta_in_ms"
110            )
111            if playback_delta_in_ms is None:
112                playback_delta_in_ms = PLAYBACK_DELTA
113            device_index = self._config.options.get("speaker_playback_device_index")
114
115            self._logger.debug("rate: %s", rate)
116            self._logger.debug("channels: %s", channels)
117            self._logger.debug("device_index: %s", device_index)
118
119            self._speaker_created = True
120
121            if device_index is not None:
122                self._speaker = Speaker(
123                    rate=rate,
124                    channels=channels,
125                    last_play_delta_in_ms=playback_delta_in_ms,
126                    verbose=self._config.verbose,
127                    output_device_index=device_index,
128                    microphone=self._microphone,
129                )
130            else:
131                self._speaker = Speaker(
132                    rate=rate,
133                    channels=channels,
134                    last_play_delta_in_ms=playback_delta_in_ms,
135                    verbose=self._config.verbose,
136                    microphone=self._microphone,
137                )
138
139        # call the parent constructor
140        super().__init__(self._config, self._endpoint)
def start( self, options: Union[deepgram.clients.speak.v1.websocket.options.SpeakWSOptions, Dict, NoneType] = None, addons: Optional[Dict] = None, headers: Optional[Dict] = None, members: Optional[Dict] = None, **kwargs) -> bool:
143    def start(
144        self,
145        options: Optional[Union[SpeakWSOptions, Dict]] = None,
146        addons: Optional[Dict] = None,
147        headers: Optional[Dict] = None,
148        members: Optional[Dict] = None,
149        **kwargs,
150    ) -> bool:
151        """
152        Starts the WebSocket connection for text-to-speech synthesis.
153        """
154        self._logger.debug("SpeakWebSocketClient.start ENTER")
155        self._logger.info("options: %s", options)
156        self._logger.info("addons: %s", addons)
157        self._logger.info("headers: %s", headers)
158        self._logger.info("members: %s", members)
159        self._logger.info("kwargs: %s", kwargs)
160
161        if isinstance(options, SpeakWSOptions) and not options.check():
162            self._logger.error("options.check failed")
163            self._logger.debug("SpeakWebSocketClient.start LEAVE")
164            raise DeepgramError("Fatal text-to-speech options error")
165
166        self._addons = addons
167        self._headers = headers
168
169        # add "members" as members of the class
170        if members is not None:
171            self.__dict__.update(members)
172
173        # set kwargs as members of the class
174        if kwargs is not None:
175            self._kwargs = kwargs
176        else:
177            self._kwargs = {}
178
179        if isinstance(options, SpeakWSOptions):
180            self._logger.info("SpeakWSOptions switching class -> dict")
181            self._options = options.to_dict()
182        elif options is not None:
183            self._options = options
184        else:
185            self._options = {}
186
187        try:
188            # speaker substitutes the listening thread
189            if self._speaker is not None:
190                self._logger.notice("passing speaker to delegate_listening")
191                super().delegate_listening(self._speaker)
192
193            # call parent start
194            if (
195                super().start(
196                    self._options,
197                    self._addons,
198                    self._headers,
199                    **dict(cast(Dict[Any, Any], self._kwargs)),
200                )
201                is False
202            ):
203                self._logger.error("SpeakWebSocketClient.start failed")
204                self._logger.debug("SpeakWebSocketClient.start LEAVE")
205                return False
206
207            if self._speaker is not None:
208                self._logger.notice("start delegate_listening thread")
209                self._speaker.start()
210
211            # debug the threads
212            for thread in threading.enumerate():
213                self._logger.debug("after running thread: %s", thread.name)
214            self._logger.debug("number of active threads: %s", threading.active_count())
215
216            # flush thread
217            if self._config.is_auto_flush_speak_enabled():
218                self._logger.notice("autoflush is enabled")
219                self._flush_thread = threading.Thread(target=self._flush)
220                self._flush_thread.start()
221            else:
222                self._logger.notice("autoflush is disabled")
223
224            # debug the threads
225            for thread in threading.enumerate():
226                self._logger.debug("after running thread: %s", thread.name)
227            self._logger.debug("number of active threads: %s", threading.active_count())
228
229            self._logger.notice("start succeeded")
230            self._logger.debug("SpeakWebSocketClient.start LEAVE")
231            return True
232
233        except Exception as e:  # pylint: disable=broad-except
234            self._logger.error(
235                "WebSocketException in SpeakWebSocketClient.start: %s", e
236            )
237            self._logger.debug("SpeakWebSocketClient.start LEAVE")
238            if self._config.options.get("termination_exception_connect") is True:
239                raise
240            return False

Starts the WebSocket connection for text-to-speech synthesis.

def on( self, event: deepgram.clients.speak.enums.SpeakWebSocketEvents, handler: Callable) -> None:
244    def on(self, event: SpeakWebSocketEvents, handler: Callable) -> None:
245        """
246        Registers event handlers for specific events.
247        """
248        self._logger.info("event subscribed: %s", event)
249        if event in SpeakWebSocketEvents.__members__.values() and callable(handler):
250            self._event_handlers[event].append(handler)

Registers event handlers for specific events.

def send_text(self, text_input: str) -> bool:
480    def send_text(self, text_input: str) -> bool:
481        """
482        Sends text to the WebSocket connection to generate audio.
483
484        Args:
485            text_input (str): The raw text to be synthesized. This function will automatically wrap
486                the text in a JSON object of type "Speak" with the key "text".
487
488        Returns:
489            bool: True if the text was successfully sent, False otherwise.
490        """
491        return self.send_raw(json.dumps({"type": "Speak", "text": text_input}))

Sends text to the WebSocket connection to generate audio.

Args: text_input (str): The raw text to be synthesized. This function will automatically wrap the text in a JSON object of type "Speak" with the key "text".

Returns: bool: True if the text was successfully sent, False otherwise.

def send(self, data: Union[str, bytes]) -> bool:
493    def send(self, data: Union[str, bytes]) -> bool:
494        """
495        Alias for send_text. Please see send_text for more information.
496        """
497        if isinstance(data, bytes):
498            self._logger.error("send() failed - data is bytes")
499            return False
500
501        return self.send_text(data)

Alias for send_text. Please see send_text for more information.

def send_control( self, msg_type: Union[deepgram.clients.speak.enums.SpeakWebSocketMessage, str], data: Optional[str] = '') -> bool:
504    def send_control(
505        self, msg_type: Union[SpeakWebSocketMessage, str], data: Optional[str] = ""
506    ) -> bool:
507        """
508        Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection.
509
510        Args:
511            msg_type (SpeakWebSocketEvents): The type of control message to send.
512            (Optional) data (str): The data to send with the control message.
513
514        Returns:
515            bool: True if the control message was successfully sent, False otherwise.
516        """
517        control_msg = json.dumps({"type": msg_type})
518        return self.send_raw(control_msg)

Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection.

Args: msg_type (SpeakWebSocketEvents): The type of control message to send. (Optional) data (str): The data to send with the control message.

Returns: bool: True if the control message was successfully sent, False otherwise.

def send_raw(self, msg: str) -> bool:
523    def send_raw(self, msg: str) -> bool:
524        """
525        Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object.
526
527        Args:
528            msg (str): The raw message to send over the WebSocket connection.
529
530        Returns:
531            bool: True if the message was successfully sent, False otherwise.
532        """
533        self._logger.spam("SpeakWebSocketClient.send_raw ENTER")
534
535        if self._config.is_inspecting_speak():
536            try:
537                _tmp_json = json.loads(msg)
538                if "type" in _tmp_json:
539                    self._logger.debug(
540                        "Inspecting Message: Sending %s", _tmp_json["type"]
541                    )
542                    match _tmp_json["type"]:
543                        case SpeakWebSocketMessage.Speak:
544                            inspect_res = self._inspect()
545                            if not inspect_res:
546                                self._logger.error("inspect_res failed")
547                        case SpeakWebSocketMessage.Flush:
548                            with self._lock_flush:
549                                self._last_datagram = None
550                                self._flush_count += 1
551                                self._logger.debug(
552                                    "Increment Flush count: %d", self._flush_count
553                                )
554            except Exception as e:  # pylint: disable=broad-except
555                self._logger.error("send_raw() failed - Exception: %s", str(e))
556
557        try:
558            if super().send(msg) is False:
559                self._logger.error("send_raw() failed")
560                self._logger.spam("SpeakWebSocketClient.send_raw LEAVE")
561                return False
562            self._logger.spam("send_raw() succeeded")
563            self._logger.spam("SpeakWebSocketClient.send_raw LEAVE")
564            return True
565        except Exception as e:  # pylint: disable=broad-except
566            self._logger.error("send_raw() failed - Exception: %s", str(e))
567            self._logger.spam("SpeakWebSocketClient.send_raw LEAVE")
568            if self._config.options.get("termination_exception_send") is True:
569                raise
570            return False

Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object.

Args: msg (str): The raw message to send over the WebSocket connection.

Returns: bool: True if the message was successfully sent, False otherwise.

def flush(self) -> bool:
574    def flush(self) -> bool:
575        """
576        Flushes the current buffer and returns generated audio
577        """
578        self._logger.spam("SpeakWebSocketClient.flush ENTER")
579
580        self._logger.notice("Sending Flush...")
581        ret = self.send_control(SpeakWebSocketMessage.Flush)
582
583        if not ret:
584            self._logger.error("flush failed")
585            self._logger.spam("SpeakWebSocketClient.flush LEAVE")
586            return False
587
588        self._logger.notice("flush succeeded")
589        self._logger.spam("SpeakWebSocketClient.flush LEAVE")
590
591        return True

Flushes the current buffer and returns generated audio

def clear(self) -> bool:
593    def clear(self) -> bool:
594        """
595        Clears the current buffer on the server
596        """
597        self._logger.spam("SpeakWebSocketClient.clear ENTER")
598
599        self._logger.notice("Sending Clear...")
600        ret = self.send_control(SpeakWebSocketMessage.Clear)
601
602        if not ret:
603            self._logger.error("clear failed")
604            self._logger.spam("SpeakWebSocketClient.clear LEAVE")
605            return False
606
607        self._logger.notice("clear succeeded")
608        self._logger.spam("SpeakWebSocketClient.clear LEAVE")
609
610        return True

Clears the current buffer on the server

def wait_for_complete(self):
612    def wait_for_complete(self):
613        """
614        This method will block until the speak is done playing sound.
615        """
616        self._logger.spam("SpeakWebSocketClient.wait_for_complete ENTER")
617
618        if self._speaker is None:
619            self._logger.error("speaker is None. Return immediately")
620            raise DeepgramError("Speaker is not initialized")
621
622        self._speaker.wait_for_complete()
623        self._logger.notice("wait_for_complete succeeded")
624        self._logger.spam("SpeakWebSocketClient.wait_for_complete LEAVE")

This method will block until the speak is done playing sound.

def finish(self) -> bool:
630    def finish(self) -> bool:
631        """
632        Closes the WebSocket connection gracefully.
633        """
634        self._logger.spam("SpeakWebSocketClient.finish ENTER")
635
636        # call parent finish which calls signal_exit
637        if super().finish() is False:
638            self._logger.error("ListenWebSocketClient.finish failed")
639
640        if self._speaker is not None and self._speaker_created:
641            self._speaker.finish()
642            self._speaker_created = False
643
644        # debug the threads
645        for thread in threading.enumerate():
646            self._logger.debug("before running thread: %s", thread.name)
647        self._logger.debug("number of active threads: %s", threading.active_count())
648
649        # stop the threads
650        if self._speaker is not None:
651            self._logger.verbose("stopping speaker...")
652            self._speaker.finish()
653            self._speaker = None
654            self._logger.notice("speaker stopped")
655
656        if self._flush_thread is not None:
657            self._logger.verbose("sdtopping _flush_thread...")
658            self._flush_thread.join()
659            self._flush_thread = None
660            self._logger.notice("_flush_thread joined")
661
662        # debug the threads
663        for thread in threading.enumerate():
664            self._logger.debug("before running thread: %s", thread.name)
665        self._logger.debug("number of active threads: %s", threading.active_count())
666
667        self._logger.notice("finish succeeded")
668        self._logger.spam("SpeakWebSocketClient.finish LEAVE")
669        return True

Closes the WebSocket connection gracefully.

SpeakWebSocketClient = <class 'SpeakWSClient'>