deepgram.clients.speak.v1.websocket.async_client
1# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. 2# Use of this source code is governed by a MIT license that can be found in the LICENSE file. 3# SPDX-License-Identifier: MIT 4 5import asyncio 6import json 7import logging 8from typing import Dict, Union, Optional, cast, Any, Callable 9from datetime import datetime 10import threading 11 12from .....utils import verboselogs 13from .....options import DeepgramClientOptions 14from ...enums import SpeakWebSocketEvents, SpeakWebSocketMessage 15from ....common import AbstractAsyncWebSocketClient 16from ....common import DeepgramError 17 18from .response import ( 19 OpenResponse, 20 MetadataResponse, 21 FlushedResponse, 22 ClearedResponse, 23 CloseResponse, 24 WarningResponse, 25 ErrorResponse, 26 UnhandledResponse, 27) 28from .options import SpeakWSOptions 29 30from .....audio.microphone import Microphone 31from .....audio.speaker import Speaker, RATE, CHANNELS, PLAYBACK_DELTA 32 33ONE_SECOND = 1 34HALF_SECOND = 0.5 35DEEPGRAM_INTERVAL = 5 36PING_INTERVAL = 20 37 38 39class AsyncSpeakWSClient( 40 AbstractAsyncWebSocketClient 41): # pylint: disable=too-many-instance-attributes 42 """ 43 Client for interacting with Deepgram's text-to-speech services over WebSockets. 44 45 This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events. 46 47 Args: 48 config (DeepgramClientOptions): all the options for the client. 49 """ 50 51 _logger: verboselogs.VerboseLogger 52 _config: DeepgramClientOptions 53 _endpoint: str 54 55 _event_handlers: Dict[SpeakWebSocketEvents, list] 56 57 _flush_thread: Union[asyncio.Task, None] 58 _last_datagram: Optional[datetime] = None 59 _flush_count: int 60 61 _kwargs: Optional[Dict] = None 62 _addons: Optional[Dict] = None 63 _options: Optional[Dict] = None 64 _headers: Optional[Dict] = None 65 66 _speaker_created: bool = False 67 _speaker: Optional[Speaker] = None 68 _microphone: Optional[Microphone] = None 69 70 def __init__( 71 self, config: DeepgramClientOptions, microphone: Optional[Microphone] = None 72 ): 73 if config is None: 74 raise DeepgramError("Config is required") 75 self._logger = verboselogs.VerboseLogger(__name__) 76 self._logger.addHandler(logging.StreamHandler()) 77 self._logger.setLevel(config.verbose) 78 79 self._config = config 80 self._endpoint = "v1/speak" 81 82 self._flush_thread = None 83 84 # auto flush 85 self._last_datagram = None 86 self._flush_count = 0 87 88 # microphone 89 self._microphone = microphone 90 91 # init handlers 92 self._event_handlers = { 93 event: [] for event in SpeakWebSocketEvents.__members__.values() 94 } 95 96 if self._config.options.get("speaker_playback") == "true": 97 self._logger.info("speaker_playback is enabled") 98 rate = self._config.options.get("speaker_playback_rate") 99 if rate is None: 100 rate = RATE 101 channels = self._config.options.get("speaker_playback_channels") 102 if channels is None: 103 channels = CHANNELS 104 playback_delta_in_ms = self._config.options.get( 105 "speaker_playback_delta_in_ms" 106 ) 107 if playback_delta_in_ms is None: 108 playback_delta_in_ms = PLAYBACK_DELTA 109 device_index = self._config.options.get("speaker_playback_device_index") 110 111 self._logger.debug("rate: %s", rate) 112 self._logger.debug("channels: %s", channels) 113 self._logger.debug("device_index: %s", device_index) 114 115 self._speaker_created = True 116 117 if device_index is not None: 118 self._speaker = Speaker( 119 rate=rate, 120 channels=channels, 121 last_play_delta_in_ms=playback_delta_in_ms, 122 verbose=self._config.verbose, 123 output_device_index=device_index, 124 microphone=self._microphone, 125 ) 126 else: 127 self._speaker = Speaker( 128 rate=rate, 129 channels=channels, 130 last_play_delta_in_ms=playback_delta_in_ms, 131 verbose=self._config.verbose, 132 microphone=self._microphone, 133 ) 134 135 # call the parent constructor 136 super().__init__(self._config, self._endpoint) 137 138 # pylint: disable=too-many-branches,too-many-statements 139 async def start( 140 self, 141 options: Optional[Union[SpeakWSOptions, Dict]] = None, 142 addons: Optional[Dict] = None, 143 headers: Optional[Dict] = None, 144 members: Optional[Dict] = None, 145 **kwargs, 146 ) -> bool: 147 """ 148 Starts the WebSocket connection for text-to-speech synthesis. 149 """ 150 self._logger.debug("AsyncSpeakWebSocketClient.start ENTER") 151 self._logger.info("options: %s", options) 152 self._logger.info("addons: %s", addons) 153 self._logger.info("headers: %s", headers) 154 self._logger.info("members: %s", members) 155 self._logger.info("kwargs: %s", kwargs) 156 157 if isinstance(options, SpeakWSOptions) and not options.check(): 158 self._logger.error("options.check failed") 159 self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE") 160 raise DeepgramError("Fatal text-to-speech options error") 161 162 self._addons = addons 163 self._headers = headers 164 165 # add "members" as members of the class 166 if members is not None: 167 self.__dict__.update(members) 168 169 # set kwargs as members of the class 170 if kwargs is not None: 171 self._kwargs = kwargs 172 else: 173 self._kwargs = {} 174 175 if isinstance(options, SpeakWSOptions): 176 self._logger.info("SpeakWSOptions switching class -> dict") 177 self._options = options.to_dict() 178 elif options is not None: 179 self._options = options 180 else: 181 self._options = {} 182 183 try: 184 # speaker substitutes the listening thread 185 if self._speaker is not None: 186 self._logger.notice("passing speaker to delegate_listening") 187 super().delegate_listening(self._speaker) 188 189 # call parent start 190 if ( 191 await super().start( 192 self._options, 193 self._addons, 194 self._headers, 195 **dict(cast(Dict[Any, Any], self._kwargs)), 196 ) 197 is False 198 ): 199 self._logger.error("AsyncSpeakWebSocketClient.start failed") 200 self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE") 201 return False 202 203 if self._speaker is not None: 204 self._logger.notice("start delegate_listening thread") 205 self._speaker.start() 206 207 # debug the threads 208 for thread in threading.enumerate(): 209 self._logger.debug("after running thread: %s", thread.name) 210 self._logger.debug("number of active threads: %s", threading.active_count()) 211 212 # flush thread 213 if self._config.is_auto_flush_speak_enabled(): 214 self._logger.notice("autoflush is enabled") 215 self._flush_thread = asyncio.create_task(self._flush()) 216 else: 217 self._logger.notice("autoflush is disabled") 218 219 # debug the threads 220 for thread in threading.enumerate(): 221 self._logger.debug("after running thread: %s", thread.name) 222 self._logger.debug("number of active threads: %s", threading.active_count()) 223 224 self._logger.notice("start succeeded") 225 self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE") 226 return True 227 228 except Exception as e: # pylint: disable=broad-except 229 self._logger.error( 230 "WebSocketException in AsyncSpeakWebSocketClient.start: %s", e 231 ) 232 self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE") 233 if self._config.options.get("termination_exception_connect") is True: 234 raise 235 return False 236 237 # pylint: enable=too-many-branches,too-many-statements 238 239 def on(self, event: SpeakWebSocketEvents, handler: Callable) -> None: 240 """ 241 Registers event handlers for specific events. 242 """ 243 self._logger.info("event subscribed: %s", event) 244 if event in SpeakWebSocketEvents.__members__.values() and callable(handler): 245 self._event_handlers[event].append(handler) 246 247 # triggers the registered event handlers for a specific event 248 async def _emit(self, event: SpeakWebSocketEvents, *args, **kwargs) -> None: 249 """ 250 Emits events to the registered event handlers. 251 """ 252 self._logger.debug("AsyncSpeakWebSocketClient._emit ENTER") 253 self._logger.debug("callback handlers for: %s", event) 254 255 # debug the threads 256 for thread in threading.enumerate(): 257 self._logger.debug("after running thread: %s", thread.name) 258 self._logger.debug("number of active threads: %s", threading.active_count()) 259 260 tasks = [] 261 for handler in self._event_handlers[event]: 262 task = asyncio.create_task(handler(self, *args, **kwargs)) 263 tasks.append(task) 264 265 if tasks: 266 self._logger.debug("waiting for tasks to finish...") 267 await asyncio.gather(*filter(None, tasks), return_exceptions=True) 268 tasks.clear() 269 270 # debug the threads 271 for thread in threading.enumerate(): 272 self._logger.debug("after running thread: %s", thread.name) 273 self._logger.debug("number of active threads: %s", threading.active_count()) 274 275 self._logger.debug("AsyncSpeakWebSocketClient._emit LEAVE") 276 277 async def _process_text(self, message: Union[str, bytes]) -> None: 278 """ 279 Processes messages received over the WebSocket connection. 280 """ 281 self._logger.debug("AsyncSpeakWebSocketClient._process_text ENTER") 282 283 try: 284 self._logger.debug("Text data received") 285 286 if len(message) == 0: 287 self._logger.debug("message is empty") 288 self._logger.debug("AsyncSpeakWebSocketClient._process_text LEAVE") 289 return 290 291 data = json.loads(message) 292 response_type = data.get("type") 293 self._logger.debug("response_type: %s, data: %s", response_type, data) 294 295 match response_type: 296 case SpeakWebSocketEvents.Open: 297 open_result: OpenResponse = OpenResponse.from_json(message) 298 self._logger.verbose("OpenResponse: %s", open_result) 299 await self._emit( 300 SpeakWebSocketEvents(SpeakWebSocketEvents.Open), 301 open=open_result, 302 **dict(cast(Dict[Any, Any], self._kwargs)), 303 ) 304 case SpeakWebSocketEvents.Metadata: 305 meta_result: MetadataResponse = MetadataResponse.from_json(message) 306 self._logger.verbose("MetadataResponse: %s", meta_result) 307 await self._emit( 308 SpeakWebSocketEvents(SpeakWebSocketEvents.Metadata), 309 metadata=meta_result, 310 **dict(cast(Dict[Any, Any], self._kwargs)), 311 ) 312 case SpeakWebSocketEvents.Flushed: 313 fl_result: FlushedResponse = FlushedResponse.from_json(message) 314 self._logger.verbose("FlushedResponse: %s", fl_result) 315 316 # auto flush 317 if self._config.is_inspecting_speak(): 318 self._flush_count -= 1 319 self._logger.debug( 320 "Decrement AutoFlush count: %d", 321 self._flush_count, 322 ) 323 324 await self._emit( 325 SpeakWebSocketEvents(SpeakWebSocketEvents.Flushed), 326 flushed=fl_result, 327 **dict(cast(Dict[Any, Any], self._kwargs)), 328 ) 329 case SpeakWebSocketEvents.Cleared: 330 clear_result: ClearedResponse = ClearedResponse.from_json(message) 331 self._logger.verbose("ClearedResponse: %s", clear_result) 332 await self._emit( 333 SpeakWebSocketEvents(SpeakWebSocketEvents.Cleared), 334 cleared=clear_result, 335 **dict(cast(Dict[Any, Any], self._kwargs)), 336 ) 337 case SpeakWebSocketEvents.Close: 338 close_result: CloseResponse = CloseResponse.from_json(message) 339 self._logger.verbose("CloseResponse: %s", close_result) 340 await self._emit( 341 SpeakWebSocketEvents(SpeakWebSocketEvents.Close), 342 close=close_result, 343 **dict(cast(Dict[Any, Any], self._kwargs)), 344 ) 345 case SpeakWebSocketEvents.Warning: 346 war_warning: WarningResponse = WarningResponse.from_json(message) 347 self._logger.verbose("WarningResponse: %s", war_warning) 348 await self._emit( 349 SpeakWebSocketEvents(SpeakWebSocketEvents.Warning), 350 warning=war_warning, 351 **dict(cast(Dict[Any, Any], self._kwargs)), 352 ) 353 case SpeakWebSocketEvents.Error: 354 err_error: ErrorResponse = ErrorResponse.from_json(message) 355 self._logger.verbose("ErrorResponse: %s", err_error) 356 await self._emit( 357 SpeakWebSocketEvents(SpeakWebSocketEvents.Error), 358 error=err_error, 359 **dict(cast(Dict[Any, Any], self._kwargs)), 360 ) 361 case _: 362 self._logger.warning( 363 "Unknown Message: response_type: %s, data: %s", 364 response_type, 365 data, 366 ) 367 unhandled_error: UnhandledResponse = UnhandledResponse( 368 type=SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), 369 raw=str(message), 370 ) 371 await self._emit( 372 SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), 373 unhandled=unhandled_error, 374 **dict(cast(Dict[Any, Any], self._kwargs)), 375 ) 376 377 self._logger.notice("_process_text Succeeded") 378 self._logger.debug("AsyncSpeakWebSocketClient._process_text LEAVE") 379 380 except Exception as e: # pylint: disable=broad-except 381 self._logger.error( 382 "Exception in AsyncSpeakWebSocketClient._process_text: %s", e 383 ) 384 e_error: ErrorResponse = ErrorResponse( 385 "Exception in AsyncSpeakWebSocketClient._process_text", 386 f"{e}", 387 "Exception", 388 ) 389 await self._emit( 390 SpeakWebSocketEvents(SpeakWebSocketEvents.Error), 391 error=e_error, 392 **dict(cast(Dict[Any, Any], self._kwargs)), 393 ) 394 395 # signal exit and close 396 await super()._signal_exit() 397 398 self._logger.debug("AsyncSpeakWebSocketClient._process_text LEAVE") 399 400 if self._config.options.get("termination_exception") is True: 401 raise 402 return 403 404 # pylint: enable=too-many-return-statements,too-many-statements 405 406 async def _process_binary(self, message: bytes) -> None: 407 self._logger.debug("SpeakWebSocketClient._process_binary ENTER") 408 self._logger.debug("Binary data received") 409 410 await self._emit( 411 SpeakWebSocketEvents(SpeakWebSocketEvents.AudioData), 412 data=message, 413 **dict(cast(Dict[Any, Any], self._kwargs)), 414 ) 415 416 self._logger.notice("_process_binary Succeeded") 417 self._logger.debug("SpeakWebSocketClient._process_binary LEAVE") 418 419 ## pylint: disable=too-many-return-statements 420 async def _flush(self) -> None: 421 self._logger.debug("AsyncSpeakWebSocketClient._flush ENTER") 422 423 delta_in_ms_str = self._config.options.get("auto_flush_speak_delta") 424 if delta_in_ms_str is None: 425 self._logger.error("auto_flush_speak_delta is None") 426 self._logger.debug("AsyncSpeakWebSocketClient._flush LEAVE") 427 return 428 delta_in_ms = float(delta_in_ms_str) 429 430 while True: 431 try: 432 await asyncio.sleep(HALF_SECOND) 433 434 if self._exit_event.is_set(): 435 self._logger.notice("_flush exiting gracefully") 436 self._logger.debug("AsyncSpeakWebSocketClient._flush LEAVE") 437 return 438 439 if self._last_datagram is None: 440 self._logger.debug("AutoFlush last_datagram is None") 441 continue 442 443 delta = datetime.now() - self._last_datagram 444 diff_in_ms = delta.total_seconds() * 1000 445 self._logger.debug("AutoFlush delta: %f", diff_in_ms) 446 if diff_in_ms < delta_in_ms: 447 self._logger.debug("AutoFlush delta is less than threshold") 448 continue 449 450 await self.flush() 451 452 except Exception as e: # pylint: disable=broad-except 453 self._logger.error( 454 "Exception in AsyncSpeakWebSocketClient._flush: %s", e 455 ) 456 e_error: ErrorResponse = ErrorResponse( 457 "Exception in AsyncSpeakWebSocketClient._flush", 458 f"{e}", 459 "Exception", 460 ) 461 self._logger.error( 462 "Exception in AsyncSpeakWebSocketClient._flush: %s", str(e) 463 ) 464 await self._emit( 465 SpeakWebSocketEvents(SpeakWebSocketEvents.Error), 466 error=e_error, 467 **dict(cast(Dict[Any, Any], self._kwargs)), 468 ) 469 470 # signal exit and close 471 await super()._signal_exit() 472 473 self._logger.debug("AsyncSpeakWebSocketClient._flush LEAVE") 474 475 if self._config.options.get("termination_exception") is True: 476 raise 477 return 478 479 # pylint: enable=too-many-return-statements 480 481 async def send_text(self, text_input: str) -> bool: 482 """ 483 Sends text to the WebSocket connection to generate audio. 484 485 Args: 486 text_input (str): The raw text to be synthesized. This function will automatically wrap 487 the text in a JSON object of type "Speak" with the key "text". 488 489 Returns: 490 bool: True if the text was successfully sent, False otherwise. 491 """ 492 return await self.send_raw(json.dumps({"type": "Speak", "text": text_input})) 493 494 async def send(self, data: Union[bytes, str]) -> bool: 495 """ 496 Alias for send_text. Please see send_text for more information. 497 """ 498 if isinstance(data, bytes): 499 self._logger.error("send() failed - data is bytes") 500 return False 501 502 return await self.send_text(data) 503 504 # pylint: disable=unused-argument 505 async def send_control( 506 self, msg_type: Union[SpeakWebSocketMessage, str], data: Optional[str] = "" 507 ) -> bool: 508 """ 509 Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection. 510 511 Args: 512 msg_type (SpeakWebSocketEvents): The type of control message to send. 513 (Optional) data (str): The data to send with the control message. 514 515 Returns: 516 bool: True if the control message was successfully sent, False otherwise. 517 """ 518 control_msg = json.dumps({"type": msg_type}) 519 return await self.send_raw(control_msg) 520 521 # pylint: enable=unused-argument 522 523 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements 524 async def send_raw(self, msg: str) -> bool: 525 """ 526 Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object. 527 528 Args: 529 msg (str): The raw message to send over the WebSocket connection. 530 531 Returns: 532 bool: True if the message was successfully sent, False otherwise. 533 """ 534 self._logger.spam("AsyncSpeakWebSocketClient.send_raw ENTER") 535 536 if self._config.is_inspecting_speak(): 537 try: 538 _tmp_json = json.loads(msg) 539 if "type" in _tmp_json: 540 self._logger.debug( 541 "Inspecting Message: Sending %s", _tmp_json["type"] 542 ) 543 match _tmp_json["type"]: 544 case SpeakWebSocketMessage.Speak: 545 inspect_res = await self._inspect() 546 if not inspect_res: 547 self._logger.error("inspect_res failed") 548 case SpeakWebSocketMessage.Flush: 549 self._last_datagram = None 550 self._flush_count += 1 551 self._logger.debug( 552 "Increment Flush count: %d", self._flush_count 553 ) 554 except Exception as e: # pylint: disable=broad-except 555 self._logger.error("send_raw() failed - Exception: %s", str(e)) 556 557 try: 558 if await super().send(msg) is False: 559 self._logger.error("send_raw() failed") 560 self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE") 561 return False 562 self._logger.spam("send_raw() succeeded") 563 self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE") 564 return True 565 except Exception as e: # pylint: disable=broad-except 566 self._logger.error("send_raw() failed - Exception: %s", str(e)) 567 self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE") 568 if self._config.options.get("termination_exception_send") is True: 569 raise 570 return False 571 572 # pylint: enable=too-many-return-statements,too-many-branches 573 574 async def flush(self) -> bool: 575 """ 576 Flushes the current buffer and returns generated audio 577 """ 578 self._logger.spam("AsyncSpeakWebSocketClient.flush ENTER") 579 580 self._logger.notice("Sending Flush...") 581 ret = await self.send_control(SpeakWebSocketMessage.Flush) 582 583 if not ret: 584 self._logger.error("flush failed") 585 self._logger.spam("AsyncSpeakWebSocketClient.flush LEAVE") 586 return False 587 588 self._logger.notice("flush succeeded") 589 self._logger.spam("AsyncSpeakWebSocketClient.flush LEAVE") 590 591 return True 592 593 async def clear(self) -> bool: 594 """ 595 Clears the current buffer on the server 596 """ 597 self._logger.spam("AsyncSpeakWebSocketClient.clear ENTER") 598 599 self._logger.notice("Sending Clear...") 600 ret = await self.send_control(SpeakWebSocketMessage.Clear) 601 602 if not ret: 603 self._logger.error("clear failed") 604 self._logger.spam("AsyncSpeakWebSocketClient.clear LEAVE") 605 return False 606 607 self._logger.notice("clear succeeded") 608 self._logger.spam("AsyncSpeakWebSocketClient.clear LEAVE") 609 610 return True 611 612 async def wait_for_complete(self): 613 """ 614 This method will block until the speak is done playing sound. 615 """ 616 self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete ENTER") 617 618 if self._speaker is None: 619 self._logger.error("speaker is None. Return immediately") 620 return 621 622 loop = asyncio.get_event_loop() 623 await loop.run_in_executor(None, self._speaker.wait_for_complete) 624 self._logger.notice("wait_for_complete succeeded") 625 self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete LEAVE") 626 627 async def _close_message(self) -> bool: 628 return await self.send_control(SpeakWebSocketMessage.Close) 629 630 async def finish(self) -> bool: 631 """ 632 Closes the WebSocket connection gracefully. 633 """ 634 self._logger.debug("AsyncSpeakWebSocketClient.finish ENTER") 635 636 # stop the threads 637 self._logger.verbose("cancelling tasks...") 638 try: 639 # call parent finish 640 if await super().finish() is False: 641 self._logger.error("AsyncListenWebSocketClient.finish failed") 642 643 if self._speaker is not None and self._speaker_created: 644 self._speaker.finish() 645 self._speaker_created = False 646 647 # Before cancelling, check if the tasks were created 648 # debug the threads 649 for thread in threading.enumerate(): 650 self._logger.debug("before running thread: %s", thread.name) 651 self._logger.debug("number of active threads: %s", threading.active_count()) 652 653 tasks = [] 654 655 if self._speaker is not None: 656 self._logger.notice("stopping speaker...") 657 self._speaker.finish() 658 self._speaker = None 659 self._logger.notice("speaker stopped") 660 661 if self._flush_thread is not None: 662 self._logger.notice("stopping _flush_thread...") 663 self._flush_thread.cancel() 664 tasks.append(self._flush_thread) 665 self._logger.notice("_flush_thread cancelled") 666 667 # Use asyncio.gather to wait for tasks to be cancelled 668 # Prevent indefinite waiting by setting a timeout 669 await asyncio.wait_for(asyncio.gather(*tasks), timeout=10) 670 self._logger.notice("threads joined") 671 672 # debug the threads 673 for thread in threading.enumerate(): 674 self._logger.debug("after running thread: %s", thread.name) 675 self._logger.debug("number of active threads: %s", threading.active_count()) 676 677 self._logger.notice("finish succeeded") 678 self._logger.spam("AsyncSpeakWebSocketClient.finish LEAVE") 679 return True 680 681 except asyncio.CancelledError: 682 self._logger.debug("tasks cancelled") 683 self._logger.debug("AsyncSpeakWebSocketClient.finish LEAVE") 684 return False 685 686 except asyncio.TimeoutError as e: 687 self._logger.error("tasks cancellation timed out: %s", e) 688 self._logger.debug("AsyncSpeakWebSocketClient.finish LEAVE") 689 return False 690 691 async def _inspect(self) -> bool: 692 # auto flush_inspect is generically used to track any messages you might want to snoop on 693 # place additional logic here to inspect messages of interest 694 695 # for auto flush functionality 696 # set the last datagram 697 self._last_datagram = datetime.now() 698 self._logger.debug( 699 "AutoFlush last received: %s", 700 str(self._last_datagram), 701 ) 702 703 return True 704 705 706AsyncSpeakWebSocketClient = AsyncSpeakWSClient
40class AsyncSpeakWSClient( 41 AbstractAsyncWebSocketClient 42): # pylint: disable=too-many-instance-attributes 43 """ 44 Client for interacting with Deepgram's text-to-speech services over WebSockets. 45 46 This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events. 47 48 Args: 49 config (DeepgramClientOptions): all the options for the client. 50 """ 51 52 _logger: verboselogs.VerboseLogger 53 _config: DeepgramClientOptions 54 _endpoint: str 55 56 _event_handlers: Dict[SpeakWebSocketEvents, list] 57 58 _flush_thread: Union[asyncio.Task, None] 59 _last_datagram: Optional[datetime] = None 60 _flush_count: int 61 62 _kwargs: Optional[Dict] = None 63 _addons: Optional[Dict] = None 64 _options: Optional[Dict] = None 65 _headers: Optional[Dict] = None 66 67 _speaker_created: bool = False 68 _speaker: Optional[Speaker] = None 69 _microphone: Optional[Microphone] = None 70 71 def __init__( 72 self, config: DeepgramClientOptions, microphone: Optional[Microphone] = None 73 ): 74 if config is None: 75 raise DeepgramError("Config is required") 76 self._logger = verboselogs.VerboseLogger(__name__) 77 self._logger.addHandler(logging.StreamHandler()) 78 self._logger.setLevel(config.verbose) 79 80 self._config = config 81 self._endpoint = "v1/speak" 82 83 self._flush_thread = None 84 85 # auto flush 86 self._last_datagram = None 87 self._flush_count = 0 88 89 # microphone 90 self._microphone = microphone 91 92 # init handlers 93 self._event_handlers = { 94 event: [] for event in SpeakWebSocketEvents.__members__.values() 95 } 96 97 if self._config.options.get("speaker_playback") == "true": 98 self._logger.info("speaker_playback is enabled") 99 rate = self._config.options.get("speaker_playback_rate") 100 if rate is None: 101 rate = RATE 102 channels = self._config.options.get("speaker_playback_channels") 103 if channels is None: 104 channels = CHANNELS 105 playback_delta_in_ms = self._config.options.get( 106 "speaker_playback_delta_in_ms" 107 ) 108 if playback_delta_in_ms is None: 109 playback_delta_in_ms = PLAYBACK_DELTA 110 device_index = self._config.options.get("speaker_playback_device_index") 111 112 self._logger.debug("rate: %s", rate) 113 self._logger.debug("channels: %s", channels) 114 self._logger.debug("device_index: %s", device_index) 115 116 self._speaker_created = True 117 118 if device_index is not None: 119 self._speaker = Speaker( 120 rate=rate, 121 channels=channels, 122 last_play_delta_in_ms=playback_delta_in_ms, 123 verbose=self._config.verbose, 124 output_device_index=device_index, 125 microphone=self._microphone, 126 ) 127 else: 128 self._speaker = Speaker( 129 rate=rate, 130 channels=channels, 131 last_play_delta_in_ms=playback_delta_in_ms, 132 verbose=self._config.verbose, 133 microphone=self._microphone, 134 ) 135 136 # call the parent constructor 137 super().__init__(self._config, self._endpoint) 138 139 # pylint: disable=too-many-branches,too-many-statements 140 async def start( 141 self, 142 options: Optional[Union[SpeakWSOptions, Dict]] = None, 143 addons: Optional[Dict] = None, 144 headers: Optional[Dict] = None, 145 members: Optional[Dict] = None, 146 **kwargs, 147 ) -> bool: 148 """ 149 Starts the WebSocket connection for text-to-speech synthesis. 150 """ 151 self._logger.debug("AsyncSpeakWebSocketClient.start ENTER") 152 self._logger.info("options: %s", options) 153 self._logger.info("addons: %s", addons) 154 self._logger.info("headers: %s", headers) 155 self._logger.info("members: %s", members) 156 self._logger.info("kwargs: %s", kwargs) 157 158 if isinstance(options, SpeakWSOptions) and not options.check(): 159 self._logger.error("options.check failed") 160 self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE") 161 raise DeepgramError("Fatal text-to-speech options error") 162 163 self._addons = addons 164 self._headers = headers 165 166 # add "members" as members of the class 167 if members is not None: 168 self.__dict__.update(members) 169 170 # set kwargs as members of the class 171 if kwargs is not None: 172 self._kwargs = kwargs 173 else: 174 self._kwargs = {} 175 176 if isinstance(options, SpeakWSOptions): 177 self._logger.info("SpeakWSOptions switching class -> dict") 178 self._options = options.to_dict() 179 elif options is not None: 180 self._options = options 181 else: 182 self._options = {} 183 184 try: 185 # speaker substitutes the listening thread 186 if self._speaker is not None: 187 self._logger.notice("passing speaker to delegate_listening") 188 super().delegate_listening(self._speaker) 189 190 # call parent start 191 if ( 192 await super().start( 193 self._options, 194 self._addons, 195 self._headers, 196 **dict(cast(Dict[Any, Any], self._kwargs)), 197 ) 198 is False 199 ): 200 self._logger.error("AsyncSpeakWebSocketClient.start failed") 201 self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE") 202 return False 203 204 if self._speaker is not None: 205 self._logger.notice("start delegate_listening thread") 206 self._speaker.start() 207 208 # debug the threads 209 for thread in threading.enumerate(): 210 self._logger.debug("after running thread: %s", thread.name) 211 self._logger.debug("number of active threads: %s", threading.active_count()) 212 213 # flush thread 214 if self._config.is_auto_flush_speak_enabled(): 215 self._logger.notice("autoflush is enabled") 216 self._flush_thread = asyncio.create_task(self._flush()) 217 else: 218 self._logger.notice("autoflush is disabled") 219 220 # debug the threads 221 for thread in threading.enumerate(): 222 self._logger.debug("after running thread: %s", thread.name) 223 self._logger.debug("number of active threads: %s", threading.active_count()) 224 225 self._logger.notice("start succeeded") 226 self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE") 227 return True 228 229 except Exception as e: # pylint: disable=broad-except 230 self._logger.error( 231 "WebSocketException in AsyncSpeakWebSocketClient.start: %s", e 232 ) 233 self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE") 234 if self._config.options.get("termination_exception_connect") is True: 235 raise 236 return False 237 238 # pylint: enable=too-many-branches,too-many-statements 239 240 def on(self, event: SpeakWebSocketEvents, handler: Callable) -> None: 241 """ 242 Registers event handlers for specific events. 243 """ 244 self._logger.info("event subscribed: %s", event) 245 if event in SpeakWebSocketEvents.__members__.values() and callable(handler): 246 self._event_handlers[event].append(handler) 247 248 # triggers the registered event handlers for a specific event 249 async def _emit(self, event: SpeakWebSocketEvents, *args, **kwargs) -> None: 250 """ 251 Emits events to the registered event handlers. 252 """ 253 self._logger.debug("AsyncSpeakWebSocketClient._emit ENTER") 254 self._logger.debug("callback handlers for: %s", event) 255 256 # debug the threads 257 for thread in threading.enumerate(): 258 self._logger.debug("after running thread: %s", thread.name) 259 self._logger.debug("number of active threads: %s", threading.active_count()) 260 261 tasks = [] 262 for handler in self._event_handlers[event]: 263 task = asyncio.create_task(handler(self, *args, **kwargs)) 264 tasks.append(task) 265 266 if tasks: 267 self._logger.debug("waiting for tasks to finish...") 268 await asyncio.gather(*filter(None, tasks), return_exceptions=True) 269 tasks.clear() 270 271 # debug the threads 272 for thread in threading.enumerate(): 273 self._logger.debug("after running thread: %s", thread.name) 274 self._logger.debug("number of active threads: %s", threading.active_count()) 275 276 self._logger.debug("AsyncSpeakWebSocketClient._emit LEAVE") 277 278 async def _process_text(self, message: Union[str, bytes]) -> None: 279 """ 280 Processes messages received over the WebSocket connection. 281 """ 282 self._logger.debug("AsyncSpeakWebSocketClient._process_text ENTER") 283 284 try: 285 self._logger.debug("Text data received") 286 287 if len(message) == 0: 288 self._logger.debug("message is empty") 289 self._logger.debug("AsyncSpeakWebSocketClient._process_text LEAVE") 290 return 291 292 data = json.loads(message) 293 response_type = data.get("type") 294 self._logger.debug("response_type: %s, data: %s", response_type, data) 295 296 match response_type: 297 case SpeakWebSocketEvents.Open: 298 open_result: OpenResponse = OpenResponse.from_json(message) 299 self._logger.verbose("OpenResponse: %s", open_result) 300 await self._emit( 301 SpeakWebSocketEvents(SpeakWebSocketEvents.Open), 302 open=open_result, 303 **dict(cast(Dict[Any, Any], self._kwargs)), 304 ) 305 case SpeakWebSocketEvents.Metadata: 306 meta_result: MetadataResponse = MetadataResponse.from_json(message) 307 self._logger.verbose("MetadataResponse: %s", meta_result) 308 await self._emit( 309 SpeakWebSocketEvents(SpeakWebSocketEvents.Metadata), 310 metadata=meta_result, 311 **dict(cast(Dict[Any, Any], self._kwargs)), 312 ) 313 case SpeakWebSocketEvents.Flushed: 314 fl_result: FlushedResponse = FlushedResponse.from_json(message) 315 self._logger.verbose("FlushedResponse: %s", fl_result) 316 317 # auto flush 318 if self._config.is_inspecting_speak(): 319 self._flush_count -= 1 320 self._logger.debug( 321 "Decrement AutoFlush count: %d", 322 self._flush_count, 323 ) 324 325 await self._emit( 326 SpeakWebSocketEvents(SpeakWebSocketEvents.Flushed), 327 flushed=fl_result, 328 **dict(cast(Dict[Any, Any], self._kwargs)), 329 ) 330 case SpeakWebSocketEvents.Cleared: 331 clear_result: ClearedResponse = ClearedResponse.from_json(message) 332 self._logger.verbose("ClearedResponse: %s", clear_result) 333 await self._emit( 334 SpeakWebSocketEvents(SpeakWebSocketEvents.Cleared), 335 cleared=clear_result, 336 **dict(cast(Dict[Any, Any], self._kwargs)), 337 ) 338 case SpeakWebSocketEvents.Close: 339 close_result: CloseResponse = CloseResponse.from_json(message) 340 self._logger.verbose("CloseResponse: %s", close_result) 341 await self._emit( 342 SpeakWebSocketEvents(SpeakWebSocketEvents.Close), 343 close=close_result, 344 **dict(cast(Dict[Any, Any], self._kwargs)), 345 ) 346 case SpeakWebSocketEvents.Warning: 347 war_warning: WarningResponse = WarningResponse.from_json(message) 348 self._logger.verbose("WarningResponse: %s", war_warning) 349 await self._emit( 350 SpeakWebSocketEvents(SpeakWebSocketEvents.Warning), 351 warning=war_warning, 352 **dict(cast(Dict[Any, Any], self._kwargs)), 353 ) 354 case SpeakWebSocketEvents.Error: 355 err_error: ErrorResponse = ErrorResponse.from_json(message) 356 self._logger.verbose("ErrorResponse: %s", err_error) 357 await self._emit( 358 SpeakWebSocketEvents(SpeakWebSocketEvents.Error), 359 error=err_error, 360 **dict(cast(Dict[Any, Any], self._kwargs)), 361 ) 362 case _: 363 self._logger.warning( 364 "Unknown Message: response_type: %s, data: %s", 365 response_type, 366 data, 367 ) 368 unhandled_error: UnhandledResponse = UnhandledResponse( 369 type=SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), 370 raw=str(message), 371 ) 372 await self._emit( 373 SpeakWebSocketEvents(SpeakWebSocketEvents.Unhandled), 374 unhandled=unhandled_error, 375 **dict(cast(Dict[Any, Any], self._kwargs)), 376 ) 377 378 self._logger.notice("_process_text Succeeded") 379 self._logger.debug("AsyncSpeakWebSocketClient._process_text LEAVE") 380 381 except Exception as e: # pylint: disable=broad-except 382 self._logger.error( 383 "Exception in AsyncSpeakWebSocketClient._process_text: %s", e 384 ) 385 e_error: ErrorResponse = ErrorResponse( 386 "Exception in AsyncSpeakWebSocketClient._process_text", 387 f"{e}", 388 "Exception", 389 ) 390 await self._emit( 391 SpeakWebSocketEvents(SpeakWebSocketEvents.Error), 392 error=e_error, 393 **dict(cast(Dict[Any, Any], self._kwargs)), 394 ) 395 396 # signal exit and close 397 await super()._signal_exit() 398 399 self._logger.debug("AsyncSpeakWebSocketClient._process_text LEAVE") 400 401 if self._config.options.get("termination_exception") is True: 402 raise 403 return 404 405 # pylint: enable=too-many-return-statements,too-many-statements 406 407 async def _process_binary(self, message: bytes) -> None: 408 self._logger.debug("SpeakWebSocketClient._process_binary ENTER") 409 self._logger.debug("Binary data received") 410 411 await self._emit( 412 SpeakWebSocketEvents(SpeakWebSocketEvents.AudioData), 413 data=message, 414 **dict(cast(Dict[Any, Any], self._kwargs)), 415 ) 416 417 self._logger.notice("_process_binary Succeeded") 418 self._logger.debug("SpeakWebSocketClient._process_binary LEAVE") 419 420 ## pylint: disable=too-many-return-statements 421 async def _flush(self) -> None: 422 self._logger.debug("AsyncSpeakWebSocketClient._flush ENTER") 423 424 delta_in_ms_str = self._config.options.get("auto_flush_speak_delta") 425 if delta_in_ms_str is None: 426 self._logger.error("auto_flush_speak_delta is None") 427 self._logger.debug("AsyncSpeakWebSocketClient._flush LEAVE") 428 return 429 delta_in_ms = float(delta_in_ms_str) 430 431 while True: 432 try: 433 await asyncio.sleep(HALF_SECOND) 434 435 if self._exit_event.is_set(): 436 self._logger.notice("_flush exiting gracefully") 437 self._logger.debug("AsyncSpeakWebSocketClient._flush LEAVE") 438 return 439 440 if self._last_datagram is None: 441 self._logger.debug("AutoFlush last_datagram is None") 442 continue 443 444 delta = datetime.now() - self._last_datagram 445 diff_in_ms = delta.total_seconds() * 1000 446 self._logger.debug("AutoFlush delta: %f", diff_in_ms) 447 if diff_in_ms < delta_in_ms: 448 self._logger.debug("AutoFlush delta is less than threshold") 449 continue 450 451 await self.flush() 452 453 except Exception as e: # pylint: disable=broad-except 454 self._logger.error( 455 "Exception in AsyncSpeakWebSocketClient._flush: %s", e 456 ) 457 e_error: ErrorResponse = ErrorResponse( 458 "Exception in AsyncSpeakWebSocketClient._flush", 459 f"{e}", 460 "Exception", 461 ) 462 self._logger.error( 463 "Exception in AsyncSpeakWebSocketClient._flush: %s", str(e) 464 ) 465 await self._emit( 466 SpeakWebSocketEvents(SpeakWebSocketEvents.Error), 467 error=e_error, 468 **dict(cast(Dict[Any, Any], self._kwargs)), 469 ) 470 471 # signal exit and close 472 await super()._signal_exit() 473 474 self._logger.debug("AsyncSpeakWebSocketClient._flush LEAVE") 475 476 if self._config.options.get("termination_exception") is True: 477 raise 478 return 479 480 # pylint: enable=too-many-return-statements 481 482 async def send_text(self, text_input: str) -> bool: 483 """ 484 Sends text to the WebSocket connection to generate audio. 485 486 Args: 487 text_input (str): The raw text to be synthesized. This function will automatically wrap 488 the text in a JSON object of type "Speak" with the key "text". 489 490 Returns: 491 bool: True if the text was successfully sent, False otherwise. 492 """ 493 return await self.send_raw(json.dumps({"type": "Speak", "text": text_input})) 494 495 async def send(self, data: Union[bytes, str]) -> bool: 496 """ 497 Alias for send_text. Please see send_text for more information. 498 """ 499 if isinstance(data, bytes): 500 self._logger.error("send() failed - data is bytes") 501 return False 502 503 return await self.send_text(data) 504 505 # pylint: disable=unused-argument 506 async def send_control( 507 self, msg_type: Union[SpeakWebSocketMessage, str], data: Optional[str] = "" 508 ) -> bool: 509 """ 510 Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection. 511 512 Args: 513 msg_type (SpeakWebSocketEvents): The type of control message to send. 514 (Optional) data (str): The data to send with the control message. 515 516 Returns: 517 bool: True if the control message was successfully sent, False otherwise. 518 """ 519 control_msg = json.dumps({"type": msg_type}) 520 return await self.send_raw(control_msg) 521 522 # pylint: enable=unused-argument 523 524 # pylint: disable=too-many-return-statements,too-many-branches,too-many-statements 525 async def send_raw(self, msg: str) -> bool: 526 """ 527 Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object. 528 529 Args: 530 msg (str): The raw message to send over the WebSocket connection. 531 532 Returns: 533 bool: True if the message was successfully sent, False otherwise. 534 """ 535 self._logger.spam("AsyncSpeakWebSocketClient.send_raw ENTER") 536 537 if self._config.is_inspecting_speak(): 538 try: 539 _tmp_json = json.loads(msg) 540 if "type" in _tmp_json: 541 self._logger.debug( 542 "Inspecting Message: Sending %s", _tmp_json["type"] 543 ) 544 match _tmp_json["type"]: 545 case SpeakWebSocketMessage.Speak: 546 inspect_res = await self._inspect() 547 if not inspect_res: 548 self._logger.error("inspect_res failed") 549 case SpeakWebSocketMessage.Flush: 550 self._last_datagram = None 551 self._flush_count += 1 552 self._logger.debug( 553 "Increment Flush count: %d", self._flush_count 554 ) 555 except Exception as e: # pylint: disable=broad-except 556 self._logger.error("send_raw() failed - Exception: %s", str(e)) 557 558 try: 559 if await super().send(msg) is False: 560 self._logger.error("send_raw() failed") 561 self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE") 562 return False 563 self._logger.spam("send_raw() succeeded") 564 self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE") 565 return True 566 except Exception as e: # pylint: disable=broad-except 567 self._logger.error("send_raw() failed - Exception: %s", str(e)) 568 self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE") 569 if self._config.options.get("termination_exception_send") is True: 570 raise 571 return False 572 573 # pylint: enable=too-many-return-statements,too-many-branches 574 575 async def flush(self) -> bool: 576 """ 577 Flushes the current buffer and returns generated audio 578 """ 579 self._logger.spam("AsyncSpeakWebSocketClient.flush ENTER") 580 581 self._logger.notice("Sending Flush...") 582 ret = await self.send_control(SpeakWebSocketMessage.Flush) 583 584 if not ret: 585 self._logger.error("flush failed") 586 self._logger.spam("AsyncSpeakWebSocketClient.flush LEAVE") 587 return False 588 589 self._logger.notice("flush succeeded") 590 self._logger.spam("AsyncSpeakWebSocketClient.flush LEAVE") 591 592 return True 593 594 async def clear(self) -> bool: 595 """ 596 Clears the current buffer on the server 597 """ 598 self._logger.spam("AsyncSpeakWebSocketClient.clear ENTER") 599 600 self._logger.notice("Sending Clear...") 601 ret = await self.send_control(SpeakWebSocketMessage.Clear) 602 603 if not ret: 604 self._logger.error("clear failed") 605 self._logger.spam("AsyncSpeakWebSocketClient.clear LEAVE") 606 return False 607 608 self._logger.notice("clear succeeded") 609 self._logger.spam("AsyncSpeakWebSocketClient.clear LEAVE") 610 611 return True 612 613 async def wait_for_complete(self): 614 """ 615 This method will block until the speak is done playing sound. 616 """ 617 self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete ENTER") 618 619 if self._speaker is None: 620 self._logger.error("speaker is None. Return immediately") 621 return 622 623 loop = asyncio.get_event_loop() 624 await loop.run_in_executor(None, self._speaker.wait_for_complete) 625 self._logger.notice("wait_for_complete succeeded") 626 self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete LEAVE") 627 628 async def _close_message(self) -> bool: 629 return await self.send_control(SpeakWebSocketMessage.Close) 630 631 async def finish(self) -> bool: 632 """ 633 Closes the WebSocket connection gracefully. 634 """ 635 self._logger.debug("AsyncSpeakWebSocketClient.finish ENTER") 636 637 # stop the threads 638 self._logger.verbose("cancelling tasks...") 639 try: 640 # call parent finish 641 if await super().finish() is False: 642 self._logger.error("AsyncListenWebSocketClient.finish failed") 643 644 if self._speaker is not None and self._speaker_created: 645 self._speaker.finish() 646 self._speaker_created = False 647 648 # Before cancelling, check if the tasks were created 649 # debug the threads 650 for thread in threading.enumerate(): 651 self._logger.debug("before running thread: %s", thread.name) 652 self._logger.debug("number of active threads: %s", threading.active_count()) 653 654 tasks = [] 655 656 if self._speaker is not None: 657 self._logger.notice("stopping speaker...") 658 self._speaker.finish() 659 self._speaker = None 660 self._logger.notice("speaker stopped") 661 662 if self._flush_thread is not None: 663 self._logger.notice("stopping _flush_thread...") 664 self._flush_thread.cancel() 665 tasks.append(self._flush_thread) 666 self._logger.notice("_flush_thread cancelled") 667 668 # Use asyncio.gather to wait for tasks to be cancelled 669 # Prevent indefinite waiting by setting a timeout 670 await asyncio.wait_for(asyncio.gather(*tasks), timeout=10) 671 self._logger.notice("threads joined") 672 673 # debug the threads 674 for thread in threading.enumerate(): 675 self._logger.debug("after running thread: %s", thread.name) 676 self._logger.debug("number of active threads: %s", threading.active_count()) 677 678 self._logger.notice("finish succeeded") 679 self._logger.spam("AsyncSpeakWebSocketClient.finish LEAVE") 680 return True 681 682 except asyncio.CancelledError: 683 self._logger.debug("tasks cancelled") 684 self._logger.debug("AsyncSpeakWebSocketClient.finish LEAVE") 685 return False 686 687 except asyncio.TimeoutError as e: 688 self._logger.error("tasks cancellation timed out: %s", e) 689 self._logger.debug("AsyncSpeakWebSocketClient.finish LEAVE") 690 return False 691 692 async def _inspect(self) -> bool: 693 # auto flush_inspect is generically used to track any messages you might want to snoop on 694 # place additional logic here to inspect messages of interest 695 696 # for auto flush functionality 697 # set the last datagram 698 self._last_datagram = datetime.now() 699 self._logger.debug( 700 "AutoFlush last received: %s", 701 str(self._last_datagram), 702 ) 703 704 return True
Client for interacting with Deepgram's text-to-speech services over WebSockets.
This class provides methods to establish a WebSocket connection for TTS synthesis and handle real-time TTS synthesis events.
Args: config (DeepgramClientOptions): all the options for the client.
71 def __init__( 72 self, config: DeepgramClientOptions, microphone: Optional[Microphone] = None 73 ): 74 if config is None: 75 raise DeepgramError("Config is required") 76 self._logger = verboselogs.VerboseLogger(__name__) 77 self._logger.addHandler(logging.StreamHandler()) 78 self._logger.setLevel(config.verbose) 79 80 self._config = config 81 self._endpoint = "v1/speak" 82 83 self._flush_thread = None 84 85 # auto flush 86 self._last_datagram = None 87 self._flush_count = 0 88 89 # microphone 90 self._microphone = microphone 91 92 # init handlers 93 self._event_handlers = { 94 event: [] for event in SpeakWebSocketEvents.__members__.values() 95 } 96 97 if self._config.options.get("speaker_playback") == "true": 98 self._logger.info("speaker_playback is enabled") 99 rate = self._config.options.get("speaker_playback_rate") 100 if rate is None: 101 rate = RATE 102 channels = self._config.options.get("speaker_playback_channels") 103 if channels is None: 104 channels = CHANNELS 105 playback_delta_in_ms = self._config.options.get( 106 "speaker_playback_delta_in_ms" 107 ) 108 if playback_delta_in_ms is None: 109 playback_delta_in_ms = PLAYBACK_DELTA 110 device_index = self._config.options.get("speaker_playback_device_index") 111 112 self._logger.debug("rate: %s", rate) 113 self._logger.debug("channels: %s", channels) 114 self._logger.debug("device_index: %s", device_index) 115 116 self._speaker_created = True 117 118 if device_index is not None: 119 self._speaker = Speaker( 120 rate=rate, 121 channels=channels, 122 last_play_delta_in_ms=playback_delta_in_ms, 123 verbose=self._config.verbose, 124 output_device_index=device_index, 125 microphone=self._microphone, 126 ) 127 else: 128 self._speaker = Speaker( 129 rate=rate, 130 channels=channels, 131 last_play_delta_in_ms=playback_delta_in_ms, 132 verbose=self._config.verbose, 133 microphone=self._microphone, 134 ) 135 136 # call the parent constructor 137 super().__init__(self._config, self._endpoint)
140 async def start( 141 self, 142 options: Optional[Union[SpeakWSOptions, Dict]] = None, 143 addons: Optional[Dict] = None, 144 headers: Optional[Dict] = None, 145 members: Optional[Dict] = None, 146 **kwargs, 147 ) -> bool: 148 """ 149 Starts the WebSocket connection for text-to-speech synthesis. 150 """ 151 self._logger.debug("AsyncSpeakWebSocketClient.start ENTER") 152 self._logger.info("options: %s", options) 153 self._logger.info("addons: %s", addons) 154 self._logger.info("headers: %s", headers) 155 self._logger.info("members: %s", members) 156 self._logger.info("kwargs: %s", kwargs) 157 158 if isinstance(options, SpeakWSOptions) and not options.check(): 159 self._logger.error("options.check failed") 160 self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE") 161 raise DeepgramError("Fatal text-to-speech options error") 162 163 self._addons = addons 164 self._headers = headers 165 166 # add "members" as members of the class 167 if members is not None: 168 self.__dict__.update(members) 169 170 # set kwargs as members of the class 171 if kwargs is not None: 172 self._kwargs = kwargs 173 else: 174 self._kwargs = {} 175 176 if isinstance(options, SpeakWSOptions): 177 self._logger.info("SpeakWSOptions switching class -> dict") 178 self._options = options.to_dict() 179 elif options is not None: 180 self._options = options 181 else: 182 self._options = {} 183 184 try: 185 # speaker substitutes the listening thread 186 if self._speaker is not None: 187 self._logger.notice("passing speaker to delegate_listening") 188 super().delegate_listening(self._speaker) 189 190 # call parent start 191 if ( 192 await super().start( 193 self._options, 194 self._addons, 195 self._headers, 196 **dict(cast(Dict[Any, Any], self._kwargs)), 197 ) 198 is False 199 ): 200 self._logger.error("AsyncSpeakWebSocketClient.start failed") 201 self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE") 202 return False 203 204 if self._speaker is not None: 205 self._logger.notice("start delegate_listening thread") 206 self._speaker.start() 207 208 # debug the threads 209 for thread in threading.enumerate(): 210 self._logger.debug("after running thread: %s", thread.name) 211 self._logger.debug("number of active threads: %s", threading.active_count()) 212 213 # flush thread 214 if self._config.is_auto_flush_speak_enabled(): 215 self._logger.notice("autoflush is enabled") 216 self._flush_thread = asyncio.create_task(self._flush()) 217 else: 218 self._logger.notice("autoflush is disabled") 219 220 # debug the threads 221 for thread in threading.enumerate(): 222 self._logger.debug("after running thread: %s", thread.name) 223 self._logger.debug("number of active threads: %s", threading.active_count()) 224 225 self._logger.notice("start succeeded") 226 self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE") 227 return True 228 229 except Exception as e: # pylint: disable=broad-except 230 self._logger.error( 231 "WebSocketException in AsyncSpeakWebSocketClient.start: %s", e 232 ) 233 self._logger.debug("AsyncSpeakWebSocketClient.start LEAVE") 234 if self._config.options.get("termination_exception_connect") is True: 235 raise 236 return False
Starts the WebSocket connection for text-to-speech synthesis.
240 def on(self, event: SpeakWebSocketEvents, handler: Callable) -> None: 241 """ 242 Registers event handlers for specific events. 243 """ 244 self._logger.info("event subscribed: %s", event) 245 if event in SpeakWebSocketEvents.__members__.values() and callable(handler): 246 self._event_handlers[event].append(handler)
Registers event handlers for specific events.
482 async def send_text(self, text_input: str) -> bool: 483 """ 484 Sends text to the WebSocket connection to generate audio. 485 486 Args: 487 text_input (str): The raw text to be synthesized. This function will automatically wrap 488 the text in a JSON object of type "Speak" with the key "text". 489 490 Returns: 491 bool: True if the text was successfully sent, False otherwise. 492 """ 493 return await self.send_raw(json.dumps({"type": "Speak", "text": text_input}))
Sends text to the WebSocket connection to generate audio.
Args: text_input (str): The raw text to be synthesized. This function will automatically wrap the text in a JSON object of type "Speak" with the key "text".
Returns: bool: True if the text was successfully sent, False otherwise.
495 async def send(self, data: Union[bytes, str]) -> bool: 496 """ 497 Alias for send_text. Please see send_text for more information. 498 """ 499 if isinstance(data, bytes): 500 self._logger.error("send() failed - data is bytes") 501 return False 502 503 return await self.send_text(data)
Alias for send_text. Please see send_text for more information.
506 async def send_control( 507 self, msg_type: Union[SpeakWebSocketMessage, str], data: Optional[str] = "" 508 ) -> bool: 509 """ 510 Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection. 511 512 Args: 513 msg_type (SpeakWebSocketEvents): The type of control message to send. 514 (Optional) data (str): The data to send with the control message. 515 516 Returns: 517 bool: True if the control message was successfully sent, False otherwise. 518 """ 519 control_msg = json.dumps({"type": msg_type}) 520 return await self.send_raw(control_msg)
Sends a control message consisting of type SpeakWebSocketEvents over the WebSocket connection.
Args: msg_type (SpeakWebSocketEvents): The type of control message to send. (Optional) data (str): The data to send with the control message.
Returns: bool: True if the control message was successfully sent, False otherwise.
525 async def send_raw(self, msg: str) -> bool: 526 """ 527 Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object. 528 529 Args: 530 msg (str): The raw message to send over the WebSocket connection. 531 532 Returns: 533 bool: True if the message was successfully sent, False otherwise. 534 """ 535 self._logger.spam("AsyncSpeakWebSocketClient.send_raw ENTER") 536 537 if self._config.is_inspecting_speak(): 538 try: 539 _tmp_json = json.loads(msg) 540 if "type" in _tmp_json: 541 self._logger.debug( 542 "Inspecting Message: Sending %s", _tmp_json["type"] 543 ) 544 match _tmp_json["type"]: 545 case SpeakWebSocketMessage.Speak: 546 inspect_res = await self._inspect() 547 if not inspect_res: 548 self._logger.error("inspect_res failed") 549 case SpeakWebSocketMessage.Flush: 550 self._last_datagram = None 551 self._flush_count += 1 552 self._logger.debug( 553 "Increment Flush count: %d", self._flush_count 554 ) 555 except Exception as e: # pylint: disable=broad-except 556 self._logger.error("send_raw() failed - Exception: %s", str(e)) 557 558 try: 559 if await super().send(msg) is False: 560 self._logger.error("send_raw() failed") 561 self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE") 562 return False 563 self._logger.spam("send_raw() succeeded") 564 self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE") 565 return True 566 except Exception as e: # pylint: disable=broad-except 567 self._logger.error("send_raw() failed - Exception: %s", str(e)) 568 self._logger.spam("AsyncSpeakWebSocketClient.send_raw LEAVE") 569 if self._config.options.get("termination_exception_send") is True: 570 raise 571 return False
Sends a raw/control message over the WebSocket connection. This message must contain a valid JSON object.
Args: msg (str): The raw message to send over the WebSocket connection.
Returns: bool: True if the message was successfully sent, False otherwise.
575 async def flush(self) -> bool: 576 """ 577 Flushes the current buffer and returns generated audio 578 """ 579 self._logger.spam("AsyncSpeakWebSocketClient.flush ENTER") 580 581 self._logger.notice("Sending Flush...") 582 ret = await self.send_control(SpeakWebSocketMessage.Flush) 583 584 if not ret: 585 self._logger.error("flush failed") 586 self._logger.spam("AsyncSpeakWebSocketClient.flush LEAVE") 587 return False 588 589 self._logger.notice("flush succeeded") 590 self._logger.spam("AsyncSpeakWebSocketClient.flush LEAVE") 591 592 return True
Flushes the current buffer and returns generated audio
594 async def clear(self) -> bool: 595 """ 596 Clears the current buffer on the server 597 """ 598 self._logger.spam("AsyncSpeakWebSocketClient.clear ENTER") 599 600 self._logger.notice("Sending Clear...") 601 ret = await self.send_control(SpeakWebSocketMessage.Clear) 602 603 if not ret: 604 self._logger.error("clear failed") 605 self._logger.spam("AsyncSpeakWebSocketClient.clear LEAVE") 606 return False 607 608 self._logger.notice("clear succeeded") 609 self._logger.spam("AsyncSpeakWebSocketClient.clear LEAVE") 610 611 return True
Clears the current buffer on the server
613 async def wait_for_complete(self): 614 """ 615 This method will block until the speak is done playing sound. 616 """ 617 self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete ENTER") 618 619 if self._speaker is None: 620 self._logger.error("speaker is None. Return immediately") 621 return 622 623 loop = asyncio.get_event_loop() 624 await loop.run_in_executor(None, self._speaker.wait_for_complete) 625 self._logger.notice("wait_for_complete succeeded") 626 self._logger.spam("AsyncSpeakWebSocketClient.wait_for_complete LEAVE")
This method will block until the speak is done playing sound.
631 async def finish(self) -> bool: 632 """ 633 Closes the WebSocket connection gracefully. 634 """ 635 self._logger.debug("AsyncSpeakWebSocketClient.finish ENTER") 636 637 # stop the threads 638 self._logger.verbose("cancelling tasks...") 639 try: 640 # call parent finish 641 if await super().finish() is False: 642 self._logger.error("AsyncListenWebSocketClient.finish failed") 643 644 if self._speaker is not None and self._speaker_created: 645 self._speaker.finish() 646 self._speaker_created = False 647 648 # Before cancelling, check if the tasks were created 649 # debug the threads 650 for thread in threading.enumerate(): 651 self._logger.debug("before running thread: %s", thread.name) 652 self._logger.debug("number of active threads: %s", threading.active_count()) 653 654 tasks = [] 655 656 if self._speaker is not None: 657 self._logger.notice("stopping speaker...") 658 self._speaker.finish() 659 self._speaker = None 660 self._logger.notice("speaker stopped") 661 662 if self._flush_thread is not None: 663 self._logger.notice("stopping _flush_thread...") 664 self._flush_thread.cancel() 665 tasks.append(self._flush_thread) 666 self._logger.notice("_flush_thread cancelled") 667 668 # Use asyncio.gather to wait for tasks to be cancelled 669 # Prevent indefinite waiting by setting a timeout 670 await asyncio.wait_for(asyncio.gather(*tasks), timeout=10) 671 self._logger.notice("threads joined") 672 673 # debug the threads 674 for thread in threading.enumerate(): 675 self._logger.debug("after running thread: %s", thread.name) 676 self._logger.debug("number of active threads: %s", threading.active_count()) 677 678 self._logger.notice("finish succeeded") 679 self._logger.spam("AsyncSpeakWebSocketClient.finish LEAVE") 680 return True 681 682 except asyncio.CancelledError: 683 self._logger.debug("tasks cancelled") 684 self._logger.debug("AsyncSpeakWebSocketClient.finish LEAVE") 685 return False 686 687 except asyncio.TimeoutError as e: 688 self._logger.error("tasks cancellation timed out: %s", e) 689 self._logger.debug("AsyncSpeakWebSocketClient.finish LEAVE") 690 return False
Closes the WebSocket connection gracefully.