deepgram.clients.speak.v1.websocket.client
1# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. 2# Use of this source code is governed by a MIT license that can be found in the LICENSE file. 3# SPDX-License-Identifier: MIT 4 5import json 6import time 7import logging 8from typing import Dict, Union, Optional, cast, Any, Callable 9from datetime import datetime 10import threading 11 12from .....utils import verboselogs 13from .....options import DeepgramClientOptions 14from ...enums import SpeakWebSocketEvents, SpeakWebSocketMessage 15from ....common import AbstractSyncWebSocketClient 16from ....common import DeepgramError 17 18from .response import ( 19 OpenResponse, 20 MetadataResponse, 21 FlushedResponse, 22 ClearedResponse, 23 CloseResponse, 24 WarningResponse, 25 ErrorResponse, 26 UnhandledResponse, 27) 28from .options import SpeakWSOptions 29 30from .....audio.microphone import Microphone 31from .....audio.speaker import Speaker, RATE, CHANNELS, PLAYBACK_DELTA 32 33ONE_SECOND = 1 34HALF_SECOND = 0.5 35DEEPGRAM_INTERVAL = 5 36PING_INTERVAL = 20 37 38 39class SpeakWSClient( 40 AbstractSyncWebSocketClient 41): # pylint: disable=too-many-instance-attributes 42 """ 43 Client for interacting with Deepgram's text-to-speech services over WebSockets. 44 45 This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events. 46 47 Args: 48 config (DeepgramClientOptions): all the options for the client. 49 """ 50 51 _logger: verboselogs.VerboseLogger 52 _config: DeepgramClientOptions 53 _endpoint: str 54 55 _event_handlers: Dict[SpeakWebSocketEvents, list] 56 57 _flush_thread: Union[threading.Thread, None] 58 _lock_flush: threading.Lock 59 _last_datagram: Optional[datetime] = None 60 _flush_count: int 61 62 _kwargs: Optional[Dict] = None 63 _addons: Optional[Dict] = None 64 _options: Optional[Dict] = None 65 _headers: Optional[Dict] = None 66 67 _speaker_created: bool = False 68 _speaker: Optional[Speaker] = None 69 _microphone: Optional[Microphone] = None 70 71 def __init__( 72 self, config: DeepgramClientOptions, microphone: Optional[Microphone] = None 73 ): 74 if config is None: 75 raise DeepgramError("Config is required") 76 77 self._logger = verboselogs.VerboseLogger(__name__) 78 self._logger.addHandler(logging.StreamHandler()) 79 self._logger.setLevel(config.verbose) 80 81 self._config = config 82 self._endpoint = "v1/speak" 83 self._lock_flush = threading.Lock() 84 85 self._flush_thread = None 86 87 # auto flush 88 self._last_datagram = None 89 self._flush_count = 0 90 91 # microphone 92 self._microphone = microphone 93 94 # init handlers 95 self._event_handlers = { 96 event: [] for event in SpeakWebSocketEvents.__members__.values() 97 } 98 99 if self._config.options.get("speaker_playback") == "true": 100 self._logger.info("speaker_playback is enabled") 101 rate = self._config.options.get("speaker_playback_rate") 102 if rate is None: 103 rate = RATE 104 channels = self._config.options.get("speaker_playback_channels") 105 if channels is None: 106 channels = CHANNELS 107 playback_delta_in_ms = self._config.options.get( 108 "speaker_playback_delta_in_ms" 109 ) 110 if playback_delta_in_ms is None: 111 playback_delta_in_ms = PLAYBACK_DELTA 112 device_index = self._config.options.get("speaker_playback_device_index") 113 114 self._logger.debug("rate: %s", rate) 115 self._logger.debug("channels: %s", channels) 116 self._logger.debug("device_index: %s", device_index) 117 118 self._speaker_created = True 119 120 if device_index is not None: 121 self._speaker = Speaker( 122 rate=rate, 123 channels=channels, 124 last_play_delta_in_ms=playback_delta_in_ms, 125 verbose=self._config.verbose, 126 output_device_index=device_index, 127 microphone=self._microphone, 128 ) 129 else: 130 self._speaker = Speaker( 131 rate=rate, 132 channels=channels, 133 last_play_delta_in_ms=playback_delta_in_ms, 134 verbose=self._config.verbose, 135 microphone=self._microphone, 136 ) 137 138 # call the parent constructor 139 super().__init__(self._config, self._endpoint) 140 141 # pylint: disable=too-many-statements,too-many-branches 142 def start( 143 self, 144 options: Optional[Union[SpeakWSOptions, Dict]] = None, 145 addons: Optional[Dict] = None, 146 headers: Optional[Dict] = None, 147 members: Optional[Dict] = None, 148 **kwargs, 149 ) -> bool: 150 """ 151 Starts the WebSocket connection for text-to-speech synthesis. 152 """ 153 self._logger.debug("SpeakWebSocketClient.start ENTER") 154 self._logger.info("options: %s", options) 155 self._logger.info("addons: %s", addons) 156 self._logger.info("headers: %s", headers) 157 self._logger.info("members: %s", members) 158 self._logger.info("kwargs: %s", kwargs) 159 160 if isinstance(options, SpeakWSOptions) and not options.check(): 161 self._logger.error("options.check failed") 162 self._logger.debug("SpeakWebSocketClient.start LEAVE") 163 raise DeepgramError("Fatal text-to-speech options error") 164 165 self._addons = addons 166 self._headers = headers 167 168 # add "members" as members of the class 169 if members is not None: 170 self.__dict__.update(members) 171 172 # set kwargs as members of the class 173 if kwargs is not None: 174 self._kwargs = kwargs 175 else: 176 self._kwargs = {} 177 178 if isinstance(options, SpeakWSOptions): 179 self._logger.info("SpeakWSOptions switching class -> dict") 180 self._options = options.to_dict() 181 elif options is not None: 182 self._options = options 183 else: 184 self._options = {} 185 186 try: 187 # speaker substitutes the listening thread 188 if self._speaker is not None: 189 self._logger.notice("passing speaker to delegate_listening") 190 super().delegate_listening(self._speaker) 191 192 # call parent start 193 if ( 194 super().start( 195 self._options, 196 self._addons, 197 self._headers, 198 **dict(cast(Dict[Any, Any], self._kwargs)), 199 ) 200 is False 201 ): 202 self._logger.error("SpeakWebSocketClient.start failed") 203 self._logger.debug("SpeakWebSocketClient.start LEAVE") 204 return False 205 206 if self._speaker is not None: 207 self._logger.notice("start delegate_listening thread") 208 self._speaker.start() 209 210 # debug the threads 211 for thread in threading.enumerate(): 212 self._logger.debug("after running thread: %s", thread.name) 213 self._logger.debug("number of active threads: %s", threading.active_count()) 214 215 # flush thread 216 if self._config.is_auto_flush_speak_enabled(): 217 self._logger.notice("autoflush is enabled") 218 self._flush_thread = threading.Thread(target=self._flush) 219 self._flush_thread.start() 220 else: 221 self._logger.notice("autoflush is disabled") 222 223 # debug the threads 224 for thread in threading.enumerate(): 225 self._logger.debug("after running thread: %s", thread.name) 226 self._logger.debug("number of active threads: %s", threading.active_count()) 227 228 self._logger.notice("start succeeded") 229 self._logger.debug("SpeakWebSocketClient.start LEAVE") 230 return True 231 232 except Exception as e: # pylint: disable=broad-except 233 self._logger.error( 234 "WebSocketException in SpeakWebSocketClient.start: %s", e 235 ) 236 self._logger.debug("SpeakWebSocketClient.start LEAVE") 237 if self._config.options.get("termination_exception_connect") is True: 238 raise 239 return False 240 241 # pylint: enable=too-many-statements,too-many-branches 242 243 def on(self, event: SpeakWebSocketEvents, handler: Callable) -> None: 244 """ 245 Registers event handlers for specific events. 246 """ 247 self._logger.info("event subscribed: %s", event) 248 if event in SpeakWebSocketEvents.__members__.values() and callable(handler): 249 self._event_handlers[event].append(handler) 250 251 def _emit(self, event: SpeakWebSocketEvents, *args, **kwargs) -> None: 252 """ 253 Emits events to the registered event handlers. 254 """ 255 self._logger.debug("SpeakWebSocketClient._emit ENTER") 256 self._logger.debug("callback handlers for: %s", event) 257 258 # debug the threads 259 for thread in threading.enumerate(): 260 self._logger.debug("after running thread: %s", thread.name) 261 self._logger.debug("number of active threads: %s", threading.active_count()) 262 263 self._logger.debug("callback handlers for: %s", event) 264 for handler in self._event_handlers[event]: 265 handler(self, *args, **kwargs) 266 267 # debug the threads 268 for thread in threading.enumerate(): 269 self._logger.debug("after running thread: %s", thread.name) 270 self._logger.debug("number of active threads: %s", threading.active_count()) 271 272 self._logger.debug("ListenWebSocketClient._emit LEAVE") 273 274 def _process_text(self, message: str) -> None: 275 """ 276 Processes messages received over the WebSocket connection. 277 """ 278 self._logger.debug("SpeakWebSocketClient._process_text ENTER") 279 280 try: 281 self._logger.debug("Text data received") 282 283 if len(message) == 0: 284 self._logger.debug("message is empty") 285 self._logger.debug("SpeakWebSocketClient._process_text LEAVE") 286 return 287 288 data = json.loads(message) 289 response_type = data.get("type") 290 self._logger.debug("response_type: %s, data: %s", response_type, data) 291 292 match response_type: 293 case SpeakWebSocketEvents.Open: 294 open_result: OpenResponse = OpenResponse.from_json(message) 295 self._logger.verbose("OpenResponse: %s", open_result) 296 self._emit( 297 SpeakWebSocketEvents(SpeakWebSocketEvents.Open), 298 open=open_result, 299 **dict(cast(Dict[Any, Any], self._kwargs)), 300 ) 301 case SpeakWebSocketEvents.Metadata: 302 meta_result: MetadataResponse = MetadataResponse.from_json(message) 303 self._logger.verbose("MetadataResponse: %s", meta_result) 304 self._emit( 305 SpeakWebSocketEvents(SpeakWebSocketEvents.Metadata), 306 metadata=meta_result, 307 **dict(cast(Dict[Any, Any], self._kwargs)), 308 ) 309 case SpeakWebSocketEvents.Flushed: 310 fl_result: FlushedResponse = FlushedResponse.from_json(message) 311 self._logger.verbose("FlushedResponse: %s", fl_result) 312 313 # auto flush 314 if self._config.is_inspecting_speak(): 315 with self._lock_flush: 316 self._flush_count -= 1 317 self._logger.debug( 318 "Decrement Flush count: %d", 319 self._flush_count, 320 ) 321 322 self._emit( 323 SpeakWebSocketEvents(SpeakWebSocketEvents.Flushed), 324 flushed=fl_result, 325 **dict(cast(Dict[Any, Any], self._kwargs)), 326 ) 327 case SpeakWebSocketEvents.Cleared: 328 clear_result: ClearedResponse = ClearedResponse.from_json(message) 329 self._logger.verbose("ClearedResponse: %s", clear_result) 330 self._emit( 331 SpeakWebSocketEvents(SpeakWebSocketEvents.Cleared), 332 cleared=clear_result, 333 **dict(cast(Dict[Any, Any], self._kwargs)), 334 ) 335 case SpeakWebSocketEvents.Close: 336 close_result: CloseResponse = CloseResponse.from_json(message) 337 self._logger.verbose("CloseResponse: %s", close_result) 338 self._emit( 339 SpeakWebSocketEvents(SpeakWebSocketEvents.Close), 340 close=close_result, 341 **dict(cast(Dict[Any, Any], self._kwargs)), 342 ) 343 case SpeakWebSocketEvents.Warning: 344 war_warning: WarningResponse = WarningResponse.from_json(message) 345 self._logger.verbose("WarningResponse: %s", war_warning) 346 self._emit( 347 SpeakWebSocketEvents(SpeakWebSocketEvents.Warning), 348 warning=war_warning, 349 **dict(cast(Dict[Any, Any], self._kwargs)), 350 ) 351 case SpeakWebSocketEvents.Error: 352 err_error: ErrorResponse = ErrorResponse.from_json(message) 353 self._logger.verbose("ErrorResponse: %s", err_error) 354 self._emit( 355 SpeakWebSocketEvents(SpeakWebSocketEvents.Error), 356 error=err_error, 357 **dict(cast(Dict[Any, Any], self._kwargs)), 358 ) 359 case _: 360 self._logger.warning( 361 "Unknown Message: response_type: %s, data: %s", 362 response_type, 363 data, 364 ) 365 unhandled_error: UnhandledResponse = UnhandledResponse( 366 type=SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), 367 raw=message, 368 ) 369 self._emit( 370 SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), 371 unhandled=unhandled_error, 372 **dict(cast(Dict[Any, Any], self._kwargs)), 373 ) 374 375 self._logger.notice("_process_text Succeeded") 376 self._logger.debug("SpeakWebSocketClient._process_text LEAVE") 377 378 except Exception as e: # pylint: disable=broad-except 379 self._logger.error("Exception in SpeakWebSocketClient._process_text: %s", e) 380 e_error: ErrorResponse = ErrorResponse( 381 "Exception in SpeakWebSocketClient._process_text", 382 f"{e}", 383 "Exception", 384 ) 385 self._logger.error( 386 "Exception in SpeakWebSocketClient._process_text: %s", str(e) 387 ) 388 self._emit( 389 SpeakWebSocketEvents(SpeakWebSocketEvents.Error), 390 e_error, 391 **dict(cast(Dict[Any, Any], self._kwargs)), 392 ) 393 394 # signal exit and close 395 super()._signal_exit() 396 397 self._logger.debug("SpeakWebSocketClient._process_text LEAVE") 398 399 if self._config.options.get("termination_exception") is True: 400 raise 401 return 402 403 # pylint: enable=too-many-return-statements,too-many-statements 404 405 def _process_binary(self, message: bytes) -> None: 406 self._logger.debug("SpeakWebSocketClient._process_binary ENTER") 407 self._logger.debug("Binary data received") 408 409 self._emit( 410 SpeakWebSocketEvents(SpeakWebSocketEvents.AudioData), 411 data=message, 412 **dict(cast(Dict[Any, Any], self._kwargs)), 413 ) 414 415 self._logger.notice("_process_binary Succeeded") 416 self._logger.debug("SpeakWebSocketClient._process_binary LEAVE") 417 418 # pylint: disable=too-many-return-statements 419 def _flush(self) -> None: 420 self._logger.debug("SpeakWebSocketClient._flush ENTER") 421 422 delta_in_ms_str = self._config.options.get("auto_flush_speak_delta") 423 if delta_in_ms_str is None: 424 self._logger.error("auto_flush_speak_delta is None") 425 self._logger.debug("SpeakWebSocketClient._flush LEAVE") 426 return 427 delta_in_ms = float(delta_in_ms_str) 428 429 while True: 430 try: 431 self._exit_event.wait(timeout=HALF_SECOND) 432 433 if self._exit_event.is_set(): 434 self._logger.notice("_flush exiting gracefully") 435 self._logger.debug("ListenWebSocketClient._flush LEAVE") 436 return 437 438 if self._last_datagram is None: 439 self._logger.debug("AutoFlush last_datagram is None") 440 continue 441 442 with self._lock_flush: 443 delta = datetime.now() - self._last_datagram 444 diff_in_ms = delta.total_seconds() * 1000 445 self._logger.debug("AutoFlush delta: %f", diff_in_ms) 446 if diff_in_ms < delta_in_ms: 447 self._logger.debug("AutoFlush delta is less than threshold") 448 continue 449 450 self.flush() 451 452 except Exception as e: # pylint: disable=broad-except 453 self._logger.error("Exception in SpeakWebSocketClient._flush: %s", e) 454 e_error: ErrorResponse = ErrorResponse( 455 "Exception in SpeakWebSocketClient._flush", 456 f"{e}", 457 "Exception", 458 ) 459 self._logger.error( 460 "Exception in SpeakWebSocketClient._flush: %s", str(e) 461 ) 462 self._emit( 463 SpeakWebSocketEvents(SpeakWebSocketEvents.Error), 464 error=e_error, 465 **dict(cast(Dict[Any, Any], self._kwargs)), 466 ) 467 468 # signal exit and close 469 super()._signal_exit() 470 471 self._logger.debug("SpeakWebSocketClient._flush LEAVE") 472 473 if self._config.options.get("termination_exception") is True: 474 raise 475 return 476 477 # pylint: enable=too-many-return-statements 478 479 def send_text(self, text_input: str) -> bool: 480 """ 481 Sends text to the WebSocket connection to generate audio. 482 483 Args: 484 text_input (str): The raw text to be synthesized. This function will automatically wrap 485 the text in a JSON object of type "Speak" with the key "text". 486 487 Returns: 488 bool: True if the text was successfully sent, False otherwise. 489 """ 490 return self.send_raw(json.dumps({"type": "Speak", "text": text_input})) 491 492 def send(self, data: Union[str, bytes]) -> bool: 493 """ 494 Alias for send_text. Please see send_text for more information. 495 """ 496 if isinstance(data, bytes): 497 self._logger.error("send() failed - data is bytes") 498 return False 499 500 return self.send_text(data) 501 502 # pylint: disable=unused-argument 503 def send_control( 504 self, msg_type: Union[SpeakWebSocketMessage, str], data: Optional[str] = "" 505 ) -> bool: 506 """ 507 Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection. 508 509 Args: 510 msg_type (SpeakWebSocketEvents): The type of control message to send. 511 (Optional) data (str): The data to send with the control message. 512 513 Returns: 514 bool: True if the control message was successfully sent, False otherwise. 515 """ 516 control_msg = json.dumps({"type": msg_type}) 517 return self.send_raw(control_msg) 518 519 # pylint: enable=unused-argument 520 521 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements 522 def send_raw(self, msg: str) -> bool: 523 """ 524 Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object. 525 526 Args: 527 msg (str): The raw message to send over the WebSocket connection. 528 529 Returns: 530 bool: True if the message was successfully sent, False otherwise. 531 """ 532 self._logger.spam("SpeakWebSocketClient.send_raw ENTER") 533 534 if self._config.is_inspecting_speak(): 535 try: 536 _tmp_json = json.loads(msg) 537 if "type" in _tmp_json: 538 self._logger.debug( 539 "Inspecting Message: Sending %s", _tmp_json["type"] 540 ) 541 match _tmp_json["type"]: 542 case SpeakWebSocketMessage.Speak: 543 inspect_res = self._inspect() 544 if not inspect_res: 545 self._logger.error("inspect_res failed") 546 case SpeakWebSocketMessage.Flush: 547 with self._lock_flush: 548 self._last_datagram = None 549 self._flush_count += 1 550 self._logger.debug( 551 "Increment Flush count: %d", self._flush_count 552 ) 553 except Exception as e: # pylint: disable=broad-except 554 self._logger.error("send_raw() failed - Exception: %s", str(e)) 555 556 try: 557 if super().send(msg) is False: 558 self._logger.error("send_raw() failed") 559 self._logger.spam("SpeakWebSocketClient.send_raw LEAVE") 560 return False 561 self._logger.spam("send_raw() succeeded") 562 self._logger.spam("SpeakWebSocketClient.send_raw LEAVE") 563 return True 564 except Exception as e: # pylint: disable=broad-except 565 self._logger.error("send_raw() failed - Exception: %s", str(e)) 566 self._logger.spam("SpeakWebSocketClient.send_raw LEAVE") 567 if self._config.options.get("termination_exception_send") is True: 568 raise 569 return False 570 571 # pylint: enable=too-many-return-statements,too-many-branches 572 573 def flush(self) -> bool: 574 """ 575 Flushes the current buffer and returns generated audio 576 """ 577 self._logger.spam("SpeakWebSocketClient.flush ENTER") 578 579 self._logger.notice("Sending Flush...") 580 ret = self.send_control(SpeakWebSocketMessage.Flush) 581 582 if not ret: 583 self._logger.error("flush failed") 584 self._logger.spam("SpeakWebSocketClient.flush LEAVE") 585 return False 586 587 self._logger.notice("flush succeeded") 588 self._logger.spam("SpeakWebSocketClient.flush LEAVE") 589 590 return True 591 592 def clear(self) -> bool: 593 """ 594 Clears the current buffer on the server 595 """ 596 self._logger.spam("SpeakWebSocketClient.clear ENTER") 597 598 self._logger.notice("Sending Clear...") 599 ret = self.send_control(SpeakWebSocketMessage.Clear) 600 601 if not ret: 602 self._logger.error("clear failed") 603 self._logger.spam("SpeakWebSocketClient.clear LEAVE") 604 return False 605 606 self._logger.notice("clear succeeded") 607 self._logger.spam("SpeakWebSocketClient.clear LEAVE") 608 609 return True 610 611 def wait_for_complete(self): 612 """ 613 This method will block until the speak is done playing sound. 614 """ 615 self._logger.spam("SpeakWebSocketClient.wait_for_complete ENTER") 616 617 if self._speaker is None: 618 self._logger.error("speaker is None. Return immediately") 619 raise DeepgramError("Speaker is not initialized") 620 621 self._speaker.wait_for_complete() 622 self._logger.notice("wait_for_complete succeeded") 623 self._logger.spam("SpeakWebSocketClient.wait_for_complete LEAVE") 624 625 def _close_message(self) -> bool: 626 return self.send_control(SpeakWebSocketMessage.Close) 627 628 # closes the WebSocket connection gracefully 629 def finish(self) -> bool: 630 """ 631 Closes the WebSocket connection gracefully. 632 """ 633 self._logger.spam("SpeakWebSocketClient.finish ENTER") 634 635 # call parent finish which calls signal_exit 636 if super().finish() is False: 637 self._logger.error("ListenWebSocketClient.finish failed") 638 639 if self._speaker is not None and self._speaker_created: 640 self._speaker.finish() 641 self._speaker_created = False 642 643 # debug the threads 644 for thread in threading.enumerate(): 645 self._logger.debug("before running thread: %s", thread.name) 646 self._logger.debug("number of active threads: %s", threading.active_count()) 647 648 # stop the threads 649 if self._speaker is not None: 650 self._logger.verbose("stopping speaker...") 651 self._speaker.finish() 652 self._speaker = None 653 self._logger.notice("speaker stopped") 654 655 if self._flush_thread is not None: 656 self._logger.verbose("sdtopping _flush_thread...") 657 self._flush_thread.join() 658 self._flush_thread = None 659 self._logger.notice("_flush_thread joined") 660 661 # debug the threads 662 for thread in threading.enumerate(): 663 self._logger.debug("before running thread: %s", thread.name) 664 self._logger.debug("number of active threads: %s", threading.active_count()) 665 666 self._logger.notice("finish succeeded") 667 self._logger.spam("SpeakWebSocketClient.finish LEAVE") 668 return True 669 670 def _inspect(self) -> bool: 671 # auto flush_inspect is generically used to track any messages you might want to snoop on 672 # place additional logic here to inspect messages of interest 673 674 # for auto flush functionality 675 # set the last datagram 676 with self._lock_flush: 677 self._last_datagram = datetime.now() 678 self._logger.debug( 679 "AutoFlush last received: %s", 680 str(self._last_datagram), 681 ) 682 683 return True 684 685 686SpeakWebSocketClient = SpeakWSClient
40class SpeakWSClient( 41 AbstractSyncWebSocketClient 42): # pylint: disable=too-many-instance-attributes 43 """ 44 Client for interacting with Deepgram's text-to-speech services over WebSockets. 45 46 This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events. 47 48 Args: 49 config (DeepgramClientOptions): all the options for the client. 50 """ 51 52 _logger: verboselogs.VerboseLogger 53 _config: DeepgramClientOptions 54 _endpoint: str 55 56 _event_handlers: Dict[SpeakWebSocketEvents, list] 57 58 _flush_thread: Union[threading.Thread, None] 59 _lock_flush: threading.Lock 60 _last_datagram: Optional[datetime] = None 61 _flush_count: int 62 63 _kwargs: Optional[Dict] = None 64 _addons: Optional[Dict] = None 65 _options: Optional[Dict] = None 66 _headers: Optional[Dict] = None 67 68 _speaker_created: bool = False 69 _speaker: Optional[Speaker] = None 70 _microphone: Optional[Microphone] = None 71 72 def __init__( 73 self, config: DeepgramClientOptions, microphone: Optional[Microphone] = None 74 ): 75 if config is None: 76 raise DeepgramError("Config is required") 77 78 self._logger = verboselogs.VerboseLogger(__name__) 79 self._logger.addHandler(logging.StreamHandler()) 80 self._logger.setLevel(config.verbose) 81 82 self._config = config 83 self._endpoint = "v1/speak" 84 self._lock_flush = threading.Lock() 85 86 self._flush_thread = None 87 88 # auto flush 89 self._last_datagram = None 90 self._flush_count = 0 91 92 # microphone 93 self._microphone = microphone 94 95 # init handlers 96 self._event_handlers = { 97 event: [] for event in SpeakWebSocketEvents.__members__.values() 98 } 99 100 if self._config.options.get("speaker_playback") == "true": 101 self._logger.info("speaker_playback is enabled") 102 rate = self._config.options.get("speaker_playback_rate") 103 if rate is None: 104 rate = RATE 105 channels = self._config.options.get("speaker_playback_channels") 106 if channels is None: 107 channels = CHANNELS 108 playback_delta_in_ms = self._config.options.get( 109 "speaker_playback_delta_in_ms" 110 ) 111 if playback_delta_in_ms is None: 112 playback_delta_in_ms = PLAYBACK_DELTA 113 device_index = self._config.options.get("speaker_playback_device_index") 114 115 self._logger.debug("rate: %s", rate) 116 self._logger.debug("channels: %s", channels) 117 self._logger.debug("device_index: %s", device_index) 118 119 self._speaker_created = True 120 121 if device_index is not None: 122 self._speaker = Speaker( 123 rate=rate, 124 channels=channels, 125 last_play_delta_in_ms=playback_delta_in_ms, 126 verbose=self._config.verbose, 127 output_device_index=device_index, 128 microphone=self._microphone, 129 ) 130 else: 131 self._speaker = Speaker( 132 rate=rate, 133 channels=channels, 134 last_play_delta_in_ms=playback_delta_in_ms, 135 verbose=self._config.verbose, 136 microphone=self._microphone, 137 ) 138 139 # call the parent constructor 140 super().__init__(self._config, self._endpoint) 141 142 # pylint: disable=too-many-statements,too-many-branches 143 def start( 144 self, 145 options: Optional[Union[SpeakWSOptions, Dict]] = None, 146 addons: Optional[Dict] = None, 147 headers: Optional[Dict] = None, 148 members: Optional[Dict] = None, 149 **kwargs, 150 ) -> bool: 151 """ 152 Starts the WebSocket connection for text-to-speech synthesis. 153 """ 154 self._logger.debug("SpeakWebSocketClient.start ENTER") 155 self._logger.info("options: %s", options) 156 self._logger.info("addons: %s", addons) 157 self._logger.info("headers: %s", headers) 158 self._logger.info("members: %s", members) 159 self._logger.info("kwargs: %s", kwargs) 160 161 if isinstance(options, SpeakWSOptions) and not options.check(): 162 self._logger.error("options.check failed") 163 self._logger.debug("SpeakWebSocketClient.start LEAVE") 164 raise DeepgramError("Fatal text-to-speech options error") 165 166 self._addons = addons 167 self._headers = headers 168 169 # add "members" as members of the class 170 if members is not None: 171 self.__dict__.update(members) 172 173 # set kwargs as members of the class 174 if kwargs is not None: 175 self._kwargs = kwargs 176 else: 177 self._kwargs = {} 178 179 if isinstance(options, SpeakWSOptions): 180 self._logger.info("SpeakWSOptions switching class -> dict") 181 self._options = options.to_dict() 182 elif options is not None: 183 self._options = options 184 else: 185 self._options = {} 186 187 try: 188 # speaker substitutes the listening thread 189 if self._speaker is not None: 190 self._logger.notice("passing speaker to delegate_listening") 191 super().delegate_listening(self._speaker) 192 193 # call parent start 194 if ( 195 super().start( 196 self._options, 197 self._addons, 198 self._headers, 199 **dict(cast(Dict[Any, Any], self._kwargs)), 200 ) 201 is False 202 ): 203 self._logger.error("SpeakWebSocketClient.start failed") 204 self._logger.debug("SpeakWebSocketClient.start LEAVE") 205 return False 206 207 if self._speaker is not None: 208 self._logger.notice("start delegate_listening thread") 209 self._speaker.start() 210 211 # debug the threads 212 for thread in threading.enumerate(): 213 self._logger.debug("after running thread: %s", thread.name) 214 self._logger.debug("number of active threads: %s", threading.active_count()) 215 216 # flush thread 217 if self._config.is_auto_flush_speak_enabled(): 218 self._logger.notice("autoflush is enabled") 219 self._flush_thread = threading.Thread(target=self._flush) 220 self._flush_thread.start() 221 else: 222 self._logger.notice("autoflush is disabled") 223 224 # debug the threads 225 for thread in threading.enumerate(): 226 self._logger.debug("after running thread: %s", thread.name) 227 self._logger.debug("number of active threads: %s", threading.active_count()) 228 229 self._logger.notice("start succeeded") 230 self._logger.debug("SpeakWebSocketClient.start LEAVE") 231 return True 232 233 except Exception as e: # pylint: disable=broad-except 234 self._logger.error( 235 "WebSocketException in SpeakWebSocketClient.start: %s", e 236 ) 237 self._logger.debug("SpeakWebSocketClient.start LEAVE") 238 if self._config.options.get("termination_exception_connect") is True: 239 raise 240 return False 241 242 # pylint: enable=too-many-statements,too-many-branches 243 244 def on(self, event: SpeakWebSocketEvents, handler: Callable) -> None: 245 """ 246 Registers event handlers for specific events. 247 """ 248 self._logger.info("event subscribed: %s", event) 249 if event in SpeakWebSocketEvents.__members__.values() and callable(handler): 250 self._event_handlers[event].append(handler) 251 252 def _emit(self, event: SpeakWebSocketEvents, *args, **kwargs) -> None: 253 """ 254 Emits events to the registered event handlers. 255 """ 256 self._logger.debug("SpeakWebSocketClient._emit ENTER") 257 self._logger.debug("callback handlers for: %s", event) 258 259 # debug the threads 260 for thread in threading.enumerate(): 261 self._logger.debug("after running thread: %s", thread.name) 262 self._logger.debug("number of active threads: %s", threading.active_count()) 263 264 self._logger.debug("callback handlers for: %s", event) 265 for handler in self._event_handlers[event]: 266 handler(self, *args, **kwargs) 267 268 # debug the threads 269 for thread in threading.enumerate(): 270 self._logger.debug("after running thread: %s", thread.name) 271 self._logger.debug("number of active threads: %s", threading.active_count()) 272 273 self._logger.debug("ListenWebSocketClient._emit LEAVE") 274 275 def _process_text(self, message: str) -> None: 276 """ 277 Processes messages received over the WebSocket connection. 278 """ 279 self._logger.debug("SpeakWebSocketClient._process_text ENTER") 280 281 try: 282 self._logger.debug("Text data received") 283 284 if len(message) == 0: 285 self._logger.debug("message is empty") 286 self._logger.debug("SpeakWebSocketClient._process_text LEAVE") 287 return 288 289 data = json.loads(message) 290 response_type = data.get("type") 291 self._logger.debug("response_type: %s, data: %s", response_type, data) 292 293 match response_type: 294 case SpeakWebSocketEvents.Open: 295 open_result: OpenResponse = OpenResponse.from_json(message) 296 self._logger.verbose("OpenResponse: %s", open_result) 297 self._emit( 298 SpeakWebSocketEvents(SpeakWebSocketEvents.Open), 299 open=open_result, 300 **dict(cast(Dict[Any, Any], self._kwargs)), 301 ) 302 case SpeakWebSocketEvents.Metadata: 303 meta_result: MetadataResponse = MetadataResponse.from_json(message) 304 self._logger.verbose("MetadataResponse: %s", meta_result) 305 self._emit( 306 SpeakWebSocketEvents(SpeakWebSocketEvents.Metadata), 307 metadata=meta_result, 308 **dict(cast(Dict[Any, Any], self._kwargs)), 309 ) 310 case SpeakWebSocketEvents.Flushed: 311 fl_result: FlushedResponse = FlushedResponse.from_json(message) 312 self._logger.verbose("FlushedResponse: %s", fl_result) 313 314 # auto flush 315 if self._config.is_inspecting_speak(): 316 with self._lock_flush: 317 self._flush_count -= 1 318 self._logger.debug( 319 "Decrement Flush count: %d", 320 self._flush_count, 321 ) 322 323 self._emit( 324 SpeakWebSocketEvents(SpeakWebSocketEvents.Flushed), 325 flushed=fl_result, 326 **dict(cast(Dict[Any, Any], self._kwargs)), 327 ) 328 case SpeakWebSocketEvents.Cleared: 329 clear_result: ClearedResponse = ClearedResponse.from_json(message) 330 self._logger.verbose("ClearedResponse: %s", clear_result) 331 self._emit( 332 SpeakWebSocketEvents(SpeakWebSocketEvents.Cleared), 333 cleared=clear_result, 334 **dict(cast(Dict[Any, Any], self._kwargs)), 335 ) 336 case SpeakWebSocketEvents.Close: 337 close_result: CloseResponse = CloseResponse.from_json(message) 338 self._logger.verbose("CloseResponse: %s", close_result) 339 self._emit( 340 SpeakWebSocketEvents(SpeakWebSocketEvents.Close), 341 close=close_result, 342 **dict(cast(Dict[Any, Any], self._kwargs)), 343 ) 344 case SpeakWebSocketEvents.Warning: 345 war_warning: WarningResponse = WarningResponse.from_json(message) 346 self._logger.verbose("WarningResponse: %s", war_warning) 347 self._emit( 348 SpeakWebSocketEvents(SpeakWebSocketEvents.Warning), 349 warning=war_warning, 350 **dict(cast(Dict[Any, Any], self._kwargs)), 351 ) 352 case SpeakWebSocketEvents.Error: 353 err_error: ErrorResponse = ErrorResponse.from_json(message) 354 self._logger.verbose("ErrorResponse: %s", err_error) 355 self._emit( 356 SpeakWebSocketEvents(SpeakWebSocketEvents.Error), 357 error=err_error, 358 **dict(cast(Dict[Any, Any], self._kwargs)), 359 ) 360 case _: 361 self._logger.warning( 362 "Unknown Message: response_type: %s, data: %s", 363 response_type, 364 data, 365 ) 366 unhandled_error: UnhandledResponse = UnhandledResponse( 367 type=SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), 368 raw=message, 369 ) 370 self._emit( 371 SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), 372 unhandled=unhandled_error, 373 **dict(cast(Dict[Any, Any], self._kwargs)), 374 ) 375 376 self._logger.notice("_process_text Succeeded") 377 self._logger.debug("SpeakWebSocketClient._process_text LEAVE") 378 379 except Exception as e: # pylint: disable=broad-except 380 self._logger.error("Exception in SpeakWebSocketClient._process_text: %s", e) 381 e_error: ErrorResponse = ErrorResponse( 382 "Exception in SpeakWebSocketClient._process_text", 383 f"{e}", 384 "Exception", 385 ) 386 self._logger.error( 387 "Exception in SpeakWebSocketClient._process_text: %s", str(e) 388 ) 389 self._emit( 390 SpeakWebSocketEvents(SpeakWebSocketEvents.Error), 391 e_error, 392 **dict(cast(Dict[Any, Any], self._kwargs)), 393 ) 394 395 # signal exit and close 396 super()._signal_exit() 397 398 self._logger.debug("SpeakWebSocketClient._process_text LEAVE") 399 400 if self._config.options.get("termination_exception") is True: 401 raise 402 return 403 404 # pylint: enable=too-many-return-statements,too-many-statements 405 406 def _process_binary(self, message: bytes) -> None: 407 self._logger.debug("SpeakWebSocketClient._process_binary ENTER") 408 self._logger.debug("Binary data received") 409 410 self._emit( 411 SpeakWebSocketEvents(SpeakWebSocketEvents.AudioData), 412 data=message, 413 **dict(cast(Dict[Any, Any], self._kwargs)), 414 ) 415 416 self._logger.notice("_process_binary Succeeded") 417 self._logger.debug("SpeakWebSocketClient._process_binary LEAVE") 418 419 # pylint: disable=too-many-return-statements 420 def _flush(self) -> None: 421 self._logger.debug("SpeakWebSocketClient._flush ENTER") 422 423 delta_in_ms_str = self._config.options.get("auto_flush_speak_delta") 424 if delta_in_ms_str is None: 425 self._logger.error("auto_flush_speak_delta is None") 426 self._logger.debug("SpeakWebSocketClient._flush LEAVE") 427 return 428 delta_in_ms = float(delta_in_ms_str) 429 430 while True: 431 try: 432 self._exit_event.wait(timeout=HALF_SECOND) 433 434 if self._exit_event.is_set(): 435 self._logger.notice("_flush exiting gracefully") 436 self._logger.debug("ListenWebSocketClient._flush LEAVE") 437 return 438 439 if self._last_datagram is None: 440 self._logger.debug("AutoFlush last_datagram is None") 441 continue 442 443 with self._lock_flush: 444 delta = datetime.now() - self._last_datagram 445 diff_in_ms = delta.total_seconds() * 1000 446 self._logger.debug("AutoFlush delta: %f", diff_in_ms) 447 if diff_in_ms < delta_in_ms: 448 self._logger.debug("AutoFlush delta is less than threshold") 449 continue 450 451 self.flush() 452 453 except Exception as e: # pylint: disable=broad-except 454 self._logger.error("Exception in SpeakWebSocketClient._flush: %s", e) 455 e_error: ErrorResponse = ErrorResponse( 456 "Exception in SpeakWebSocketClient._flush", 457 f"{e}", 458 "Exception", 459 ) 460 self._logger.error( 461 "Exception in SpeakWebSocketClient._flush: %s", str(e) 462 ) 463 self._emit( 464 SpeakWebSocketEvents(SpeakWebSocketEvents.Error), 465 error=e_error, 466 **dict(cast(Dict[Any, Any], self._kwargs)), 467 ) 468 469 # signal exit and close 470 super()._signal_exit() 471 472 self._logger.debug("SpeakWebSocketClient._flush LEAVE") 473 474 if self._config.options.get("termination_exception") is True: 475 raise 476 return 477 478 # pylint: enable=too-many-return-statements 479 480 def send_text(self, text_input: str) -> bool: 481 """ 482 Sends text to the WebSocket connection to generate audio. 483 484 Args: 485 text_input (str): The raw text to be synthesized. This function will automatically wrap 486 the text in a JSON object of type "Speak" with the key "text". 487 488 Returns: 489 bool: True if the text was successfully sent, False otherwise. 490 """ 491 return self.send_raw(json.dumps({"type": "Speak", "text": text_input})) 492 493 def send(self, data: Union[str, bytes]) -> bool: 494 """ 495 Alias for send_text. Please see send_text for more information. 496 """ 497 if isinstance(data, bytes): 498 self._logger.error("send() failed - data is bytes") 499 return False 500 501 return self.send_text(data) 502 503 # pylint: disable=unused-argument 504 def send_control( 505 self, msg_type: Union[SpeakWebSocketMessage, str], data: Optional[str] = "" 506 ) -> bool: 507 """ 508 Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection. 509 510 Args: 511 msg_type (SpeakWebSocketEvents): The type of control message to send. 512 (Optional) data (str): The data to send with the control message. 513 514 Returns: 515 bool: True if the control message was successfully sent, False otherwise. 516 """ 517 control_msg = json.dumps({"type": msg_type}) 518 return self.send_raw(control_msg) 519 520 # pylint: enable=unused-argument 521 522 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements 523 def send_raw(self, msg: str) -> bool: 524 """ 525 Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object. 526 527 Args: 528 msg (str): The raw message to send over the WebSocket connection. 529 530 Returns: 531 bool: True if the message was successfully sent, False otherwise. 532 """ 533 self._logger.spam("SpeakWebSocketClient.send_raw ENTER") 534 535 if self._config.is_inspecting_speak(): 536 try: 537 _tmp_json = json.loads(msg) 538 if "type" in _tmp_json: 539 self._logger.debug( 540 "Inspecting Message: Sending %s", _tmp_json["type"] 541 ) 542 match _tmp_json["type"]: 543 case SpeakWebSocketMessage.Speak: 544 inspect_res = self._inspect() 545 if not inspect_res: 546 self._logger.error("inspect_res failed") 547 case SpeakWebSocketMessage.Flush: 548 with self._lock_flush: 549 self._last_datagram = None 550 self._flush_count += 1 551 self._logger.debug( 552 "Increment Flush count: %d", self._flush_count 553 ) 554 except Exception as e: # pylint: disable=broad-except 555 self._logger.error("send_raw() failed - Exception: %s", str(e)) 556 557 try: 558 if super().send(msg) is False: 559 self._logger.error("send_raw() failed") 560 self._logger.spam("SpeakWebSocketClient.send_raw LEAVE") 561 return False 562 self._logger.spam("send_raw() succeeded") 563 self._logger.spam("SpeakWebSocketClient.send_raw LEAVE") 564 return True 565 except Exception as e: # pylint: disable=broad-except 566 self._logger.error("send_raw() failed - Exception: %s", str(e)) 567 self._logger.spam("SpeakWebSocketClient.send_raw LEAVE") 568 if self._config.options.get("termination_exception_send") is True: 569 raise 570 return False 571 572 # pylint: enable=too-many-return-statements,too-many-branches 573 574 def flush(self) -> bool: 575 """ 576 Flushes the current buffer and returns generated audio 577 """ 578 self._logger.spam("SpeakWebSocketClient.flush ENTER") 579 580 self._logger.notice("Sending Flush...") 581 ret = self.send_control(SpeakWebSocketMessage.Flush) 582 583 if not ret: 584 self._logger.error("flush failed") 585 self._logger.spam("SpeakWebSocketClient.flush LEAVE") 586 return False 587 588 self._logger.notice("flush succeeded") 589 self._logger.spam("SpeakWebSocketClient.flush LEAVE") 590 591 return True 592 593 def clear(self) -> bool: 594 """ 595 Clears the current buffer on the server 596 """ 597 self._logger.spam("SpeakWebSocketClient.clear ENTER") 598 599 self._logger.notice("Sending Clear...") 600 ret = self.send_control(SpeakWebSocketMessage.Clear) 601 602 if not ret: 603 self._logger.error("clear failed") 604 self._logger.spam("SpeakWebSocketClient.clear LEAVE") 605 return False 606 607 self._logger.notice("clear succeeded") 608 self._logger.spam("SpeakWebSocketClient.clear LEAVE") 609 610 return True 611 612 def wait_for_complete(self): 613 """ 614 This method will block until the speak is done playing sound. 615 """ 616 self._logger.spam("SpeakWebSocketClient.wait_for_complete ENTER") 617 618 if self._speaker is None: 619 self._logger.error("speaker is None. Return immediately") 620 raise DeepgramError("Speaker is not initialized") 621 622 self._speaker.wait_for_complete() 623 self._logger.notice("wait_for_complete succeeded") 624 self._logger.spam("SpeakWebSocketClient.wait_for_complete LEAVE") 625 626 def _close_message(self) -> bool: 627 return self.send_control(SpeakWebSocketMessage.Close) 628 629 # closes the WebSocket connection gracefully 630 def finish(self) -> bool: 631 """ 632 Closes the WebSocket connection gracefully. 633 """ 634 self._logger.spam("SpeakWebSocketClient.finish ENTER") 635 636 # call parent finish which calls signal_exit 637 if super().finish() is False: 638 self._logger.error("ListenWebSocketClient.finish failed") 639 640 if self._speaker is not None and self._speaker_created: 641 self._speaker.finish() 642 self._speaker_created = False 643 644 # debug the threads 645 for thread in threading.enumerate(): 646 self._logger.debug("before running thread: %s", thread.name) 647 self._logger.debug("number of active threads: %s", threading.active_count()) 648 649 # stop the threads 650 if self._speaker is not None: 651 self._logger.verbose("stopping speaker...") 652 self._speaker.finish() 653 self._speaker = None 654 self._logger.notice("speaker stopped") 655 656 if self._flush_thread is not None: 657 self._logger.verbose("sdtopping _flush_thread...") 658 self._flush_thread.join() 659 self._flush_thread = None 660 self._logger.notice("_flush_thread joined") 661 662 # debug the threads 663 for thread in threading.enumerate(): 664 self._logger.debug("before running thread: %s", thread.name) 665 self._logger.debug("number of active threads: %s", threading.active_count()) 666 667 self._logger.notice("finish succeeded") 668 self._logger.spam("SpeakWebSocketClient.finish LEAVE") 669 return True 670 671 def _inspect(self) -> bool: 672 # auto flush_inspect is generically used to track any messages you might want to snoop on 673 # place additional logic here to inspect messages of interest 674 675 # for auto flush functionality 676 # set the last datagram 677 with self._lock_flush: 678 self._last_datagram = datetime.now() 679 self._logger.debug( 680 "AutoFlush last received: %s", 681 str(self._last_datagram), 682 ) 683 684 return True
Client for interacting with Deepgram's text-to-speech services over WebSockets.
This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events.
Args: config (DeepgramClientOptions): all the options for the client.
72 def __init__( 73 self, config: DeepgramClientOptions, microphone: Optional[Microphone] = None 74 ): 75 if config is None: 76 raise DeepgramError("Config is required") 77 78 self._logger = verboselogs.VerboseLogger(__name__) 79 self._logger.addHandler(logging.StreamHandler()) 80 self._logger.setLevel(config.verbose) 81 82 self._config = config 83 self._endpoint = "v1/speak" 84 self._lock_flush = threading.Lock() 85 86 self._flush_thread = None 87 88 # auto flush 89 self._last_datagram = None 90 self._flush_count = 0 91 92 # microphone 93 self._microphone = microphone 94 95 # init handlers 96 self._event_handlers = { 97 event: [] for event in SpeakWebSocketEvents.__members__.values() 98 } 99 100 if self._config.options.get("speaker_playback") == "true": 101 self._logger.info("speaker_playback is enabled") 102 rate = self._config.options.get("speaker_playback_rate") 103 if rate is None: 104 rate = RATE 105 channels = self._config.options.get("speaker_playback_channels") 106 if channels is None: 107 channels = CHANNELS 108 playback_delta_in_ms = self._config.options.get( 109 "speaker_playback_delta_in_ms" 110 ) 111 if playback_delta_in_ms is None: 112 playback_delta_in_ms = PLAYBACK_DELTA 113 device_index = self._config.options.get("speaker_playback_device_index") 114 115 self._logger.debug("rate: %s", rate) 116 self._logger.debug("channels: %s", channels) 117 self._logger.debug("device_index: %s", device_index) 118 119 self._speaker_created = True 120 121 if device_index is not None: 122 self._speaker = Speaker( 123 rate=rate, 124 channels=channels, 125 last_play_delta_in_ms=playback_delta_in_ms, 126 verbose=self._config.verbose, 127 output_device_index=device_index, 128 microphone=self._microphone, 129 ) 130 else: 131 self._speaker = Speaker( 132 rate=rate, 133 channels=channels, 134 last_play_delta_in_ms=playback_delta_in_ms, 135 verbose=self._config.verbose, 136 microphone=self._microphone, 137 ) 138 139 # call the parent constructor 140 super().__init__(self._config, self._endpoint)
143 def start( 144 self, 145 options: Optional[Union[SpeakWSOptions, Dict]] = None, 146 addons: Optional[Dict] = None, 147 headers: Optional[Dict] = None, 148 members: Optional[Dict] = None, 149 **kwargs, 150 ) -> bool: 151 """ 152 Starts the WebSocket connection for text-to-speech synthesis. 153 """ 154 self._logger.debug("SpeakWebSocketClient.start ENTER") 155 self._logger.info("options: %s", options) 156 self._logger.info("addons: %s", addons) 157 self._logger.info("headers: %s", headers) 158 self._logger.info("members: %s", members) 159 self._logger.info("kwargs: %s", kwargs) 160 161 if isinstance(options, SpeakWSOptions) and not options.check(): 162 self._logger.error("options.check failed") 163 self._logger.debug("SpeakWebSocketClient.start LEAVE") 164 raise DeepgramError("Fatal text-to-speech options error") 165 166 self._addons = addons 167 self._headers = headers 168 169 # add "members" as members of the class 170 if members is not None: 171 self.__dict__.update(members) 172 173 # set kwargs as members of the class 174 if kwargs is not None: 175 self._kwargs = kwargs 176 else: 177 self._kwargs = {} 178 179 if isinstance(options, SpeakWSOptions): 180 self._logger.info("SpeakWSOptions switching class -> dict") 181 self._options = options.to_dict() 182 elif options is not None: 183 self._options = options 184 else: 185 self._options = {} 186 187 try: 188 # speaker substitutes the listening thread 189 if self._speaker is not None: 190 self._logger.notice("passing speaker to delegate_listening") 191 super().delegate_listening(self._speaker) 192 193 # call parent start 194 if ( 195 super().start( 196 self._options, 197 self._addons, 198 self._headers, 199 **dict(cast(Dict[Any, Any], self._kwargs)), 200 ) 201 is False 202 ): 203 self._logger.error("SpeakWebSocketClient.start failed") 204 self._logger.debug("SpeakWebSocketClient.start LEAVE") 205 return False 206 207 if self._speaker is not None: 208 self._logger.notice("start delegate_listening thread") 209 self._speaker.start() 210 211 # debug the threads 212 for thread in threading.enumerate(): 213 self._logger.debug("after running thread: %s", thread.name) 214 self._logger.debug("number of active threads: %s", threading.active_count()) 215 216 # flush thread 217 if self._config.is_auto_flush_speak_enabled(): 218 self._logger.notice("autoflush is enabled") 219 self._flush_thread = threading.Thread(target=self._flush) 220 self._flush_thread.start() 221 else: 222 self._logger.notice("autoflush is disabled") 223 224 # debug the threads 225 for thread in threading.enumerate(): 226 self._logger.debug("after running thread: %s", thread.name) 227 self._logger.debug("number of active threads: %s", threading.active_count()) 228 229 self._logger.notice("start succeeded") 230 self._logger.debug("SpeakWebSocketClient.start LEAVE") 231 return True 232 233 except Exception as e: # pylint: disable=broad-except 234 self._logger.error( 235 "WebSocketException in SpeakWebSocketClient.start: %s", e 236 ) 237 self._logger.debug("SpeakWebSocketClient.start LEAVE") 238 if self._config.options.get("termination_exception_connect") is True: 239 raise 240 return False
Starts the WebSocket connection for text-to-speech synthesis.
244 def on(self, event: SpeakWebSocketEvents, handler: Callable) -> None: 245 """ 246 Registers event handlers for specific events. 247 """ 248 self._logger.info("event subscribed: %s", event) 249 if event in SpeakWebSocketEvents.__members__.values() and callable(handler): 250 self._event_handlers[event].append(handler)
Registers event handlers for specific events.
480 def send_text(self, text_input: str) -> bool: 481 """ 482 Sends text to the WebSocket connection to generate audio. 483 484 Args: 485 text_input (str): The raw text to be synthesized. This function will automatically wrap 486 the text in a JSON object of type "Speak" with the key "text". 487 488 Returns: 489 bool: True if the text was successfully sent, False otherwise. 490 """ 491 return self.send_raw(json.dumps({"type": "Speak", "text": text_input}))
Sends text to the WebSocket connection to generate audio.
Args: text_input (str): The raw text to be synthesized. This function will automatically wrap the text in a JSON object of type "Speak" with the key "text".
Returns: bool: True if the text was successfully sent, False otherwise.
493 def send(self, data: Union[str, bytes]) -> bool: 494 """ 495 Alias for send_text. Please see send_text for more information. 496 """ 497 if isinstance(data, bytes): 498 self._logger.error("send() failed - data is bytes") 499 return False 500 501 return self.send_text(data)
Alias for send_text. Please see send_text for more information.
504 def send_control( 505 self, msg_type: Union[SpeakWebSocketMessage, str], data: Optional[str] = "" 506 ) -> bool: 507 """ 508 Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection. 509 510 Args: 511 msg_type (SpeakWebSocketEvents): The type of control message to send. 512 (Optional) data (str): The data to send with the control message. 513 514 Returns: 515 bool: True if the control message was successfully sent, False otherwise. 516 """ 517 control_msg = json.dumps({"type": msg_type}) 518 return self.send_raw(control_msg)
Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection.
Args: msg_type (SpeakWebSocketEvents): The type of control message to send. (Optional) data (str): The data to send with the control message.
Returns: bool: True if the control message was successfully sent, False otherwise.
523 def send_raw(self, msg: str) -> bool: 524 """ 525 Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object. 526 527 Args: 528 msg (str): The raw message to send over the WebSocket connection. 529 530 Returns: 531 bool: True if the message was successfully sent, False otherwise. 532 """ 533 self._logger.spam("SpeakWebSocketClient.send_raw ENTER") 534 535 if self._config.is_inspecting_speak(): 536 try: 537 _tmp_json = json.loads(msg) 538 if "type" in _tmp_json: 539 self._logger.debug( 540 "Inspecting Message: Sending %s", _tmp_json["type"] 541 ) 542 match _tmp_json["type"]: 543 case SpeakWebSocketMessage.Speak: 544 inspect_res = self._inspect() 545 if not inspect_res: 546 self._logger.error("inspect_res failed") 547 case SpeakWebSocketMessage.Flush: 548 with self._lock_flush: 549 self._last_datagram = None 550 self._flush_count += 1 551 self._logger.debug( 552 "Increment Flush count: %d", self._flush_count 553 ) 554 except Exception as e: # pylint: disable=broad-except 555 self._logger.error("send_raw() failed - Exception: %s", str(e)) 556 557 try: 558 if super().send(msg) is False: 559 self._logger.error("send_raw() failed") 560 self._logger.spam("SpeakWebSocketClient.send_raw LEAVE") 561 return False 562 self._logger.spam("send_raw() succeeded") 563 self._logger.spam("SpeakWebSocketClient.send_raw LEAVE") 564 return True 565 except Exception as e: # pylint: disable=broad-except 566 self._logger.error("send_raw() failed - Exception: %s", str(e)) 567 self._logger.spam("SpeakWebSocketClient.send_raw LEAVE") 568 if self._config.options.get("termination_exception_send") is True: 569 raise 570 return False
Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object.
Args: msg (str): The raw message to send over the WebSocket connection.
Returns: bool: True if the message was successfully sent, False otherwise.
574 def flush(self) -> bool: 575 """ 576 Flushes the current buffer and returns generated audio 577 """ 578 self._logger.spam("SpeakWebSocketClient.flush ENTER") 579 580 self._logger.notice("Sending Flush...") 581 ret = self.send_control(SpeakWebSocketMessage.Flush) 582 583 if not ret: 584 self._logger.error("flush failed") 585 self._logger.spam("SpeakWebSocketClient.flush LEAVE") 586 return False 587 588 self._logger.notice("flush succeeded") 589 self._logger.spam("SpeakWebSocketClient.flush LEAVE") 590 591 return True
Flushes the current buffer and returns generated audio
593 def clear(self) -> bool: 594 """ 595 Clears the current buffer on the server 596 """ 597 self._logger.spam("SpeakWebSocketClient.clear ENTER") 598 599 self._logger.notice("Sending Clear...") 600 ret = self.send_control(SpeakWebSocketMessage.Clear) 601 602 if not ret: 603 self._logger.error("clear failed") 604 self._logger.spam("SpeakWebSocketClient.clear LEAVE") 605 return False 606 607 self._logger.notice("clear succeeded") 608 self._logger.spam("SpeakWebSocketClient.clear LEAVE") 609 610 return True
Clears the current buffer on the server
612 def wait_for_complete(self): 613 """ 614 This method will block until the speak is done playing sound. 615 """ 616 self._logger.spam("SpeakWebSocketClient.wait_for_complete ENTER") 617 618 if self._speaker is None: 619 self._logger.error("speaker is None. Return immediately") 620 raise DeepgramError("Speaker is not initialized") 621 622 self._speaker.wait_for_complete() 623 self._logger.notice("wait_for_complete succeeded") 624 self._logger.spam("SpeakWebSocketClient.wait_for_complete LEAVE")
This method will block until the speak is done playing sound.
630 def finish(self) -> bool: 631 """ 632 Closes the WebSocket connection gracefully. 633 """ 634 self._logger.spam("SpeakWebSocketClient.finish ENTER") 635 636 # call parent finish which calls signal_exit 637 if super().finish() is False: 638 self._logger.error("ListenWebSocketClient.finish failed") 639 640 if self._speaker is not None and self._speaker_created: 641 self._speaker.finish() 642 self._speaker_created = False 643 644 # debug the threads 645 for thread in threading.enumerate(): 646 self._logger.debug("before running thread: %s", thread.name) 647 self._logger.debug("number of active threads: %s", threading.active_count()) 648 649 # stop the threads 650 if self._speaker is not None: 651 self._logger.verbose("stopping speaker...") 652 self._speaker.finish() 653 self._speaker = None 654 self._logger.notice("speaker stopped") 655 656 if self._flush_thread is not None: 657 self._logger.verbose("sdtopping _flush_thread...") 658 self._flush_thread.join() 659 self._flush_thread = None 660 self._logger.notice("_flush_thread joined") 661 662 # debug the threads 663 for thread in threading.enumerate(): 664 self._logger.debug("before running thread: %s", thread.name) 665 self._logger.debug("number of active threads: %s", threading.active_count()) 666 667 self._logger.notice("finish succeeded") 668 self._logger.spam("SpeakWebSocketClient.finish LEAVE") 669 return True
Closes the WebSocket connection gracefully.