deepgram.clients.common.v1.helpers
1# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved. 2# Use of this source code is governed by a MIT license that can be found in the LICENSE file. 3# SPDX-License-Identifier: MIT 4 5from urllib.parse import urlparse, urlunparse, parse_qs, urlencode 6from typing import Dict, Optional 7import re 8 9 10# This function appends query parameters to a URL 11def append_query_params(url: str, params: Optional[Dict] = None): 12 """ 13 Appends query parameters to a URL 14 """ 15 parsed_url = urlparse(url) 16 query_params = parse_qs(parsed_url.query) 17 18 if params is not None: 19 for key, value in params.items(): 20 if value is None: 21 continue 22 if isinstance(value, bool): 23 value = str(value).lower() 24 if isinstance(value, list): 25 for item in value: 26 query_params[key] = query_params.get(key, []) + [str(item)] 27 else: 28 query_params[key] = [str(value)] 29 30 updated_query_string = urlencode(query_params, doseq=True) 31 updated_url = parsed_url._replace(query=updated_query_string).geturl() 32 return updated_url 33 34 35# This function converts a URL to a WebSocket URL 36def convert_to_websocket_url(base_url: str, endpoint: str): 37 """ 38 Converts a URL to a WebSocket URL 39 """ 40 use_ssl = True # Default to true 41 if re.match(r"^https?://", base_url, re.IGNORECASE): 42 if "http://" in base_url: 43 use_ssl = False # Override to false if http:// is found 44 base_url = base_url.replace("https://", "").replace("http://", "") 45 if not re.match(r"^wss?://", base_url, re.IGNORECASE): 46 if use_ssl: 47 base_url = "wss://" + base_url 48 else: 49 base_url = "ws://" + base_url 50 parsed_url = urlparse(base_url) 51 domain = parsed_url.netloc 52 websocket_url = urlunparse((parsed_url.scheme, domain, endpoint, "", "", "")) 53 return websocket_url
def
append_query_params(url: str, params: Optional[Dict] = None):
12def append_query_params(url: str, params: Optional[Dict] = None): 13 """ 14 Appends query parameters to a URL 15 """ 16 parsed_url = urlparse(url) 17 query_params = parse_qs(parsed_url.query) 18 19 if params is not None: 20 for key, value in params.items(): 21 if value is None: 22 continue 23 if isinstance(value, bool): 24 value = str(value).lower() 25 if isinstance(value, list): 26 for item in value: 27 query_params[key] = query_params.get(key, []) + [str(item)] 28 else: 29 query_params[key] = [str(value)] 30 31 updated_query_string = urlencode(query_params, doseq=True) 32 updated_url = parsed_url._replace(query=updated_query_string).geturl() 33 return updated_url
Appends query parameters to a URL
def
convert_to_websocket_url(base_url: str, endpoint: str):
37def convert_to_websocket_url(base_url: str, endpoint: str): 38 """ 39 Converts a URL to a WebSocket URL 40 """ 41 use_ssl = True # Default to true 42 if re.match(r"^https?://", base_url, re.IGNORECASE): 43 if "http://" in base_url: 44 use_ssl = False # Override to false if http:// is found 45 base_url = base_url.replace("https://", "").replace("http://", "") 46 if not re.match(r"^wss?://", base_url, re.IGNORECASE): 47 if use_ssl: 48 base_url = "wss://" + base_url 49 else: 50 base_url = "ws://" + base_url 51 parsed_url = urlparse(base_url) 52 domain = parsed_url.netloc 53 websocket_url = urlunparse((parsed_url.scheme, domain, endpoint, "", "", "")) 54 return websocket_url
Converts a URL to a WebSocket URL