deepgram.clients.listen.v1.websocket.client

  1# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved.
  2# Use of this source code is governed by a MIT license that can be found in the LICENSE file.
  3# SPDX-License-Identifier: MIT
  4import json
  5import time
  6import logging
  7from typing import Dict, Union, Optional, cast, Any, Callable, Type
  8from datetime import datetime
  9import threading
 10
 11from .....utils import verboselogs
 12from .....options import DeepgramClientOptions
 13from ...enums import LiveTranscriptionEvents
 14from ....common import AbstractSyncWebSocketClient
 15from ....common import DeepgramError
 16
 17from .response import (
 18    OpenResponse,
 19    LiveResultResponse,
 20    MetadataResponse,
 21    SpeechStartedResponse,
 22    UtteranceEndResponse,
 23    CloseResponse,
 24    ErrorResponse,
 25    UnhandledResponse,
 26)
 27from .options import ListenWebSocketOptions
 28
 29ONE_SECOND = 1
 30HALF_SECOND = 0.5
 31DEEPGRAM_INTERVAL = 5
 32PING_INTERVAL = 20
 33
 34
 35class ListenWebSocketClient(
 36    AbstractSyncWebSocketClient
 37):  # pylint: disable=too-many-instance-attributes
 38    """
 39    Client for interacting with Deepgram's live transcription services over WebSockets.
 40
 41    This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
 42
 43    Args:
 44        config (DeepgramClientOptions): all the options for the client.
 45        thread_cls (Type[threading.Thread]): optional thread class to use for creating threads,
 46        defaults to threading.Thread. Useful for custom thread management like ContextVar support.
 47    """
 48
 49    _logger: verboselogs.VerboseLogger
 50    _config: DeepgramClientOptions
 51    _endpoint: str
 52
 53    _lock_flush: threading.Lock
 54    _event_handlers: Dict[LiveTranscriptionEvents, list]
 55
 56    _keep_alive_thread: Union[threading.Thread, None]
 57    _flush_thread: Union[threading.Thread, None]
 58    _last_datagram: Optional[datetime] = None
 59
 60    _thread_cls: Type[threading.Thread]
 61
 62    _kwargs: Optional[Dict] = None
 63    _addons: Optional[Dict] = None
 64    _options: Optional[Dict] = None
 65    _headers: Optional[Dict] = None
 66
 67    def __init__(
 68        self,
 69        config: DeepgramClientOptions,
 70        thread_cls: Type[threading.Thread] = threading.Thread,
 71    ):
 72        if config is None:
 73            raise DeepgramError("Config is required")
 74
 75        self._logger = verboselogs.VerboseLogger(__name__)
 76        self._logger.addHandler(logging.StreamHandler())
 77        self._logger.setLevel(config.verbose)
 78
 79        self._config = config
 80        self._endpoint = "v1/listen"
 81
 82        self._flush_thread = None
 83        self._keep_alive_thread = None
 84
 85        # auto flush
 86        self._last_datagram = None
 87        self._lock_flush = threading.Lock()
 88
 89        self._thread_cls = thread_cls
 90
 91        # init handlers
 92        self._event_handlers = {
 93            event: [] for event in LiveTranscriptionEvents.__members__.values()
 94        }
 95
 96        # call the parent constructor
 97        super().__init__(
 98            config=self._config,
 99            endpoint=self._endpoint,
100            thread_cls=self._thread_cls,
101        )
102
103    # pylint: disable=too-many-statements,too-many-branches
104    def start(
105        self,
106        options: Optional[Union[ListenWebSocketOptions, Dict]] = None,
107        addons: Optional[Dict] = None,
108        headers: Optional[Dict] = None,
109        members: Optional[Dict] = None,
110        **kwargs,
111    ) -> bool:
112        """
113        Starts the WebSocket connection for live transcription.
114        """
115        self._logger.debug("ListenWebSocketClient.start ENTER")
116        self._logger.info("options: %s", options)
117        self._logger.info("addons: %s", addons)
118        self._logger.info("headers: %s", headers)
119        self._logger.info("members: %s", members)
120        self._logger.info("kwargs: %s", kwargs)
121
122        if isinstance(options, ListenWebSocketOptions) and not options.check():
123            self._logger.error("options.check failed")
124            self._logger.debug("ListenWebSocketClient.start LEAVE")
125            raise DeepgramError("Fatal transcription options error")
126
127        self._addons = addons
128        self._headers = headers
129
130        # add "members" as members of the class
131        if members is not None:
132            self.__dict__.update(members)
133
134        # set kwargs as members of the class
135        if kwargs is not None:
136            self._kwargs = kwargs
137        else:
138            self._kwargs = {}
139
140        if isinstance(options, ListenWebSocketOptions):
141            self._logger.info("ListenWebSocketOptions switching class -> dict")
142            self._options = options.to_dict()
143        elif options is not None:
144            self._options = options
145        else:
146            self._options = {}
147
148        try:
149            # call parent start
150            if (
151                super().start(
152                    self._options,
153                    self._addons,
154                    self._headers,
155                    **dict(cast(Dict[Any, Any], self._kwargs)),
156                )
157                is False
158            ):
159                self._logger.error("ListenWebSocketClient.start failed")
160                self._logger.debug("ListenWebSocketClient.start LEAVE")
161                return False
162
163            # debug the threads
164            for thread in threading.enumerate():
165                self._logger.debug("after running thread: %s", thread.name)
166            self._logger.debug("number of active threads: %s", threading.active_count())
167
168            # keepalive thread
169            if self._config.is_keep_alive_enabled():
170                self._logger.notice("keepalive is enabled")
171                self._keep_alive_thread = self._thread_cls(target=self._keep_alive)
172                self._keep_alive_thread.start()
173            else:
174                self._logger.notice("keepalive is disabled")
175
176            # flush thread
177            if self._config.is_auto_flush_reply_enabled():
178                self._logger.notice("autoflush is enabled")
179                self._flush_thread = self._thread_cls(target=self._flush)
180                self._flush_thread.start()
181            else:
182                self._logger.notice("autoflush is disabled")
183
184            # debug the threads
185            for thread in threading.enumerate():
186                self._logger.debug("after running thread: %s", thread.name)
187            self._logger.debug("number of active threads: %s", threading.active_count())
188
189            self._logger.notice("start succeeded")
190            self._logger.debug("ListenWebSocketClient.start LEAVE")
191            return True
192
193        except Exception as e:  # pylint: disable=broad-except
194            self._logger.error(
195                "WebSocketException in ListenWebSocketClient.start: %s", e
196            )
197            self._logger.debug("ListenWebSocketClient.start LEAVE")
198            if self._config.options.get("termination_exception_connect") is True:
199                raise e
200            return False
201
202    # pylint: enable=too-many-statements,too-many-branches
203
204    def on(
205        self, event: LiveTranscriptionEvents, handler: Callable
206    ) -> None:  # registers event handlers for specific events
207        """
208        Registers event handlers for specific events.
209        """
210        self._logger.info("event subscribed: %s", event)
211        if event in LiveTranscriptionEvents.__members__.values() and callable(handler):
212            self._event_handlers[event].append(handler)
213
214    def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None:
215        """
216        Emits events to the registered event handlers.
217        """
218        self._logger.debug("ListenWebSocketClient._emit ENTER")
219        self._logger.debug("callback handlers for: %s", event)
220
221        # debug the threads
222        for thread in threading.enumerate():
223            self._logger.debug("after running thread: %s", thread.name)
224        self._logger.debug("number of active threads: %s", threading.active_count())
225
226        self._logger.debug("callback handlers for: %s", event)
227        for handler in self._event_handlers[event]:
228            handler(self, *args, **kwargs)
229
230        # debug the threads
231        for thread in threading.enumerate():
232            self._logger.debug("after running thread: %s", thread.name)
233        self._logger.debug("number of active threads: %s", threading.active_count())
234
235        self._logger.debug("ListenWebSocketClient._emit LEAVE")
236
237    # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches
238    def _process_text(self, message: str) -> None:
239        """
240        Processes messages received over the WebSocket connection.
241        """
242        self._logger.debug("ListenWebSocketClient._process_text ENTER")
243
244        try:
245            if len(message) == 0:
246                self._logger.debug("message is empty")
247                self._logger.debug("ListenWebSocketClient._process_text LEAVE")
248                return
249
250            data = json.loads(message)
251            response_type = data.get("type")
252            self._logger.debug("response_type: %s, data: %s", response_type, data)
253
254            match response_type:
255                case LiveTranscriptionEvents.Open:
256                    open_result: OpenResponse = OpenResponse.from_json(message)
257                    self._logger.verbose("OpenResponse: %s", open_result)
258                    self._emit(
259                        LiveTranscriptionEvents(LiveTranscriptionEvents.Open),
260                        open=open_result,
261                        **dict(cast(Dict[Any, Any], self._kwargs)),
262                    )
263                case LiveTranscriptionEvents.Transcript:
264                    msg_result: LiveResultResponse = LiveResultResponse.from_json(
265                        message
266                    )
267                    self._logger.verbose("LiveResultResponse: %s", msg_result)
268
269                    #  auto flush
270                    if self._config.is_inspecting_listen():
271                        inspect_res = self._inspect(msg_result)
272                        if not inspect_res:
273                            self._logger.error("inspect_res failed")
274
275                    self._emit(
276                        LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript),
277                        result=msg_result,
278                        **dict(cast(Dict[Any, Any], self._kwargs)),
279                    )
280                case LiveTranscriptionEvents.Metadata:
281                    meta_result: MetadataResponse = MetadataResponse.from_json(message)
282                    self._logger.verbose("MetadataResponse: %s", meta_result)
283                    self._emit(
284                        LiveTranscriptionEvents(LiveTranscriptionEvents.Metadata),
285                        metadata=meta_result,
286                        **dict(cast(Dict[Any, Any], self._kwargs)),
287                    )
288                case LiveTranscriptionEvents.SpeechStarted:
289                    ss_result: SpeechStartedResponse = SpeechStartedResponse.from_json(
290                        message
291                    )
292                    self._logger.verbose("SpeechStartedResponse: %s", ss_result)
293                    self._emit(
294                        LiveTranscriptionEvents(LiveTranscriptionEvents.SpeechStarted),
295                        speech_started=ss_result,
296                        **dict(cast(Dict[Any, Any], self._kwargs)),
297                    )
298                case LiveTranscriptionEvents.UtteranceEnd:
299                    ue_result: UtteranceEndResponse = UtteranceEndResponse.from_json(
300                        message
301                    )
302                    self._logger.verbose("UtteranceEndResponse: %s", ue_result)
303                    self._emit(
304                        LiveTranscriptionEvents(LiveTranscriptionEvents.UtteranceEnd),
305                        utterance_end=ue_result,
306                        **dict(cast(Dict[Any, Any], self._kwargs)),
307                    )
308                case LiveTranscriptionEvents.Close:
309                    close_result: CloseResponse = CloseResponse.from_json(message)
310                    self._logger.verbose("CloseResponse: %s", close_result)
311                    self._emit(
312                        LiveTranscriptionEvents(LiveTranscriptionEvents.Close),
313                        close=close_result,
314                        **dict(cast(Dict[Any, Any], self._kwargs)),
315                    )
316                case LiveTranscriptionEvents.Error:
317                    err_error: ErrorResponse = ErrorResponse.from_json(message)
318                    self._logger.verbose("ErrorResponse: %s", err_error)
319                    self._emit(
320                        LiveTranscriptionEvents(LiveTranscriptionEvents.Error),
321                        error=err_error,
322                        **dict(cast(Dict[Any, Any], self._kwargs)),
323                    )
324                case _:
325                    self._logger.warning(
326                        "Unknown Message: response_type: %s, data: %s",
327                        response_type,
328                        data,
329                    )
330                    unhandled_error: UnhandledResponse = UnhandledResponse(
331                        type=LiveTranscriptionEvents(LiveTranscriptionEvents.Unhandled),
332                        raw=message,
333                    )
334                    self._emit(
335                        LiveTranscriptionEvents(LiveTranscriptionEvents.Unhandled),
336                        unhandled=unhandled_error,
337                        **dict(cast(Dict[Any, Any], self._kwargs)),
338                    )
339
340            self._logger.notice("_process_text Succeeded")
341            self._logger.debug("SpeakStreamClient._process_text LEAVE")
342
343        except Exception as e:  # pylint: disable=broad-except
344            self._logger.error(
345                "Exception in ListenWebSocketClient._process_text: %s", e
346            )
347            e_error: ErrorResponse = ErrorResponse(
348                "Exception in ListenWebSocketClient._process_text",
349                f"{e}",
350                "Exception",
351            )
352            self._logger.error(
353                "Exception in ListenWebSocketClient._process_text: %s", str(e)
354            )
355            self._emit(
356                LiveTranscriptionEvents(LiveTranscriptionEvents.Error),
357                e_error,
358                **dict(cast(Dict[Any, Any], self._kwargs)),
359            )
360
361            # signal exit and close
362            super()._signal_exit()
363
364            self._logger.debug("ListenWebSocketClient._process_text LEAVE")
365
366            if self._config.options.get("termination_exception") is True:
367                raise
368            return
369
370    # pylint: enable=too-many-return-statements,too-many-statements
371
372    def _process_binary(self, message: bytes) -> None:
373        raise NotImplementedError("no _process_binary method should be called")
374
375    # pylint: disable=too-many-return-statements
376    def _keep_alive(self) -> None:
377        self._logger.debug("ListenWebSocketClient._keep_alive ENTER")
378
379        counter = 0
380        while True:
381            try:
382                counter += 1
383                self._exit_event.wait(timeout=ONE_SECOND)
384
385                if self._exit_event.is_set():
386                    self._logger.notice("_keep_alive exiting gracefully")
387                    self._logger.debug("ListenWebSocketClient._keep_alive LEAVE")
388                    return
389
390                # deepgram keepalive
391                if counter % DEEPGRAM_INTERVAL == 0:
392                    self.keep_alive()
393
394            except Exception as e:  # pylint: disable=broad-except
395                self._logger.error(
396                    "Exception in ListenWebSocketClient._keep_alive: %s", e
397                )
398                e_error: ErrorResponse = ErrorResponse(
399                    "Exception in ListenWebSocketClient._keep_alive",
400                    f"{e}",
401                    "Exception",
402                )
403                self._logger.error(
404                    "Exception in ListenWebSocketClient._keep_alive: %s", str(e)
405                )
406                self._emit(
407                    LiveTranscriptionEvents(LiveTranscriptionEvents.Error),
408                    e_error,
409                    **dict(cast(Dict[Any, Any], self._kwargs)),
410                )
411
412                # signal exit and close
413                super()._signal_exit()
414
415                self._logger.debug("ListenWebSocketClient._keep_alive LEAVE")
416
417                if self._config.options.get("termination_exception") is True:
418                    raise
419                return
420
421    ## pylint: disable=too-many-return-statements,too-many-statements
422    def _flush(self) -> None:
423        self._logger.debug("ListenWebSocketClient._flush ENTER")
424
425        delta_in_ms_str = self._config.options.get("auto_flush_reply_delta")
426        if delta_in_ms_str is None:
427            self._logger.error("auto_flush_reply_delta is None")
428            self._logger.debug("ListenWebSocketClient._flush LEAVE")
429            return
430        delta_in_ms = float(delta_in_ms_str)
431
432        _flush_event = threading.Event()
433        while True:
434            try:
435                _flush_event.wait(timeout=HALF_SECOND)
436
437                if self._exit_event.is_set():
438                    self._logger.notice("_flush exiting gracefully")
439                    self._logger.debug("ListenWebSocketClient._flush LEAVE")
440                    return
441
442                with self._lock_flush:
443                    if self._last_datagram is None:
444                        self._logger.debug("AutoFlush last_datagram is None")
445                        continue
446
447                    delta = datetime.now() - self._last_datagram
448                    diff_in_ms = delta.total_seconds() * 1000
449                    self._logger.debug("AutoFlush delta: %f", diff_in_ms)
450                    if diff_in_ms < delta_in_ms:
451                        self._logger.debug("AutoFlush delta is less than threshold")
452                        continue
453
454                    with self._lock_flush:
455                        self._last_datagram = None
456                    self.finalize()
457
458            except Exception as e:  # pylint: disable=broad-except
459                self._logger.error("Exception in ListenWebSocketClient._flush: %s", e)
460                e_error: ErrorResponse = ErrorResponse(
461                    "Exception in ListenWebSocketClient._flush",
462                    f"{e}",
463                    "Exception",
464                )
465                self._logger.error(
466                    "Exception in ListenWebSocketClient._flush: %s", str(e)
467                )
468                self._emit(
469                    LiveTranscriptionEvents(LiveTranscriptionEvents.Error),
470                    e_error,
471                    **dict(cast(Dict[Any, Any], self._kwargs)),
472                )
473
474                # signal exit and close
475                super()._signal_exit()
476
477                self._logger.debug("ListenWebSocketClient._flush LEAVE")
478
479                if self._config.options.get("termination_exception") is True:
480                    raise
481                return
482
483    # pylint: enable=too-many-return-statements
484
485    def keep_alive(self) -> bool:
486        """
487        Sends a KeepAlive message
488        """
489        self._logger.spam("ListenWebSocketClient.keep_alive ENTER")
490
491        self._logger.notice("Sending KeepAlive...")
492        ret = self.send(json.dumps({"type": "KeepAlive"}))
493
494        if not ret:
495            self._logger.error("keep_alive failed")
496            self._logger.spam("ListenWebSocketClient.keep_alive LEAVE")
497            return False
498
499        self._logger.notice("keep_alive succeeded")
500        self._logger.spam("ListenWebSocketClient.keep_alive LEAVE")
501
502        return True
503
504    def finalize(self) -> bool:
505        """
506        Finalizes the Transcript connection by flushing it
507        """
508        self._logger.spam("ListenWebSocketClient.finalize ENTER")
509
510        self._logger.notice("Sending Finalize...")
511        ret = self.send(json.dumps({"type": "Finalize"}))
512
513        if not ret:
514            self._logger.error("finalize failed")
515            self._logger.spam("ListenWebSocketClient.finalize LEAVE")
516            return False
517
518        self._logger.notice("finalize succeeded")
519        self._logger.spam("ListenWebSocketClient.finalize LEAVE")
520
521        return True
522
523    def _close_message(self) -> bool:
524        return self.send(json.dumps({"type": "CloseStream"}))
525
526    # closes the WebSocket connection gracefully
527    def finish(self) -> bool:
528        """
529        Closes the WebSocket connection gracefully.
530        """
531        self._logger.spam("ListenWebSocketClient.finish ENTER")
532
533        # call parent finish
534        if super().finish() is False:
535            self._logger.error("ListenWebSocketClient.finish failed")
536
537        # debug the threads
538        for thread in threading.enumerate():
539            self._logger.debug("before running thread: %s", thread.name)
540        self._logger.debug("number of active threads: %s", threading.active_count())
541
542        # stop the threads
543        self._logger.verbose("cancelling tasks...")
544        if self._flush_thread is not None:
545            self._flush_thread.join()
546            self._flush_thread = None
547            self._logger.notice("processing _flush_thread thread joined")
548
549        if self._keep_alive_thread is not None:
550            self._keep_alive_thread.join()
551            self._keep_alive_thread = None
552            self._logger.notice("processing _keep_alive_thread thread joined")
553
554        if self._listen_thread is not None:
555            self._listen_thread.join()
556            self._listen_thread = None
557        self._logger.notice("listening thread joined")
558
559        # debug the threads
560        for thread in threading.enumerate():
561            self._logger.debug("before running thread: %s", thread.name)
562        self._logger.debug("number of active threads: %s", threading.active_count())
563
564        self._logger.notice("finish succeeded")
565        self._logger.spam("ListenWebSocketClient.finish LEAVE")
566        return True
567
568    def _inspect(self, msg_result: LiveResultResponse) -> bool:
569        # auto flush_inspect is generically used to track any messages you might want to snoop on
570        # place additional logic here to inspect messages of interest
571
572        # for auto flush functionality
573        # set the last datagram
574        sentence = msg_result.channel.alternatives[0].transcript
575        if len(sentence) == 0:
576            return True
577
578        if msg_result.is_final:
579            with self._lock_flush:
580                self._logger.debug("AutoFlush is_final received")
581                self._last_datagram = None
582        else:
583            with self._lock_flush:
584                self._last_datagram = datetime.now()
585                self._logger.debug(
586                    "AutoFlush interim received: %s",
587                    str(self._last_datagram),
588                )
589
590        return True
ONE_SECOND = 1
HALF_SECOND = 0.5
DEEPGRAM_INTERVAL = 5
PING_INTERVAL = 20
 36class ListenWebSocketClient(
 37    AbstractSyncWebSocketClient
 38):  # pylint: disable=too-many-instance-attributes
 39    """
 40    Client for interacting with Deepgram's live transcription services over WebSockets.
 41
 42    This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
 43
 44    Args:
 45        config (DeepgramClientOptions): all the options for the client.
 46        thread_cls (Type[threading.Thread]): optional thread class to use for creating threads,
 47        defaults to threading.Thread. Useful for custom thread management like ContextVar support.
 48    """
 49
 50    _logger: verboselogs.VerboseLogger
 51    _config: DeepgramClientOptions
 52    _endpoint: str
 53
 54    _lock_flush: threading.Lock
 55    _event_handlers: Dict[LiveTranscriptionEvents, list]
 56
 57    _keep_alive_thread: Union[threading.Thread, None]
 58    _flush_thread: Union[threading.Thread, None]
 59    _last_datagram: Optional[datetime] = None
 60
 61    _thread_cls: Type[threading.Thread]
 62
 63    _kwargs: Optional[Dict] = None
 64    _addons: Optional[Dict] = None
 65    _options: Optional[Dict] = None
 66    _headers: Optional[Dict] = None
 67
 68    def __init__(
 69        self,
 70        config: DeepgramClientOptions,
 71        thread_cls: Type[threading.Thread] = threading.Thread,
 72    ):
 73        if config is None:
 74            raise DeepgramError("Config is required")
 75
 76        self._logger = verboselogs.VerboseLogger(__name__)
 77        self._logger.addHandler(logging.StreamHandler())
 78        self._logger.setLevel(config.verbose)
 79
 80        self._config = config
 81        self._endpoint = "v1/listen"
 82
 83        self._flush_thread = None
 84        self._keep_alive_thread = None
 85
 86        # auto flush
 87        self._last_datagram = None
 88        self._lock_flush = threading.Lock()
 89
 90        self._thread_cls = thread_cls
 91
 92        # init handlers
 93        self._event_handlers = {
 94            event: [] for event in LiveTranscriptionEvents.__members__.values()
 95        }
 96
 97        # call the parent constructor
 98        super().__init__(
 99            config=self._config,
100            endpoint=self._endpoint,
101            thread_cls=self._thread_cls,
102        )
103
104    # pylint: disable=too-many-statements,too-many-branches
105    def start(
106        self,
107        options: Optional[Union[ListenWebSocketOptions, Dict]] = None,
108        addons: Optional[Dict] = None,
109        headers: Optional[Dict] = None,
110        members: Optional[Dict] = None,
111        **kwargs,
112    ) -> bool:
113        """
114        Starts the WebSocket connection for live transcription.
115        """
116        self._logger.debug("ListenWebSocketClient.start ENTER")
117        self._logger.info("options: %s", options)
118        self._logger.info("addons: %s", addons)
119        self._logger.info("headers: %s", headers)
120        self._logger.info("members: %s", members)
121        self._logger.info("kwargs: %s", kwargs)
122
123        if isinstance(options, ListenWebSocketOptions) and not options.check():
124            self._logger.error("options.check failed")
125            self._logger.debug("ListenWebSocketClient.start LEAVE")
126            raise DeepgramError("Fatal transcription options error")
127
128        self._addons = addons
129        self._headers = headers
130
131        # add "members" as members of the class
132        if members is not None:
133            self.__dict__.update(members)
134
135        # set kwargs as members of the class
136        if kwargs is not None:
137            self._kwargs = kwargs
138        else:
139            self._kwargs = {}
140
141        if isinstance(options, ListenWebSocketOptions):
142            self._logger.info("ListenWebSocketOptions switching class -> dict")
143            self._options = options.to_dict()
144        elif options is not None:
145            self._options = options
146        else:
147            self._options = {}
148
149        try:
150            # call parent start
151            if (
152                super().start(
153                    self._options,
154                    self._addons,
155                    self._headers,
156                    **dict(cast(Dict[Any, Any], self._kwargs)),
157                )
158                is False
159            ):
160                self._logger.error("ListenWebSocketClient.start failed")
161                self._logger.debug("ListenWebSocketClient.start LEAVE")
162                return False
163
164            # debug the threads
165            for thread in threading.enumerate():
166                self._logger.debug("after running thread: %s", thread.name)
167            self._logger.debug("number of active threads: %s", threading.active_count())
168
169            # keepalive thread
170            if self._config.is_keep_alive_enabled():
171                self._logger.notice("keepalive is enabled")
172                self._keep_alive_thread = self._thread_cls(target=self._keep_alive)
173                self._keep_alive_thread.start()
174            else:
175                self._logger.notice("keepalive is disabled")
176
177            # flush thread
178            if self._config.is_auto_flush_reply_enabled():
179                self._logger.notice("autoflush is enabled")
180                self._flush_thread = self._thread_cls(target=self._flush)
181                self._flush_thread.start()
182            else:
183                self._logger.notice("autoflush is disabled")
184
185            # debug the threads
186            for thread in threading.enumerate():
187                self._logger.debug("after running thread: %s", thread.name)
188            self._logger.debug("number of active threads: %s", threading.active_count())
189
190            self._logger.notice("start succeeded")
191            self._logger.debug("ListenWebSocketClient.start LEAVE")
192            return True
193
194        except Exception as e:  # pylint: disable=broad-except
195            self._logger.error(
196                "WebSocketException in ListenWebSocketClient.start: %s", e
197            )
198            self._logger.debug("ListenWebSocketClient.start LEAVE")
199            if self._config.options.get("termination_exception_connect") is True:
200                raise e
201            return False
202
203    # pylint: enable=too-many-statements,too-many-branches
204
205    def on(
206        self, event: LiveTranscriptionEvents, handler: Callable
207    ) -> None:  # registers event handlers for specific events
208        """
209        Registers event handlers for specific events.
210        """
211        self._logger.info("event subscribed: %s", event)
212        if event in LiveTranscriptionEvents.__members__.values() and callable(handler):
213            self._event_handlers[event].append(handler)
214
215    def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None:
216        """
217        Emits events to the registered event handlers.
218        """
219        self._logger.debug("ListenWebSocketClient._emit ENTER")
220        self._logger.debug("callback handlers for: %s", event)
221
222        # debug the threads
223        for thread in threading.enumerate():
224            self._logger.debug("after running thread: %s", thread.name)
225        self._logger.debug("number of active threads: %s", threading.active_count())
226
227        self._logger.debug("callback handlers for: %s", event)
228        for handler in self._event_handlers[event]:
229            handler(self, *args, **kwargs)
230
231        # debug the threads
232        for thread in threading.enumerate():
233            self._logger.debug("after running thread: %s", thread.name)
234        self._logger.debug("number of active threads: %s", threading.active_count())
235
236        self._logger.debug("ListenWebSocketClient._emit LEAVE")
237
238    # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches
239    def _process_text(self, message: str) -> None:
240        """
241        Processes messages received over the WebSocket connection.
242        """
243        self._logger.debug("ListenWebSocketClient._process_text ENTER")
244
245        try:
246            if len(message) == 0:
247                self._logger.debug("message is empty")
248                self._logger.debug("ListenWebSocketClient._process_text LEAVE")
249                return
250
251            data = json.loads(message)
252            response_type = data.get("type")
253            self._logger.debug("response_type: %s, data: %s", response_type, data)
254
255            match response_type:
256                case LiveTranscriptionEvents.Open:
257                    open_result: OpenResponse = OpenResponse.from_json(message)
258                    self._logger.verbose("OpenResponse: %s", open_result)
259                    self._emit(
260                        LiveTranscriptionEvents(LiveTranscriptionEvents.Open),
261                        open=open_result,
262                        **dict(cast(Dict[Any, Any], self._kwargs)),
263                    )
264                case LiveTranscriptionEvents.Transcript:
265                    msg_result: LiveResultResponse = LiveResultResponse.from_json(
266                        message
267                    )
268                    self._logger.verbose("LiveResultResponse: %s", msg_result)
269
270                    #  auto flush
271                    if self._config.is_inspecting_listen():
272                        inspect_res = self._inspect(msg_result)
273                        if not inspect_res:
274                            self._logger.error("inspect_res failed")
275
276                    self._emit(
277                        LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript),
278                        result=msg_result,
279                        **dict(cast(Dict[Any, Any], self._kwargs)),
280                    )
281                case LiveTranscriptionEvents.Metadata:
282                    meta_result: MetadataResponse = MetadataResponse.from_json(message)
283                    self._logger.verbose("MetadataResponse: %s", meta_result)
284                    self._emit(
285                        LiveTranscriptionEvents(LiveTranscriptionEvents.Metadata),
286                        metadata=meta_result,
287                        **dict(cast(Dict[Any, Any], self._kwargs)),
288                    )
289                case LiveTranscriptionEvents.SpeechStarted:
290                    ss_result: SpeechStartedResponse = SpeechStartedResponse.from_json(
291                        message
292                    )
293                    self._logger.verbose("SpeechStartedResponse: %s", ss_result)
294                    self._emit(
295                        LiveTranscriptionEvents(LiveTranscriptionEvents.SpeechStarted),
296                        speech_started=ss_result,
297                        **dict(cast(Dict[Any, Any], self._kwargs)),
298                    )
299                case LiveTranscriptionEvents.UtteranceEnd:
300                    ue_result: UtteranceEndResponse = UtteranceEndResponse.from_json(
301                        message
302                    )
303                    self._logger.verbose("UtteranceEndResponse: %s", ue_result)
304                    self._emit(
305                        LiveTranscriptionEvents(LiveTranscriptionEvents.UtteranceEnd),
306                        utterance_end=ue_result,
307                        **dict(cast(Dict[Any, Any], self._kwargs)),
308                    )
309                case LiveTranscriptionEvents.Close:
310                    close_result: CloseResponse = CloseResponse.from_json(message)
311                    self._logger.verbose("CloseResponse: %s", close_result)
312                    self._emit(
313                        LiveTranscriptionEvents(LiveTranscriptionEvents.Close),
314                        close=close_result,
315                        **dict(cast(Dict[Any, Any], self._kwargs)),
316                    )
317                case LiveTranscriptionEvents.Error:
318                    err_error: ErrorResponse = ErrorResponse.from_json(message)
319                    self._logger.verbose("ErrorResponse: %s", err_error)
320                    self._emit(
321                        LiveTranscriptionEvents(LiveTranscriptionEvents.Error),
322                        error=err_error,
323                        **dict(cast(Dict[Any, Any], self._kwargs)),
324                    )
325                case _:
326                    self._logger.warning(
327                        "Unknown Message: response_type: %s, data: %s",
328                        response_type,
329                        data,
330                    )
331                    unhandled_error: UnhandledResponse = UnhandledResponse(
332                        type=LiveTranscriptionEvents(LiveTranscriptionEvents.Unhandled),
333                        raw=message,
334                    )
335                    self._emit(
336                        LiveTranscriptionEvents(LiveTranscriptionEvents.Unhandled),
337                        unhandled=unhandled_error,
338                        **dict(cast(Dict[Any, Any], self._kwargs)),
339                    )
340
341            self._logger.notice("_process_text Succeeded")
342            self._logger.debug("SpeakStreamClient._process_text LEAVE")
343
344        except Exception as e:  # pylint: disable=broad-except
345            self._logger.error(
346                "Exception in ListenWebSocketClient._process_text: %s", e
347            )
348            e_error: ErrorResponse = ErrorResponse(
349                "Exception in ListenWebSocketClient._process_text",
350                f"{e}",
351                "Exception",
352            )
353            self._logger.error(
354                "Exception in ListenWebSocketClient._process_text: %s", str(e)
355            )
356            self._emit(
357                LiveTranscriptionEvents(LiveTranscriptionEvents.Error),
358                e_error,
359                **dict(cast(Dict[Any, Any], self._kwargs)),
360            )
361
362            # signal exit and close
363            super()._signal_exit()
364
365            self._logger.debug("ListenWebSocketClient._process_text LEAVE")
366
367            if self._config.options.get("termination_exception") is True:
368                raise
369            return
370
371    # pylint: enable=too-many-return-statements,too-many-statements
372
373    def _process_binary(self, message: bytes) -> None:
374        raise NotImplementedError("no _process_binary method should be called")
375
376    # pylint: disable=too-many-return-statements
377    def _keep_alive(self) -> None:
378        self._logger.debug("ListenWebSocketClient._keep_alive ENTER")
379
380        counter = 0
381        while True:
382            try:
383                counter += 1
384                self._exit_event.wait(timeout=ONE_SECOND)
385
386                if self._exit_event.is_set():
387                    self._logger.notice("_keep_alive exiting gracefully")
388                    self._logger.debug("ListenWebSocketClient._keep_alive LEAVE")
389                    return
390
391                # deepgram keepalive
392                if counter % DEEPGRAM_INTERVAL == 0:
393                    self.keep_alive()
394
395            except Exception as e:  # pylint: disable=broad-except
396                self._logger.error(
397                    "Exception in ListenWebSocketClient._keep_alive: %s", e
398                )
399                e_error: ErrorResponse = ErrorResponse(
400                    "Exception in ListenWebSocketClient._keep_alive",
401                    f"{e}",
402                    "Exception",
403                )
404                self._logger.error(
405                    "Exception in ListenWebSocketClient._keep_alive: %s", str(e)
406                )
407                self._emit(
408                    LiveTranscriptionEvents(LiveTranscriptionEvents.Error),
409                    e_error,
410                    **dict(cast(Dict[Any, Any], self._kwargs)),
411                )
412
413                # signal exit and close
414                super()._signal_exit()
415
416                self._logger.debug("ListenWebSocketClient._keep_alive LEAVE")
417
418                if self._config.options.get("termination_exception") is True:
419                    raise
420                return
421
422    ## pylint: disable=too-many-return-statements,too-many-statements
423    def _flush(self) -> None:
424        self._logger.debug("ListenWebSocketClient._flush ENTER")
425
426        delta_in_ms_str = self._config.options.get("auto_flush_reply_delta")
427        if delta_in_ms_str is None:
428            self._logger.error("auto_flush_reply_delta is None")
429            self._logger.debug("ListenWebSocketClient._flush LEAVE")
430            return
431        delta_in_ms = float(delta_in_ms_str)
432
433        _flush_event = threading.Event()
434        while True:
435            try:
436                _flush_event.wait(timeout=HALF_SECOND)
437
438                if self._exit_event.is_set():
439                    self._logger.notice("_flush exiting gracefully")
440                    self._logger.debug("ListenWebSocketClient._flush LEAVE")
441                    return
442
443                with self._lock_flush:
444                    if self._last_datagram is None:
445                        self._logger.debug("AutoFlush last_datagram is None")
446                        continue
447
448                    delta = datetime.now() - self._last_datagram
449                    diff_in_ms = delta.total_seconds() * 1000
450                    self._logger.debug("AutoFlush delta: %f", diff_in_ms)
451                    if diff_in_ms < delta_in_ms:
452                        self._logger.debug("AutoFlush delta is less than threshold")
453                        continue
454
455                    with self._lock_flush:
456                        self._last_datagram = None
457                    self.finalize()
458
459            except Exception as e:  # pylint: disable=broad-except
460                self._logger.error("Exception in ListenWebSocketClient._flush: %s", e)
461                e_error: ErrorResponse = ErrorResponse(
462                    "Exception in ListenWebSocketClient._flush",
463                    f"{e}",
464                    "Exception",
465                )
466                self._logger.error(
467                    "Exception in ListenWebSocketClient._flush: %s", str(e)
468                )
469                self._emit(
470                    LiveTranscriptionEvents(LiveTranscriptionEvents.Error),
471                    e_error,
472                    **dict(cast(Dict[Any, Any], self._kwargs)),
473                )
474
475                # signal exit and close
476                super()._signal_exit()
477
478                self._logger.debug("ListenWebSocketClient._flush LEAVE")
479
480                if self._config.options.get("termination_exception") is True:
481                    raise
482                return
483
484    # pylint: enable=too-many-return-statements
485
486    def keep_alive(self) -> bool:
487        """
488        Sends a KeepAlive message
489        """
490        self._logger.spam("ListenWebSocketClient.keep_alive ENTER")
491
492        self._logger.notice("Sending KeepAlive...")
493        ret = self.send(json.dumps({"type": "KeepAlive"}))
494
495        if not ret:
496            self._logger.error("keep_alive failed")
497            self._logger.spam("ListenWebSocketClient.keep_alive LEAVE")
498            return False
499
500        self._logger.notice("keep_alive succeeded")
501        self._logger.spam("ListenWebSocketClient.keep_alive LEAVE")
502
503        return True
504
505    def finalize(self) -> bool:
506        """
507        Finalizes the Transcript connection by flushing it
508        """
509        self._logger.spam("ListenWebSocketClient.finalize ENTER")
510
511        self._logger.notice("Sending Finalize...")
512        ret = self.send(json.dumps({"type": "Finalize"}))
513
514        if not ret:
515            self._logger.error("finalize failed")
516            self._logger.spam("ListenWebSocketClient.finalize LEAVE")
517            return False
518
519        self._logger.notice("finalize succeeded")
520        self._logger.spam("ListenWebSocketClient.finalize LEAVE")
521
522        return True
523
524    def _close_message(self) -> bool:
525        return self.send(json.dumps({"type": "CloseStream"}))
526
527    # closes the WebSocket connection gracefully
528    def finish(self) -> bool:
529        """
530        Closes the WebSocket connection gracefully.
531        """
532        self._logger.spam("ListenWebSocketClient.finish ENTER")
533
534        # call parent finish
535        if super().finish() is False:
536            self._logger.error("ListenWebSocketClient.finish failed")
537
538        # debug the threads
539        for thread in threading.enumerate():
540            self._logger.debug("before running thread: %s", thread.name)
541        self._logger.debug("number of active threads: %s", threading.active_count())
542
543        # stop the threads
544        self._logger.verbose("cancelling tasks...")
545        if self._flush_thread is not None:
546            self._flush_thread.join()
547            self._flush_thread = None
548            self._logger.notice("processing _flush_thread thread joined")
549
550        if self._keep_alive_thread is not None:
551            self._keep_alive_thread.join()
552            self._keep_alive_thread = None
553            self._logger.notice("processing _keep_alive_thread thread joined")
554
555        if self._listen_thread is not None:
556            self._listen_thread.join()
557            self._listen_thread = None
558        self._logger.notice("listening thread joined")
559
560        # debug the threads
561        for thread in threading.enumerate():
562            self._logger.debug("before running thread: %s", thread.name)
563        self._logger.debug("number of active threads: %s", threading.active_count())
564
565        self._logger.notice("finish succeeded")
566        self._logger.spam("ListenWebSocketClient.finish LEAVE")
567        return True
568
569    def _inspect(self, msg_result: LiveResultResponse) -> bool:
570        # auto flush_inspect is generically used to track any messages you might want to snoop on
571        # place additional logic here to inspect messages of interest
572
573        # for auto flush functionality
574        # set the last datagram
575        sentence = msg_result.channel.alternatives[0].transcript
576        if len(sentence) == 0:
577            return True
578
579        if msg_result.is_final:
580            with self._lock_flush:
581                self._logger.debug("AutoFlush is_final received")
582                self._last_datagram = None
583        else:
584            with self._lock_flush:
585                self._last_datagram = datetime.now()
586                self._logger.debug(
587                    "AutoFlush interim received: %s",
588                    str(self._last_datagram),
589                )
590
591        return True

Client for interacting with Deepgram's live transcription services over WebSockets.

This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.

Args: config (DeepgramClientOptions): all the options for the client. thread_cls (Type[threading.Thread]): optional thread class to use for creating threads, defaults to threading.Thread. Useful for custom thread management like ContextVar support.

ListenWebSocketClient( config: deepgram.options.DeepgramClientOptions, thread_cls: Type[threading.Thread] = <class 'threading.Thread'>)
 68    def __init__(
 69        self,
 70        config: DeepgramClientOptions,
 71        thread_cls: Type[threading.Thread] = threading.Thread,
 72    ):
 73        if config is None:
 74            raise DeepgramError("Config is required")
 75
 76        self._logger = verboselogs.VerboseLogger(__name__)
 77        self._logger.addHandler(logging.StreamHandler())
 78        self._logger.setLevel(config.verbose)
 79
 80        self._config = config
 81        self._endpoint = "v1/listen"
 82
 83        self._flush_thread = None
 84        self._keep_alive_thread = None
 85
 86        # auto flush
 87        self._last_datagram = None
 88        self._lock_flush = threading.Lock()
 89
 90        self._thread_cls = thread_cls
 91
 92        # init handlers
 93        self._event_handlers = {
 94            event: [] for event in LiveTranscriptionEvents.__members__.values()
 95        }
 96
 97        # call the parent constructor
 98        super().__init__(
 99            config=self._config,
100            endpoint=self._endpoint,
101            thread_cls=self._thread_cls,
102        )
def start( self, options: Union[deepgram.clients.listen.v1.websocket.options.LiveOptions, Dict, NoneType] = None, addons: Optional[Dict] = None, headers: Optional[Dict] = None, members: Optional[Dict] = None, **kwargs) -> bool:
105    def start(
106        self,
107        options: Optional[Union[ListenWebSocketOptions, Dict]] = None,
108        addons: Optional[Dict] = None,
109        headers: Optional[Dict] = None,
110        members: Optional[Dict] = None,
111        **kwargs,
112    ) -> bool:
113        """
114        Starts the WebSocket connection for live transcription.
115        """
116        self._logger.debug("ListenWebSocketClient.start ENTER")
117        self._logger.info("options: %s", options)
118        self._logger.info("addons: %s", addons)
119        self._logger.info("headers: %s", headers)
120        self._logger.info("members: %s", members)
121        self._logger.info("kwargs: %s", kwargs)
122
123        if isinstance(options, ListenWebSocketOptions) and not options.check():
124            self._logger.error("options.check failed")
125            self._logger.debug("ListenWebSocketClient.start LEAVE")
126            raise DeepgramError("Fatal transcription options error")
127
128        self._addons = addons
129        self._headers = headers
130
131        # add "members" as members of the class
132        if members is not None:
133            self.__dict__.update(members)
134
135        # set kwargs as members of the class
136        if kwargs is not None:
137            self._kwargs = kwargs
138        else:
139            self._kwargs = {}
140
141        if isinstance(options, ListenWebSocketOptions):
142            self._logger.info("ListenWebSocketOptions switching class -> dict")
143            self._options = options.to_dict()
144        elif options is not None:
145            self._options = options
146        else:
147            self._options = {}
148
149        try:
150            # call parent start
151            if (
152                super().start(
153                    self._options,
154                    self._addons,
155                    self._headers,
156                    **dict(cast(Dict[Any, Any], self._kwargs)),
157                )
158                is False
159            ):
160                self._logger.error("ListenWebSocketClient.start failed")
161                self._logger.debug("ListenWebSocketClient.start LEAVE")
162                return False
163
164            # debug the threads
165            for thread in threading.enumerate():
166                self._logger.debug("after running thread: %s", thread.name)
167            self._logger.debug("number of active threads: %s", threading.active_count())
168
169            # keepalive thread
170            if self._config.is_keep_alive_enabled():
171                self._logger.notice("keepalive is enabled")
172                self._keep_alive_thread = self._thread_cls(target=self._keep_alive)
173                self._keep_alive_thread.start()
174            else:
175                self._logger.notice("keepalive is disabled")
176
177            # flush thread
178            if self._config.is_auto_flush_reply_enabled():
179                self._logger.notice("autoflush is enabled")
180                self._flush_thread = self._thread_cls(target=self._flush)
181                self._flush_thread.start()
182            else:
183                self._logger.notice("autoflush is disabled")
184
185            # debug the threads
186            for thread in threading.enumerate():
187                self._logger.debug("after running thread: %s", thread.name)
188            self._logger.debug("number of active threads: %s", threading.active_count())
189
190            self._logger.notice("start succeeded")
191            self._logger.debug("ListenWebSocketClient.start LEAVE")
192            return True
193
194        except Exception as e:  # pylint: disable=broad-except
195            self._logger.error(
196                "WebSocketException in ListenWebSocketClient.start: %s", e
197            )
198            self._logger.debug("ListenWebSocketClient.start LEAVE")
199            if self._config.options.get("termination_exception_connect") is True:
200                raise e
201            return False

Starts the WebSocket connection for live transcription.

def on( self, event: deepgram.clients.listen.enums.LiveTranscriptionEvents, handler: Callable) -> None:
205    def on(
206        self, event: LiveTranscriptionEvents, handler: Callable
207    ) -> None:  # registers event handlers for specific events
208        """
209        Registers event handlers for specific events.
210        """
211        self._logger.info("event subscribed: %s", event)
212        if event in LiveTranscriptionEvents.__members__.values() and callable(handler):
213            self._event_handlers[event].append(handler)

Registers event handlers for specific events.

def keep_alive(self) -> bool:
486    def keep_alive(self) -> bool:
487        """
488        Sends a KeepAlive message
489        """
490        self._logger.spam("ListenWebSocketClient.keep_alive ENTER")
491
492        self._logger.notice("Sending KeepAlive...")
493        ret = self.send(json.dumps({"type": "KeepAlive"}))
494
495        if not ret:
496            self._logger.error("keep_alive failed")
497            self._logger.spam("ListenWebSocketClient.keep_alive LEAVE")
498            return False
499
500        self._logger.notice("keep_alive succeeded")
501        self._logger.spam("ListenWebSocketClient.keep_alive LEAVE")
502
503        return True

Sends a KeepAlive message

def finalize(self) -> bool:
505    def finalize(self) -> bool:
506        """
507        Finalizes the Transcript connection by flushing it
508        """
509        self._logger.spam("ListenWebSocketClient.finalize ENTER")
510
511        self._logger.notice("Sending Finalize...")
512        ret = self.send(json.dumps({"type": "Finalize"}))
513
514        if not ret:
515            self._logger.error("finalize failed")
516            self._logger.spam("ListenWebSocketClient.finalize LEAVE")
517            return False
518
519        self._logger.notice("finalize succeeded")
520        self._logger.spam("ListenWebSocketClient.finalize LEAVE")
521
522        return True

Finalizes the Transcript connection by flushing it

def finish(self) -> bool:
528    def finish(self) -> bool:
529        """
530        Closes the WebSocket connection gracefully.
531        """
532        self._logger.spam("ListenWebSocketClient.finish ENTER")
533
534        # call parent finish
535        if super().finish() is False:
536            self._logger.error("ListenWebSocketClient.finish failed")
537
538        # debug the threads
539        for thread in threading.enumerate():
540            self._logger.debug("before running thread: %s", thread.name)
541        self._logger.debug("number of active threads: %s", threading.active_count())
542
543        # stop the threads
544        self._logger.verbose("cancelling tasks...")
545        if self._flush_thread is not None:
546            self._flush_thread.join()
547            self._flush_thread = None
548            self._logger.notice("processing _flush_thread thread joined")
549
550        if self._keep_alive_thread is not None:
551            self._keep_alive_thread.join()
552            self._keep_alive_thread = None
553            self._logger.notice("processing _keep_alive_thread thread joined")
554
555        if self._listen_thread is not None:
556            self._listen_thread.join()
557            self._listen_thread = None
558        self._logger.notice("listening thread joined")
559
560        # debug the threads
561        for thread in threading.enumerate():
562            self._logger.debug("before running thread: %s", thread.name)
563        self._logger.debug("number of active threads: %s", threading.active_count())
564
565        self._logger.notice("finish succeeded")
566        self._logger.spam("ListenWebSocketClient.finish LEAVE")
567        return True

Closes the WebSocket connection gracefully.