"""Connection to the ISY."""
import asyncio
import logging
import ssl
import sys
from urllib.parse import quote, urlencode
import aiohttp
from .constants import (
_LOGGER,
LOG_DATE_FORMAT,
LOG_FORMAT,
LOG_LEVEL,
METHOD_GET,
URL_CLOCK,
URL_CONFIG,
URL_DEFINITIONS,
URL_MEMBERS,
URL_NETWORK,
URL_NODES,
URL_PING,
URL_PROGRAMS,
URL_RESOURCES,
URL_STATUS,
URL_SUBFOLDERS,
URL_VARIABLES,
VAR_INTEGER,
VAR_STATE,
XML_FALSE,
XML_TRUE,
)
from .exceptions import ISYConnectionError, ISYInvalidAuthError
MAX_RETRIES = 5
MAX_HTTPS_CONNECTIONS = 2
MAX_HTTP_CONNECTIONS = 5
RETRY_BACKOFF = [0.01, 0.10, 0.25, 1, 2] # Seconds
HTTP_OK = 200 # Valid request received, will run it
HTTP_UNAUTHORIZED = 401 # User authentication failed
HTTP_NOT_FOUND = 404 # Unrecognized request received and ignored
HTTP_SERVICE_UNAVAILABLE = 503 # Valid request received, system too busy to run it
HTTP_TIMEOUT = 30
HTTP_HEADERS = {
"Connection": "keep-alive",
"Keep-Alive": "5000",
"Accept-Encoding": "gzip, deflate",
}
[docs]class Connection:
"""Connection object to manage connection to and interaction with ISY."""
[docs] def __init__(
self,
address,
port,
username,
password,
use_https=False,
tls_ver=1.1,
webroot="",
websession=None,
):
"""Initialize the Connection object."""
if not len(_LOGGER.handlers):
logging.basicConfig(
format=LOG_FORMAT, datefmt=LOG_DATE_FORMAT, level=LOG_LEVEL
)
_LOGGER.addHandler(logging.NullHandler())
logging.getLogger("urllib3").setLevel(logging.WARNING)
self._address = address
self._port = port
self._username = username
self._password = password
self._auth = aiohttp.BasicAuth(self._username, self._password)
self._webroot = webroot.rstrip("/")
self.req_session = websession
self._tls_ver = tls_ver
self.use_https = use_https
self.semaphore = asyncio.Semaphore(
MAX_HTTPS_CONNECTIONS if use_https else MAX_HTTP_CONNECTIONS
)
if websession is None:
websession = get_new_client_session(use_https, tls_ver)
self.req_session = websession
self.sslcontext = get_sslcontext(use_https, tls_ver)
[docs] async def test_connection(self):
"""Test the connection and get the config for the ISY."""
config = await self.get_config(retries=None)
if not config:
_LOGGER.error("Could not connect to the ISY with the parameters provided.")
raise ISYConnectionError()
return config
[docs] async def close(self):
"""Cleanup connections and prepare for exit."""
await self.req_session.close()
@property
def connection_info(self):
"""Return the connection info required to connect to the ISY."""
connection_info = {}
connection_info["auth"] = self._auth.encode()
connection_info["addr"] = self._address
connection_info["port"] = int(self._port)
connection_info["passwd"] = self._password
connection_info["webroot"] = self._webroot
if self.use_https and self._tls_ver:
connection_info["tls"] = self._tls_ver
return connection_info
# COMMON UTILITIES
[docs] def compile_url(self, path, query=None):
"""Compile the URL to fetch from the ISY."""
url = "https://" if self.use_https else "http://"
url += f"{self._address}:{self._port}{self._webroot}"
if path is not None:
url += "/rest/" + "/".join([quote(item) for item in path])
if query is not None:
url += "?" + urlencode(query)
return url
[docs] async def request(self, url, retries=0, ok404=False, delay=0):
"""Execute request to ISY REST interface."""
_LOGGER.debug("ISY Request: %s", url)
if delay:
await asyncio.sleep(delay)
try:
async with self.semaphore, self.req_session.get(
url,
auth=self._auth,
headers=HTTP_HEADERS,
timeout=HTTP_TIMEOUT,
ssl=self.sslcontext,
) as res:
if res.status == HTTP_OK:
_LOGGER.debug("ISY Response Received.")
results = await res.text(encoding="utf-8", errors="ignore")
return results
if res.status == HTTP_NOT_FOUND:
if ok404:
_LOGGER.debug("ISY Response Received.")
res.release()
return ""
_LOGGER.error("ISY Reported an Invalid Command Received.")
res.release()
return None
if res.status == HTTP_UNAUTHORIZED:
_LOGGER.error("Invalid credentials provided for ISY connection.")
res.release()
raise ISYInvalidAuthError(
"Invalid credentials provided for ISY connection."
)
if res.status == HTTP_SERVICE_UNAVAILABLE:
_LOGGER.warning("ISY too busy to process request.")
res.release()
except asyncio.TimeoutError:
_LOGGER.warning("Timeout while trying to connect to the ISY.")
except (
aiohttp.ClientOSError,
aiohttp.ServerDisconnectedError,
):
_LOGGER.debug("ISY not ready or closed connection.")
except aiohttp.ClientResponseError as err:
_LOGGER.error(
"Client Response Error from ISY: %s %s.", err.status, err.message
)
except aiohttp.ClientError as err:
_LOGGER.error(
"ISY Could not receive response from device because of a network issue: %s",
type(err),
)
if retries is None:
raise ISYConnectionError()
if retries < MAX_RETRIES:
_LOGGER.debug(
"Retrying ISY Request in %ss, retry %s.",
RETRY_BACKOFF[retries],
retries + 1,
)
# sleep to allow the ISY to catch up
await asyncio.sleep(RETRY_BACKOFF[retries])
# recurse to try again
retry_result = await self.request(url, retries + 1, ok404=False)
return retry_result
# fail for good
_LOGGER.error(
"Bad ISY Request: (%s) Failed after %s retries.",
url,
retries,
)
return None
[docs] async def ping(self):
"""Test connection to the ISY and return True if alive."""
req_url = self.compile_url([URL_PING])
result = await self.request(req_url, ok404=True)
return result is not None
[docs] async def get_description(self):
"""Fetch the services description from the ISY."""
url = "https://" if self.use_https else "http://"
url += f"{self._address}:{self._port}{self._webroot}/desc"
result = await self.request(url)
return result
[docs] async def get_config(self, retries=0):
"""Fetch the configuration from the ISY."""
req_url = self.compile_url([URL_CONFIG])
result = await self.request(req_url, retries=retries)
return result
[docs] async def get_programs(self, address=None):
"""Fetch the list of programs from the ISY."""
addr = [URL_PROGRAMS]
if address is not None:
addr.append(str(address))
req_url = self.compile_url(addr, {URL_SUBFOLDERS: XML_TRUE})
result = await self.request(req_url)
return result
[docs] async def get_nodes(self):
"""Fetch the list of nodes/groups/scenes from the ISY."""
req_url = self.compile_url([URL_NODES], {URL_MEMBERS: XML_FALSE})
result = await self.request(req_url)
return result
[docs] async def get_status(self):
"""Fetch the status of nodes/groups/scenes from the ISY."""
req_url = self.compile_url([URL_STATUS])
result = await self.request(req_url)
return result
[docs] async def get_variable_defs(self):
"""Fetch the list of variables from the ISY."""
req_list = [
[URL_VARIABLES, URL_DEFINITIONS, VAR_INTEGER],
[URL_VARIABLES, URL_DEFINITIONS, VAR_STATE],
]
req_urls = [self.compile_url(req) for req in req_list]
results = await asyncio.gather(
*[self.request(req_url) for req_url in req_urls], return_exceptions=True
)
return results
[docs] async def get_variables(self):
"""Fetch the variable details from the ISY to update local copy."""
req_list = [
[URL_VARIABLES, METHOD_GET, VAR_INTEGER],
[URL_VARIABLES, METHOD_GET, VAR_STATE],
]
req_urls = [self.compile_url(req) for req in req_list]
results = await asyncio.gather(
*[self.request(req_url) for req_url in req_urls], return_exceptions=True
)
results = [r for r in results if r is not None] # Strip any bad requests.
result = "".join(results)
result = result.replace(
'</vars><?xml version="1.0" encoding="UTF-8"?><vars>', ""
)
return result
[docs] async def get_network(self):
"""Fetch the list of network resources from the ISY."""
req_url = self.compile_url([URL_NETWORK, URL_RESOURCES])
result = await self.request(req_url)
return result
[docs] async def get_time(self):
"""Fetch the system time info from the ISY."""
req_url = self.compile_url([URL_CLOCK])
result = await self.request(req_url)
return result
def get_new_client_session(use_https, tls_ver=1.1):
"""Create a new Client Session for Connecting."""
if use_https:
if not can_https(tls_ver):
raise (
ValueError(
"PyISY could not connect to the ISY. "
"Check log for SSL/TLS error."
)
)
return aiohttp.ClientSession(cookie_jar=aiohttp.CookieJar(unsafe=True))
return aiohttp.ClientSession()
def get_sslcontext(use_https, tls_ver=1.1):
"""Create an SSLContext object to use for the connections."""
if not use_https:
return None
if tls_ver == 1.1:
return ssl.SSLContext(ssl.PROTOCOL_TLSv1_1)
elif tls_ver == 1.2:
return ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
def can_https(tls_ver):
"""
Verify minimum requirements to use an HTTPS connection.
Returns boolean indicating whether HTTPS is available.
"""
output = True
# check python version
if sys.version_info < (3, 7):
_LOGGER.error("PyISY cannot use HTTPS: Invalid Python version. See docs.")
output = False
# check that Python was compiled against correct OpenSSL lib
if "PROTOCOL_TLSv1_1" not in dir(ssl):
_LOGGER.error(
"PyISY cannot use HTTPS: Compiled against old OpenSSL "
+ "library. See docs."
)
output = False
# check the requested TLS version
if tls_ver not in [1.1, 1.2]:
_LOGGER.error(
"PyISY cannot use HTTPS: Only TLS 1.1 and 1.2 are supported "
+ "by the ISY controller."
)
output = False
return output