deepgram.audio.speaker.speaker
1# Copyright 2024 Deepgram SDK contributors. All Rights Reserved. 2# Use of this source code is governed by a MIT license that can be found in the LICENSE file. 3# SPDX-License-Identifier: MIT 4 5import asyncio 6import inspect 7import queue 8import threading 9from typing import Optional, Callable, Union, TYPE_CHECKING 10import logging 11from datetime import datetime 12 13import websockets 14 15from ...utils import verboselogs 16from .constants import LOGGING, CHANNELS, RATE, CHUNK, TIMEOUT, PLAYBACK_DELTA 17 18from ..microphone import Microphone 19 20if TYPE_CHECKING: 21 import pyaudio 22 23HALF_SECOND = 0.5 24 25 26class Speaker: # pylint: disable=too-many-instance-attributes 27 """ 28 This implements a speaker for local audio output. This uses PyAudio under the hood. 29 """ 30 31 _logger: verboselogs.VerboseLogger 32 33 _audio: Optional["pyaudio.PyAudio"] = None 34 _stream: Optional["pyaudio.Stream"] = None 35 36 _chunk: int 37 _rate: int 38 _channels: int 39 _output_device_index: Optional[int] = None 40 41 # last time we received audio 42 _last_datagram: datetime = datetime.now() 43 _last_play_delta_in_ms: int 44 _lock_wait: threading.Lock 45 46 _queue: queue.Queue 47 _exit: threading.Event 48 49 _thread: Optional[threading.Thread] = None 50 # _asyncio_loop: asyncio.AbstractEventLoop 51 # _asyncio_thread: threading.Thread 52 _receiver_thread: Optional[threading.Thread] = None 53 _loop: Optional[asyncio.AbstractEventLoop] = None 54 55 _push_callback_org: Optional[Callable] = None 56 _push_callback: Optional[Callable] = None 57 _pull_callback_org: Optional[Callable] = None 58 _pull_callback: Optional[Callable] = None 59 60 _microphone: Optional[Microphone] = None 61 62 def __init__( 63 self, 64 pull_callback: Optional[Callable] = None, 65 push_callback: Optional[Callable] = None, 66 verbose: int = LOGGING, 67 rate: int = RATE, 68 chunk: int = CHUNK, 69 channels: int = CHANNELS, 70 last_play_delta_in_ms: int = PLAYBACK_DELTA, 71 output_device_index: Optional[int] = None, 72 microphone: Optional[Microphone] = None, 73 ): # pylint: disable=too-many-positional-arguments 74 # dynamic import of pyaudio as not to force the requirements on the SDK (and users) 75 import pyaudio # pylint: disable=import-outside-toplevel 76 77 self._logger = verboselogs.VerboseLogger(__name__) 78 self._logger.addHandler(logging.StreamHandler()) 79 self._logger.setLevel(verbose) 80 81 self._exit = threading.Event() 82 self._queue = queue.Queue() 83 84 self._last_datagram = datetime.now() 85 self._lock_wait = threading.Lock() 86 87 self._microphone = microphone 88 89 self._audio = pyaudio.PyAudio() 90 self._chunk = chunk 91 self._rate = rate 92 self._format = pyaudio.paInt16 93 self._channels = channels 94 self._last_play_delta_in_ms = last_play_delta_in_ms 95 self._output_device_index = output_device_index 96 97 self._push_callback_org = push_callback 98 self._pull_callback_org = pull_callback 99 100 def set_push_callback(self, push_callback: Callable) -> None: 101 """ 102 set_push_callback - sets the callback function to be called when data is sent. 103 104 Args: 105 push_callback (Callable): The callback function to be called when data is send. 106 This should be the websocket handle message function. 107 108 Returns: 109 None 110 """ 111 self._push_callback_org = push_callback 112 113 def set_pull_callback(self, pull_callback: Callable) -> None: 114 """ 115 set_pull_callback - sets the callback function to be called when data is received. 116 117 Args: 118 pull_callback (Callable): The callback function to be called when data is received. 119 This should be the websocket recv function. 120 121 Returns: 122 None 123 """ 124 self._pull_callback_org = pull_callback 125 126 def start(self, active_loop: Optional[asyncio.AbstractEventLoop] = None) -> bool: 127 """ 128 starts - starts the Speaker stream 129 130 Args: 131 socket (Union[SyncClientConnection, AsyncClientConnection]): The socket to receive audio data from. 132 133 Returns: 134 bool: True if the stream was started, False otherwise 135 """ 136 self._logger.debug("Speaker.start ENTER") 137 138 self._logger.info("format: %s", self._format) 139 self._logger.info("channels: %d", self._channels) 140 self._logger.info("rate: %d", self._rate) 141 self._logger.info("chunk: %d", self._chunk) 142 # self._logger.info("output_device_id: %d", self._output_device_index) 143 144 # Automatically get the current running event loop 145 if inspect.iscoroutinefunction(self._push_callback_org) and active_loop is None: 146 self._logger.verbose("get default running asyncio loop") 147 self._loop = asyncio.get_running_loop() 148 149 self._exit.clear() 150 self._queue = queue.Queue() 151 152 if self._audio is not None: 153 self._stream = self._audio.open( 154 format=self._format, 155 channels=self._channels, 156 rate=self._rate, 157 input=False, 158 output=True, 159 frames_per_buffer=self._chunk, 160 output_device_index=self._output_device_index, 161 ) 162 163 if self._stream is None: 164 self._logger.error("start failed. No stream created.") 165 self._logger.debug("Speaker.start LEAVE") 166 return False 167 168 self._push_callback = self._push_callback_org 169 self._pull_callback = self._pull_callback_org 170 171 # start the play thread 172 self._thread = threading.Thread( 173 target=self._play, args=(self._queue, self._stream, self._exit), daemon=True 174 ) 175 self._thread.start() 176 177 # Start the stream 178 if self._stream is not None: 179 self._stream.start_stream() 180 181 # Start the receiver thread within the start function 182 self._logger.verbose("Starting receiver thread...") 183 self._receiver_thread = threading.Thread(target=self._start_receiver) 184 self._receiver_thread.start() 185 186 self._logger.notice("start succeeded") 187 self._logger.debug("Speaker.start LEAVE") 188 189 return True 190 191 def wait_for_complete_with_mute(self, mic: Microphone): 192 """ 193 This method will mute/unmute a Microphone and block until the speak is done playing sound. 194 """ 195 self._logger.debug("Speaker.wait_for_complete ENTER") 196 197 if self._microphone is not None: 198 mic.mute() 199 self.wait_for_complete() 200 if self._microphone is not None: 201 mic.unmute() 202 203 self._logger.debug("Speaker.wait_for_complete LEAVE") 204 205 def wait_for_complete(self): 206 """ 207 This method will block until the speak is done playing sound. 208 """ 209 self._logger.debug("Speaker.wait_for_complete ENTER") 210 211 delta_in_ms = float(self._last_play_delta_in_ms) 212 self._logger.debug("Last Play delta: %f", delta_in_ms) 213 214 # set to now 215 with self._lock_wait: 216 self._last_datagram = datetime.now() 217 218 while True: 219 # sleep for a bit 220 self._exit.wait(HALF_SECOND) 221 222 # check if we should exit 223 if self._exit.is_set(): 224 self._logger.debug("Exiting wait_for_complete _exit is set") 225 break 226 227 # check the time 228 with self._lock_wait: 229 delta = datetime.now() - self._last_datagram 230 diff_in_ms = delta.total_seconds() * 1000 231 if diff_in_ms < delta_in_ms: 232 self._logger.debug("LastPlay delta is less than threshold") 233 continue 234 235 # if we get here, we are done playing audio 236 self._logger.debug("LastPlay delta is greater than threshold. Exit wait!") 237 break 238 239 self._logger.debug("Speaker.wait_for_complete LEAVE") 240 241 def _start_receiver(self): 242 # Check if the socket is an asyncio WebSocket 243 if inspect.iscoroutinefunction(self._pull_callback_org): 244 self._logger.verbose("Starting asyncio receiver...") 245 asyncio.run_coroutine_threadsafe(self._start_asyncio_receiver(), self._loop) 246 else: 247 self._logger.verbose("Starting threaded receiver...") 248 self._start_threaded_receiver() 249 250 async def _start_asyncio_receiver(self): 251 try: 252 while True: 253 if self._exit.is_set(): 254 self._logger.verbose("Exiting receiver thread...") 255 break 256 257 message = await self._pull_callback() 258 if message is None: 259 self._logger.verbose("No message received...") 260 continue 261 262 if isinstance(message, str): 263 self._logger.verbose("Received control message...") 264 await self._push_callback(message) 265 elif isinstance(message, bytes): 266 self._logger.verbose("Received audio data...") 267 await self._push_callback(message) 268 self.add_audio_to_queue(message) 269 except websockets.exceptions.ConnectionClosedOK as e: 270 self._logger.debug("send() exiting gracefully: %d", e.code) 271 except websockets.exceptions.ConnectionClosed as e: 272 if e.code in [1000, 1001]: 273 self._logger.debug("send() exiting gracefully: %d", e.code) 274 return 275 self._logger.error("_start_asyncio_receiver - ConnectionClosed: %s", str(e)) 276 except websockets.exceptions.WebSocketException as e: 277 self._logger.error( 278 "_start_asyncio_receiver- WebSocketException: %s", str(e) 279 ) 280 except Exception as e: # pylint: disable=broad-except 281 self._logger.error("_start_asyncio_receiver exception: %s", str(e)) 282 283 def _start_threaded_receiver(self): 284 try: 285 while True: 286 if self._exit.is_set(): 287 self._logger.verbose("Exiting receiver thread...") 288 break 289 290 message = self._pull_callback() 291 if message is None: 292 self._logger.verbose("No message received...") 293 continue 294 295 if isinstance(message, str): 296 self._logger.verbose("Received control message...") 297 self._push_callback(message) 298 elif isinstance(message, bytes): 299 self._logger.verbose("Received audio data...") 300 self._push_callback(message) 301 self.add_audio_to_queue(message) 302 except Exception as e: # pylint: disable=broad-except 303 self._logger.notice("_start_threaded_receiver exception: %s", str(e)) 304 305 def add_audio_to_queue(self, data: bytes) -> None: 306 """ 307 add_audio_to_queue - adds audio data to the Speaker queue 308 309 Args: 310 data (bytes): The audio data to add to the queue 311 """ 312 self._queue.put(data) 313 314 def finish(self) -> bool: 315 """ 316 finish - stops the Speaker stream 317 318 Returns: 319 bool: True if the stream was stopped, False otherwise 320 """ 321 self._logger.debug("Speaker.finish ENTER") 322 323 self._logger.notice("signal exit") 324 self._exit.set() 325 326 if self._stream is not None: 327 self._logger.notice("stopping stream...") 328 self._stream.stop_stream() 329 self._stream.close() 330 self._logger.notice("stream stopped") 331 332 if self._thread is not None: 333 self._logger.notice("joining _thread...") 334 self._thread.join() 335 self._logger.notice("thread stopped") 336 337 if self._receiver_thread is not None: 338 self._logger.notice("stopping _receiver_thread...") 339 self._receiver_thread.join() 340 self._logger.notice("_receiver_thread joined") 341 342 with self._queue.mutex: 343 self._queue.queue.clear() 344 345 self._stream = None 346 self._thread = None 347 self._receiver_thread = None 348 349 self._logger.notice("finish succeeded") 350 self._logger.debug("Speaker.finish LEAVE") 351 352 return True 353 354 def _play(self, audio_out, stream, stop): 355 """ 356 _play - plays audio data from the Speaker queue callback for portaudio 357 """ 358 while not stop.is_set(): 359 try: 360 if self._microphone is not None and self._microphone.is_muted(): 361 with self._lock_wait: 362 delta = datetime.now() - self._last_datagram 363 diff_in_ms = delta.total_seconds() * 1000 364 if diff_in_ms > float(self._last_play_delta_in_ms): 365 self._logger.debug( 366 "LastPlay delta is greater than threshold. Unmute!" 367 ) 368 self._microphone.unmute() 369 370 data = audio_out.get(True, TIMEOUT) 371 with self._lock_wait: 372 self._last_datagram = datetime.now() 373 if self._microphone is not None and not self._microphone.is_muted(): 374 self._logger.debug("New speaker sound detected. Mute!") 375 self._microphone.mute() 376 stream.write(data) 377 except queue.Empty: 378 pass 379 except Exception as e: # pylint: disable=broad-except 380 self._logger.error("_play exception: %s", str(e))
27class Speaker: # pylint: disable=too-many-instance-attributes 28 """ 29 This implements a speaker for local audio output. This uses PyAudio under the hood. 30 """ 31 32 _logger: verboselogs.VerboseLogger 33 34 _audio: Optional["pyaudio.PyAudio"] = None 35 _stream: Optional["pyaudio.Stream"] = None 36 37 _chunk: int 38 _rate: int 39 _channels: int 40 _output_device_index: Optional[int] = None 41 42 # last time we received audio 43 _last_datagram: datetime = datetime.now() 44 _last_play_delta_in_ms: int 45 _lock_wait: threading.Lock 46 47 _queue: queue.Queue 48 _exit: threading.Event 49 50 _thread: Optional[threading.Thread] = None 51 # _asyncio_loop: asyncio.AbstractEventLoop 52 # _asyncio_thread: threading.Thread 53 _receiver_thread: Optional[threading.Thread] = None 54 _loop: Optional[asyncio.AbstractEventLoop] = None 55 56 _push_callback_org: Optional[Callable] = None 57 _push_callback: Optional[Callable] = None 58 _pull_callback_org: Optional[Callable] = None 59 _pull_callback: Optional[Callable] = None 60 61 _microphone: Optional[Microphone] = None 62 63 def __init__( 64 self, 65 pull_callback: Optional[Callable] = None, 66 push_callback: Optional[Callable] = None, 67 verbose: int = LOGGING, 68 rate: int = RATE, 69 chunk: int = CHUNK, 70 channels: int = CHANNELS, 71 last_play_delta_in_ms: int = PLAYBACK_DELTA, 72 output_device_index: Optional[int] = None, 73 microphone: Optional[Microphone] = None, 74 ): # pylint: disable=too-many-positional-arguments 75 # dynamic import of pyaudio as not to force the requirements on the SDK (and users) 76 import pyaudio # pylint: disable=import-outside-toplevel 77 78 self._logger = verboselogs.VerboseLogger(__name__) 79 self._logger.addHandler(logging.StreamHandler()) 80 self._logger.setLevel(verbose) 81 82 self._exit = threading.Event() 83 self._queue = queue.Queue() 84 85 self._last_datagram = datetime.now() 86 self._lock_wait = threading.Lock() 87 88 self._microphone = microphone 89 90 self._audio = pyaudio.PyAudio() 91 self._chunk = chunk 92 self._rate = rate 93 self._format = pyaudio.paInt16 94 self._channels = channels 95 self._last_play_delta_in_ms = last_play_delta_in_ms 96 self._output_device_index = output_device_index 97 98 self._push_callback_org = push_callback 99 self._pull_callback_org = pull_callback 100 101 def set_push_callback(self, push_callback: Callable) -> None: 102 """ 103 set_push_callback - sets the callback function to be called when data is sent. 104 105 Args: 106 push_callback (Callable): The callback function to be called when data is send. 107 This should be the websocket handle message function. 108 109 Returns: 110 None 111 """ 112 self._push_callback_org = push_callback 113 114 def set_pull_callback(self, pull_callback: Callable) -> None: 115 """ 116 set_pull_callback - sets the callback function to be called when data is received. 117 118 Args: 119 pull_callback (Callable): The callback function to be called when data is received. 120 This should be the websocket recv function. 121 122 Returns: 123 None 124 """ 125 self._pull_callback_org = pull_callback 126 127 def start(self, active_loop: Optional[asyncio.AbstractEventLoop] = None) -> bool: 128 """ 129 starts - starts the Speaker stream 130 131 Args: 132 socket (Union[SyncClientConnection, AsyncClientConnection]): The socket to receive audio data from. 133 134 Returns: 135 bool: True if the stream was started, False otherwise 136 """ 137 self._logger.debug("Speaker.start ENTER") 138 139 self._logger.info("format: %s", self._format) 140 self._logger.info("channels: %d", self._channels) 141 self._logger.info("rate: %d", self._rate) 142 self._logger.info("chunk: %d", self._chunk) 143 # self._logger.info("output_device_id: %d", self._output_device_index) 144 145 # Automatically get the current running event loop 146 if inspect.iscoroutinefunction(self._push_callback_org) and active_loop is None: 147 self._logger.verbose("get default running asyncio loop") 148 self._loop = asyncio.get_running_loop() 149 150 self._exit.clear() 151 self._queue = queue.Queue() 152 153 if self._audio is not None: 154 self._stream = self._audio.open( 155 format=self._format, 156 channels=self._channels, 157 rate=self._rate, 158 input=False, 159 output=True, 160 frames_per_buffer=self._chunk, 161 output_device_index=self._output_device_index, 162 ) 163 164 if self._stream is None: 165 self._logger.error("start failed. No stream created.") 166 self._logger.debug("Speaker.start LEAVE") 167 return False 168 169 self._push_callback = self._push_callback_org 170 self._pull_callback = self._pull_callback_org 171 172 # start the play thread 173 self._thread = threading.Thread( 174 target=self._play, args=(self._queue, self._stream, self._exit), daemon=True 175 ) 176 self._thread.start() 177 178 # Start the stream 179 if self._stream is not None: 180 self._stream.start_stream() 181 182 # Start the receiver thread within the start function 183 self._logger.verbose("Starting receiver thread...") 184 self._receiver_thread = threading.Thread(target=self._start_receiver) 185 self._receiver_thread.start() 186 187 self._logger.notice("start succeeded") 188 self._logger.debug("Speaker.start LEAVE") 189 190 return True 191 192 def wait_for_complete_with_mute(self, mic: Microphone): 193 """ 194 This method will mute/unmute a Microphone and block until the speak is done playing sound. 195 """ 196 self._logger.debug("Speaker.wait_for_complete ENTER") 197 198 if self._microphone is not None: 199 mic.mute() 200 self.wait_for_complete() 201 if self._microphone is not None: 202 mic.unmute() 203 204 self._logger.debug("Speaker.wait_for_complete LEAVE") 205 206 def wait_for_complete(self): 207 """ 208 This method will block until the speak is done playing sound. 209 """ 210 self._logger.debug("Speaker.wait_for_complete ENTER") 211 212 delta_in_ms = float(self._last_play_delta_in_ms) 213 self._logger.debug("Last Play delta: %f", delta_in_ms) 214 215 # set to now 216 with self._lock_wait: 217 self._last_datagram = datetime.now() 218 219 while True: 220 # sleep for a bit 221 self._exit.wait(HALF_SECOND) 222 223 # check if we should exit 224 if self._exit.is_set(): 225 self._logger.debug("Exiting wait_for_complete _exit is set") 226 break 227 228 # check the time 229 with self._lock_wait: 230 delta = datetime.now() - self._last_datagram 231 diff_in_ms = delta.total_seconds() * 1000 232 if diff_in_ms < delta_in_ms: 233 self._logger.debug("LastPlay delta is less than threshold") 234 continue 235 236 # if we get here, we are done playing audio 237 self._logger.debug("LastPlay delta is greater than threshold. Exit wait!") 238 break 239 240 self._logger.debug("Speaker.wait_for_complete LEAVE") 241 242 def _start_receiver(self): 243 # Check if the socket is an asyncio WebSocket 244 if inspect.iscoroutinefunction(self._pull_callback_org): 245 self._logger.verbose("Starting asyncio receiver...") 246 asyncio.run_coroutine_threadsafe(self._start_asyncio_receiver(), self._loop) 247 else: 248 self._logger.verbose("Starting threaded receiver...") 249 self._start_threaded_receiver() 250 251 async def _start_asyncio_receiver(self): 252 try: 253 while True: 254 if self._exit.is_set(): 255 self._logger.verbose("Exiting receiver thread...") 256 break 257 258 message = await self._pull_callback() 259 if message is None: 260 self._logger.verbose("No message received...") 261 continue 262 263 if isinstance(message, str): 264 self._logger.verbose("Received control message...") 265 await self._push_callback(message) 266 elif isinstance(message, bytes): 267 self._logger.verbose("Received audio data...") 268 await self._push_callback(message) 269 self.add_audio_to_queue(message) 270 except websockets.exceptions.ConnectionClosedOK as e: 271 self._logger.debug("send() exiting gracefully: %d", e.code) 272 except websockets.exceptions.ConnectionClosed as e: 273 if e.code in [1000, 1001]: 274 self._logger.debug("send() exiting gracefully: %d", e.code) 275 return 276 self._logger.error("_start_asyncio_receiver - ConnectionClosed: %s", str(e)) 277 except websockets.exceptions.WebSocketException as e: 278 self._logger.error( 279 "_start_asyncio_receiver- WebSocketException: %s", str(e) 280 ) 281 except Exception as e: # pylint: disable=broad-except 282 self._logger.error("_start_asyncio_receiver exception: %s", str(e)) 283 284 def _start_threaded_receiver(self): 285 try: 286 while True: 287 if self._exit.is_set(): 288 self._logger.verbose("Exiting receiver thread...") 289 break 290 291 message = self._pull_callback() 292 if message is None: 293 self._logger.verbose("No message received...") 294 continue 295 296 if isinstance(message, str): 297 self._logger.verbose("Received control message...") 298 self._push_callback(message) 299 elif isinstance(message, bytes): 300 self._logger.verbose("Received audio data...") 301 self._push_callback(message) 302 self.add_audio_to_queue(message) 303 except Exception as e: # pylint: disable=broad-except 304 self._logger.notice("_start_threaded_receiver exception: %s", str(e)) 305 306 def add_audio_to_queue(self, data: bytes) -> None: 307 """ 308 add_audio_to_queue - adds audio data to the Speaker queue 309 310 Args: 311 data (bytes): The audio data to add to the queue 312 """ 313 self._queue.put(data) 314 315 def finish(self) -> bool: 316 """ 317 finish - stops the Speaker stream 318 319 Returns: 320 bool: True if the stream was stopped, False otherwise 321 """ 322 self._logger.debug("Speaker.finish ENTER") 323 324 self._logger.notice("signal exit") 325 self._exit.set() 326 327 if self._stream is not None: 328 self._logger.notice("stopping stream...") 329 self._stream.stop_stream() 330 self._stream.close() 331 self._logger.notice("stream stopped") 332 333 if self._thread is not None: 334 self._logger.notice("joining _thread...") 335 self._thread.join() 336 self._logger.notice("thread stopped") 337 338 if self._receiver_thread is not None: 339 self._logger.notice("stopping _receiver_thread...") 340 self._receiver_thread.join() 341 self._logger.notice("_receiver_thread joined") 342 343 with self._queue.mutex: 344 self._queue.queue.clear() 345 346 self._stream = None 347 self._thread = None 348 self._receiver_thread = None 349 350 self._logger.notice("finish succeeded") 351 self._logger.debug("Speaker.finish LEAVE") 352 353 return True 354 355 def _play(self, audio_out, stream, stop): 356 """ 357 _play - plays audio data from the Speaker queue callback for portaudio 358 """ 359 while not stop.is_set(): 360 try: 361 if self._microphone is not None and self._microphone.is_muted(): 362 with self._lock_wait: 363 delta = datetime.now() - self._last_datagram 364 diff_in_ms = delta.total_seconds() * 1000 365 if diff_in_ms > float(self._last_play_delta_in_ms): 366 self._logger.debug( 367 "LastPlay delta is greater than threshold. Unmute!" 368 ) 369 self._microphone.unmute() 370 371 data = audio_out.get(True, TIMEOUT) 372 with self._lock_wait: 373 self._last_datagram = datetime.now() 374 if self._microphone is not None and not self._microphone.is_muted(): 375 self._logger.debug("New speaker sound detected. Mute!") 376 self._microphone.mute() 377 stream.write(data) 378 except queue.Empty: 379 pass 380 except Exception as e: # pylint: disable=broad-except 381 self._logger.error("_play exception: %s", str(e))
This implements a speaker for local audio output. This uses PyAudio under the hood.
63 def __init__( 64 self, 65 pull_callback: Optional[Callable] = None, 66 push_callback: Optional[Callable] = None, 67 verbose: int = LOGGING, 68 rate: int = RATE, 69 chunk: int = CHUNK, 70 channels: int = CHANNELS, 71 last_play_delta_in_ms: int = PLAYBACK_DELTA, 72 output_device_index: Optional[int] = None, 73 microphone: Optional[Microphone] = None, 74 ): # pylint: disable=too-many-positional-arguments 75 # dynamic import of pyaudio as not to force the requirements on the SDK (and users) 76 import pyaudio # pylint: disable=import-outside-toplevel 77 78 self._logger = verboselogs.VerboseLogger(__name__) 79 self._logger.addHandler(logging.StreamHandler()) 80 self._logger.setLevel(verbose) 81 82 self._exit = threading.Event() 83 self._queue = queue.Queue() 84 85 self._last_datagram = datetime.now() 86 self._lock_wait = threading.Lock() 87 88 self._microphone = microphone 89 90 self._audio = pyaudio.PyAudio() 91 self._chunk = chunk 92 self._rate = rate 93 self._format = pyaudio.paInt16 94 self._channels = channels 95 self._last_play_delta_in_ms = last_play_delta_in_ms 96 self._output_device_index = output_device_index 97 98 self._push_callback_org = push_callback 99 self._pull_callback_org = pull_callback
101 def set_push_callback(self, push_callback: Callable) -> None: 102 """ 103 set_push_callback - sets the callback function to be called when data is sent. 104 105 Args: 106 push_callback (Callable): The callback function to be called when data is send. 107 This should be the websocket handle message function. 108 109 Returns: 110 None 111 """ 112 self._push_callback_org = push_callback
set_push_callback - sets the callback function to be called when data is sent.
Args: push_callback (Callable): The callback function to be called when data is send. This should be the websocket handle message function.
Returns: None
114 def set_pull_callback(self, pull_callback: Callable) -> None: 115 """ 116 set_pull_callback - sets the callback function to be called when data is received. 117 118 Args: 119 pull_callback (Callable): The callback function to be called when data is received. 120 This should be the websocket recv function. 121 122 Returns: 123 None 124 """ 125 self._pull_callback_org = pull_callback
set_pull_callback - sets the callback function to be called when data is received.
Args: pull_callback (Callable): The callback function to be called when data is received. This should be the websocket recv function.
Returns: None
127 def start(self, active_loop: Optional[asyncio.AbstractEventLoop] = None) -> bool: 128 """ 129 starts - starts the Speaker stream 130 131 Args: 132 socket (Union[SyncClientConnection, AsyncClientConnection]): The socket to receive audio data from. 133 134 Returns: 135 bool: True if the stream was started, False otherwise 136 """ 137 self._logger.debug("Speaker.start ENTER") 138 139 self._logger.info("format: %s", self._format) 140 self._logger.info("channels: %d", self._channels) 141 self._logger.info("rate: %d", self._rate) 142 self._logger.info("chunk: %d", self._chunk) 143 # self._logger.info("output_device_id: %d", self._output_device_index) 144 145 # Automatically get the current running event loop 146 if inspect.iscoroutinefunction(self._push_callback_org) and active_loop is None: 147 self._logger.verbose("get default running asyncio loop") 148 self._loop = asyncio.get_running_loop() 149 150 self._exit.clear() 151 self._queue = queue.Queue() 152 153 if self._audio is not None: 154 self._stream = self._audio.open( 155 format=self._format, 156 channels=self._channels, 157 rate=self._rate, 158 input=False, 159 output=True, 160 frames_per_buffer=self._chunk, 161 output_device_index=self._output_device_index, 162 ) 163 164 if self._stream is None: 165 self._logger.error("start failed. No stream created.") 166 self._logger.debug("Speaker.start LEAVE") 167 return False 168 169 self._push_callback = self._push_callback_org 170 self._pull_callback = self._pull_callback_org 171 172 # start the play thread 173 self._thread = threading.Thread( 174 target=self._play, args=(self._queue, self._stream, self._exit), daemon=True 175 ) 176 self._thread.start() 177 178 # Start the stream 179 if self._stream is not None: 180 self._stream.start_stream() 181 182 # Start the receiver thread within the start function 183 self._logger.verbose("Starting receiver thread...") 184 self._receiver_thread = threading.Thread(target=self._start_receiver) 185 self._receiver_thread.start() 186 187 self._logger.notice("start succeeded") 188 self._logger.debug("Speaker.start LEAVE") 189 190 return True
starts - starts the Speaker stream
Args: socket (Union[SyncClientConnection, AsyncClientConnection]): The socket to receive audio data from.
Returns: bool: True if the stream was started, False otherwise
192 def wait_for_complete_with_mute(self, mic: Microphone): 193 """ 194 This method will mute/unmute a Microphone and block until the speak is done playing sound. 195 """ 196 self._logger.debug("Speaker.wait_for_complete ENTER") 197 198 if self._microphone is not None: 199 mic.mute() 200 self.wait_for_complete() 201 if self._microphone is not None: 202 mic.unmute() 203 204 self._logger.debug("Speaker.wait_for_complete LEAVE")
This method will mute/unmute a Microphone and block until the speak is done playing sound.
206 def wait_for_complete(self): 207 """ 208 This method will block until the speak is done playing sound. 209 """ 210 self._logger.debug("Speaker.wait_for_complete ENTER") 211 212 delta_in_ms = float(self._last_play_delta_in_ms) 213 self._logger.debug("Last Play delta: %f", delta_in_ms) 214 215 # set to now 216 with self._lock_wait: 217 self._last_datagram = datetime.now() 218 219 while True: 220 # sleep for a bit 221 self._exit.wait(HALF_SECOND) 222 223 # check if we should exit 224 if self._exit.is_set(): 225 self._logger.debug("Exiting wait_for_complete _exit is set") 226 break 227 228 # check the time 229 with self._lock_wait: 230 delta = datetime.now() - self._last_datagram 231 diff_in_ms = delta.total_seconds() * 1000 232 if diff_in_ms < delta_in_ms: 233 self._logger.debug("LastPlay delta is less than threshold") 234 continue 235 236 # if we get here, we are done playing audio 237 self._logger.debug("LastPlay delta is greater than threshold. Exit wait!") 238 break 239 240 self._logger.debug("Speaker.wait_for_complete LEAVE")
This method will block until the speak is done playing sound.
306 def add_audio_to_queue(self, data: bytes) -> None: 307 """ 308 add_audio_to_queue - adds audio data to the Speaker queue 309 310 Args: 311 data (bytes): The audio data to add to the queue 312 """ 313 self._queue.put(data)
add_audio_to_queue - adds audio data to the Speaker queue
Args: data (bytes): The audio data to add to the queue
315 def finish(self) -> bool: 316 """ 317 finish - stops the Speaker stream 318 319 Returns: 320 bool: True if the stream was stopped, False otherwise 321 """ 322 self._logger.debug("Speaker.finish ENTER") 323 324 self._logger.notice("signal exit") 325 self._exit.set() 326 327 if self._stream is not None: 328 self._logger.notice("stopping stream...") 329 self._stream.stop_stream() 330 self._stream.close() 331 self._logger.notice("stream stopped") 332 333 if self._thread is not None: 334 self._logger.notice("joining _thread...") 335 self._thread.join() 336 self._logger.notice("thread stopped") 337 338 if self._receiver_thread is not None: 339 self._logger.notice("stopping _receiver_thread...") 340 self._receiver_thread.join() 341 self._logger.notice("_receiver_thread joined") 342 343 with self._queue.mutex: 344 self._queue.queue.clear() 345 346 self._stream = None 347 self._thread = None 348 self._receiver_thread = None 349 350 self._logger.notice("finish succeeded") 351 self._logger.debug("Speaker.finish LEAVE") 352 353 return True
finish - stops the Speaker stream
Returns: bool: True if the stream was stopped, False otherwise