deepgram.audio.speaker.speaker

  1# Copyright 2024 Deepgram SDK contributors. All Rights Reserved.
  2# Use of this source code is governed by a MIT license that can be found in the LICENSE file.
  3# SPDX-License-Identifier: MIT
  4
  5import asyncio
  6import inspect
  7import queue
  8import threading
  9from typing import Optional, Callable, Union, TYPE_CHECKING
 10import logging
 11from datetime import datetime
 12
 13import websockets
 14
 15from ...utils import verboselogs
 16from .constants import LOGGING, CHANNELS, RATE, CHUNK, TIMEOUT, PLAYBACK_DELTA
 17
 18from ..microphone import Microphone
 19
 20if TYPE_CHECKING:
 21    import pyaudio
 22
 23HALF_SECOND = 0.5
 24
 25
 26class Speaker:  # pylint: disable=too-many-instance-attributes
 27    """
 28    This implements a speaker for local audio output. This uses PyAudio under the hood.
 29    """
 30
 31    _logger: verboselogs.VerboseLogger
 32
 33    _audio: Optional["pyaudio.PyAudio"] = None
 34    _stream: Optional["pyaudio.Stream"] = None
 35
 36    _chunk: int
 37    _rate: int
 38    _channels: int
 39    _output_device_index: Optional[int] = None
 40
 41    # last time we received audio
 42    _last_datagram: datetime = datetime.now()
 43    _last_play_delta_in_ms: int
 44    _lock_wait: threading.Lock
 45
 46    _queue: queue.Queue
 47    _exit: threading.Event
 48
 49    _thread: Optional[threading.Thread] = None
 50    # _asyncio_loop: asyncio.AbstractEventLoop
 51    # _asyncio_thread: threading.Thread
 52    _receiver_thread: Optional[threading.Thread] = None
 53    _loop: Optional[asyncio.AbstractEventLoop] = None
 54
 55    _push_callback_org: Optional[Callable] = None
 56    _push_callback: Optional[Callable] = None
 57    _pull_callback_org: Optional[Callable] = None
 58    _pull_callback: Optional[Callable] = None
 59
 60    _microphone: Optional[Microphone] = None
 61
 62    def __init__(
 63        self,
 64        pull_callback: Optional[Callable] = None,
 65        push_callback: Optional[Callable] = None,
 66        verbose: int = LOGGING,
 67        rate: int = RATE,
 68        chunk: int = CHUNK,
 69        channels: int = CHANNELS,
 70        last_play_delta_in_ms: int = PLAYBACK_DELTA,
 71        output_device_index: Optional[int] = None,
 72        microphone: Optional[Microphone] = None,
 73    ):  # pylint: disable=too-many-positional-arguments
 74        # dynamic import of pyaudio as not to force the requirements on the SDK (and users)
 75        import pyaudio  # pylint: disable=import-outside-toplevel
 76
 77        self._logger = verboselogs.VerboseLogger(__name__)
 78        self._logger.addHandler(logging.StreamHandler())
 79        self._logger.setLevel(verbose)
 80
 81        self._exit = threading.Event()
 82        self._queue = queue.Queue()
 83
 84        self._last_datagram = datetime.now()
 85        self._lock_wait = threading.Lock()
 86
 87        self._microphone = microphone
 88
 89        self._audio = pyaudio.PyAudio()
 90        self._chunk = chunk
 91        self._rate = rate
 92        self._format = pyaudio.paInt16
 93        self._channels = channels
 94        self._last_play_delta_in_ms = last_play_delta_in_ms
 95        self._output_device_index = output_device_index
 96
 97        self._push_callback_org = push_callback
 98        self._pull_callback_org = pull_callback
 99
100    def set_push_callback(self, push_callback: Callable) -> None:
101        """
102        set_push_callback - sets the callback function to be called when data is sent.
103
104        Args:
105            push_callback (Callable): The callback function to be called when data is send.
106                                      This should be the websocket handle message function.
107
108        Returns:
109            None
110        """
111        self._push_callback_org = push_callback
112
113    def set_pull_callback(self, pull_callback: Callable) -> None:
114        """
115        set_pull_callback - sets the callback function to be called when data is received.
116
117        Args:
118            pull_callback (Callable): The callback function to be called when data is received.
119                                      This should be the websocket recv function.
120
121        Returns:
122            None
123        """
124        self._pull_callback_org = pull_callback
125
126    def start(self, active_loop: Optional[asyncio.AbstractEventLoop] = None) -> bool:
127        """
128        starts - starts the Speaker stream
129
130        Args:
131            socket (Union[SyncClientConnection, AsyncClientConnection]): The socket to receive audio data from.
132
133        Returns:
134            bool: True if the stream was started, False otherwise
135        """
136        self._logger.debug("Speaker.start ENTER")
137
138        self._logger.info("format: %s", self._format)
139        self._logger.info("channels: %d", self._channels)
140        self._logger.info("rate: %d", self._rate)
141        self._logger.info("chunk: %d", self._chunk)
142        # self._logger.info("output_device_id: %d", self._output_device_index)
143
144        # Automatically get the current running event loop
145        if inspect.iscoroutinefunction(self._push_callback_org) and active_loop is None:
146            self._logger.verbose("get default running asyncio loop")
147            self._loop = asyncio.get_running_loop()
148
149        self._exit.clear()
150        self._queue = queue.Queue()
151
152        if self._audio is not None:
153            self._stream = self._audio.open(
154                format=self._format,
155                channels=self._channels,
156                rate=self._rate,
157                input=False,
158                output=True,
159                frames_per_buffer=self._chunk,
160                output_device_index=self._output_device_index,
161            )
162
163        if self._stream is None:
164            self._logger.error("start failed. No stream created.")
165            self._logger.debug("Speaker.start LEAVE")
166            return False
167
168        self._push_callback = self._push_callback_org
169        self._pull_callback = self._pull_callback_org
170
171        # start the play thread
172        self._thread = threading.Thread(
173            target=self._play, args=(self._queue, self._stream, self._exit), daemon=True
174        )
175        self._thread.start()
176
177        # Start the stream
178        if self._stream is not None:
179            self._stream.start_stream()
180
181        # Start the receiver thread within the start function
182        self._logger.verbose("Starting receiver thread...")
183        self._receiver_thread = threading.Thread(target=self._start_receiver)
184        self._receiver_thread.start()
185
186        self._logger.notice("start succeeded")
187        self._logger.debug("Speaker.start LEAVE")
188
189        return True
190
191    def wait_for_complete_with_mute(self, mic: Microphone):
192        """
193        This method will mute/unmute a Microphone and block until the speak is done playing sound.
194        """
195        self._logger.debug("Speaker.wait_for_complete ENTER")
196
197        if self._microphone is not None:
198            mic.mute()
199        self.wait_for_complete()
200        if self._microphone is not None:
201            mic.unmute()
202
203        self._logger.debug("Speaker.wait_for_complete LEAVE")
204
205    def wait_for_complete(self):
206        """
207        This method will block until the speak is done playing sound.
208        """
209        self._logger.debug("Speaker.wait_for_complete ENTER")
210
211        delta_in_ms = float(self._last_play_delta_in_ms)
212        self._logger.debug("Last Play delta: %f", delta_in_ms)
213
214        # set to now
215        with self._lock_wait:
216            self._last_datagram = datetime.now()
217
218        while True:
219            # sleep for a bit
220            self._exit.wait(HALF_SECOND)
221
222            # check if we should exit
223            if self._exit.is_set():
224                self._logger.debug("Exiting wait_for_complete _exit is set")
225                break
226
227            # check the time
228            with self._lock_wait:
229                delta = datetime.now() - self._last_datagram
230                diff_in_ms = delta.total_seconds() * 1000
231                if diff_in_ms < delta_in_ms:
232                    self._logger.debug("LastPlay delta is less than threshold")
233                    continue
234
235            # if we get here, we are done playing audio
236            self._logger.debug("LastPlay delta is greater than threshold. Exit wait!")
237            break
238
239        self._logger.debug("Speaker.wait_for_complete LEAVE")
240
241    def _start_receiver(self):
242        # Check if the socket is an asyncio WebSocket
243        if inspect.iscoroutinefunction(self._pull_callback_org):
244            self._logger.verbose("Starting asyncio receiver...")
245            asyncio.run_coroutine_threadsafe(self._start_asyncio_receiver(), self._loop)
246        else:
247            self._logger.verbose("Starting threaded receiver...")
248            self._start_threaded_receiver()
249
250    async def _start_asyncio_receiver(self):
251        try:
252            while True:
253                if self._exit.is_set():
254                    self._logger.verbose("Exiting receiver thread...")
255                    break
256
257                message = await self._pull_callback()
258                if message is None:
259                    self._logger.verbose("No message received...")
260                    continue
261
262                if isinstance(message, str):
263                    self._logger.verbose("Received control message...")
264                    await self._push_callback(message)
265                elif isinstance(message, bytes):
266                    self._logger.verbose("Received audio data...")
267                    await self._push_callback(message)
268                    self.add_audio_to_queue(message)
269        except websockets.exceptions.ConnectionClosedOK as e:
270            self._logger.debug("send() exiting gracefully: %d", e.code)
271        except websockets.exceptions.ConnectionClosed as e:
272            if e.code in [1000, 1001]:
273                self._logger.debug("send() exiting gracefully: %d", e.code)
274                return
275            self._logger.error("_start_asyncio_receiver - ConnectionClosed: %s", str(e))
276        except websockets.exceptions.WebSocketException as e:
277            self._logger.error(
278                "_start_asyncio_receiver- WebSocketException: %s", str(e)
279            )
280        except Exception as e:  # pylint: disable=broad-except
281            self._logger.error("_start_asyncio_receiver exception: %s", str(e))
282
283    def _start_threaded_receiver(self):
284        try:
285            while True:
286                if self._exit.is_set():
287                    self._logger.verbose("Exiting receiver thread...")
288                    break
289
290                message = self._pull_callback()
291                if message is None:
292                    self._logger.verbose("No message received...")
293                    continue
294
295                if isinstance(message, str):
296                    self._logger.verbose("Received control message...")
297                    self._push_callback(message)
298                elif isinstance(message, bytes):
299                    self._logger.verbose("Received audio data...")
300                    self._push_callback(message)
301                    self.add_audio_to_queue(message)
302        except Exception as e:  # pylint: disable=broad-except
303            self._logger.notice("_start_threaded_receiver exception: %s", str(e))
304
305    def add_audio_to_queue(self, data: bytes) -> None:
306        """
307        add_audio_to_queue - adds audio data to the Speaker queue
308
309        Args:
310            data (bytes): The audio data to add to the queue
311        """
312        self._queue.put(data)
313
314    def finish(self) -> bool:
315        """
316        finish - stops the Speaker stream
317
318        Returns:
319            bool: True if the stream was stopped, False otherwise
320        """
321        self._logger.debug("Speaker.finish ENTER")
322
323        self._logger.notice("signal exit")
324        self._exit.set()
325
326        if self._stream is not None:
327            self._logger.notice("stopping stream...")
328            self._stream.stop_stream()
329            self._stream.close()
330            self._logger.notice("stream stopped")
331
332        if self._thread is not None:
333            self._logger.notice("joining _thread...")
334            self._thread.join()
335            self._logger.notice("thread stopped")
336
337        if self._receiver_thread is not None:
338            self._logger.notice("stopping _receiver_thread...")
339            self._receiver_thread.join()
340            self._logger.notice("_receiver_thread joined")
341
342        with self._queue.mutex:
343            self._queue.queue.clear()
344
345        self._stream = None
346        self._thread = None
347        self._receiver_thread = None
348
349        self._logger.notice("finish succeeded")
350        self._logger.debug("Speaker.finish LEAVE")
351
352        return True
353
354    def _play(self, audio_out, stream, stop):
355        """
356        _play - plays audio data from the Speaker queue callback for portaudio
357        """
358        while not stop.is_set():
359            try:
360                if self._microphone is not None and self._microphone.is_muted():
361                    with self._lock_wait:
362                        delta = datetime.now() - self._last_datagram
363                        diff_in_ms = delta.total_seconds() * 1000
364                        if diff_in_ms > float(self._last_play_delta_in_ms):
365                            self._logger.debug(
366                                "LastPlay delta is greater than threshold. Unmute!"
367                            )
368                            self._microphone.unmute()
369
370                data = audio_out.get(True, TIMEOUT)
371                with self._lock_wait:
372                    self._last_datagram = datetime.now()
373                    if self._microphone is not None and not self._microphone.is_muted():
374                        self._logger.debug("New speaker sound detected. Mute!")
375                        self._microphone.mute()
376                stream.write(data)
377            except queue.Empty:
378                pass
379            except Exception as e:  # pylint: disable=broad-except
380                self._logger.error("_play exception: %s", str(e))
HALF_SECOND = 0.5
class Speaker:
 27class Speaker:  # pylint: disable=too-many-instance-attributes
 28    """
 29    This implements a speaker for local audio output. This uses PyAudio under the hood.
 30    """
 31
 32    _logger: verboselogs.VerboseLogger
 33
 34    _audio: Optional["pyaudio.PyAudio"] = None
 35    _stream: Optional["pyaudio.Stream"] = None
 36
 37    _chunk: int
 38    _rate: int
 39    _channels: int
 40    _output_device_index: Optional[int] = None
 41
 42    # last time we received audio
 43    _last_datagram: datetime = datetime.now()
 44    _last_play_delta_in_ms: int
 45    _lock_wait: threading.Lock
 46
 47    _queue: queue.Queue
 48    _exit: threading.Event
 49
 50    _thread: Optional[threading.Thread] = None
 51    # _asyncio_loop: asyncio.AbstractEventLoop
 52    # _asyncio_thread: threading.Thread
 53    _receiver_thread: Optional[threading.Thread] = None
 54    _loop: Optional[asyncio.AbstractEventLoop] = None
 55
 56    _push_callback_org: Optional[Callable] = None
 57    _push_callback: Optional[Callable] = None
 58    _pull_callback_org: Optional[Callable] = None
 59    _pull_callback: Optional[Callable] = None
 60
 61    _microphone: Optional[Microphone] = None
 62
 63    def __init__(
 64        self,
 65        pull_callback: Optional[Callable] = None,
 66        push_callback: Optional[Callable] = None,
 67        verbose: int = LOGGING,
 68        rate: int = RATE,
 69        chunk: int = CHUNK,
 70        channels: int = CHANNELS,
 71        last_play_delta_in_ms: int = PLAYBACK_DELTA,
 72        output_device_index: Optional[int] = None,
 73        microphone: Optional[Microphone] = None,
 74    ):  # pylint: disable=too-many-positional-arguments
 75        # dynamic import of pyaudio as not to force the requirements on the SDK (and users)
 76        import pyaudio  # pylint: disable=import-outside-toplevel
 77
 78        self._logger = verboselogs.VerboseLogger(__name__)
 79        self._logger.addHandler(logging.StreamHandler())
 80        self._logger.setLevel(verbose)
 81
 82        self._exit = threading.Event()
 83        self._queue = queue.Queue()
 84
 85        self._last_datagram = datetime.now()
 86        self._lock_wait = threading.Lock()
 87
 88        self._microphone = microphone
 89
 90        self._audio = pyaudio.PyAudio()
 91        self._chunk = chunk
 92        self._rate = rate
 93        self._format = pyaudio.paInt16
 94        self._channels = channels
 95        self._last_play_delta_in_ms = last_play_delta_in_ms
 96        self._output_device_index = output_device_index
 97
 98        self._push_callback_org = push_callback
 99        self._pull_callback_org = pull_callback
100
101    def set_push_callback(self, push_callback: Callable) -> None:
102        """
103        set_push_callback - sets the callback function to be called when data is sent.
104
105        Args:
106            push_callback (Callable): The callback function to be called when data is send.
107                                      This should be the websocket handle message function.
108
109        Returns:
110            None
111        """
112        self._push_callback_org = push_callback
113
114    def set_pull_callback(self, pull_callback: Callable) -> None:
115        """
116        set_pull_callback - sets the callback function to be called when data is received.
117
118        Args:
119            pull_callback (Callable): The callback function to be called when data is received.
120                                      This should be the websocket recv function.
121
122        Returns:
123            None
124        """
125        self._pull_callback_org = pull_callback
126
127    def start(self, active_loop: Optional[asyncio.AbstractEventLoop] = None) -> bool:
128        """
129        starts - starts the Speaker stream
130
131        Args:
132            socket (Union[SyncClientConnection, AsyncClientConnection]): The socket to receive audio data from.
133
134        Returns:
135            bool: True if the stream was started, False otherwise
136        """
137        self._logger.debug("Speaker.start ENTER")
138
139        self._logger.info("format: %s", self._format)
140        self._logger.info("channels: %d", self._channels)
141        self._logger.info("rate: %d", self._rate)
142        self._logger.info("chunk: %d", self._chunk)
143        # self._logger.info("output_device_id: %d", self._output_device_index)
144
145        # Automatically get the current running event loop
146        if inspect.iscoroutinefunction(self._push_callback_org) and active_loop is None:
147            self._logger.verbose("get default running asyncio loop")
148            self._loop = asyncio.get_running_loop()
149
150        self._exit.clear()
151        self._queue = queue.Queue()
152
153        if self._audio is not None:
154            self._stream = self._audio.open(
155                format=self._format,
156                channels=self._channels,
157                rate=self._rate,
158                input=False,
159                output=True,
160                frames_per_buffer=self._chunk,
161                output_device_index=self._output_device_index,
162            )
163
164        if self._stream is None:
165            self._logger.error("start failed. No stream created.")
166            self._logger.debug("Speaker.start LEAVE")
167            return False
168
169        self._push_callback = self._push_callback_org
170        self._pull_callback = self._pull_callback_org
171
172        # start the play thread
173        self._thread = threading.Thread(
174            target=self._play, args=(self._queue, self._stream, self._exit), daemon=True
175        )
176        self._thread.start()
177
178        # Start the stream
179        if self._stream is not None:
180            self._stream.start_stream()
181
182        # Start the receiver thread within the start function
183        self._logger.verbose("Starting receiver thread...")
184        self._receiver_thread = threading.Thread(target=self._start_receiver)
185        self._receiver_thread.start()
186
187        self._logger.notice("start succeeded")
188        self._logger.debug("Speaker.start LEAVE")
189
190        return True
191
192    def wait_for_complete_with_mute(self, mic: Microphone):
193        """
194        This method will mute/unmute a Microphone and block until the speak is done playing sound.
195        """
196        self._logger.debug("Speaker.wait_for_complete ENTER")
197
198        if self._microphone is not None:
199            mic.mute()
200        self.wait_for_complete()
201        if self._microphone is not None:
202            mic.unmute()
203
204        self._logger.debug("Speaker.wait_for_complete LEAVE")
205
206    def wait_for_complete(self):
207        """
208        This method will block until the speak is done playing sound.
209        """
210        self._logger.debug("Speaker.wait_for_complete ENTER")
211
212        delta_in_ms = float(self._last_play_delta_in_ms)
213        self._logger.debug("Last Play delta: %f", delta_in_ms)
214
215        # set to now
216        with self._lock_wait:
217            self._last_datagram = datetime.now()
218
219        while True:
220            # sleep for a bit
221            self._exit.wait(HALF_SECOND)
222
223            # check if we should exit
224            if self._exit.is_set():
225                self._logger.debug("Exiting wait_for_complete _exit is set")
226                break
227
228            # check the time
229            with self._lock_wait:
230                delta = datetime.now() - self._last_datagram
231                diff_in_ms = delta.total_seconds() * 1000
232                if diff_in_ms < delta_in_ms:
233                    self._logger.debug("LastPlay delta is less than threshold")
234                    continue
235
236            # if we get here, we are done playing audio
237            self._logger.debug("LastPlay delta is greater than threshold. Exit wait!")
238            break
239
240        self._logger.debug("Speaker.wait_for_complete LEAVE")
241
242    def _start_receiver(self):
243        # Check if the socket is an asyncio WebSocket
244        if inspect.iscoroutinefunction(self._pull_callback_org):
245            self._logger.verbose("Starting asyncio receiver...")
246            asyncio.run_coroutine_threadsafe(self._start_asyncio_receiver(), self._loop)
247        else:
248            self._logger.verbose("Starting threaded receiver...")
249            self._start_threaded_receiver()
250
251    async def _start_asyncio_receiver(self):
252        try:
253            while True:
254                if self._exit.is_set():
255                    self._logger.verbose("Exiting receiver thread...")
256                    break
257
258                message = await self._pull_callback()
259                if message is None:
260                    self._logger.verbose("No message received...")
261                    continue
262
263                if isinstance(message, str):
264                    self._logger.verbose("Received control message...")
265                    await self._push_callback(message)
266                elif isinstance(message, bytes):
267                    self._logger.verbose("Received audio data...")
268                    await self._push_callback(message)
269                    self.add_audio_to_queue(message)
270        except websockets.exceptions.ConnectionClosedOK as e:
271            self._logger.debug("send() exiting gracefully: %d", e.code)
272        except websockets.exceptions.ConnectionClosed as e:
273            if e.code in [1000, 1001]:
274                self._logger.debug("send() exiting gracefully: %d", e.code)
275                return
276            self._logger.error("_start_asyncio_receiver - ConnectionClosed: %s", str(e))
277        except websockets.exceptions.WebSocketException as e:
278            self._logger.error(
279                "_start_asyncio_receiver- WebSocketException: %s", str(e)
280            )
281        except Exception as e:  # pylint: disable=broad-except
282            self._logger.error("_start_asyncio_receiver exception: %s", str(e))
283
284    def _start_threaded_receiver(self):
285        try:
286            while True:
287                if self._exit.is_set():
288                    self._logger.verbose("Exiting receiver thread...")
289                    break
290
291                message = self._pull_callback()
292                if message is None:
293                    self._logger.verbose("No message received...")
294                    continue
295
296                if isinstance(message, str):
297                    self._logger.verbose("Received control message...")
298                    self._push_callback(message)
299                elif isinstance(message, bytes):
300                    self._logger.verbose("Received audio data...")
301                    self._push_callback(message)
302                    self.add_audio_to_queue(message)
303        except Exception as e:  # pylint: disable=broad-except
304            self._logger.notice("_start_threaded_receiver exception: %s", str(e))
305
306    def add_audio_to_queue(self, data: bytes) -> None:
307        """
308        add_audio_to_queue - adds audio data to the Speaker queue
309
310        Args:
311            data (bytes): The audio data to add to the queue
312        """
313        self._queue.put(data)
314
315    def finish(self) -> bool:
316        """
317        finish - stops the Speaker stream
318
319        Returns:
320            bool: True if the stream was stopped, False otherwise
321        """
322        self._logger.debug("Speaker.finish ENTER")
323
324        self._logger.notice("signal exit")
325        self._exit.set()
326
327        if self._stream is not None:
328            self._logger.notice("stopping stream...")
329            self._stream.stop_stream()
330            self._stream.close()
331            self._logger.notice("stream stopped")
332
333        if self._thread is not None:
334            self._logger.notice("joining _thread...")
335            self._thread.join()
336            self._logger.notice("thread stopped")
337
338        if self._receiver_thread is not None:
339            self._logger.notice("stopping _receiver_thread...")
340            self._receiver_thread.join()
341            self._logger.notice("_receiver_thread joined")
342
343        with self._queue.mutex:
344            self._queue.queue.clear()
345
346        self._stream = None
347        self._thread = None
348        self._receiver_thread = None
349
350        self._logger.notice("finish succeeded")
351        self._logger.debug("Speaker.finish LEAVE")
352
353        return True
354
355    def _play(self, audio_out, stream, stop):
356        """
357        _play - plays audio data from the Speaker queue callback for portaudio
358        """
359        while not stop.is_set():
360            try:
361                if self._microphone is not None and self._microphone.is_muted():
362                    with self._lock_wait:
363                        delta = datetime.now() - self._last_datagram
364                        diff_in_ms = delta.total_seconds() * 1000
365                        if diff_in_ms > float(self._last_play_delta_in_ms):
366                            self._logger.debug(
367                                "LastPlay delta is greater than threshold. Unmute!"
368                            )
369                            self._microphone.unmute()
370
371                data = audio_out.get(True, TIMEOUT)
372                with self._lock_wait:
373                    self._last_datagram = datetime.now()
374                    if self._microphone is not None and not self._microphone.is_muted():
375                        self._logger.debug("New speaker sound detected. Mute!")
376                        self._microphone.mute()
377                stream.write(data)
378            except queue.Empty:
379                pass
380            except Exception as e:  # pylint: disable=broad-except
381                self._logger.error("_play exception: %s", str(e))

This implements a speaker for local audio output. This uses PyAudio under the hood.

Speaker( pull_callback: Optional[Callable] = None, push_callback: Optional[Callable] = None, verbose: int = 30, rate: int = 16000, chunk: int = 8194, channels: int = 1, last_play_delta_in_ms: int = 2000, output_device_index: Optional[int] = None, microphone: Optional[deepgram.audio.microphone.microphone.Microphone] = None)
63    def __init__(
64        self,
65        pull_callback: Optional[Callable] = None,
66        push_callback: Optional[Callable] = None,
67        verbose: int = LOGGING,
68        rate: int = RATE,
69        chunk: int = CHUNK,
70        channels: int = CHANNELS,
71        last_play_delta_in_ms: int = PLAYBACK_DELTA,
72        output_device_index: Optional[int] = None,
73        microphone: Optional[Microphone] = None,
74    ):  # pylint: disable=too-many-positional-arguments
75        # dynamic import of pyaudio as not to force the requirements on the SDK (and users)
76        import pyaudio  # pylint: disable=import-outside-toplevel
77
78        self._logger = verboselogs.VerboseLogger(__name__)
79        self._logger.addHandler(logging.StreamHandler())
80        self._logger.setLevel(verbose)
81
82        self._exit = threading.Event()
83        self._queue = queue.Queue()
84
85        self._last_datagram = datetime.now()
86        self._lock_wait = threading.Lock()
87
88        self._microphone = microphone
89
90        self._audio = pyaudio.PyAudio()
91        self._chunk = chunk
92        self._rate = rate
93        self._format = pyaudio.paInt16
94        self._channels = channels
95        self._last_play_delta_in_ms = last_play_delta_in_ms
96        self._output_device_index = output_device_index
97
98        self._push_callback_org = push_callback
99        self._pull_callback_org = pull_callback
def set_push_callback(self, push_callback: Callable) -> None:
101    def set_push_callback(self, push_callback: Callable) -> None:
102        """
103        set_push_callback - sets the callback function to be called when data is sent.
104
105        Args:
106            push_callback (Callable): The callback function to be called when data is send.
107                                      This should be the websocket handle message function.
108
109        Returns:
110            None
111        """
112        self._push_callback_org = push_callback

set_push_callback - sets the callback function to be called when data is sent.

Args: push_callback (Callable): The callback function to be called when data is send. This should be the websocket handle message function.

Returns: None

def set_pull_callback(self, pull_callback: Callable) -> None:
114    def set_pull_callback(self, pull_callback: Callable) -> None:
115        """
116        set_pull_callback - sets the callback function to be called when data is received.
117
118        Args:
119            pull_callback (Callable): The callback function to be called when data is received.
120                                      This should be the websocket recv function.
121
122        Returns:
123            None
124        """
125        self._pull_callback_org = pull_callback

set_pull_callback - sets the callback function to be called when data is received.

Args: pull_callback (Callable): The callback function to be called when data is received. This should be the websocket recv function.

Returns: None

def start( self, active_loop: Optional[asyncio.events.AbstractEventLoop] = None) -> bool:
127    def start(self, active_loop: Optional[asyncio.AbstractEventLoop] = None) -> bool:
128        """
129        starts - starts the Speaker stream
130
131        Args:
132            socket (Union[SyncClientConnection, AsyncClientConnection]): The socket to receive audio data from.
133
134        Returns:
135            bool: True if the stream was started, False otherwise
136        """
137        self._logger.debug("Speaker.start ENTER")
138
139        self._logger.info("format: %s", self._format)
140        self._logger.info("channels: %d", self._channels)
141        self._logger.info("rate: %d", self._rate)
142        self._logger.info("chunk: %d", self._chunk)
143        # self._logger.info("output_device_id: %d", self._output_device_index)
144
145        # Automatically get the current running event loop
146        if inspect.iscoroutinefunction(self._push_callback_org) and active_loop is None:
147            self._logger.verbose("get default running asyncio loop")
148            self._loop = asyncio.get_running_loop()
149
150        self._exit.clear()
151        self._queue = queue.Queue()
152
153        if self._audio is not None:
154            self._stream = self._audio.open(
155                format=self._format,
156                channels=self._channels,
157                rate=self._rate,
158                input=False,
159                output=True,
160                frames_per_buffer=self._chunk,
161                output_device_index=self._output_device_index,
162            )
163
164        if self._stream is None:
165            self._logger.error("start failed. No stream created.")
166            self._logger.debug("Speaker.start LEAVE")
167            return False
168
169        self._push_callback = self._push_callback_org
170        self._pull_callback = self._pull_callback_org
171
172        # start the play thread
173        self._thread = threading.Thread(
174            target=self._play, args=(self._queue, self._stream, self._exit), daemon=True
175        )
176        self._thread.start()
177
178        # Start the stream
179        if self._stream is not None:
180            self._stream.start_stream()
181
182        # Start the receiver thread within the start function
183        self._logger.verbose("Starting receiver thread...")
184        self._receiver_thread = threading.Thread(target=self._start_receiver)
185        self._receiver_thread.start()
186
187        self._logger.notice("start succeeded")
188        self._logger.debug("Speaker.start LEAVE")
189
190        return True

starts - starts the Speaker stream

Args: socket (Union[SyncClientConnection, AsyncClientConnection]): The socket to receive audio data from.

Returns: bool: True if the stream was started, False otherwise

def wait_for_complete_with_mute(self, mic: deepgram.audio.microphone.microphone.Microphone):
192    def wait_for_complete_with_mute(self, mic: Microphone):
193        """
194        This method will mute/unmute a Microphone and block until the speak is done playing sound.
195        """
196        self._logger.debug("Speaker.wait_for_complete ENTER")
197
198        if self._microphone is not None:
199            mic.mute()
200        self.wait_for_complete()
201        if self._microphone is not None:
202            mic.unmute()
203
204        self._logger.debug("Speaker.wait_for_complete LEAVE")

This method will mute/unmute a Microphone and block until the speak is done playing sound.

def wait_for_complete(self):
206    def wait_for_complete(self):
207        """
208        This method will block until the speak is done playing sound.
209        """
210        self._logger.debug("Speaker.wait_for_complete ENTER")
211
212        delta_in_ms = float(self._last_play_delta_in_ms)
213        self._logger.debug("Last Play delta: %f", delta_in_ms)
214
215        # set to now
216        with self._lock_wait:
217            self._last_datagram = datetime.now()
218
219        while True:
220            # sleep for a bit
221            self._exit.wait(HALF_SECOND)
222
223            # check if we should exit
224            if self._exit.is_set():
225                self._logger.debug("Exiting wait_for_complete _exit is set")
226                break
227
228            # check the time
229            with self._lock_wait:
230                delta = datetime.now() - self._last_datagram
231                diff_in_ms = delta.total_seconds() * 1000
232                if diff_in_ms < delta_in_ms:
233                    self._logger.debug("LastPlay delta is less than threshold")
234                    continue
235
236            # if we get here, we are done playing audio
237            self._logger.debug("LastPlay delta is greater than threshold. Exit wait!")
238            break
239
240        self._logger.debug("Speaker.wait_for_complete LEAVE")

This method will block until the speak is done playing sound.

def add_audio_to_queue(self, data: bytes) -> None:
306    def add_audio_to_queue(self, data: bytes) -> None:
307        """
308        add_audio_to_queue - adds audio data to the Speaker queue
309
310        Args:
311            data (bytes): The audio data to add to the queue
312        """
313        self._queue.put(data)

add_audio_to_queue - adds audio data to the Speaker queue

Args: data (bytes): The audio data to add to the queue

def finish(self) -> bool:
315    def finish(self) -> bool:
316        """
317        finish - stops the Speaker stream
318
319        Returns:
320            bool: True if the stream was stopped, False otherwise
321        """
322        self._logger.debug("Speaker.finish ENTER")
323
324        self._logger.notice("signal exit")
325        self._exit.set()
326
327        if self._stream is not None:
328            self._logger.notice("stopping stream...")
329            self._stream.stop_stream()
330            self._stream.close()
331            self._logger.notice("stream stopped")
332
333        if self._thread is not None:
334            self._logger.notice("joining _thread...")
335            self._thread.join()
336            self._logger.notice("thread stopped")
337
338        if self._receiver_thread is not None:
339            self._logger.notice("stopping _receiver_thread...")
340            self._receiver_thread.join()
341            self._logger.notice("_receiver_thread joined")
342
343        with self._queue.mutex:
344            self._queue.queue.clear()
345
346        self._stream = None
347        self._thread = None
348        self._receiver_thread = None
349
350        self._logger.notice("finish succeeded")
351        self._logger.debug("Speaker.finish LEAVE")
352
353        return True

finish - stops the Speaker stream

Returns: bool: True if the stream was stopped, False otherwise