deepgram.clients.listen.client
1# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. 2# Use of this source code is governed by a MIT license that can be found in the LICENSE file. 3# SPDX-License-Identifier: MIT 4 5# rest 6from .v1 import ( 7 ListenRESTClient as ListenRESTClientLatest, 8 AsyncListenRESTClient as AsyncListenRESTClientLatest, 9) 10from .v1 import ( 11 PrerecordedOptions as PrerecordedOptionsLatest, 12 ListenRESTOptions as ListenRESTOptionsLatest, 13) 14 15from .v1 import ( 16 UrlSource as UrlSourceLatest, 17 TextSource as TextSourceLatest, 18 BufferSource as BufferSourceLatest, 19 StreamSource as StreamSourceLatest, 20 FileSource as FileSourceLatest, 21 PreRecordedStreamSource as PreRecordedStreamSourceLatest, 22 PrerecordedSource as PrerecordedSourceLatest, 23 ListenRestSource as ListenRestSourceLatest, 24) 25from .v1 import ( 26 AsyncPrerecordedResponse as AsyncPrerecordedResponseLatest, 27 PrerecordedResponse as PrerecordedResponseLatest, 28 SyncPrerecordedResponse as SyncPrerecordedResponseLatest, 29 # shared 30 Average as AverageLatest, 31 Intent as IntentLatest, 32 Intents as IntentsLatest, 33 IntentsInfo as IntentsInfoLatest, 34 Segment as SegmentLatest, 35 SentimentInfo as SentimentInfoLatest, 36 Sentiment as SentimentLatest, 37 Sentiments as SentimentsLatest, 38 SummaryInfo as SummaryInfoLatest, 39 Topic as TopicLatest, 40 Topics as TopicsLatest, 41 TopicsInfo as TopicsInfoLatest, 42 # between rest and websocket 43 ModelInfo as ModelInfoLatest, 44 Hit as HitLatest, 45 Search as SearchLatest, 46 # unique 47 ListenRESTMetadata as ListenRESTMetadataLatest, 48 Entity as EntityLatest, 49 Paragraph as ParagraphLatest, 50 Paragraphs as ParagraphsLatest, 51 ListenRESTResults as ListenRESTResultsLatest, 52 Sentence as SentenceLatest, 53 Summaries as SummariesLatest, 54 SummaryV1 as SummaryV1Latest, 55 SummaryV2 as SummaryV2Latest, 56 Translation as TranslationLatest, 57 Utterance as UtteranceLatest, 58 Warning as WarningLatest, 59 ListenRESTAlternative as ListenRESTAlternativeLatest, 60 ListenRESTChannel as ListenRESTChannelLatest, 61 ListenRESTWord as ListenRESTWordLatest, 62) 63 64# websocket 65from .v1 import ( 66 ListenWebSocketClient as ListenWebSocketClientLatest, 67 AsyncListenWebSocketClient as AsyncListenWebSocketClientLatest, 68) 69from .v1 import ( 70 LiveOptions as LiveOptionsLatest, 71 ListenWebSocketOptions as ListenWebSocketOptionsLatest, 72) 73from .v1 import ( 74 OpenResponse as OpenResponseLatest, 75 LiveResultResponse as LiveResultResponseLatest, 76 ListenWSMetadataResponse as ListenWSMetadataResponseLatest, 77 SpeechStartedResponse as SpeechStartedResponseLatest, 78 UtteranceEndResponse as UtteranceEndResponseLatest, 79 CloseResponse as CloseResponseLatest, 80 ErrorResponse as ErrorResponseLatest, 81 UnhandledResponse as UnhandledResponseLatest, 82 ListenWSMetadata as ListenWSMetadataLatest, 83 ListenWSAlternative as ListenWSAlternativeLatest, 84 ListenWSChannel as ListenWSChannelLatest, 85 ListenWSWord as ListenWSWordLatest, 86) 87 88# The vX/client.py points to the current supported version in the SDK. 89# Older versions are supported in the SDK for backwards compatibility. 90 91# shared 92Average = AverageLatest 93Intent = IntentLatest 94Intents = IntentsLatest 95IntentsInfo = IntentsInfoLatest 96Segment = SegmentLatest 97SentimentInfo = SentimentInfoLatest 98Sentiment = SentimentLatest 99Sentiments = SentimentsLatest 100SummaryInfo = SummaryInfoLatest 101Topic = TopicLatest 102Topics = TopicsLatest 103TopicsInfo = TopicsInfoLatest 104 105# between rest and websocket 106Hit = HitLatest 107ModelInfo = ModelInfoLatest 108Search = SearchLatest 109 110# websocket common 111OpenResponse = OpenResponseLatest 112CloseResponse = CloseResponseLatest 113ErrorResponse = ErrorResponseLatest 114UnhandledResponse = UnhandledResponseLatest 115 116 117# backward compat 118PreRecordedClient = ListenRESTClientLatest 119AsyncPreRecordedClient = AsyncListenRESTClientLatest 120LiveClient = ListenWebSocketClientLatest 121AsyncLiveClient = ListenWebSocketClientLatest 122 123# rest 124## common 125UrlSource = UrlSourceLatest 126TextSource = TextSourceLatest 127BufferSource = BufferSourceLatest 128StreamSource = StreamSourceLatest 129FileSource = FileSourceLatest 130 131# input 132ListenRESTOptions = ListenRESTOptionsLatest 133PrerecordedOptions = PrerecordedOptionsLatest 134PreRecordedStreamSource = PreRecordedStreamSourceLatest 135PrerecordedSource = PrerecordedSourceLatest 136ListenRestSource = ListenRestSourceLatest 137 138## output 139AsyncPrerecordedResponse = AsyncPrerecordedResponseLatest 140PrerecordedResponse = PrerecordedResponseLatest 141SyncPrerecordedResponse = SyncPrerecordedResponseLatest 142# unique 143Entity = EntityLatest 144ListenRESTMetadata = ListenRESTMetadataLatest 145Paragraph = ParagraphLatest 146Paragraphs = ParagraphsLatest 147ListenRESTResults = ListenRESTResultsLatest 148Sentence = SentenceLatest 149Summaries = SummariesLatest 150SummaryV1 = SummaryV1Latest 151SummaryV2 = SummaryV2Latest 152Translation = TranslationLatest 153Utterance = UtteranceLatest 154Warning = WarningLatest 155ListenRESTAlternative = ListenRESTAlternativeLatest 156ListenRESTChannel = ListenRESTChannelLatest 157ListenRESTWord = ListenRESTWordLatest 158 159# websocket 160## input 161ListenWebSocketOptions = ListenWebSocketOptionsLatest 162LiveOptions = LiveOptionsLatest 163 164## output 165LiveResultResponse = LiveResultResponseLatest 166ListenWSMetadataResponse = ListenWSMetadataResponseLatest 167SpeechStartedResponse = SpeechStartedResponseLatest 168UtteranceEndResponse = UtteranceEndResponseLatest 169 170## unique 171ListenWSMetadata = ListenWSMetadataLatest 172ListenWSAlternative = ListenWSAlternativeLatest 173ListenWSChannel = ListenWSChannelLatest 174ListenWSWord = ListenWSWordLatest 175 176# clients 177ListenRESTClient = ListenRESTClientLatest 178AsyncListenRESTClient = AsyncListenRESTClientLatest 179ListenWebSocketClient = ListenWebSocketClientLatest 180AsyncListenWebSocketClient = AsyncListenWebSocketClientLatest
62@dataclass 63class Average(BaseResponse): 64 """ 65 Average 66 """ 67 68 sentiment: Sentiment 69 sentiment_score: float = 0 70 71 def __getitem__(self, key): 72 _dict = self.to_dict() 73 if "sentiment" in _dict: 74 _dict["sentiment"] = Sentiment.from_dict(_dict["sentiment"]) 75 return _dict[key]
Average
Inherited Members
88@dataclass 89class Intent(BaseResponse): 90 """ 91 Intent 92 """ 93 94 intent: str = "" 95 confidence_score: float = 0
Intent
Inherited Members
168@dataclass 169class Intents(BaseResponse): 170 """ 171 Intents 172 """ 173 174 segments: List[Segment] = field(default_factory=list) 175 176 def __getitem__(self, key): 177 _dict = self.to_dict() 178 if "segments" in _dict: 179 _dict["segments"] = [ 180 Segment.from_dict(segments) for segments in _dict["segments"] 181 ] 182 return _dict[key]
Intents
Inherited Members
18@dataclass 19class IntentsInfo(BaseResponse): 20 """ 21 Intents Info 22 """ 23 24 model_uuid: str = "" 25 input_tokens: int = 0 26 output_tokens: int = 0
Intents Info
Inherited Members
98@dataclass 99class Segment(BaseResponse): 100 """ 101 Segment 102 """ 103 104 text: str = "" 105 start_word: int = 0 106 end_word: int = 0 107 sentiment: Optional[Sentiment] = field( 108 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 109 ) 110 sentiment_score: Optional[float] = field( 111 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 112 ) 113 intents: Optional[List[Intent]] = field( 114 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 115 ) 116 topics: Optional[List[Topic]] = field( 117 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 118 ) 119 120 def __getitem__(self, key): 121 _dict = self.to_dict() 122 if "sentiment" in _dict: 123 _dict["sentiment"] = Sentiment.from_dict(_dict["sentiment"]) 124 if "intents" in _dict: 125 _dict["intents"] = Intent.from_dict(_dict["intents"]) 126 if "topics" in _dict: 127 _dict["topics"] = Topic.from_dict(_dict["topics"]) 128 return _dict[key]
Segment
Inherited Members
29@dataclass 30class SentimentInfo(BaseResponse): 31 """ 32 Sentiment Info 33 """ 34 35 model_uuid: str = "" 36 input_tokens: int = 0 37 output_tokens: int = 0
Sentiment Info
Inherited Members
11class Sentiment(StrEnum): 12 """ 13 Sentiment values. 14 """ 15 16 UNKNOWN: str = "" 17 NEGATIVE: str = "negative" 18 NEUTRAL: str = "neutral" 19 POSITIVE: str = "positive"
Sentiment values.
131@dataclass 132class Sentiments(BaseResponse): 133 """ 134 Sentiments 135 """ 136 137 average: Average 138 segments: List[Segment] = field(default_factory=list) 139 140 def __getitem__(self, key): 141 _dict = self.to_dict() 142 if "segments" in _dict: 143 _dict["segments"] = [ 144 Segment.from_dict(segments) for segments in _dict["segments"] 145 ] 146 if "average" in _dict: 147 _dict["average"] = Average.from_dict(_dict["average"]) 148 return _dict[key]
Sentiments
Inherited Members
40@dataclass 41class SummaryInfo(BaseResponse): 42 """ 43 Summary Info 44 """ 45 46 model_uuid: str = "" 47 input_tokens: int = 0 48 output_tokens: int = 0
Summary Info
Inherited Members
78@dataclass 79class Topic(BaseResponse): 80 """ 81 Topic 82 """ 83 84 topic: str = "" 85 confidence_score: float = 0
Topic
Inherited Members
151@dataclass 152class Topics(BaseResponse): 153 """ 154 Topics 155 """ 156 157 segments: List[Segment] = field(default_factory=list) 158 159 def __getitem__(self, key): 160 _dict = self.to_dict() 161 if "segments" in _dict: 162 _dict["segments"] = [ 163 Segment.from_dict(segments) for segments in _dict["segments"] 164 ] 165 return _dict[key]
Topics
Inherited Members
51@dataclass 52class TopicsInfo(BaseResponse): 53 """ 54 Topics Info 55 """ 56 57 model_uuid: str = "" 58 input_tokens: int = 0 59 output_tokens: int = 0
Topics Info
Inherited Members
62@dataclass 63class Hit(BaseResponse): 64 """ 65 The hit information for the response. 66 """ 67 68 confidence: float = 0 69 start: float = 0 70 end: float = 0 71 snippet: Optional[str] = ""
The hit information for the response.
Inherited Members
51@dataclass 52class ModelInfo(BaseResponse): 53 """ 54 ModelInfo object 55 """ 56 57 name: str = "" 58 version: str = "" 59 arch: str = ""
ModelInfo object
Inherited Members
74@dataclass 75class Search(BaseResponse): 76 """ 77 The search information for the response. 78 """ 79 80 query: str = "" 81 hits: List[Hit] = field(default_factory=list) 82 83 def __getitem__(self, key): 84 _dict = self.to_dict() 85 if "hits" in _dict: 86 _dict["hits"] = [Hit.from_dict(hits) for hits in _dict["hits"]] 87 return _dict[key]
The search information for the response.
Inherited Members
17@dataclass 18class OpenResponse(BaseResponse): 19 """ 20 Open Message from the Deepgram Platform 21 """ 22 23 type: str = ""
Open Message from the Deepgram Platform
Inherited Members
29@dataclass 30class CloseResponse(BaseResponse): 31 """ 32 Close Message from the Deepgram Platform 33 """ 34 35 type: str = ""
Close Message from the Deepgram Platform
Inherited Members
41@dataclass 42class ErrorResponse(BaseResponse): 43 """ 44 Error Message from the Deepgram Platform 45 """ 46 47 description: str = "" 48 message: str = "" 49 type: str = "" 50 variant: Optional[str] = field( 51 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 52 )
Error Message from the Deepgram Platform
Inherited Members
58@dataclass 59class UnhandledResponse(BaseResponse): 60 """ 61 Unhandled Message from the Deepgram Platform 62 """ 63 64 type: str = "" 65 raw: str = ""
Unhandled Message from the Deepgram Platform
Inherited Members
25class UrlSource(TypedDict): 26 """ 27 Represents a data source for specifying the location of a file via a URL. 28 29 This class is used to specify a hosted file URL, typically pointing to an 30 externally hosted file, such as an audio file hosted on a server or the internet. 31 32 Attributes: 33 url (str): The URL pointing to the hosted file. 34 """ 35 36 url: str
Represents a data source for specifying the location of a file via a URL.
This class is used to specify a hosted file URL, typically pointing to an externally hosted file, such as an audio file hosted on a server or the internet.
Attributes: url (str): The URL pointing to the hosted file.
53class TextSource(TypedDict): 54 """ 55 Represents a data source for reading binary data from a text-like source. 56 57 This class is used to specify a source of text data that can be read from. 58 59 Attributes: 60 text (str): A string for reading text data. 61 """ 62 63 text: str
Represents a data source for reading binary data from a text-like source.
This class is used to specify a source of text data that can be read from.
Attributes: text (str): A string for reading text data.
39class BufferSource(TypedDict): 40 """ 41 Represents a data source for handling raw binary data. 42 43 This class is used to specify raw binary data, such as audio data in its 44 binary form, which can be captured from a microphone or generated synthetically. 45 46 Attributes: 47 buffer (bytes): The binary data. 48 """ 49 50 buffer: bytes
Represents a data source for handling raw binary data.
This class is used to specify raw binary data, such as audio data in its binary form, which can be captured from a microphone or generated synthetically.
Attributes: buffer (bytes): The binary data.
11class StreamSource(TypedDict): 12 """ 13 Represents a data source for reading binary data from a stream-like source. 14 15 This class is used to specify a source of binary data that can be read from 16 a stream, such as an audio file in .wav format. 17 18 Attributes: 19 stream (BufferedReader): A BufferedReader object for reading binary data. 20 """ 21 22 stream: BufferedReader
Represents a data source for reading binary data from a stream-like source.
This class is used to specify a source of binary data that can be read from a stream, such as an audio file in .wav format.
Attributes: stream (BufferedReader): A BufferedReader object for reading binary data.
23@dataclass 24class PrerecordedOptions(BaseResponse): # pylint: disable=too-many-instance-attributes 25 """ 26 Contains all the options for the PrerecordedClient. 27 28 Reference: 29 https://developers.deepgram.com/reference/pre-recorded 30 """ 31 32 alternatives: Optional[int] = field( 33 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 34 ) 35 channels: Optional[int] = field( 36 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 37 ) 38 callback: Optional[str] = field( 39 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 40 ) 41 callback_method: Optional[str] = field( 42 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 43 ) 44 custom_intent: Optional[Union[List[str], str]] = field( 45 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 46 ) 47 custom_intent_mode: Optional[str] = field( 48 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 49 ) 50 custom_topic: Optional[Union[List[str], str]] = field( 51 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 52 ) 53 custom_topic_mode: Optional[str] = field( 54 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 55 ) 56 detect_entities: Optional[bool] = field( 57 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 58 ) 59 detect_language: Optional[bool] = field( 60 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 61 ) 62 detect_topics: Optional[bool] = field( 63 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 64 ) 65 diarize: Optional[bool] = field( 66 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 67 ) 68 diarize_version: Optional[str] = field( 69 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 70 ) 71 dictation: Optional[bool] = field( 72 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 73 ) 74 encoding: Optional[str] = field( 75 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 76 ) 77 extra: Optional[Union[List[str], str]] = field( 78 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 79 ) 80 filler_words: Optional[bool] = field( 81 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 82 ) 83 intents: Optional[bool] = field( 84 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 85 ) 86 keyterm: Optional[List[str]] = field( 87 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 88 ) 89 keywords: Optional[Union[List[str], str]] = field( 90 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 91 ) 92 language: Optional[str] = field( 93 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 94 ) 95 measurements: Optional[bool] = field( 96 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 97 ) 98 model: Optional[str] = field( 99 default="None", metadata=dataclass_config(exclude=lambda f: f is None) 100 ) 101 multichannel: Optional[bool] = field( 102 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 103 ) 104 numerals: Optional[bool] = field( 105 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 106 ) 107 paragraphs: Optional[bool] = field( 108 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 109 ) 110 profanity_filter: Optional[bool] = field( 111 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 112 ) 113 punctuate: Optional[bool] = field( 114 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 115 ) 116 redact: Optional[Union[List[str], bool, str]] = field( 117 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 118 ) 119 replace: Optional[Union[List[str], str]] = field( 120 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 121 ) 122 sample_rate: Optional[int] = field( 123 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 124 ) 125 search: Optional[Union[List[str], str]] = field( 126 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 127 ) 128 sentiment: Optional[bool] = field( 129 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 130 ) 131 smart_format: Optional[bool] = field( 132 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 133 ) 134 summarize: Optional[Union[bool, str]] = field( 135 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 136 ) 137 tag: Optional[List[str]] = field( 138 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 139 ) 140 tier: Optional[str] = field( 141 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 142 ) 143 topics: Optional[bool] = field( 144 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 145 ) 146 utt_split: Optional[float] = field( 147 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 148 ) 149 utterances: Optional[bool] = field( 150 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 151 ) 152 version: Optional[str] = field( 153 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 154 ) 155 156 def check(self): 157 """ 158 Check the options for any deprecated fields or values. 159 """ 160 logger = verboselogs.VerboseLogger(__name__) 161 logger.addHandler(logging.StreamHandler()) 162 prev = logger.level 163 logger.setLevel(verboselogs.ERROR) 164 165 if self.tier: 166 logger.error( 167 "WARNING: Tier is deprecated. Will be removed in a future version." 168 ) 169 170 logger.setLevel(prev) 171 172 return True
Contains all the options for the PrerecordedClient.
Reference: https://developers.deepgram.com/reference/pre-recorded
156 def check(self): 157 """ 158 Check the options for any deprecated fields or values. 159 """ 160 logger = verboselogs.VerboseLogger(__name__) 161 logger.addHandler(logging.StreamHandler()) 162 prev = logger.level 163 logger.setLevel(verboselogs.ERROR) 164 165 if self.tier: 166 logger.error( 167 "WARNING: Tier is deprecated. Will be removed in a future version." 168 ) 169 170 logger.setLevel(prev) 171 172 return True
Check the options for any deprecated fields or values.
Inherited Members
38@dataclass 39class AsyncPrerecordedResponse(BaseResponse): 40 """ 41 The response object for the async prerecorded API. 42 """ 43 44 request_id: str = ""
The response object for the async prerecorded API.
Inherited Members
449@dataclass 450class PrerecordedResponse(BaseResponse): 451 """ 452 The response object for the prerecorded API. 453 """ 454 455 metadata: Optional[Metadata] = field( 456 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 457 ) 458 results: Optional[Results] = field( 459 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 460 ) 461 462 def __getitem__(self, key): 463 _dict = self.to_dict() 464 if "metadata" in _dict: 465 _dict["metadata"] = Metadata.from_dict(_dict["metadata"]) 466 if "results" in _dict: 467 _dict["results"] = Results.from_dict(_dict["results"]) 468 return _dict[key]
The response object for the prerecorded API.
Inherited Members
307@dataclass 308class Entity(BaseResponse): 309 """ 310 The entity information for the response. 311 """ 312 313 label: str = "" 314 value: str = "" 315 confidence: float = 0 316 start_word: float = 0 317 end_word: float = 0
The entity information for the response.
Inherited Members
201@dataclass 202class Paragraph(BaseResponse): 203 """ 204 The paragraph information for the response. 205 """ 206 207 sentences: List[Sentence] = field(default_factory=list) 208 start: float = 0 209 end: float = 0 210 num_words: int = 0 211 speaker: Optional[int] = field( 212 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 213 ) 214 sentiment: Optional[Sentiment] = field( 215 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 216 ) 217 sentiment_score: Optional[float] = field( 218 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 219 ) 220 221 def __getitem__(self, key): 222 _dict = self.to_dict() 223 if "sentences" in _dict: 224 _dict["sentences"] = [ 225 Sentence.from_dict(sentences) for sentences in _dict["sentences"] 226 ] 227 if "sentiment" in _dict: 228 _dict["sentiment"] = Sentiment.from_dict(_dict["sentiment"]) 229 return _dict[key]
The paragraph information for the response.
Inherited Members
232@dataclass 233class Paragraphs(BaseResponse): 234 """ 235 The paragraphs information for the response. 236 """ 237 238 transcript: Optional[str] = "" 239 paragraphs: Optional[List[Paragraph]] = field( 240 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 241 ) 242 243 def __getitem__(self, key): 244 _dict = self.to_dict() 245 if "paragraphs" in _dict: 246 _dict["paragraphs"] = [ 247 Paragraph.from_dict(paragraphs) for paragraphs in _dict["paragraphs"] 248 ] 249 return _dict[key]
The paragraphs information for the response.
Inherited Members
178@dataclass 179class Sentence(BaseResponse): 180 """ 181 The sentence information for the response. 182 """ 183 184 text: str = "" 185 start: float = 0 186 end: float = 0 187 sentiment: Optional[Sentiment] = field( 188 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 189 ) 190 sentiment_score: Optional[float] = field( 191 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 192 ) 193 194 def __getitem__(self, key): 195 _dict = self.to_dict() 196 if "sentiment" in _dict: 197 _dict["sentiment"] = Sentiment.from_dict(_dict["sentiment"]) 198 return _dict[key]
The sentence information for the response.
Inherited Members
115@dataclass 116class SummaryV1(BaseResponse): 117 """ 118 The summary information for the response. 119 """ 120 121 summary: str = "" 122 start_word: float = 0 123 end_word: float = 0
The summary information for the response.
Inherited Members
129@dataclass 130class SummaryV2(BaseResponse): 131 """ 132 The summary information for the response. 133 """ 134 135 result: str = "" 136 short: str = ""
The summary information for the response.
Inherited Members
252@dataclass 253class Translation(BaseResponse): 254 """ 255 The translation information for the response. 256 """ 257 258 language: Optional[str] = "" 259 translation: Optional[str] = ""
The translation information for the response.
Inherited Members
273@dataclass 274class Utterance(BaseResponse): # pylint: disable=too-many-instance-attributes 275 """ 276 The utterance information for the response. 277 """ 278 279 start: float = 0 280 end: float = 0 281 confidence: float = 0 282 channel: int = 0 283 transcript: str = "" 284 words: List[ListenRESTWord] = field(default_factory=list) 285 speaker: Optional[int] = field( 286 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 287 ) 288 sentiment: Optional[Sentiment] = field( 289 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 290 ) 291 sentiment_score: Optional[float] = field( 292 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 293 ) 294 id: str = "" 295 296 def __getitem__(self, key): 297 _dict = self.to_dict() 298 if "words" in _dict: 299 _dict["words"] = [ 300 ListenRESTWord.from_dict(words) for words in _dict["words"] 301 ] 302 if "sentiment" in _dict: 303 _dict["sentiment"] = Sentiment.from_dict(_dict["sentiment"]) 304 return _dict[key]
The utterance information for the response.
Inherited Members
262@dataclass 263class Warning(BaseResponse): # pylint: disable=used-before-assignment,redefined-builtin 264 """ 265 The warning information for the response. 266 """ 267 268 parameter: str = "" 269 type: str = "" 270 message: str = ""
The warning information for the response.
Inherited Members
320@dataclass 321class ListenRESTAlternative( 322 BaseResponse 323): # pylint: disable=too-many-instance-attributes 324 """ 325 The alternative information for the response. 326 """ 327 328 transcript: str = "" 329 confidence: float = 0 330 words: List[ListenRESTWord] = field(default_factory=list) 331 summaries: Optional[List[SummaryV1]] = field( 332 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 333 ) 334 paragraphs: Optional[Paragraphs] = field( 335 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 336 ) 337 entities: Optional[List[Entity]] = field( 338 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 339 ) 340 translations: Optional[List[Translation]] = field( 341 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 342 ) 343 languages: Optional[List[str]] = field( 344 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 345 ) 346 347 def __getitem__(self, key): 348 _dict = self.to_dict() 349 if "words" in _dict: 350 _dict["words"] = [ 351 ListenRESTWord.from_dict(words) for words in _dict["words"] 352 ] 353 if "summaries" in _dict: 354 _dict["summaries"] = [ 355 SummaryV1.from_dict(summaries) for summaries in _dict["summaries"] 356 ] 357 if "paragraphs" in _dict: 358 _dict["paragraphs"] = Paragraphs.from_dict(_dict["paragraphs"]) 359 if "entities" in _dict: 360 _dict["entities"] = [ 361 Entity.from_dict(entities) for entities in _dict["entities"] 362 ] 363 if "translations" in _dict: 364 _dict["translations"] = [ 365 Translation.from_dict(translations) 366 for translations in _dict["translations"] 367 ] 368 return _dict[key]
The alternative information for the response.
Inherited Members
371@dataclass 372class ListenRESTChannel(BaseResponse): 373 """ 374 The channel information for the response. 375 """ 376 377 search: Optional[List[Search]] = field( 378 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 379 ) 380 alternatives: List[ListenRESTAlternative] = field(default_factory=list) 381 detected_language: Optional[str] = field( 382 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 383 ) 384 language_confidence: Optional[float] = field( 385 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 386 ) 387 388 def __getitem__(self, key): 389 _dict = self.to_dict() 390 if "search" in _dict: 391 _dict["search"] = [Search.from_dict(search) for search in _dict["search"]] 392 if "alternatives" in _dict: 393 _dict["alternatives"] = [ 394 ListenRESTAlternative.from_dict(alternatives) 395 for alternatives in _dict["alternatives"] 396 ] 397 return _dict[key]
The channel information for the response.
Inherited Members
142@dataclass 143class ListenRESTWord(BaseResponse): # pylint: disable=too-many-instance-attributes 144 """ 145 The word information for the response. 146 """ 147 148 word: str = "" 149 start: float = 0 150 end: float = 0 151 confidence: float = 0 152 punctuated_word: Optional[str] = field( 153 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 154 ) 155 speaker: Optional[int] = field( 156 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 157 ) 158 language: Optional[str] = field( 159 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 160 ) 161 speaker_confidence: Optional[float] = field( 162 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 163 ) 164 sentiment: Optional[Sentiment] = field( 165 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 166 ) 167 sentiment_score: Optional[float] = field( 168 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 169 ) 170 171 def __getitem__(self, key): 172 _dict = self.to_dict() 173 if "sentiment" in _dict: 174 _dict["sentiment"] = Sentiment.from_dict(_dict["sentiment"]) 175 return _dict[key]
The word information for the response.
Inherited Members
17@dataclass 18class LiveOptions(BaseResponse): # pylint: disable=too-many-instance-attributes 19 """ 20 Live Transcription Options for the Deepgram Platform. 21 22 Please see the documentation for more information on each option: 23 https://developers.deepgram.com/reference/streaming 24 """ 25 26 alternatives: Optional[int] = field( 27 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 28 ) 29 callback: Optional[str] = field( 30 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 31 ) 32 callback_method: Optional[str] = field( 33 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 34 ) 35 channels: Optional[int] = field( 36 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 37 ) 38 diarize: Optional[bool] = field( 39 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 40 ) 41 diarize_version: Optional[str] = field( 42 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 43 ) 44 dictation: Optional[bool] = field( 45 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 46 ) 47 encoding: Optional[str] = field( 48 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 49 ) 50 # pylint: disable=W0511 51 # TODO: endpointing's current type previous was `Optional[str]` which is incorrect 52 # for backward compatibility we are keeping it as `Optional[Union[str, bool, int]]` 53 # since it gets translated to a string to be placed as a query parameter, will keep `str` for now 54 # but will change this to `Optional[Union[bool, int]]` in a future release 55 endpointing: Optional[Union[str, bool, int]] = field( 56 default=None, 57 metadata=dataclass_config(exclude=lambda f: f is None), 58 ) 59 # pylint: enable=W0511 60 extra: Optional[Union[List[str], str]] = field( 61 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 62 ) 63 filler_words: Optional[bool] = field( 64 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 65 ) 66 interim_results: Optional[bool] = field( 67 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 68 ) 69 keywords: Optional[Union[List[str], str]] = field( 70 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 71 ) 72 keyterm: Optional[List[str]] = field( 73 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 74 ) 75 language: Optional[str] = field( 76 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 77 ) 78 model: Optional[str] = field( 79 default="None", metadata=dataclass_config(exclude=lambda f: f is None) 80 ) 81 multichannel: Optional[bool] = field( 82 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 83 ) 84 no_delay: Optional[bool] = field( 85 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 86 ) 87 numerals: Optional[bool] = field( 88 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 89 ) 90 punctuate: Optional[bool] = field( 91 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 92 ) 93 profanity_filter: Optional[bool] = field( 94 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 95 ) 96 redact: Optional[Union[List[str], bool, str]] = field( 97 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 98 ) 99 replace: Optional[Union[List[str], str]] = field( 100 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 101 ) 102 sample_rate: Optional[int] = field( 103 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 104 ) 105 search: Optional[Union[List[str], str]] = field( 106 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 107 ) 108 smart_format: Optional[bool] = field( 109 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 110 ) 111 tag: Optional[List[str]] = field( 112 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 113 ) 114 tier: Optional[str] = field( 115 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 116 ) 117 utterance_end_ms: Optional[str] = field( 118 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 119 ) 120 vad_events: Optional[bool] = field( 121 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 122 ) 123 version: Optional[str] = field( 124 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 125 ) 126 127 def check(self): 128 """ 129 Check the options for any deprecated or soon-to-be-deprecated options. 130 """ 131 logger = verboselogs.VerboseLogger(__name__) 132 logger.addHandler(logging.StreamHandler()) 133 prev = logger.level 134 logger.setLevel(verboselogs.ERROR) 135 136 if self.tier: 137 logger.warning( 138 "WARNING: Tier is deprecated. Will be removed in a future version." 139 ) 140 141 if isinstance(self.endpointing, str): 142 logger.warning( 143 "WARNING: endpointing's current type previous was `Optional[str]` which is incorrect" 144 " for backward compatibility we are keeping it as `Optional[Union[str, bool, int]]`" 145 " since it gets translated to a string to be placed as a query parameter, will keep `str` for now" 146 " but will change this to `Optional[Union[bool, int]]` in a future release" 147 ) 148 149 logger.setLevel(prev) 150 151 return True
Live Transcription Options for the Deepgram Platform.
Please see the documentation for more information on each option: https://developers.deepgram.com/reference/streaming
127 def check(self): 128 """ 129 Check the options for any deprecated or soon-to-be-deprecated options. 130 """ 131 logger = verboselogs.VerboseLogger(__name__) 132 logger.addHandler(logging.StreamHandler()) 133 prev = logger.level 134 logger.setLevel(verboselogs.ERROR) 135 136 if self.tier: 137 logger.warning( 138 "WARNING: Tier is deprecated. Will be removed in a future version." 139 ) 140 141 if isinstance(self.endpointing, str): 142 logger.warning( 143 "WARNING: endpointing's current type previous was `Optional[str]` which is incorrect" 144 " for backward compatibility we are keeping it as `Optional[Union[str, bool, int]]`" 145 " since it gets translated to a string to be placed as a query parameter, will keep `str` for now" 146 " but will change this to `Optional[Union[bool, int]]` in a future release" 147 ) 148 149 logger.setLevel(prev) 150 151 return True
Check the options for any deprecated or soon-to-be-deprecated options.
Inherited Members
122@dataclass 123class LiveResultResponse(BaseResponse): # pylint: disable=too-many-instance-attributes 124 """ 125 Result Message from the Deepgram Platform 126 """ 127 128 channel: ListenWSChannel 129 metadata: Metadata 130 type: str = "" 131 channel_index: List[int] = field(default_factory=list) 132 duration: float = 0 133 start: float = 0 134 is_final: bool = False 135 from_finalize: Optional[bool] = field( 136 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 137 ) 138 speech_final: bool = False 139 140 def __getitem__(self, key): 141 _dict = self.to_dict() 142 if "channel" in _dict: 143 _dict["channel"] = [ 144 ListenWSChannel.from_dict(channel) for channel in _dict["channel"] 145 ] 146 if "metadata" in _dict: 147 _dict["metadata"] = [ 148 Metadata.from_dict(metadata) for metadata in _dict["metadata"] 149 ] 150 return _dict[key]
Result Message from the Deepgram Platform
Inherited Members
196@dataclass 197class SpeechStartedResponse(BaseResponse): 198 """ 199 SpeechStartedResponse Message from the Deepgram Platform 200 """ 201 202 type: str = "" 203 channel: List[int] = field(default_factory=list) 204 timestamp: float = 0
SpeechStartedResponse Message from the Deepgram Platform
Inherited Members
210@dataclass 211class UtteranceEndResponse(BaseResponse): 212 """ 213 UtteranceEnd Message from the Deepgram Platform 214 """ 215 216 type: str = "" 217 channel: List[int] = field(default_factory=list) 218 last_word_end: float = 0
UtteranceEnd Message from the Deepgram Platform
Inherited Members
52@dataclass 53class ListenWSAlternative(BaseResponse): 54 """ 55 Alternative object 56 """ 57 58 transcript: str = "" 59 confidence: float = 0 60 words: List[ListenWSWord] = field(default_factory=list) 61 languages: Optional[List[str]] = field( 62 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 63 ) 64 65 def __getitem__(self, key): 66 _dict = self.to_dict() 67 if "words" in _dict: 68 _dict["words"] = [ListenWSWord.from_dict(words) for words in _dict["words"]] 69 return _dict[key]
Alternative object
Inherited Members
72@dataclass 73class ListenWSChannel(BaseResponse): 74 """ 75 Channel object 76 """ 77 78 search: Optional[List[Search]] = field( 79 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 80 ) 81 alternatives: List[ListenWSAlternative] = field(default_factory=list) 82 83 def __getitem__(self, key): 84 _dict = self.to_dict() 85 if "search" in _dict: 86 _dict["search"] = [Search.from_dict(search) for search in _dict["search"]] 87 if "alternatives" in _dict: 88 _dict["alternatives"] = [ 89 ListenWSAlternative.from_dict(alternatives) 90 for alternatives in _dict["alternatives"] 91 ] 92 return _dict[key]
Channel object
Inherited Members
31@dataclass 32class ListenWSWord(BaseResponse): 33 """ 34 Word object 35 """ 36 37 word: str = "" 38 start: float = 0 39 end: float = 0 40 confidence: float = 0 41 punctuated_word: Optional[str] = field( 42 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 43 ) 44 speaker: Optional[int] = field( 45 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 46 ) 47 language: Optional[str] = field( 48 default=None, metadata=dataclass_config(exclude=lambda f: f is None) 49 )
Word object
Inherited Members
26class ListenRESTClient(AbstractSyncRestClient): 27 """ 28 A client class for handling pre-recorded audio data. 29 Provides methods for transcribing audio from URLs and files. 30 """ 31 32 _logger: verboselogs.VerboseLogger 33 _config: DeepgramClientOptions 34 35 def __init__(self, config: DeepgramClientOptions): 36 self._logger = verboselogs.VerboseLogger(__name__) 37 self._logger.addHandler(logging.StreamHandler()) 38 self._logger.setLevel(config.verbose) 39 self._config = config 40 super().__init__(config) 41 42 # pylint: disable=too-many-positional-arguments 43 44 def transcribe_url( 45 self, 46 source: UrlSource, 47 options: Optional[Union[Dict, ListenRESTOptions]] = None, 48 addons: Optional[Dict] = None, 49 headers: Optional[Dict] = None, 50 timeout: Optional[httpx.Timeout] = None, 51 endpoint: str = "v1/listen", 52 **kwargs, 53 ) -> Union[AsyncPrerecordedResponse, PrerecordedResponse]: 54 """ 55 Transcribes audio from a URL source. 56 57 Args: 58 source (UrlSource): The URL source of the audio to transcribe. 59 options (ListenRESTOptions): Additional options for the transcription (default is None). 60 endpoint (str): The API endpoint for the transcription (default is "v1/listen"). 61 62 Returns: 63 PrerecordedResponse: An object containing the transcription result. 64 65 Raises: 66 DeepgramTypeError: Raised for known API errors. 67 """ 68 self._logger.debug("ListenRESTClient.transcribe_url ENTER") 69 70 if ( 71 isinstance(options, dict) 72 and "callback" in options 73 and options["callback"] is not None 74 ) or (isinstance(options, ListenRESTOptions) and options.callback is not None): 75 self._logger.debug("ListenRESTClient.transcribe_url LEAVE") 76 return self.transcribe_url_callback( 77 source, 78 callback=options["callback"], 79 options=options, 80 addons=addons, 81 headers=headers, 82 timeout=timeout, 83 endpoint=endpoint, 84 **kwargs, 85 ) 86 87 url = f"{self._config.url}/{endpoint}" 88 if is_url_source(source): 89 body = source 90 else: 91 self._logger.error("Unknown transcription source type") 92 self._logger.debug("ListenRESTClient.transcribe_url LEAVE") 93 raise DeepgramTypeError("Unknown transcription source type") 94 95 if isinstance(options, ListenRESTOptions) and not options.check(): 96 self._logger.error("options.check failed") 97 self._logger.debug("ListenRESTClient.transcribe_url LEAVE") 98 raise DeepgramError("Fatal transcription options error") 99 100 self._logger.info("url: %s", url) 101 self._logger.info("source: %s", source) 102 if isinstance(options, ListenRESTOptions): 103 self._logger.info("ListenRESTOptions switching class -> dict") 104 options = options.to_dict() 105 self._logger.info("options: %s", options) 106 self._logger.info("addons: %s", addons) 107 self._logger.info("headers: %s", headers) 108 result = self.post( 109 url, 110 options=options, 111 addons=addons, 112 headers=headers, 113 json=body, 114 timeout=timeout, 115 **kwargs, 116 ) 117 self._logger.info("json: %s", result) 118 res = PrerecordedResponse.from_json(result) 119 self._logger.verbose("result: %s", res) 120 self._logger.notice("transcribe_url succeeded") 121 self._logger.debug("ListenRESTClient.transcribe_url LEAVE") 122 return res 123 124 def transcribe_url_callback( 125 self, 126 source: UrlSource, 127 callback: str, 128 options: Optional[Union[Dict, ListenRESTOptions]] = None, 129 addons: Optional[Dict] = None, 130 headers: Optional[Dict] = None, 131 timeout: Optional[httpx.Timeout] = None, 132 endpoint: str = "v1/listen", 133 **kwargs, 134 ) -> AsyncPrerecordedResponse: 135 """ 136 Transcribes audio from a URL source and sends the result to a callback URL. 137 138 Args: 139 source (UrlSource): The URL source of the audio to transcribe. 140 callback (str): The callback URL where the transcription results will be sent. 141 options (ListenRESTOptions): Additional options for the transcription (default is None). 142 endpoint (str): The API endpoint for the transcription (default is "v1/listen"). 143 144 Returns: 145 AsyncPrerecordedResponse: An object containing the request_id or an error message. 146 147 Raises: 148 DeepgramTypeError: Raised for known API errors. 149 """ 150 self._logger.debug("ListenRESTClient.transcribe_url_callback ENTER") 151 152 url = f"{self._config.url}/{endpoint}" 153 if options is None: 154 options = {} 155 if isinstance(options, ListenRESTOptions): 156 options.callback = callback 157 else: 158 options["callback"] = callback 159 if is_url_source(source): 160 body = source 161 else: 162 self._logger.error("Unknown transcription source type") 163 self._logger.debug("ListenRESTClient.transcribe_url_callback LEAVE") 164 raise DeepgramTypeError("Unknown transcription source type") 165 166 if isinstance(options, ListenRESTOptions) and not options.check(): 167 self._logger.error("options.check failed") 168 self._logger.debug("ListenRESTClient.transcribe_url_callback LEAVE") 169 raise DeepgramError("Fatal transcription options error") 170 171 self._logger.info("url: %s", url) 172 self._logger.info("source: %s", source) 173 if isinstance(options, ListenRESTOptions): 174 self._logger.info("ListenRESTOptions switching class -> dict") 175 options = options.to_dict() 176 self._logger.info("options: %s", options) 177 self._logger.info("addons: %s", addons) 178 self._logger.info("headers: %s", headers) 179 result = self.post( 180 url, 181 options=options, 182 addons=addons, 183 headers=headers, 184 json=body, 185 timeout=timeout, 186 **kwargs, 187 ) 188 self._logger.info("json: %s", result) 189 res = AsyncPrerecordedResponse.from_json(result) 190 self._logger.verbose("result: %s", res) 191 self._logger.notice("transcribe_url_callback succeeded") 192 self._logger.debug("ListenRESTClient.transcribe_url_callback LEAVE") 193 return res 194 195 def transcribe_file( 196 self, 197 source: FileSource, 198 options: Optional[Union[Dict, ListenRESTOptions]] = None, 199 addons: Optional[Dict] = None, 200 headers: Optional[Dict] = None, 201 timeout: Optional[httpx.Timeout] = None, 202 endpoint: str = "v1/listen", 203 **kwargs, 204 ) -> Union[AsyncPrerecordedResponse, PrerecordedResponse]: 205 """ 206 Transcribes audio from a local file source. 207 208 Args: 209 source (FileSource): The local file source of the audio to transcribe. 210 options (ListenRESTOptions): Additional options for the transcription (default is None). 211 endpoint (str): The API endpoint for the transcription (default is "v1/listen"). 212 213 Returns: 214 PrerecordedResponse: An object containing the transcription result or an error message. 215 216 Raises: 217 DeepgramTypeError: Raised for known API errors. 218 """ 219 self._logger.debug("ListenRESTClient.transcribe_file ENTER") 220 221 if ( 222 isinstance(options, dict) 223 and "callback" in options 224 and options["callback"] is not None 225 ) or (isinstance(options, ListenRESTOptions) and options.callback is not None): 226 self._logger.debug("ListenRESTClient.transcribe_file LEAVE") 227 return self.transcribe_file_callback( 228 source, 229 callback=options["callback"], 230 options=options, 231 addons=addons, 232 headers=headers, 233 timeout=timeout, 234 endpoint=endpoint, 235 **kwargs, 236 ) 237 238 url = f"{self._config.url}/{endpoint}" 239 240 if is_buffer_source(source): 241 body = source["buffer"] # type: ignore 242 elif is_readstream_source(source): 243 body = source["stream"] # type: ignore 244 else: 245 self._logger.error("Unknown transcription source type") 246 self._logger.debug("ListenRESTClient.transcribe_file LEAVE") 247 raise DeepgramTypeError("Unknown transcription source type") 248 249 if isinstance(options, ListenRESTOptions) and not options.check(): 250 self._logger.error("options.check failed") 251 self._logger.debug("ListenRESTClient.transcribe_file LEAVE") 252 raise DeepgramError("Fatal transcription options error") 253 254 self._logger.info("url: %s", url) 255 if isinstance(options, ListenRESTOptions): 256 self._logger.info("ListenRESTOptions switching class -> dict") 257 options = options.to_dict() 258 self._logger.info("options: %s", options) 259 self._logger.info("addons: %s", addons) 260 self._logger.info("headers: %s", headers) 261 result = self.post( 262 url, 263 options=options, 264 addons=addons, 265 headers=headers, 266 content=body, 267 timeout=timeout, 268 **kwargs, 269 ) 270 self._logger.info("json: %s", result) 271 res = PrerecordedResponse.from_json(result) 272 self._logger.verbose("result: %s", res) 273 self._logger.notice("transcribe_file succeeded") 274 self._logger.debug("ListenRESTClient.transcribe_file LEAVE") 275 return res 276 277 def transcribe_file_callback( 278 self, 279 source: FileSource, 280 callback: str, 281 options: Optional[Union[Dict, ListenRESTOptions]] = None, 282 addons: Optional[Dict] = None, 283 headers: Optional[Dict] = None, 284 timeout: Optional[httpx.Timeout] = None, 285 endpoint: str = "v1/listen", 286 **kwargs, 287 ) -> AsyncPrerecordedResponse: 288 """ 289 Transcribes audio from a local file source and sends the result to a callback URL. 290 291 Args: 292 source (FileSource): The local file source of the audio to transcribe. 293 callback (str): The callback URL where the transcription results will be sent. 294 options (ListenRESTOptions): Additional options for the transcription (default is None). 295 endpoint (str): The API endpoint for the transcription (default is "v1/listen"). 296 297 Returns: 298 AsyncPrerecordedResponse: An object containing the request_id or an error message. 299 300 Raises: 301 DeepgramTypeError: Raised for known API errors. 302 """ 303 self._logger.debug("ListenRESTClient.transcribe_file_callback ENTER") 304 305 url = f"{self._config.url}/{endpoint}" 306 if options is None: 307 options = {} 308 if isinstance(options, ListenRESTOptions): 309 options.callback = callback 310 else: 311 options["callback"] = callback 312 if is_buffer_source(source): 313 body = source["buffer"] # type: ignore 314 elif is_readstream_source(source): 315 body = source["stream"] # type: ignore 316 else: 317 self._logger.error("Unknown transcription source type") 318 self._logger.debug("ListenRESTClient.transcribe_file_callback LEAVE") 319 raise DeepgramTypeError("Unknown transcription source type") 320 321 if isinstance(options, ListenRESTOptions) and not options.check(): 322 self._logger.error("options.check failed") 323 self._logger.debug("ListenRESTClient.transcribe_file_callback LEAVE") 324 raise DeepgramError("Fatal transcription options error") 325 326 self._logger.info("url: %s", url) 327 if isinstance(options, ListenRESTOptions): 328 self._logger.info("ListenRESTOptions switching class -> dict") 329 options = options.to_dict() 330 self._logger.info("options: %s", options) 331 self._logger.info("addons: %s", addons) 332 self._logger.info("headers: %s", headers) 333 result = self.post( 334 url, 335 options=options, 336 addons=addons, 337 headers=headers, 338 content=body, 339 timeout=timeout, 340 **kwargs, 341 ) 342 self._logger.info("json: %s", result) 343 res = AsyncPrerecordedResponse.from_json(result) 344 self._logger.verbose("result: %s", res) 345 self._logger.notice("transcribe_file_callback succeeded") 346 self._logger.debug("ListenRESTClient.transcribe_file_callback LEAVE") 347 return res 348 349 # pylint: enable=too-many-positional-arguments
A client class for handling pre-recorded audio data. Provides methods for transcribing audio from URLs and files.
44 def transcribe_url( 45 self, 46 source: UrlSource, 47 options: Optional[Union[Dict, ListenRESTOptions]] = None, 48 addons: Optional[Dict] = None, 49 headers: Optional[Dict] = None, 50 timeout: Optional[httpx.Timeout] = None, 51 endpoint: str = "v1/listen", 52 **kwargs, 53 ) -> Union[AsyncPrerecordedResponse, PrerecordedResponse]: 54 """ 55 Transcribes audio from a URL source. 56 57 Args: 58 source (UrlSource): The URL source of the audio to transcribe. 59 options (ListenRESTOptions): Additional options for the transcription (default is None). 60 endpoint (str): The API endpoint for the transcription (default is "v1/listen"). 61 62 Returns: 63 PrerecordedResponse: An object containing the transcription result. 64 65 Raises: 66 DeepgramTypeError: Raised for known API errors. 67 """ 68 self._logger.debug("ListenRESTClient.transcribe_url ENTER") 69 70 if ( 71 isinstance(options, dict) 72 and "callback" in options 73 and options["callback"] is not None 74 ) or (isinstance(options, ListenRESTOptions) and options.callback is not None): 75 self._logger.debug("ListenRESTClient.transcribe_url LEAVE") 76 return self.transcribe_url_callback( 77 source, 78 callback=options["callback"], 79 options=options, 80 addons=addons, 81 headers=headers, 82 timeout=timeout, 83 endpoint=endpoint, 84 **kwargs, 85 ) 86 87 url = f"{self._config.url}/{endpoint}" 88 if is_url_source(source): 89 body = source 90 else: 91 self._logger.error("Unknown transcription source type") 92 self._logger.debug("ListenRESTClient.transcribe_url LEAVE") 93 raise DeepgramTypeError("Unknown transcription source type") 94 95 if isinstance(options, ListenRESTOptions) and not options.check(): 96 self._logger.error("options.check failed") 97 self._logger.debug("ListenRESTClient.transcribe_url LEAVE") 98 raise DeepgramError("Fatal transcription options error") 99 100 self._logger.info("url: %s", url) 101 self._logger.info("source: %s", source) 102 if isinstance(options, ListenRESTOptions): 103 self._logger.info("ListenRESTOptions switching class -> dict") 104 options = options.to_dict() 105 self._logger.info("options: %s", options) 106 self._logger.info("addons: %s", addons) 107 self._logger.info("headers: %s", headers) 108 result = self.post( 109 url, 110 options=options, 111 addons=addons, 112 headers=headers, 113 json=body, 114 timeout=timeout, 115 **kwargs, 116 ) 117 self._logger.info("json: %s", result) 118 res = PrerecordedResponse.from_json(result) 119 self._logger.verbose("result: %s", res) 120 self._logger.notice("transcribe_url succeeded") 121 self._logger.debug("ListenRESTClient.transcribe_url LEAVE") 122 return res
Transcribes audio from a URL source.
Args: source (UrlSource): The URL source of the audio to transcribe. options (ListenRESTOptions): Additional options for the transcription (default is None). endpoint (str): The API endpoint for the transcription (default is "v1/listen").
Returns: PrerecordedResponse: An object containing the transcription result.
Raises: DeepgramTypeError: Raised for known API errors.
124 def transcribe_url_callback( 125 self, 126 source: UrlSource, 127 callback: str, 128 options: Optional[Union[Dict, ListenRESTOptions]] = None, 129 addons: Optional[Dict] = None, 130 headers: Optional[Dict] = None, 131 timeout: Optional[httpx.Timeout] = None, 132 endpoint: str = "v1/listen", 133 **kwargs, 134 ) -> AsyncPrerecordedResponse: 135 """ 136 Transcribes audio from a URL source and sends the result to a callback URL. 137 138 Args: 139 source (UrlSource): The URL source of the audio to transcribe. 140 callback (str): The callback URL where the transcription results will be sent. 141 options (ListenRESTOptions): Additional options for the transcription (default is None). 142 endpoint (str): The API endpoint for the transcription (default is "v1/listen"). 143 144 Returns: 145 AsyncPrerecordedResponse: An object containing the request_id or an error message. 146 147 Raises: 148 DeepgramTypeError: Raised for known API errors. 149 """ 150 self._logger.debug("ListenRESTClient.transcribe_url_callback ENTER") 151 152 url = f"{self._config.url}/{endpoint}" 153 if options is None: 154 options = {} 155 if isinstance(options, ListenRESTOptions): 156 options.callback = callback 157 else: 158 options["callback"] = callback 159 if is_url_source(source): 160 body = source 161 else: 162 self._logger.error("Unknown transcription source type") 163 self._logger.debug("ListenRESTClient.transcribe_url_callback LEAVE") 164 raise DeepgramTypeError("Unknown transcription source type") 165 166 if isinstance(options, ListenRESTOptions) and not options.check(): 167 self._logger.error("options.check failed") 168 self._logger.debug("ListenRESTClient.transcribe_url_callback LEAVE") 169 raise DeepgramError("Fatal transcription options error") 170 171 self._logger.info("url: %s", url) 172 self._logger.info("source: %s", source) 173 if isinstance(options, ListenRESTOptions): 174 self._logger.info("ListenRESTOptions switching class -> dict") 175 options = options.to_dict() 176 self._logger.info("options: %s", options) 177 self._logger.info("addons: %s", addons) 178 self._logger.info("headers: %s", headers) 179 result = self.post( 180 url, 181 options=options, 182 addons=addons, 183 headers=headers, 184 json=body, 185 timeout=timeout, 186 **kwargs, 187 ) 188 self._logger.info("json: %s", result) 189 res = AsyncPrerecordedResponse.from_json(result) 190 self._logger.verbose("result: %s", res) 191 self._logger.notice("transcribe_url_callback succeeded") 192 self._logger.debug("ListenRESTClient.transcribe_url_callback LEAVE") 193 return res
Transcribes audio from a URL source and sends the result to a callback URL.
Args: source (UrlSource): The URL source of the audio to transcribe. callback (str): The callback URL where the transcription results will be sent. options (ListenRESTOptions): Additional options for the transcription (default is None). endpoint (str): The API endpoint for the transcription (default is "v1/listen").
Returns: AsyncPrerecordedResponse: An object containing the request_id or an error message.
Raises: DeepgramTypeError: Raised for known API errors.
195 def transcribe_file( 196 self, 197 source: FileSource, 198 options: Optional[Union[Dict, ListenRESTOptions]] = None, 199 addons: Optional[Dict] = None, 200 headers: Optional[Dict] = None, 201 timeout: Optional[httpx.Timeout] = None, 202 endpoint: str = "v1/listen", 203 **kwargs, 204 ) -> Union[AsyncPrerecordedResponse, PrerecordedResponse]: 205 """ 206 Transcribes audio from a local file source. 207 208 Args: 209 source (FileSource): The local file source of the audio to transcribe. 210 options (ListenRESTOptions): Additional options for the transcription (default is None). 211 endpoint (str): The API endpoint for the transcription (default is "v1/listen"). 212 213 Returns: 214 PrerecordedResponse: An object containing the transcription result or an error message. 215 216 Raises: 217 DeepgramTypeError: Raised for known API errors. 218 """ 219 self._logger.debug("ListenRESTClient.transcribe_file ENTER") 220 221 if ( 222 isinstance(options, dict) 223 and "callback" in options 224 and options["callback"] is not None 225 ) or (isinstance(options, ListenRESTOptions) and options.callback is not None): 226 self._logger.debug("ListenRESTClient.transcribe_file LEAVE") 227 return self.transcribe_file_callback( 228 source, 229 callback=options["callback"], 230 options=options, 231 addons=addons, 232 headers=headers, 233 timeout=timeout, 234 endpoint=endpoint, 235 **kwargs, 236 ) 237 238 url = f"{self._config.url}/{endpoint}" 239 240 if is_buffer_source(source): 241 body = source["buffer"] # type: ignore 242 elif is_readstream_source(source): 243 body = source["stream"] # type: ignore 244 else: 245 self._logger.error("Unknown transcription source type") 246 self._logger.debug("ListenRESTClient.transcribe_file LEAVE") 247 raise DeepgramTypeError("Unknown transcription source type") 248 249 if isinstance(options, ListenRESTOptions) and not options.check(): 250 self._logger.error("options.check failed") 251 self._logger.debug("ListenRESTClient.transcribe_file LEAVE") 252 raise DeepgramError("Fatal transcription options error") 253 254 self._logger.info("url: %s", url) 255 if isinstance(options, ListenRESTOptions): 256 self._logger.info("ListenRESTOptions switching class -> dict") 257 options = options.to_dict() 258 self._logger.info("options: %s", options) 259 self._logger.info("addons: %s", addons) 260 self._logger.info("headers: %s", headers) 261 result = self.post( 262 url, 263 options=options, 264 addons=addons, 265 headers=headers, 266 content=body, 267 timeout=timeout, 268 **kwargs, 269 ) 270 self._logger.info("json: %s", result) 271 res = PrerecordedResponse.from_json(result) 272 self._logger.verbose("result: %s", res) 273 self._logger.notice("transcribe_file succeeded") 274 self._logger.debug("ListenRESTClient.transcribe_file LEAVE") 275 return res
Transcribes audio from a local file source.
Args: source (FileSource): The local file source of the audio to transcribe. options (ListenRESTOptions): Additional options for the transcription (default is None). endpoint (str): The API endpoint for the transcription (default is "v1/listen").
Returns: PrerecordedResponse: An object containing the transcription result or an error message.
Raises: DeepgramTypeError: Raised for known API errors.
277 def transcribe_file_callback( 278 self, 279 source: FileSource, 280 callback: str, 281 options: Optional[Union[Dict, ListenRESTOptions]] = None, 282 addons: Optional[Dict] = None, 283 headers: Optional[Dict] = None, 284 timeout: Optional[httpx.Timeout] = None, 285 endpoint: str = "v1/listen", 286 **kwargs, 287 ) -> AsyncPrerecordedResponse: 288 """ 289 Transcribes audio from a local file source and sends the result to a callback URL. 290 291 Args: 292 source (FileSource): The local file source of the audio to transcribe. 293 callback (str): The callback URL where the transcription results will be sent. 294 options (ListenRESTOptions): Additional options for the transcription (default is None). 295 endpoint (str): The API endpoint for the transcription (default is "v1/listen"). 296 297 Returns: 298 AsyncPrerecordedResponse: An object containing the request_id or an error message. 299 300 Raises: 301 DeepgramTypeError: Raised for known API errors. 302 """ 303 self._logger.debug("ListenRESTClient.transcribe_file_callback ENTER") 304 305 url = f"{self._config.url}/{endpoint}" 306 if options is None: 307 options = {} 308 if isinstance(options, ListenRESTOptions): 309 options.callback = callback 310 else: 311 options["callback"] = callback 312 if is_buffer_source(source): 313 body = source["buffer"] # type: ignore 314 elif is_readstream_source(source): 315 body = source["stream"] # type: ignore 316 else: 317 self._logger.error("Unknown transcription source type") 318 self._logger.debug("ListenRESTClient.transcribe_file_callback LEAVE") 319 raise DeepgramTypeError("Unknown transcription source type") 320 321 if isinstance(options, ListenRESTOptions) and not options.check(): 322 self._logger.error("options.check failed") 323 self._logger.debug("ListenRESTClient.transcribe_file_callback LEAVE") 324 raise DeepgramError("Fatal transcription options error") 325 326 self._logger.info("url: %s", url) 327 if isinstance(options, ListenRESTOptions): 328 self._logger.info("ListenRESTOptions switching class -> dict") 329 options = options.to_dict() 330 self._logger.info("options: %s", options) 331 self._logger.info("addons: %s", addons) 332 self._logger.info("headers: %s", headers) 333 result = self.post( 334 url, 335 options=options, 336 addons=addons, 337 headers=headers, 338 content=body, 339 timeout=timeout, 340 **kwargs, 341 ) 342 self._logger.info("json: %s", result) 343 res = AsyncPrerecordedResponse.from_json(result) 344 self._logger.verbose("result: %s", res) 345 self._logger.notice("transcribe_file_callback succeeded") 346 self._logger.debug("ListenRESTClient.transcribe_file_callback LEAVE") 347 return res
Transcribes audio from a local file source and sends the result to a callback URL.
Args: source (FileSource): The local file source of the audio to transcribe. callback (str): The callback URL where the transcription results will be sent. options (ListenRESTOptions): Additional options for the transcription (default is None). endpoint (str): The API endpoint for the transcription (default is "v1/listen").
Returns: AsyncPrerecordedResponse: An object containing the request_id or an error message.
Raises: DeepgramTypeError: Raised for known API errors.
26class AsyncListenRESTClient(AbstractAsyncRestClient): 27 """ 28 A client class for handling pre-recorded audio data. 29 Provides methods for transcribing audio from URLs and files. 30 """ 31 32 _logger: verboselogs.VerboseLogger 33 _config: DeepgramClientOptions 34 35 def __init__(self, config: DeepgramClientOptions): 36 self._logger = verboselogs.VerboseLogger(__name__) 37 self._logger.addHandler(logging.StreamHandler()) 38 self._logger.setLevel(config.verbose) 39 self._config = config 40 super().__init__(config) 41 42 # pylint: disable=too-many-positional-arguments 43 44 async def transcribe_url( 45 self, 46 source: UrlSource, 47 options: Optional[Union[Dict, ListenRESTOptions]] = None, 48 addons: Optional[Dict] = None, 49 headers: Optional[Dict] = None, 50 timeout: Optional[httpx.Timeout] = None, 51 endpoint: str = "v1/listen", 52 **kwargs, 53 ) -> Union[AsyncPrerecordedResponse, PrerecordedResponse]: 54 """ 55 Transcribes audio from a URL source. 56 57 Args: 58 source (UrlSource): The URL source of the audio to transcribe. 59 options (ListenRESTOptions): Additional options for the transcription (default is None). 60 endpoint (str): The API endpoint for the transcription (default is "v1/listen"). 61 62 Returns: 63 PrerecordedResponse: An object containing the transcription result. 64 65 Raises: 66 DeepgramTypeError: Raised for known API errors. 67 """ 68 self._logger.debug("ListenRESTClient.transcribe_url ENTER") 69 70 if ( 71 isinstance(options, dict) 72 and "callback" in options 73 and options["callback"] is not None 74 ) or (isinstance(options, ListenRESTOptions) and options.callback is not None): 75 self._logger.debug("ListenRESTClient.transcribe_url LEAVE") 76 return await self.transcribe_url_callback( 77 source, 78 callback=options["callback"], 79 options=options, 80 addons=addons, 81 headers=headers, 82 timeout=timeout, 83 endpoint=endpoint, 84 **kwargs, 85 ) 86 87 url = f"{self._config.url}/{endpoint}" 88 if is_url_source(source): 89 body = source 90 else: 91 self._logger.error("Unknown transcription source type") 92 self._logger.debug("ListenRESTClient.transcribe_url LEAVE") 93 raise DeepgramTypeError("Unknown transcription source type") 94 95 if isinstance(options, ListenRESTOptions) and not options.check(): 96 self._logger.error("options.check failed") 97 self._logger.debug("ListenRESTClient.transcribe_url LEAVE") 98 raise DeepgramError("Fatal transcription options error") 99 100 self._logger.info("url: %s", url) 101 self._logger.info("source: %s", source) 102 if isinstance(options, ListenRESTOptions): 103 self._logger.info("ListenRESTOptions switching class -> dict") 104 options = options.to_dict() 105 self._logger.info("options: %s", options) 106 self._logger.info("addons: %s", addons) 107 self._logger.info("headers: %s", headers) 108 result = await self.post( 109 url, 110 options=options, 111 addons=addons, 112 headers=headers, 113 json=body, 114 timeout=timeout, 115 **kwargs, 116 ) 117 self._logger.info("json: %s", result) 118 res = PrerecordedResponse.from_json(result) 119 self._logger.verbose("result: %s", res) 120 self._logger.notice("transcribe_url succeeded") 121 self._logger.debug("ListenRESTClient.transcribe_url LEAVE") 122 return res 123 124 async def transcribe_url_callback( 125 self, 126 source: UrlSource, 127 callback: str, 128 options: Optional[Union[Dict, ListenRESTOptions]] = None, 129 addons: Optional[Dict] = None, 130 headers: Optional[Dict] = None, 131 timeout: Optional[httpx.Timeout] = None, 132 endpoint: str = "v1/listen", 133 **kwargs, 134 ) -> AsyncPrerecordedResponse: 135 """ 136 Transcribes audio from a URL source and sends the result to a callback URL. 137 138 Args: 139 source (UrlSource): The URL source of the audio to transcribe. 140 callback (str): The callback URL where the transcription results will be sent. 141 options (ListenRESTOptions): Additional options for the transcription (default is None). 142 endpoint (str): The API endpoint for the transcription (default is "v1/listen"). 143 144 Returns: 145 AsyncPrerecordedResponse: An object containing the request_id or an error message. 146 147 Raises: 148 DeepgramTypeError: Raised for known API errors. 149 """ 150 self._logger.debug("ListenRESTClient.transcribe_url_callback ENTER") 151 152 url = f"{self._config.url}/{endpoint}" 153 if options is None: 154 options = {} 155 if isinstance(options, ListenRESTOptions): 156 options.callback = callback 157 else: 158 options["callback"] = callback 159 if is_url_source(source): 160 body = source 161 else: 162 self._logger.error("Unknown transcription source type") 163 self._logger.debug("ListenRESTClient.transcribe_url_callback LEAVE") 164 raise DeepgramTypeError("Unknown transcription source type") 165 166 if isinstance(options, ListenRESTOptions) and not options.check(): 167 self._logger.error("options.check failed") 168 self._logger.debug("ListenRESTClient.transcribe_url_callback LEAVE") 169 raise DeepgramError("Fatal transcription options error") 170 171 self._logger.info("url: %s", url) 172 self._logger.info("source: %s", source) 173 if isinstance(options, ListenRESTOptions): 174 self._logger.info("ListenRESTOptions switching class -> dict") 175 options = options.to_dict() 176 self._logger.info("options: %s", options) 177 self._logger.info("addons: %s", addons) 178 self._logger.info("headers: %s", headers) 179 result = await self.post( 180 url, 181 options=options, 182 addons=addons, 183 headers=headers, 184 json=body, 185 timeout=timeout, 186 **kwargs, 187 ) 188 self._logger.info("json: %s", result) 189 res = AsyncPrerecordedResponse.from_json(result) 190 self._logger.verbose("result: %s", res) 191 self._logger.notice("transcribe_url_callback succeeded") 192 self._logger.debug("ListenRESTClient.transcribe_url_callback LEAVE") 193 return res 194 195 async def transcribe_file( 196 self, 197 source: FileSource, 198 options: Optional[Union[Dict, ListenRESTOptions]] = None, 199 addons: Optional[Dict] = None, 200 headers: Optional[Dict] = None, 201 timeout: Optional[httpx.Timeout] = None, 202 endpoint: str = "v1/listen", 203 **kwargs, 204 ) -> Union[AsyncPrerecordedResponse, PrerecordedResponse]: 205 """ 206 Transcribes audio from a local file source. 207 208 Args: 209 source (FileSource): The local file source of the audio to transcribe. 210 options (ListenRESTOptions): Additional options for the transcription (default is None). 211 endpoint (str): The API endpoint for the transcription (default is "v1/listen"). 212 213 Returns: 214 PrerecordedResponse: An object containing the transcription result or an error message. 215 216 Raises: 217 DeepgramTypeError: Raised for known API errors. 218 """ 219 self._logger.debug("ListenRESTClient.transcribe_file ENTER") 220 221 if ( 222 isinstance(options, dict) 223 and "callback" in options 224 and options["callback"] is not None 225 ) or (isinstance(options, ListenRESTOptions) and options.callback is not None): 226 self._logger.debug("ListenRESTClient.transcribe_file LEAVE") 227 return await self.transcribe_file_callback( 228 source, 229 callback=options["callback"], 230 options=options, 231 addons=addons, 232 headers=headers, 233 timeout=timeout, 234 endpoint=endpoint, 235 **kwargs, 236 ) 237 238 url = f"{self._config.url}/{endpoint}" 239 if is_buffer_source(source): 240 body = source["buffer"] # type: ignore 241 elif is_readstream_source(source): 242 body = source["stream"] # type: ignore 243 else: 244 self._logger.error("Unknown transcription source type") 245 self._logger.debug("ListenRESTClient.transcribe_file LEAVE") 246 raise DeepgramTypeError("Unknown transcription source type") 247 248 if isinstance(options, ListenRESTOptions) and not options.check(): 249 self._logger.error("options.check failed") 250 self._logger.debug("ListenRESTClient.transcribe_file LEAVE") 251 raise DeepgramError("Fatal transcription options error") 252 253 self._logger.info("url: %s", url) 254 if isinstance(options, ListenRESTOptions): 255 self._logger.info("ListenRESTOptions switching class -> dict") 256 options = options.to_dict() 257 self._logger.info("options: %s", options) 258 self._logger.info("addons: %s", addons) 259 self._logger.info("headers: %s", headers) 260 result = await self.post( 261 url, 262 options=options, 263 addons=addons, 264 headers=headers, 265 content=body, 266 timeout=timeout, 267 **kwargs, 268 ) 269 self._logger.info("json: %s", result) 270 res = PrerecordedResponse.from_json(result) 271 self._logger.verbose("result: %s", res) 272 self._logger.notice("transcribe_file succeeded") 273 self._logger.debug("ListenRESTClient.transcribe_file LEAVE") 274 return res 275 276 async def transcribe_file_callback( 277 self, 278 source: FileSource, 279 callback: str, 280 options: Optional[Union[Dict, ListenRESTOptions]] = None, 281 addons: Optional[Dict] = None, 282 headers: Optional[Dict] = None, 283 timeout: Optional[httpx.Timeout] = None, 284 endpoint: str = "v1/listen", 285 **kwargs, 286 ) -> AsyncPrerecordedResponse: 287 """ 288 Transcribes audio from a local file source and sends the result to a callback URL. 289 290 Args: 291 source (FileSource): The local file source of the audio to transcribe. 292 callback (str): The callback URL where the transcription results will be sent. 293 options (ListenRESTOptions): Additional options for the transcription (default is None). 294 endpoint (str): The API endpoint for the transcription (default is "v1/listen"). 295 296 Returns: 297 AsyncPrerecordedResponse: An object containing the request_id or an error message. 298 299 Raises: 300 DeepgramTypeError: Raised for known API errors. 301 """ 302 self._logger.debug("ListenRESTClient.transcribe_file_callback ENTER") 303 304 url = f"{self._config.url}/{endpoint}" 305 if options is None: 306 options = {} 307 if isinstance(options, ListenRESTOptions): 308 options.callback = callback 309 else: 310 options["callback"] = callback 311 if is_buffer_source(source): 312 body = source["buffer"] # type: ignore 313 elif is_readstream_source(source): 314 body = source["stream"] # type: ignore 315 else: 316 self._logger.error("Unknown transcription source type") 317 self._logger.debug("ListenRESTClient.transcribe_file_callback LEAVE") 318 raise DeepgramTypeError("Unknown transcription source type") 319 320 if isinstance(options, ListenRESTOptions) and not options.check(): 321 self._logger.error("options.check failed") 322 self._logger.debug("ListenRESTClient.transcribe_file_callback LEAVE") 323 raise DeepgramError("Fatal transcription options error") 324 325 self._logger.info("url: %s", url) 326 if isinstance(options, ListenRESTOptions): 327 self._logger.info("ListenRESTOptions switching class -> dict") 328 options = options.to_dict() 329 self._logger.info("options: %s", options) 330 self._logger.info("addons: %s", addons) 331 self._logger.info("headers: %s", headers) 332 result = await self.post( 333 url, 334 options=options, 335 addons=addons, 336 headers=headers, 337 content=body, 338 timeout=timeout, 339 **kwargs, 340 ) 341 self._logger.info("json: %s", result) 342 res = AsyncPrerecordedResponse.from_json(result) 343 self._logger.verbose("result: %s", res) 344 self._logger.notice("transcribe_file_callback succeeded") 345 self._logger.debug("ListenRESTClient.transcribe_file_callback LEAVE") 346 return res 347 348 # pylint: enable=too-many-positional-arguments
A client class for handling pre-recorded audio data. Provides methods for transcribing audio from URLs and files.
44 async def transcribe_url( 45 self, 46 source: UrlSource, 47 options: Optional[Union[Dict, ListenRESTOptions]] = None, 48 addons: Optional[Dict] = None, 49 headers: Optional[Dict] = None, 50 timeout: Optional[httpx.Timeout] = None, 51 endpoint: str = "v1/listen", 52 **kwargs, 53 ) -> Union[AsyncPrerecordedResponse, PrerecordedResponse]: 54 """ 55 Transcribes audio from a URL source. 56 57 Args: 58 source (UrlSource): The URL source of the audio to transcribe. 59 options (ListenRESTOptions): Additional options for the transcription (default is None). 60 endpoint (str): The API endpoint for the transcription (default is "v1/listen"). 61 62 Returns: 63 PrerecordedResponse: An object containing the transcription result. 64 65 Raises: 66 DeepgramTypeError: Raised for known API errors. 67 """ 68 self._logger.debug("ListenRESTClient.transcribe_url ENTER") 69 70 if ( 71 isinstance(options, dict) 72 and "callback" in options 73 and options["callback"] is not None 74 ) or (isinstance(options, ListenRESTOptions) and options.callback is not None): 75 self._logger.debug("ListenRESTClient.transcribe_url LEAVE") 76 return await self.transcribe_url_callback( 77 source, 78 callback=options["callback"], 79 options=options, 80 addons=addons, 81 headers=headers, 82 timeout=timeout, 83 endpoint=endpoint, 84 **kwargs, 85 ) 86 87 url = f"{self._config.url}/{endpoint}" 88 if is_url_source(source): 89 body = source 90 else: 91 self._logger.error("Unknown transcription source type") 92 self._logger.debug("ListenRESTClient.transcribe_url LEAVE") 93 raise DeepgramTypeError("Unknown transcription source type") 94 95 if isinstance(options, ListenRESTOptions) and not options.check(): 96 self._logger.error("options.check failed") 97 self._logger.debug("ListenRESTClient.transcribe_url LEAVE") 98 raise DeepgramError("Fatal transcription options error") 99 100 self._logger.info("url: %s", url) 101 self._logger.info("source: %s", source) 102 if isinstance(options, ListenRESTOptions): 103 self._logger.info("ListenRESTOptions switching class -> dict") 104 options = options.to_dict() 105 self._logger.info("options: %s", options) 106 self._logger.info("addons: %s", addons) 107 self._logger.info("headers: %s", headers) 108 result = await self.post( 109 url, 110 options=options, 111 addons=addons, 112 headers=headers, 113 json=body, 114 timeout=timeout, 115 **kwargs, 116 ) 117 self._logger.info("json: %s", result) 118 res = PrerecordedResponse.from_json(result) 119 self._logger.verbose("result: %s", res) 120 self._logger.notice("transcribe_url succeeded") 121 self._logger.debug("ListenRESTClient.transcribe_url LEAVE") 122 return res
Transcribes audio from a URL source.
Args: source (UrlSource): The URL source of the audio to transcribe. options (ListenRESTOptions): Additional options for the transcription (default is None). endpoint (str): The API endpoint for the transcription (default is "v1/listen").
Returns: PrerecordedResponse: An object containing the transcription result.
Raises: DeepgramTypeError: Raised for known API errors.
124 async def transcribe_url_callback( 125 self, 126 source: UrlSource, 127 callback: str, 128 options: Optional[Union[Dict, ListenRESTOptions]] = None, 129 addons: Optional[Dict] = None, 130 headers: Optional[Dict] = None, 131 timeout: Optional[httpx.Timeout] = None, 132 endpoint: str = "v1/listen", 133 **kwargs, 134 ) -> AsyncPrerecordedResponse: 135 """ 136 Transcribes audio from a URL source and sends the result to a callback URL. 137 138 Args: 139 source (UrlSource): The URL source of the audio to transcribe. 140 callback (str): The callback URL where the transcription results will be sent. 141 options (ListenRESTOptions): Additional options for the transcription (default is None). 142 endpoint (str): The API endpoint for the transcription (default is "v1/listen"). 143 144 Returns: 145 AsyncPrerecordedResponse: An object containing the request_id or an error message. 146 147 Raises: 148 DeepgramTypeError: Raised for known API errors. 149 """ 150 self._logger.debug("ListenRESTClient.transcribe_url_callback ENTER") 151 152 url = f"{self._config.url}/{endpoint}" 153 if options is None: 154 options = {} 155 if isinstance(options, ListenRESTOptions): 156 options.callback = callback 157 else: 158 options["callback"] = callback 159 if is_url_source(source): 160 body = source 161 else: 162 self._logger.error("Unknown transcription source type") 163 self._logger.debug("ListenRESTClient.transcribe_url_callback LEAVE") 164 raise DeepgramTypeError("Unknown transcription source type") 165 166 if isinstance(options, ListenRESTOptions) and not options.check(): 167 self._logger.error("options.check failed") 168 self._logger.debug("ListenRESTClient.transcribe_url_callback LEAVE") 169 raise DeepgramError("Fatal transcription options error") 170 171 self._logger.info("url: %s", url) 172 self._logger.info("source: %s", source) 173 if isinstance(options, ListenRESTOptions): 174 self._logger.info("ListenRESTOptions switching class -> dict") 175 options = options.to_dict() 176 self._logger.info("options: %s", options) 177 self._logger.info("addons: %s", addons) 178 self._logger.info("headers: %s", headers) 179 result = await self.post( 180 url, 181 options=options, 182 addons=addons, 183 headers=headers, 184 json=body, 185 timeout=timeout, 186 **kwargs, 187 ) 188 self._logger.info("json: %s", result) 189 res = AsyncPrerecordedResponse.from_json(result) 190 self._logger.verbose("result: %s", res) 191 self._logger.notice("transcribe_url_callback succeeded") 192 self._logger.debug("ListenRESTClient.transcribe_url_callback LEAVE") 193 return res
Transcribes audio from a URL source and sends the result to a callback URL.
Args: source (UrlSource): The URL source of the audio to transcribe. callback (str): The callback URL where the transcription results will be sent. options (ListenRESTOptions): Additional options for the transcription (default is None). endpoint (str): The API endpoint for the transcription (default is "v1/listen").
Returns: AsyncPrerecordedResponse: An object containing the request_id or an error message.
Raises: DeepgramTypeError: Raised for known API errors.
195 async def transcribe_file( 196 self, 197 source: FileSource, 198 options: Optional[Union[Dict, ListenRESTOptions]] = None, 199 addons: Optional[Dict] = None, 200 headers: Optional[Dict] = None, 201 timeout: Optional[httpx.Timeout] = None, 202 endpoint: str = "v1/listen", 203 **kwargs, 204 ) -> Union[AsyncPrerecordedResponse, PrerecordedResponse]: 205 """ 206 Transcribes audio from a local file source. 207 208 Args: 209 source (FileSource): The local file source of the audio to transcribe. 210 options (ListenRESTOptions): Additional options for the transcription (default is None). 211 endpoint (str): The API endpoint for the transcription (default is "v1/listen"). 212 213 Returns: 214 PrerecordedResponse: An object containing the transcription result or an error message. 215 216 Raises: 217 DeepgramTypeError: Raised for known API errors. 218 """ 219 self._logger.debug("ListenRESTClient.transcribe_file ENTER") 220 221 if ( 222 isinstance(options, dict) 223 and "callback" in options 224 and options["callback"] is not None 225 ) or (isinstance(options, ListenRESTOptions) and options.callback is not None): 226 self._logger.debug("ListenRESTClient.transcribe_file LEAVE") 227 return await self.transcribe_file_callback( 228 source, 229 callback=options["callback"], 230 options=options, 231 addons=addons, 232 headers=headers, 233 timeout=timeout, 234 endpoint=endpoint, 235 **kwargs, 236 ) 237 238 url = f"{self._config.url}/{endpoint}" 239 if is_buffer_source(source): 240 body = source["buffer"] # type: ignore 241 elif is_readstream_source(source): 242 body = source["stream"] # type: ignore 243 else: 244 self._logger.error("Unknown transcription source type") 245 self._logger.debug("ListenRESTClient.transcribe_file LEAVE") 246 raise DeepgramTypeError("Unknown transcription source type") 247 248 if isinstance(options, ListenRESTOptions) and not options.check(): 249 self._logger.error("options.check failed") 250 self._logger.debug("ListenRESTClient.transcribe_file LEAVE") 251 raise DeepgramError("Fatal transcription options error") 252 253 self._logger.info("url: %s", url) 254 if isinstance(options, ListenRESTOptions): 255 self._logger.info("ListenRESTOptions switching class -> dict") 256 options = options.to_dict() 257 self._logger.info("options: %s", options) 258 self._logger.info("addons: %s", addons) 259 self._logger.info("headers: %s", headers) 260 result = await self.post( 261 url, 262 options=options, 263 addons=addons, 264 headers=headers, 265 content=body, 266 timeout=timeout, 267 **kwargs, 268 ) 269 self._logger.info("json: %s", result) 270 res = PrerecordedResponse.from_json(result) 271 self._logger.verbose("result: %s", res) 272 self._logger.notice("transcribe_file succeeded") 273 self._logger.debug("ListenRESTClient.transcribe_file LEAVE") 274 return res
Transcribes audio from a local file source.
Args: source (FileSource): The local file source of the audio to transcribe. options (ListenRESTOptions): Additional options for the transcription (default is None). endpoint (str): The API endpoint for the transcription (default is "v1/listen").
Returns: PrerecordedResponse: An object containing the transcription result or an error message.
Raises: DeepgramTypeError: Raised for known API errors.
276 async def transcribe_file_callback( 277 self, 278 source: FileSource, 279 callback: str, 280 options: Optional[Union[Dict, ListenRESTOptions]] = None, 281 addons: Optional[Dict] = None, 282 headers: Optional[Dict] = None, 283 timeout: Optional[httpx.Timeout] = None, 284 endpoint: str = "v1/listen", 285 **kwargs, 286 ) -> AsyncPrerecordedResponse: 287 """ 288 Transcribes audio from a local file source and sends the result to a callback URL. 289 290 Args: 291 source (FileSource): The local file source of the audio to transcribe. 292 callback (str): The callback URL where the transcription results will be sent. 293 options (ListenRESTOptions): Additional options for the transcription (default is None). 294 endpoint (str): The API endpoint for the transcription (default is "v1/listen"). 295 296 Returns: 297 AsyncPrerecordedResponse: An object containing the request_id or an error message. 298 299 Raises: 300 DeepgramTypeError: Raised for known API errors. 301 """ 302 self._logger.debug("ListenRESTClient.transcribe_file_callback ENTER") 303 304 url = f"{self._config.url}/{endpoint}" 305 if options is None: 306 options = {} 307 if isinstance(options, ListenRESTOptions): 308 options.callback = callback 309 else: 310 options["callback"] = callback 311 if is_buffer_source(source): 312 body = source["buffer"] # type: ignore 313 elif is_readstream_source(source): 314 body = source["stream"] # type: ignore 315 else: 316 self._logger.error("Unknown transcription source type") 317 self._logger.debug("ListenRESTClient.transcribe_file_callback LEAVE") 318 raise DeepgramTypeError("Unknown transcription source type") 319 320 if isinstance(options, ListenRESTOptions) and not options.check(): 321 self._logger.error("options.check failed") 322 self._logger.debug("ListenRESTClient.transcribe_file_callback LEAVE") 323 raise DeepgramError("Fatal transcription options error") 324 325 self._logger.info("url: %s", url) 326 if isinstance(options, ListenRESTOptions): 327 self._logger.info("ListenRESTOptions switching class -> dict") 328 options = options.to_dict() 329 self._logger.info("options: %s", options) 330 self._logger.info("addons: %s", addons) 331 self._logger.info("headers: %s", headers) 332 result = await self.post( 333 url, 334 options=options, 335 addons=addons, 336 headers=headers, 337 content=body, 338 timeout=timeout, 339 **kwargs, 340 ) 341 self._logger.info("json: %s", result) 342 res = AsyncPrerecordedResponse.from_json(result) 343 self._logger.verbose("result: %s", res) 344 self._logger.notice("transcribe_file_callback succeeded") 345 self._logger.debug("ListenRESTClient.transcribe_file_callback LEAVE") 346 return res
Transcribes audio from a local file source and sends the result to a callback URL.
Args: source (FileSource): The local file source of the audio to transcribe. callback (str): The callback URL where the transcription results will be sent. options (ListenRESTOptions): Additional options for the transcription (default is None). endpoint (str): The API endpoint for the transcription (default is "v1/listen").
Returns: AsyncPrerecordedResponse: An object containing the request_id or an error message.
Raises: DeepgramTypeError: Raised for known API errors.
36class ListenWebSocketClient( 37 AbstractSyncWebSocketClient 38): # pylint: disable=too-many-instance-attributes 39 """ 40 Client for interacting with Deepgram's live transcription services over WebSockets. 41 42 This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events. 43 44 Args: 45 config (DeepgramClientOptions): all the options for the client. 46 thread_cls (Type[threading.Thread]): optional thread class to use for creating threads, 47 defaults to threading.Thread. Useful for custom thread management like ContextVar support. 48 """ 49 50 _logger: verboselogs.VerboseLogger 51 _config: DeepgramClientOptions 52 _endpoint: str 53 54 _lock_flush: threading.Lock 55 _event_handlers: Dict[LiveTranscriptionEvents, list] 56 57 _keep_alive_thread: Union[threading.Thread, None] 58 _flush_thread: Union[threading.Thread, None] 59 _last_datagram: Optional[datetime] = None 60 61 _thread_cls: Type[threading.Thread] 62 63 _kwargs: Optional[Dict] = None 64 _addons: Optional[Dict] = None 65 _options: Optional[Dict] = None 66 _headers: Optional[Dict] = None 67 68 def __init__( 69 self, 70 config: DeepgramClientOptions, 71 thread_cls: Type[threading.Thread] = threading.Thread, 72 ): 73 if config is None: 74 raise DeepgramError("Config is required") 75 76 self._logger = verboselogs.VerboseLogger(__name__) 77 self._logger.addHandler(logging.StreamHandler()) 78 self._logger.setLevel(config.verbose) 79 80 self._config = config 81 self._endpoint = "v1/listen" 82 83 self._flush_thread = None 84 self._keep_alive_thread = None 85 86 # auto flush 87 self._last_datagram = None 88 self._lock_flush = threading.Lock() 89 90 self._thread_cls = thread_cls 91 92 # init handlers 93 self._event_handlers = { 94 event: [] for event in LiveTranscriptionEvents.__members__.values() 95 } 96 97 # call the parent constructor 98 super().__init__( 99 config=self._config, 100 endpoint=self._endpoint, 101 thread_cls=self._thread_cls, 102 ) 103 104 # pylint: disable=too-many-statements,too-many-branches 105 def start( 106 self, 107 options: Optional[Union[ListenWebSocketOptions, Dict]] = None, 108 addons: Optional[Dict] = None, 109 headers: Optional[Dict] = None, 110 members: Optional[Dict] = None, 111 **kwargs, 112 ) -> bool: 113 """ 114 Starts the WebSocket connection for live transcription. 115 """ 116 self._logger.debug("ListenWebSocketClient.start ENTER") 117 self._logger.info("options: %s", options) 118 self._logger.info("addons: %s", addons) 119 self._logger.info("headers: %s", headers) 120 self._logger.info("members: %s", members) 121 self._logger.info("kwargs: %s", kwargs) 122 123 if isinstance(options, ListenWebSocketOptions) and not options.check(): 124 self._logger.error("options.check failed") 125 self._logger.debug("ListenWebSocketClient.start LEAVE") 126 raise DeepgramError("Fatal transcription options error") 127 128 self._addons = addons 129 self._headers = headers 130 131 # add "members" as members of the class 132 if members is not None: 133 self.__dict__.update(members) 134 135 # set kwargs as members of the class 136 if kwargs is not None: 137 self._kwargs = kwargs 138 else: 139 self._kwargs = {} 140 141 if isinstance(options, ListenWebSocketOptions): 142 self._logger.info("ListenWebSocketOptions switching class -> dict") 143 self._options = options.to_dict() 144 elif options is not None: 145 self._options = options 146 else: 147 self._options = {} 148 149 try: 150 # call parent start 151 if ( 152 super().start( 153 self._options, 154 self._addons, 155 self._headers, 156 **dict(cast(Dict[Any, Any], self._kwargs)), 157 ) 158 is False 159 ): 160 self._logger.error("ListenWebSocketClient.start failed") 161 self._logger.debug("ListenWebSocketClient.start LEAVE") 162 return False 163 164 # debug the threads 165 for thread in threading.enumerate(): 166 self._logger.debug("after running thread: %s", thread.name) 167 self._logger.debug("number of active threads: %s", threading.active_count()) 168 169 # keepalive thread 170 if self._config.is_keep_alive_enabled(): 171 self._logger.notice("keepalive is enabled") 172 self._keep_alive_thread = self._thread_cls(target=self._keep_alive) 173 self._keep_alive_thread.start() 174 else: 175 self._logger.notice("keepalive is disabled") 176 177 # flush thread 178 if self._config.is_auto_flush_reply_enabled(): 179 self._logger.notice("autoflush is enabled") 180 self._flush_thread = self._thread_cls(target=self._flush) 181 self._flush_thread.start() 182 else: 183 self._logger.notice("autoflush is disabled") 184 185 # debug the threads 186 for thread in threading.enumerate(): 187 self._logger.debug("after running thread: %s", thread.name) 188 self._logger.debug("number of active threads: %s", threading.active_count()) 189 190 self._logger.notice("start succeeded") 191 self._logger.debug("ListenWebSocketClient.start LEAVE") 192 return True 193 194 except Exception as e: # pylint: disable=broad-except 195 self._logger.error( 196 "WebSocketException in ListenWebSocketClient.start: %s", e 197 ) 198 self._logger.debug("ListenWebSocketClient.start LEAVE") 199 if self._config.options.get("termination_exception_connect") is True: 200 raise e 201 return False 202 203 # pylint: enable=too-many-statements,too-many-branches 204 205 def on( 206 self, event: LiveTranscriptionEvents, handler: Callable 207 ) -> None: # registers event handlers for specific events 208 """ 209 Registers event handlers for specific events. 210 """ 211 self._logger.info("event subscribed: %s", event) 212 if event in LiveTranscriptionEvents.__members__.values() and callable(handler): 213 self._event_handlers[event].append(handler) 214 215 def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None: 216 """ 217 Emits events to the registered event handlers. 218 """ 219 self._logger.debug("ListenWebSocketClient._emit ENTER") 220 self._logger.debug("callback handlers for: %s", event) 221 222 # debug the threads 223 for thread in threading.enumerate(): 224 self._logger.debug("after running thread: %s", thread.name) 225 self._logger.debug("number of active threads: %s", threading.active_count()) 226 227 self._logger.debug("callback handlers for: %s", event) 228 for handler in self._event_handlers[event]: 229 handler(self, *args, **kwargs) 230 231 # debug the threads 232 for thread in threading.enumerate(): 233 self._logger.debug("after running thread: %s", thread.name) 234 self._logger.debug("number of active threads: %s", threading.active_count()) 235 236 self._logger.debug("ListenWebSocketClient._emit LEAVE") 237 238 # pylint: disable=too-many-return-statements,too-many-statements,too-many-locals,too-many-branches 239 def _process_text(self, message: str) -> None: 240 """ 241 Processes messages received over the WebSocket connection. 242 """ 243 self._logger.debug("ListenWebSocketClient._process_text ENTER") 244 245 try: 246 if len(message) == 0: 247 self._logger.debug("message is empty") 248 self._logger.debug("ListenWebSocketClient._process_text LEAVE") 249 return 250 251 data = json.loads(message) 252 response_type = data.get("type") 253 self._logger.debug("response_type: %s, data: %s", response_type, data) 254 255 match response_type: 256 case LiveTranscriptionEvents.Open: 257 open_result: OpenResponse = OpenResponse.from_json(message) 258 self._logger.verbose("OpenResponse: %s", open_result) 259 self._emit( 260 LiveTranscriptionEvents(LiveTranscriptionEvents.Open), 261 open=open_result, 262 **dict(cast(Dict[Any, Any], self._kwargs)), 263 ) 264 case LiveTranscriptionEvents.Transcript: 265 msg_result: LiveResultResponse = LiveResultResponse.from_json( 266 message 267 ) 268 self._logger.verbose("LiveResultResponse: %s", msg_result) 269 270 # auto flush 271 if self._config.is_inspecting_listen(): 272 inspect_res = self._inspect(msg_result) 273 if not inspect_res: 274 self._logger.error("inspect_res failed") 275 276 self._emit( 277 LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript), 278 result=msg_result, 279 **dict(cast(Dict[Any, Any], self._kwargs)), 280 ) 281 case LiveTranscriptionEvents.Metadata: 282 meta_result: MetadataResponse = MetadataResponse.from_json(message) 283 self._logger.verbose("MetadataResponse: %s", meta_result) 284 self._emit( 285 LiveTranscriptionEvents(LiveTranscriptionEvents.Metadata), 286 metadata=meta_result, 287 **dict(cast(Dict[Any, Any], self._kwargs)), 288 ) 289 case LiveTranscriptionEvents.SpeechStarted: 290 ss_result: SpeechStartedResponse = SpeechStartedResponse.from_json( 291 message 292 ) 293 self._logger.verbose("SpeechStartedResponse: %s", ss_result) 294 self._emit( 295 LiveTranscriptionEvents(LiveTranscriptionEvents.SpeechStarted), 296 speech_started=ss_result, 297 **dict(cast(Dict[Any, Any], self._kwargs)), 298 ) 299 case LiveTranscriptionEvents.UtteranceEnd: 300 ue_result: UtteranceEndResponse = UtteranceEndResponse.from_json( 301 message 302 ) 303 self._logger.verbose("UtteranceEndResponse: %s", ue_result) 304 self._emit( 305 LiveTranscriptionEvents(LiveTranscriptionEvents.UtteranceEnd), 306 utterance_end=ue_result, 307 **dict(cast(Dict[Any, Any], self._kwargs)), 308 ) 309 case LiveTranscriptionEvents.Close: 310 close_result: CloseResponse = CloseResponse.from_json(message) 311 self._logger.verbose("CloseResponse: %s", close_result) 312 self._emit( 313 LiveTranscriptionEvents(LiveTranscriptionEvents.Close), 314 close=close_result, 315 **dict(cast(Dict[Any, Any], self._kwargs)), 316 ) 317 case LiveTranscriptionEvents.Error: 318 err_error: ErrorResponse = ErrorResponse.from_json(message) 319 self._logger.verbose("ErrorResponse: %s", err_error) 320 self._emit( 321 LiveTranscriptionEvents(LiveTranscriptionEvents.Error), 322 error=err_error, 323 **dict(cast(Dict[Any, Any], self._kwargs)), 324 ) 325 case _: 326 self._logger.warning( 327 "Unknown Message: response_type: %s, data: %s", 328 response_type, 329 data, 330 ) 331 unhandled_error: UnhandledResponse = UnhandledResponse( 332 type=LiveTranscriptionEvents(LiveTranscriptionEvents.Unhandled), 333 raw=message, 334 ) 335 self._emit( 336 LiveTranscriptionEvents(LiveTranscriptionEvents.Unhandled), 337 unhandled=unhandled_error, 338 **dict(cast(Dict[Any, Any], self._kwargs)), 339 ) 340 341 self._logger.notice("_process_text Succeeded") 342 self._logger.debug("SpeakStreamClient._process_text LEAVE") 343 344 except Exception as e: # pylint: disable=broad-except 345 self._logger.error( 346 "Exception in ListenWebSocketClient._process_text: %s", e 347 ) 348 e_error: ErrorResponse = ErrorResponse( 349 "Exception in ListenWebSocketClient._process_text", 350 f"{e}", 351 "Exception", 352 ) 353 self._logger.error( 354 "Exception in ListenWebSocketClient._process_text: %s", str(e) 355 ) 356 self._emit( 357 LiveTranscriptionEvents(LiveTranscriptionEvents.Error), 358 e_error, 359 **dict(cast(Dict[Any, Any], self._kwargs)), 360 ) 361 362 # signal exit and close 363 super()._signal_exit() 364 365 self._logger.debug("ListenWebSocketClient._process_text LEAVE") 366 367 if self._config.options.get("termination_exception") is True: 368 raise 369 return 370 371 # pylint: enable=too-many-return-statements,too-many-statements 372 373 def _process_binary(self, message: bytes) -> None: 374 raise NotImplementedError("no _process_binary method should be called") 375 376 # pylint: disable=too-many-return-statements 377 def _keep_alive(self) -> None: 378 self._logger.debug("ListenWebSocketClient._keep_alive ENTER") 379 380 counter = 0 381 while True: 382 try: 383 counter += 1 384 self._exit_event.wait(timeout=ONE_SECOND) 385 386 if self._exit_event.is_set(): 387 self._logger.notice("_keep_alive exiting gracefully") 388 self._logger.debug("ListenWebSocketClient._keep_alive LEAVE") 389 return 390 391 # deepgram keepalive 392 if counter % DEEPGRAM_INTERVAL == 0: 393 self.keep_alive() 394 395 except Exception as e: # pylint: disable=broad-except 396 self._logger.error( 397 "Exception in ListenWebSocketClient._keep_alive: %s", e 398 ) 399 e_error: ErrorResponse = ErrorResponse( 400 "Exception in ListenWebSocketClient._keep_alive", 401 f"{e}", 402 "Exception", 403 ) 404 self._logger.error( 405 "Exception in ListenWebSocketClient._keep_alive: %s", str(e) 406 ) 407 self._emit( 408 LiveTranscriptionEvents(LiveTranscriptionEvents.Error), 409 e_error, 410 **dict(cast(Dict[Any, Any], self._kwargs)), 411 ) 412 413 # signal exit and close 414 super()._signal_exit() 415 416 self._logger.debug("ListenWebSocketClient._keep_alive LEAVE") 417 418 if self._config.options.get("termination_exception") is True: 419 raise 420 return 421 422 ## pylint: disable=too-many-return-statements,too-many-statements 423 def _flush(self) -> None: 424 self._logger.debug("ListenWebSocketClient._flush ENTER") 425 426 delta_in_ms_str = self._config.options.get("auto_flush_reply_delta") 427 if delta_in_ms_str is None: 428 self._logger.error("auto_flush_reply_delta is None") 429 self._logger.debug("ListenWebSocketClient._flush LEAVE") 430 return 431 delta_in_ms = float(delta_in_ms_str) 432 433 _flush_event = threading.Event() 434 while True: 435 try: 436 _flush_event.wait(timeout=HALF_SECOND) 437 438 if self._exit_event.is_set(): 439 self._logger.notice("_flush exiting gracefully") 440 self._logger.debug("ListenWebSocketClient._flush LEAVE") 441 return 442 443 with self._lock_flush: 444 if self._last_datagram is None: 445 self._logger.debug("AutoFlush last_datagram is None") 446 continue 447 448 delta = datetime.now() - self._last_datagram 449 diff_in_ms = delta.total_seconds() * 1000 450 self._logger.debug("AutoFlush delta: %f", diff_in_ms) 451 if diff_in_ms < delta_in_ms: 452 self._logger.debug("AutoFlush delta is less than threshold") 453 continue 454 455 with self._lock_flush: 456 self._last_datagram = None 457 self.finalize() 458 459 except Exception as e: # pylint: disable=broad-except 460 self._logger.error("Exception in ListenWebSocketClient._flush: %s", e) 461 e_error: ErrorResponse = ErrorResponse( 462 "Exception in ListenWebSocketClient._flush", 463 f"{e}", 464 "Exception", 465 ) 466 self._logger.error( 467 "Exception in ListenWebSocketClient._flush: %s", str(e) 468 ) 469 self._emit( 470 LiveTranscriptionEvents(LiveTranscriptionEvents.Error), 471 e_error, 472 **dict(cast(Dict[Any, Any], self._kwargs)), 473 ) 474 475 # signal exit and close 476 super()._signal_exit() 477 478 self._logger.debug("ListenWebSocketClient._flush LEAVE") 479 480 if self._config.options.get("termination_exception") is True: 481 raise 482 return 483 484 # pylint: enable=too-many-return-statements 485 486 def keep_alive(self) -> bool: 487 """ 488 Sends a KeepAlive message 489 """ 490 self._logger.spam("ListenWebSocketClient.keep_alive ENTER") 491 492 self._logger.notice("Sending KeepAlive...") 493 ret = self.send(json.dumps({"type": "KeepAlive"})) 494 495 if not ret: 496 self._logger.error("keep_alive failed") 497 self._logger.spam("ListenWebSocketClient.keep_alive LEAVE") 498 return False 499 500 self._logger.notice("keep_alive succeeded") 501 self._logger.spam("ListenWebSocketClient.keep_alive LEAVE") 502 503 return True 504 505 def finalize(self) -> bool: 506 """ 507 Finalizes the Transcript connection by flushing it 508 """ 509 self._logger.spam("ListenWebSocketClient.finalize ENTER") 510 511 self._logger.notice("Sending Finalize...") 512 ret = self.send(json.dumps({"type": "Finalize"})) 513 514 if not ret: 515 self._logger.error("finalize failed") 516 self._logger.spam("ListenWebSocketClient.finalize LEAVE") 517 return False 518 519 self._logger.notice("finalize succeeded") 520 self._logger.spam("ListenWebSocketClient.finalize LEAVE") 521 522 return True 523 524 def _close_message(self) -> bool: 525 return self.send(json.dumps({"type": "CloseStream"})) 526 527 # closes the WebSocket connection gracefully 528 def finish(self) -> bool: 529 """ 530 Closes the WebSocket connection gracefully. 531 """ 532 self._logger.spam("ListenWebSocketClient.finish ENTER") 533 534 # call parent finish 535 if super().finish() is False: 536 self._logger.error("ListenWebSocketClient.finish failed") 537 538 # debug the threads 539 for thread in threading.enumerate(): 540 self._logger.debug("before running thread: %s", thread.name) 541 self._logger.debug("number of active threads: %s", threading.active_count()) 542 543 # stop the threads 544 self._logger.verbose("cancelling tasks...") 545 if self._flush_thread is not None: 546 self._flush_thread.join() 547 self._flush_thread = None 548 self._logger.notice("processing _flush_thread thread joined") 549 550 if self._keep_alive_thread is not None: 551 self._keep_alive_thread.join() 552 self._keep_alive_thread = None 553 self._logger.notice("processing _keep_alive_thread thread joined") 554 555 if self._listen_thread is not None: 556 self._listen_thread.join() 557 self._listen_thread = None 558 self._logger.notice("listening thread joined") 559 560 # debug the threads 561 for thread in threading.enumerate(): 562 self._logger.debug("before running thread: %s", thread.name) 563 self._logger.debug("number of active threads: %s", threading.active_count()) 564 565 self._logger.notice("finish succeeded") 566 self._logger.spam("ListenWebSocketClient.finish LEAVE") 567 return True 568 569 def _inspect(self, msg_result: LiveResultResponse) -> bool: 570 # auto flush_inspect is generically used to track any messages you might want to snoop on 571 # place additional logic here to inspect messages of interest 572 573 # for auto flush functionality 574 # set the last datagram 575 sentence = msg_result.channel.alternatives[0].transcript 576 if len(sentence) == 0: 577 return True 578 579 if msg_result.is_final: 580 with self._lock_flush: 581 self._logger.debug("AutoFlush is_final received") 582 self._last_datagram = None 583 else: 584 with self._lock_flush: 585 self._last_datagram = datetime.now() 586 self._logger.debug( 587 "AutoFlush interim received: %s", 588 str(self._last_datagram), 589 ) 590 591 return True
Client for interacting with Deepgram's live transcription services over WebSockets.
This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
Args: config (DeepgramClientOptions): all the options for the client. thread_cls (Type[threading.Thread]): optional thread class to use for creating threads, defaults to threading.Thread. Useful for custom thread management like ContextVar support.
68 def __init__( 69 self, 70 config: DeepgramClientOptions, 71 thread_cls: Type[threading.Thread] = threading.Thread, 72 ): 73 if config is None: 74 raise DeepgramError("Config is required") 75 76 self._logger = verboselogs.VerboseLogger(__name__) 77 self._logger.addHandler(logging.StreamHandler()) 78 self._logger.setLevel(config.verbose) 79 80 self._config = config 81 self._endpoint = "v1/listen" 82 83 self._flush_thread = None 84 self._keep_alive_thread = None 85 86 # auto flush 87 self._last_datagram = None 88 self._lock_flush = threading.Lock() 89 90 self._thread_cls = thread_cls 91 92 # init handlers 93 self._event_handlers = { 94 event: [] for event in LiveTranscriptionEvents.__members__.values() 95 } 96 97 # call the parent constructor 98 super().__init__( 99 config=self._config, 100 endpoint=self._endpoint, 101 thread_cls=self._thread_cls, 102 )
105 def start( 106 self, 107 options: Optional[Union[ListenWebSocketOptions, Dict]] = None, 108 addons: Optional[Dict] = None, 109 headers: Optional[Dict] = None, 110 members: Optional[Dict] = None, 111 **kwargs, 112 ) -> bool: 113 """ 114 Starts the WebSocket connection for live transcription. 115 """ 116 self._logger.debug("ListenWebSocketClient.start ENTER") 117 self._logger.info("options: %s", options) 118 self._logger.info("addons: %s", addons) 119 self._logger.info("headers: %s", headers) 120 self._logger.info("members: %s", members) 121 self._logger.info("kwargs: %s", kwargs) 122 123 if isinstance(options, ListenWebSocketOptions) and not options.check(): 124 self._logger.error("options.check failed") 125 self._logger.debug("ListenWebSocketClient.start LEAVE") 126 raise DeepgramError("Fatal transcription options error") 127 128 self._addons = addons 129 self._headers = headers 130 131 # add "members" as members of the class 132 if members is not None: 133 self.__dict__.update(members) 134 135 # set kwargs as members of the class 136 if kwargs is not None: 137 self._kwargs = kwargs 138 else: 139 self._kwargs = {} 140 141 if isinstance(options, ListenWebSocketOptions): 142 self._logger.info("ListenWebSocketOptions switching class -> dict") 143 self._options = options.to_dict() 144 elif options is not None: 145 self._options = options 146 else: 147 self._options = {} 148 149 try: 150 # call parent start 151 if ( 152 super().start( 153 self._options, 154 self._addons, 155 self._headers, 156 **dict(cast(Dict[Any, Any], self._kwargs)), 157 ) 158 is False 159 ): 160 self._logger.error("ListenWebSocketClient.start failed") 161 self._logger.debug("ListenWebSocketClient.start LEAVE") 162 return False 163 164 # debug the threads 165 for thread in threading.enumerate(): 166 self._logger.debug("after running thread: %s", thread.name) 167 self._logger.debug("number of active threads: %s", threading.active_count()) 168 169 # keepalive thread 170 if self._config.is_keep_alive_enabled(): 171 self._logger.notice("keepalive is enabled") 172 self._keep_alive_thread = self._thread_cls(target=self._keep_alive) 173 self._keep_alive_thread.start() 174 else: 175 self._logger.notice("keepalive is disabled") 176 177 # flush thread 178 if self._config.is_auto_flush_reply_enabled(): 179 self._logger.notice("autoflush is enabled") 180 self._flush_thread = self._thread_cls(target=self._flush) 181 self._flush_thread.start() 182 else: 183 self._logger.notice("autoflush is disabled") 184 185 # debug the threads 186 for thread in threading.enumerate(): 187 self._logger.debug("after running thread: %s", thread.name) 188 self._logger.debug("number of active threads: %s", threading.active_count()) 189 190 self._logger.notice("start succeeded") 191 self._logger.debug("ListenWebSocketClient.start LEAVE") 192 return True 193 194 except Exception as e: # pylint: disable=broad-except 195 self._logger.error( 196 "WebSocketException in ListenWebSocketClient.start: %s", e 197 ) 198 self._logger.debug("ListenWebSocketClient.start LEAVE") 199 if self._config.options.get("termination_exception_connect") is True: 200 raise e 201 return False
Starts the WebSocket connection for live transcription.
205 def on( 206 self, event: LiveTranscriptionEvents, handler: Callable 207 ) -> None: # registers event handlers for specific events 208 """ 209 Registers event handlers for specific events. 210 """ 211 self._logger.info("event subscribed: %s", event) 212 if event in LiveTranscriptionEvents.__members__.values() and callable(handler): 213 self._event_handlers[event].append(handler)
Registers event handlers for specific events.
486 def keep_alive(self) -> bool: 487 """ 488 Sends a KeepAlive message 489 """ 490 self._logger.spam("ListenWebSocketClient.keep_alive ENTER") 491 492 self._logger.notice("Sending KeepAlive...") 493 ret = self.send(json.dumps({"type": "KeepAlive"})) 494 495 if not ret: 496 self._logger.error("keep_alive failed") 497 self._logger.spam("ListenWebSocketClient.keep_alive LEAVE") 498 return False 499 500 self._logger.notice("keep_alive succeeded") 501 self._logger.spam("ListenWebSocketClient.keep_alive LEAVE") 502 503 return True
Sends a KeepAlive message
505 def finalize(self) -> bool: 506 """ 507 Finalizes the Transcript connection by flushing it 508 """ 509 self._logger.spam("ListenWebSocketClient.finalize ENTER") 510 511 self._logger.notice("Sending Finalize...") 512 ret = self.send(json.dumps({"type": "Finalize"})) 513 514 if not ret: 515 self._logger.error("finalize failed") 516 self._logger.spam("ListenWebSocketClient.finalize LEAVE") 517 return False 518 519 self._logger.notice("finalize succeeded") 520 self._logger.spam("ListenWebSocketClient.finalize LEAVE") 521 522 return True
Finalizes the Transcript connection by flushing it
528 def finish(self) -> bool: 529 """ 530 Closes the WebSocket connection gracefully. 531 """ 532 self._logger.spam("ListenWebSocketClient.finish ENTER") 533 534 # call parent finish 535 if super().finish() is False: 536 self._logger.error("ListenWebSocketClient.finish failed") 537 538 # debug the threads 539 for thread in threading.enumerate(): 540 self._logger.debug("before running thread: %s", thread.name) 541 self._logger.debug("number of active threads: %s", threading.active_count()) 542 543 # stop the threads 544 self._logger.verbose("cancelling tasks...") 545 if self._flush_thread is not None: 546 self._flush_thread.join() 547 self._flush_thread = None 548 self._logger.notice("processing _flush_thread thread joined") 549 550 if self._keep_alive_thread is not None: 551 self._keep_alive_thread.join() 552 self._keep_alive_thread = None 553 self._logger.notice("processing _keep_alive_thread thread joined") 554 555 if self._listen_thread is not None: 556 self._listen_thread.join() 557 self._listen_thread = None 558 self._logger.notice("listening thread joined") 559 560 # debug the threads 561 for thread in threading.enumerate(): 562 self._logger.debug("before running thread: %s", thread.name) 563 self._logger.debug("number of active threads: %s", threading.active_count()) 564 565 self._logger.notice("finish succeeded") 566 self._logger.spam("ListenWebSocketClient.finish LEAVE") 567 return True
Closes the WebSocket connection gracefully.
36class AsyncListenWebSocketClient( 37 AbstractAsyncWebSocketClient 38): # pylint: disable=too-many-instance-attributes 39 """ 40 Client for interacting with Deepgram's live transcription services over WebSockets. 41 42 This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events. 43 44 Args: 45 config (DeepgramClientOptions): all the options for the client. 46 """ 47 48 _logger: verboselogs.VerboseLogger 49 _config: DeepgramClientOptions 50 _endpoint: str 51 52 _event_handlers: Dict[LiveTranscriptionEvents, list] 53 54 _keep_alive_thread: Union[asyncio.Task, None] 55 _flush_thread: Union[asyncio.Task, None] 56 _last_datagram: Optional[datetime] = None 57 58 _kwargs: Optional[Dict] = None 59 _addons: Optional[Dict] = None 60 _options: Optional[Dict] = None 61 _headers: Optional[Dict] = None 62 63 def __init__(self, config: DeepgramClientOptions): 64 if config is None: 65 raise DeepgramError("Config is required") 66 67 self._logger = verboselogs.VerboseLogger(__name__) 68 self._logger.addHandler(logging.StreamHandler()) 69 self._logger.setLevel(config.verbose) 70 71 self._config = config 72 self._endpoint = "v1/listen" 73 74 self._flush_thread = None 75 self._keep_alive_thread = None 76 77 # auto flush 78 self._last_datagram = None 79 self._lock_flush = threading.Lock() 80 81 # init handlers 82 self._event_handlers = { 83 event: [] for event in LiveTranscriptionEvents.__members__.values() 84 } 85 86 # call the parent constructor 87 super().__init__(self._config, self._endpoint) 88 89 # pylint: disable=too-many-branches,too-many-statements 90 async def start( 91 self, 92 options: Optional[Union[ListenWebSocketOptions, Dict]] = None, 93 addons: Optional[Dict] = None, 94 headers: Optional[Dict] = None, 95 members: Optional[Dict] = None, 96 **kwargs, 97 ) -> bool: 98 """ 99 Starts the WebSocket connection for live transcription. 100 """ 101 self._logger.debug("AsyncListenWebSocketClient.start ENTER") 102 self._logger.info("options: %s", options) 103 self._logger.info("addons: %s", addons) 104 self._logger.info("headers: %s", headers) 105 self._logger.info("members: %s", members) 106 self._logger.info("kwargs: %s", kwargs) 107 108 if isinstance(options, ListenWebSocketOptions) and not options.check(): 109 self._logger.error("options.check failed") 110 self._logger.debug("AsyncListenWebSocketClient.start LEAVE") 111 raise DeepgramError("Fatal transcription options error") 112 113 self._addons = addons 114 self._headers = headers 115 116 # add "members" as members of the class 117 if members is not None: 118 self.__dict__.update(members) 119 120 # set kwargs as members of the class 121 if kwargs is not None: 122 self._kwargs = kwargs 123 else: 124 self._kwargs = {} 125 126 if isinstance(options, ListenWebSocketOptions): 127 self._logger.info("ListenWebSocketOptions switching class -> dict") 128 self._options = options.to_dict() 129 elif options is not None: 130 self._options = options 131 else: 132 self._options = {} 133 134 try: 135 # call parent start 136 if ( 137 await super().start( 138 self._options, 139 self._addons, 140 self._headers, 141 **dict(cast(Dict[Any, Any], self._kwargs)), 142 ) 143 is False 144 ): 145 self._logger.error("AsyncListenWebSocketClient.start failed") 146 self._logger.debug("AsyncListenWebSocketClient.start LEAVE") 147 return False 148 149 # debug the threads 150 for thread in threading.enumerate(): 151 self._logger.debug("after running thread: %s", thread.name) 152 self._logger.debug("number of active threads: %s", threading.active_count()) 153 154 # keepalive thread 155 if self._config.is_keep_alive_enabled(): 156 self._logger.notice("keepalive is enabled") 157 self._keep_alive_thread = asyncio.create_task(self._keep_alive()) 158 else: 159 self._logger.notice("keepalive is disabled") 160 161 # flush thread 162 if self._config.is_auto_flush_reply_enabled(): 163 self._logger.notice("autoflush is enabled") 164 self._flush_thread = asyncio.create_task(self._flush()) 165 else: 166 self._logger.notice("autoflush is disabled") 167 168 # debug the threads 169 for thread in threading.enumerate(): 170 self._logger.debug("after running thread: %s", thread.name) 171 self._logger.debug("number of active threads: %s", threading.active_count()) 172 173 self._logger.notice("start succeeded") 174 self._logger.debug("AsyncListenWebSocketClient.start LEAVE") 175 return True 176 177 except Exception as e: # pylint: disable=broad-except 178 self._logger.error( 179 "WebSocketException in AsyncListenWebSocketClient.start: %s", e 180 ) 181 self._logger.debug("AsyncListenWebSocketClient.start LEAVE") 182 if self._config.options.get("termination_exception_connect") is True: 183 raise 184 return False 185 186 # pylint: enable=too-many-branches,too-many-statements 187 188 def on(self, event: LiveTranscriptionEvents, handler: Callable) -> None: 189 """ 190 Registers event handlers for specific events. 191 """ 192 self._logger.info("event subscribed: %s", event) 193 if event in LiveTranscriptionEvents.__members__.values() and callable(handler): 194 self._event_handlers[event].append(handler) 195 196 # triggers the registered event handlers for a specific event 197 async def _emit(self, event: LiveTranscriptionEvents, *args, **kwargs) -> None: 198 """ 199 Emits events to the registered event handlers. 200 """ 201 self._logger.debug("AsyncListenWebSocketClient._emit ENTER") 202 self._logger.debug("callback handlers for: %s", event) 203 204 # debug the threads 205 for thread in threading.enumerate(): 206 self._logger.debug("after running thread: %s", thread.name) 207 self._logger.debug("number of active threads: %s", threading.active_count()) 208 209 tasks = [] 210 for handler in self._event_handlers[event]: 211 task = asyncio.create_task(handler(self, *args, **kwargs)) 212 tasks.append(task) 213 214 if tasks: 215 self._logger.debug("waiting for tasks to finish...") 216 await asyncio.gather(*tasks, return_exceptions=True) 217 tasks.clear() 218 219 # debug the threads 220 for thread in threading.enumerate(): 221 self._logger.debug("after running thread: %s", thread.name) 222 self._logger.debug("number of active threads: %s", threading.active_count()) 223 224 self._logger.debug("AsyncListenWebSocketClient._emit LEAVE") 225 226 async def _process_text(self, message: str) -> None: 227 """ 228 Processes messages received over the WebSocket connection. 229 """ 230 self._logger.debug("AsyncListenWebSocketClient._process_text ENTER") 231 232 try: 233 self._logger.debug("Text data received") 234 if len(message) == 0: 235 self._logger.debug("message is empty") 236 self._logger.debug("AsyncListenWebSocketClient._process_text LEAVE") 237 return 238 239 data = json.loads(message) 240 response_type = data.get("type") 241 self._logger.debug("response_type: %s, data: %s", response_type, data) 242 243 match response_type: 244 case LiveTranscriptionEvents.Open: 245 open_result: OpenResponse = OpenResponse.from_json(message) 246 self._logger.verbose("OpenResponse: %s", open_result) 247 await self._emit( 248 LiveTranscriptionEvents(LiveTranscriptionEvents.Open), 249 open=open_result, 250 **dict(cast(Dict[Any, Any], self._kwargs)), 251 ) 252 case LiveTranscriptionEvents.Transcript: 253 msg_result: LiveResultResponse = LiveResultResponse.from_json( 254 message 255 ) 256 self._logger.verbose("LiveResultResponse: %s", msg_result) 257 258 # auto flush 259 if self._config.is_inspecting_listen(): 260 inspect_res = await self._inspect(msg_result) 261 if not inspect_res: 262 self._logger.error("inspect_res failed") 263 264 await self._emit( 265 LiveTranscriptionEvents(LiveTranscriptionEvents.Transcript), 266 result=msg_result, 267 **dict(cast(Dict[Any, Any], self._kwargs)), 268 ) 269 case LiveTranscriptionEvents.Metadata: 270 meta_result: MetadataResponse = MetadataResponse.from_json(message) 271 self._logger.verbose("MetadataResponse: %s", meta_result) 272 await self._emit( 273 LiveTranscriptionEvents(LiveTranscriptionEvents.Metadata), 274 metadata=meta_result, 275 **dict(cast(Dict[Any, Any], self._kwargs)), 276 ) 277 case LiveTranscriptionEvents.SpeechStarted: 278 ss_result: SpeechStartedResponse = SpeechStartedResponse.from_json( 279 message 280 ) 281 self._logger.verbose("SpeechStartedResponse: %s", ss_result) 282 await self._emit( 283 LiveTranscriptionEvents(LiveTranscriptionEvents.SpeechStarted), 284 speech_started=ss_result, 285 **dict(cast(Dict[Any, Any], self._kwargs)), 286 ) 287 case LiveTranscriptionEvents.UtteranceEnd: 288 ue_result: UtteranceEndResponse = UtteranceEndResponse.from_json( 289 message 290 ) 291 self._logger.verbose("UtteranceEndResponse: %s", ue_result) 292 await self._emit( 293 LiveTranscriptionEvents(LiveTranscriptionEvents.UtteranceEnd), 294 utterance_end=ue_result, 295 **dict(cast(Dict[Any, Any], self._kwargs)), 296 ) 297 case LiveTranscriptionEvents.Close: 298 close_result: CloseResponse = CloseResponse.from_json(message) 299 self._logger.verbose("CloseResponse: %s", close_result) 300 await self._emit( 301 LiveTranscriptionEvents(LiveTranscriptionEvents.Close), 302 close=close_result, 303 **dict(cast(Dict[Any, Any], self._kwargs)), 304 ) 305 case LiveTranscriptionEvents.Error: 306 err_error: ErrorResponse = ErrorResponse.from_json(message) 307 self._logger.verbose("ErrorResponse: %s", err_error) 308 await self._emit( 309 LiveTranscriptionEvents(LiveTranscriptionEvents.Error), 310 error=err_error, 311 **dict(cast(Dict[Any, Any], self._kwargs)), 312 ) 313 case _: 314 self._logger.warning( 315 "Unknown Message: response_type: %s, data: %s", 316 response_type, 317 data, 318 ) 319 unhandled_error: UnhandledResponse = UnhandledResponse( 320 type=LiveTranscriptionEvents(LiveTranscriptionEvents.Unhandled), 321 raw=message, 322 ) 323 await self._emit( 324 LiveTranscriptionEvents(LiveTranscriptionEvents.Unhandled), 325 unhandled=unhandled_error, 326 **dict(cast(Dict[Any, Any], self._kwargs)), 327 ) 328 329 self._logger.notice("_process_text Succeeded") 330 self._logger.debug("AsyncListenWebSocketClient._process_text LEAVE") 331 332 except Exception as e: # pylint: disable=broad-except 333 self._logger.error( 334 "Exception in AsyncListenWebSocketClient._process_text: %s", e 335 ) 336 e_error: ErrorResponse = ErrorResponse( 337 "Exception in AsyncListenWebSocketClient._process_text", 338 f"{e}", 339 "Exception", 340 ) 341 await self._emit( 342 LiveTranscriptionEvents(LiveTranscriptionEvents.Error), 343 error=e_error, 344 **dict(cast(Dict[Any, Any], self._kwargs)), 345 ) 346 347 # signal exit and close 348 await super()._signal_exit() 349 350 self._logger.debug("AsyncListenWebSocketClient._process_text LEAVE") 351 352 if self._config.options.get("termination_exception") is True: 353 raise 354 return 355 356 # pylint: enable=too-many-return-statements,too-many-statements 357 358 async def _process_binary(self, message: bytes) -> None: 359 raise NotImplementedError("no _process_binary method should be called") 360 361 # pylint: disable=too-many-return-statements 362 async def _keep_alive(self) -> None: 363 """ 364 Sends keepalive messages to the WebSocket connection. 365 """ 366 self._logger.debug("AsyncListenWebSocketClient._keep_alive ENTER") 367 368 counter = 0 369 while True: 370 try: 371 counter += 1 372 await asyncio.sleep(ONE_SECOND) 373 374 if self._exit_event.is_set(): 375 self._logger.notice("_keep_alive exiting gracefully") 376 self._logger.debug("AsyncListenWebSocketClient._keep_alive LEAVE") 377 return 378 379 # deepgram keepalive 380 if counter % DEEPGRAM_INTERVAL == 0: 381 await self.keep_alive() 382 383 except Exception as e: # pylint: disable=broad-except 384 self._logger.error( 385 "Exception in AsyncListenWebSocketClient._keep_alive: %s", e 386 ) 387 e_error: ErrorResponse = ErrorResponse( 388 "Exception in AsyncListenWebSocketClient._keep_alive", 389 f"{e}", 390 "Exception", 391 ) 392 self._logger.error( 393 "Exception in AsyncListenWebSocketClient._keep_alive: %s", str(e) 394 ) 395 await self._emit( 396 LiveTranscriptionEvents(LiveTranscriptionEvents.Error), 397 error=e_error, 398 **dict(cast(Dict[Any, Any], self._kwargs)), 399 ) 400 401 # signal exit and close 402 await super()._signal_exit() 403 404 self._logger.debug("AsyncListenWebSocketClient._keep_alive LEAVE") 405 406 if self._config.options.get("termination_exception") is True: 407 raise 408 return 409 410 ## pylint: disable=too-many-return-statements,too-many-statements 411 async def _flush(self) -> None: 412 self._logger.debug("AsyncListenWebSocketClient._flush ENTER") 413 414 delta_in_ms_str = self._config.options.get("auto_flush_reply_delta") 415 if delta_in_ms_str is None: 416 self._logger.error("auto_flush_reply_delta is None") 417 self._logger.debug("AsyncListenWebSocketClient._flush LEAVE") 418 return 419 delta_in_ms = float(delta_in_ms_str) 420 421 while True: 422 try: 423 await asyncio.sleep(HALF_SECOND) 424 425 if self._exit_event.is_set(): 426 self._logger.notice("_flush exiting gracefully") 427 self._logger.debug("AsyncListenWebSocketClient._flush LEAVE") 428 return 429 430 if self._last_datagram is None: 431 self._logger.debug("AutoFlush last_datagram is None") 432 continue 433 434 delta = datetime.now() - self._last_datagram 435 diff_in_ms = delta.total_seconds() * 1000 436 self._logger.debug("AutoFlush delta: %f", diff_in_ms) 437 if diff_in_ms < delta_in_ms: 438 self._logger.debug("AutoFlush delta is less than threshold") 439 continue 440 441 self._last_datagram = None 442 await self.finalize() 443 444 except Exception as e: # pylint: disable=broad-except 445 self._logger.error( 446 "Exception in AsyncListenWebSocketClient._flush: %s", e 447 ) 448 e_error: ErrorResponse = ErrorResponse( 449 "Exception in AsyncListenWebSocketClient._flush", 450 f"{e}", 451 "Exception", 452 ) 453 self._logger.error( 454 "Exception in AsyncListenWebSocketClient._flush: %s", str(e) 455 ) 456 await self._emit( 457 LiveTranscriptionEvents(LiveTranscriptionEvents.Error), 458 error=e_error, 459 **dict(cast(Dict[Any, Any], self._kwargs)), 460 ) 461 462 # signal exit and close 463 await super()._signal_exit() 464 465 self._logger.debug("AsyncListenWebSocketClient._flush LEAVE") 466 467 if self._config.options.get("termination_exception") is True: 468 raise 469 return 470 471 # pylint: enable=too-many-return-statements 472 473 async def keep_alive(self) -> bool: 474 """ 475 Sends a KeepAlive message 476 """ 477 self._logger.spam("AsyncListenWebSocketClient.keep_alive ENTER") 478 479 self._logger.notice("Sending KeepAlive...") 480 ret = await self.send(json.dumps({"type": "KeepAlive"})) 481 482 if not ret: 483 self._logger.error("keep_alive failed") 484 self._logger.spam("AsyncListenWebSocketClient.keep_alive LEAVE") 485 return False 486 487 self._logger.notice("keep_alive succeeded") 488 self._logger.spam("AsyncListenWebSocketClient.keep_alive LEAVE") 489 490 return True 491 492 async def finalize(self) -> bool: 493 """ 494 Finalizes the Transcript connection by flushing it 495 """ 496 self._logger.spam("AsyncListenWebSocketClient.finalize ENTER") 497 498 self._logger.notice("Sending Finalize...") 499 ret = await self.send(json.dumps({"type": "Finalize"})) 500 501 if not ret: 502 self._logger.error("finalize failed") 503 self._logger.spam("AsyncListenWebSocketClient.finalize LEAVE") 504 return False 505 506 self._logger.notice("finalize succeeded") 507 self._logger.spam("AsyncListenWebSocketClient.finalize LEAVE") 508 509 return True 510 511 async def _close_message(self) -> bool: 512 return await self.send(json.dumps({"type": "CloseStream"})) 513 514 async def finish(self) -> bool: 515 """ 516 Closes the WebSocket connection gracefully. 517 """ 518 self._logger.debug("AsyncListenWebSocketClient.finish ENTER") 519 520 # stop the threads 521 self._logger.verbose("cancelling tasks...") 522 try: 523 # call parent finish 524 if await super().finish() is False: 525 self._logger.error("AsyncListenWebSocketClient.finish failed") 526 527 # Before cancelling, check if the tasks were created 528 # debug the threads 529 for thread in threading.enumerate(): 530 self._logger.debug("before running thread: %s", thread.name) 531 self._logger.debug("number of active threads: %s", threading.active_count()) 532 533 tasks = [] 534 if self._keep_alive_thread is not None: 535 self._keep_alive_thread.cancel() 536 tasks.append(self._keep_alive_thread) 537 self._logger.notice("processing _keep_alive_thread cancel...") 538 539 if self._flush_thread is not None: 540 self._flush_thread.cancel() 541 tasks.append(self._flush_thread) 542 self._logger.notice("processing _flush_thread cancel...") 543 544 # Use asyncio.gather to wait for tasks to be cancelled 545 # Prevent indefinite waiting by setting a timeout 546 await asyncio.wait_for(asyncio.gather(*tasks), timeout=10) 547 self._logger.notice("threads joined") 548 549 # debug the threads 550 for thread in threading.enumerate(): 551 self._logger.debug("after running thread: %s", thread.name) 552 self._logger.debug("number of active threads: %s", threading.active_count()) 553 554 self._logger.notice("finish succeeded") 555 self._logger.spam("AsyncListenWebSocketClient.finish LEAVE") 556 return True 557 558 except asyncio.CancelledError as e: 559 self._logger.error("tasks cancelled error: %s", e) 560 self._logger.debug("AsyncListenWebSocketClient.finish LEAVE") 561 return False 562 563 except asyncio.TimeoutError as e: 564 self._logger.error("tasks cancellation timed out: %s", e) 565 self._logger.debug("AsyncListenWebSocketClient.finish LEAVE") 566 return False 567 568 async def _inspect(self, msg_result: LiveResultResponse) -> bool: 569 # auto flush_inspect is generically used to track any messages you might want to snoop on 570 # place additional logic here to inspect messages of interest 571 572 # for auto flush functionality 573 # set the last datagram 574 sentence = msg_result.channel.alternatives[0].transcript 575 if len(sentence) == 0: 576 return True 577 578 if msg_result.is_final: 579 self._logger.debug("AutoFlush is_final received") 580 self._last_datagram = None 581 else: 582 self._last_datagram = datetime.now() 583 self._logger.debug( 584 "AutoFlush interim received: %s", 585 str(self._last_datagram), 586 ) 587 588 return True
Client for interacting with Deepgram's live transcription services over WebSockets.
This class provides methods to establish a WebSocket connection for live transcription and handle real-time transcription events.
Args: config (DeepgramClientOptions): all the options for the client.
63 def __init__(self, config: DeepgramClientOptions): 64 if config is None: 65 raise DeepgramError("Config is required") 66 67 self._logger = verboselogs.VerboseLogger(__name__) 68 self._logger.addHandler(logging.StreamHandler()) 69 self._logger.setLevel(config.verbose) 70 71 self._config = config 72 self._endpoint = "v1/listen" 73 74 self._flush_thread = None 75 self._keep_alive_thread = None 76 77 # auto flush 78 self._last_datagram = None 79 self._lock_flush = threading.Lock() 80 81 # init handlers 82 self._event_handlers = { 83 event: [] for event in LiveTranscriptionEvents.__members__.values() 84 } 85 86 # call the parent constructor 87 super().__init__(self._config, self._endpoint)
90 async def start( 91 self, 92 options: Optional[Union[ListenWebSocketOptions, Dict]] = None, 93 addons: Optional[Dict] = None, 94 headers: Optional[Dict] = None, 95 members: Optional[Dict] = None, 96 **kwargs, 97 ) -> bool: 98 """ 99 Starts the WebSocket connection for live transcription. 100 """ 101 self._logger.debug("AsyncListenWebSocketClient.start ENTER") 102 self._logger.info("options: %s", options) 103 self._logger.info("addons: %s", addons) 104 self._logger.info("headers: %s", headers) 105 self._logger.info("members: %s", members) 106 self._logger.info("kwargs: %s", kwargs) 107 108 if isinstance(options, ListenWebSocketOptions) and not options.check(): 109 self._logger.error("options.check failed") 110 self._logger.debug("AsyncListenWebSocketClient.start LEAVE") 111 raise DeepgramError("Fatal transcription options error") 112 113 self._addons = addons 114 self._headers = headers 115 116 # add "members" as members of the class 117 if members is not None: 118 self.__dict__.update(members) 119 120 # set kwargs as members of the class 121 if kwargs is not None: 122 self._kwargs = kwargs 123 else: 124 self._kwargs = {} 125 126 if isinstance(options, ListenWebSocketOptions): 127 self._logger.info("ListenWebSocketOptions switching class -> dict") 128 self._options = options.to_dict() 129 elif options is not None: 130 self._options = options 131 else: 132 self._options = {} 133 134 try: 135 # call parent start 136 if ( 137 await super().start( 138 self._options, 139 self._addons, 140 self._headers, 141 **dict(cast(Dict[Any, Any], self._kwargs)), 142 ) 143 is False 144 ): 145 self._logger.error("AsyncListenWebSocketClient.start failed") 146 self._logger.debug("AsyncListenWebSocketClient.start LEAVE") 147 return False 148 149 # debug the threads 150 for thread in threading.enumerate(): 151 self._logger.debug("after running thread: %s", thread.name) 152 self._logger.debug("number of active threads: %s", threading.active_count()) 153 154 # keepalive thread 155 if self._config.is_keep_alive_enabled(): 156 self._logger.notice("keepalive is enabled") 157 self._keep_alive_thread = asyncio.create_task(self._keep_alive()) 158 else: 159 self._logger.notice("keepalive is disabled") 160 161 # flush thread 162 if self._config.is_auto_flush_reply_enabled(): 163 self._logger.notice("autoflush is enabled") 164 self._flush_thread = asyncio.create_task(self._flush()) 165 else: 166 self._logger.notice("autoflush is disabled") 167 168 # debug the threads 169 for thread in threading.enumerate(): 170 self._logger.debug("after running thread: %s", thread.name) 171 self._logger.debug("number of active threads: %s", threading.active_count()) 172 173 self._logger.notice("start succeeded") 174 self._logger.debug("AsyncListenWebSocketClient.start LEAVE") 175 return True 176 177 except Exception as e: # pylint: disable=broad-except 178 self._logger.error( 179 "WebSocketException in AsyncListenWebSocketClient.start: %s", e 180 ) 181 self._logger.debug("AsyncListenWebSocketClient.start LEAVE") 182 if self._config.options.get("termination_exception_connect") is True: 183 raise 184 return False
Starts the WebSocket connection for live transcription.
188 def on(self, event: LiveTranscriptionEvents, handler: Callable) -> None: 189 """ 190 Registers event handlers for specific events. 191 """ 192 self._logger.info("event subscribed: %s", event) 193 if event in LiveTranscriptionEvents.__members__.values() and callable(handler): 194 self._event_handlers[event].append(handler)
Registers event handlers for specific events.
473 async def keep_alive(self) -> bool: 474 """ 475 Sends a KeepAlive message 476 """ 477 self._logger.spam("AsyncListenWebSocketClient.keep_alive ENTER") 478 479 self._logger.notice("Sending KeepAlive...") 480 ret = await self.send(json.dumps({"type": "KeepAlive"})) 481 482 if not ret: 483 self._logger.error("keep_alive failed") 484 self._logger.spam("AsyncListenWebSocketClient.keep_alive LEAVE") 485 return False 486 487 self._logger.notice("keep_alive succeeded") 488 self._logger.spam("AsyncListenWebSocketClient.keep_alive LEAVE") 489 490 return True
Sends a KeepAlive message
492 async def finalize(self) -> bool: 493 """ 494 Finalizes the Transcript connection by flushing it 495 """ 496 self._logger.spam("AsyncListenWebSocketClient.finalize ENTER") 497 498 self._logger.notice("Sending Finalize...") 499 ret = await self.send(json.dumps({"type": "Finalize"})) 500 501 if not ret: 502 self._logger.error("finalize failed") 503 self._logger.spam("AsyncListenWebSocketClient.finalize LEAVE") 504 return False 505 506 self._logger.notice("finalize succeeded") 507 self._logger.spam("AsyncListenWebSocketClient.finalize LEAVE") 508 509 return True
Finalizes the Transcript connection by flushing it
514 async def finish(self) -> bool: 515 """ 516 Closes the WebSocket connection gracefully. 517 """ 518 self._logger.debug("AsyncListenWebSocketClient.finish ENTER") 519 520 # stop the threads 521 self._logger.verbose("cancelling tasks...") 522 try: 523 # call parent finish 524 if await super().finish() is False: 525 self._logger.error("AsyncListenWebSocketClient.finish failed") 526 527 # Before cancelling, check if the tasks were created 528 # debug the threads 529 for thread in threading.enumerate(): 530 self._logger.debug("before running thread: %s", thread.name) 531 self._logger.debug("number of active threads: %s", threading.active_count()) 532 533 tasks = [] 534 if self._keep_alive_thread is not None: 535 self._keep_alive_thread.cancel() 536 tasks.append(self._keep_alive_thread) 537 self._logger.notice("processing _keep_alive_thread cancel...") 538 539 if self._flush_thread is not None: 540 self._flush_thread.cancel() 541 tasks.append(self._flush_thread) 542 self._logger.notice("processing _flush_thread cancel...") 543 544 # Use asyncio.gather to wait for tasks to be cancelled 545 # Prevent indefinite waiting by setting a timeout 546 await asyncio.wait_for(asyncio.gather(*tasks), timeout=10) 547 self._logger.notice("threads joined") 548 549 # debug the threads 550 for thread in threading.enumerate(): 551 self._logger.debug("after running thread: %s", thread.name) 552 self._logger.debug("number of active threads: %s", threading.active_count()) 553 554 self._logger.notice("finish succeeded") 555 self._logger.spam("AsyncListenWebSocketClient.finish LEAVE") 556 return True 557 558 except asyncio.CancelledError as e: 559 self._logger.error("tasks cancelled error: %s", e) 560 self._logger.debug("AsyncListenWebSocketClient.finish LEAVE") 561 return False 562 563 except asyncio.TimeoutError as e: 564 self._logger.error("tasks cancellation timed out: %s", e) 565 self._logger.debug("AsyncListenWebSocketClient.finish LEAVE") 566 return False
Closes the WebSocket connection gracefully.