deepgram.clients.common.v1.helpers

 1# Copyright 2023-2024 Deepgram SDK contributors. All Rights Reserved.
 2# Use of this source code is governed by a MIT license that can be found in the LICENSE file.
 3# SPDX-License-Identifier: MIT
 4
 5from urllib.parse import urlparse, urlunparse, parse_qs, urlencode
 6from typing import Dict, Optional
 7import re
 8
 9
10# This function appends query parameters to a URL
11def append_query_params(url: str, params: Optional[Dict] = None):
12    """
13    Appends query parameters to a URL
14    """
15    parsed_url = urlparse(url)
16    query_params = parse_qs(parsed_url.query)
17
18    if params is not None:
19        for key, value in params.items():
20            if value is None:
21                continue
22            if isinstance(value, bool):
23                value = str(value).lower()
24            if isinstance(value, list):
25                for item in value:
26                    query_params[key] = query_params.get(key, []) + [str(item)]
27            else:
28                query_params[key] = [str(value)]
29
30    updated_query_string = urlencode(query_params, doseq=True)
31    updated_url = parsed_url._replace(query=updated_query_string).geturl()
32    return updated_url
33
34
35# This function converts a URL to a WebSocket URL
36def convert_to_websocket_url(base_url: str, endpoint: str):
37    """
38    Converts a URL to a WebSocket URL
39    """
40    use_ssl = True  # Default to true
41    if re.match(r"^https?://", base_url, re.IGNORECASE):
42        if "http://" in base_url:
43            use_ssl = False  # Override to false if http:// is found
44        base_url = base_url.replace("https://", "").replace("http://", "")
45    if not re.match(r"^wss?://", base_url, re.IGNORECASE):
46        if use_ssl:
47            base_url = "wss://" + base_url
48        else:
49            base_url = "ws://" + base_url
50    parsed_url = urlparse(base_url)
51    domain = parsed_url.netloc
52    websocket_url = urlunparse((parsed_url.scheme, domain, endpoint, "", "", ""))
53    return websocket_url
def append_query_params(url: str, params: Optional[Dict] = None):
12def append_query_params(url: str, params: Optional[Dict] = None):
13    """
14    Appends query parameters to a URL
15    """
16    parsed_url = urlparse(url)
17    query_params = parse_qs(parsed_url.query)
18
19    if params is not None:
20        for key, value in params.items():
21            if value is None:
22                continue
23            if isinstance(value, bool):
24                value = str(value).lower()
25            if isinstance(value, list):
26                for item in value:
27                    query_params[key] = query_params.get(key, []) + [str(item)]
28            else:
29                query_params[key] = [str(value)]
30
31    updated_query_string = urlencode(query_params, doseq=True)
32    updated_url = parsed_url._replace(query=updated_query_string).geturl()
33    return updated_url

Appends query parameters to a URL

def convert_to_websocket_url(base_url: str, endpoint: str):
37def convert_to_websocket_url(base_url: str, endpoint: str):
38    """
39    Converts a URL to a WebSocket URL
40    """
41    use_ssl = True  # Default to true
42    if re.match(r"^https?://", base_url, re.IGNORECASE):
43        if "http://" in base_url:
44            use_ssl = False  # Override to false if http:// is found
45        base_url = base_url.replace("https://", "").replace("http://", "")
46    if not re.match(r"^wss?://", base_url, re.IGNORECASE):
47        if use_ssl:
48            base_url = "wss://" + base_url
49        else:
50            base_url = "ws://" + base_url
51    parsed_url = urlparse(base_url)
52    domain = parsed_url.netloc
53    websocket_url = urlunparse((parsed_url.scheme, domain, endpoint, "", "", ""))
54    return websocket_url

Converts a URL to a WebSocket URL