# -*- coding: utf-8 -*-
# Copyright 2021 Cohesity Inc.
import logging
from cohesity_management_sdk.api_helper import APIHelper
from cohesity_management_sdk.configuration import Configuration
from cohesity_management_sdk.controllers.base_controller import BaseController
from cohesity_management_sdk.http.auth.auth_manager import AuthManager
from cohesity_management_sdk.models.active_directory_entry import ActiveDirectoryEntry
from cohesity_management_sdk.models.list_centrify_zone import ListCentrifyZone
from cohesity_management_sdk.models.domain_controllers import DomainControllers
from cohesity_management_sdk.models.active_directory_principal import ActiveDirectoryPrincipal
from cohesity_management_sdk.models.added_active_directory_principal import AddedActiveDirectoryPrincipal
from cohesity_management_sdk.exceptions.request_error_error_exception import RequestErrorErrorException
[docs]class ActiveDirectoryController(BaseController):
"""A Controller to access Endpoints in the cohesity_management_sdk API."""
def __init__(self, config=None, client=None, call_back=None):
super(ActiveDirectoryController, self).__init__(client, call_back)
self.logger = logging.getLogger(__name__)
self.config = config
[docs] def delete_active_directory_entry(self, body):
"""Does a DELETE request to /public/activeDirectory.
Deletes the join of the Cohesity Cluster to the specified
Active Directory domain. After the deletion, the Cohesity Cluster
no longer has access to the principals on the Active Directory.
For example, you can no longer log in to the Cohesity Cluster
with a user defined in a principal group of the Active Directory
domain.
Args:
body (ActiveDirectoryEntry): Request to delete a join with an
Active Directory.
Returns:
void: Response from the API. No Content
Raises:
APIException: When an error occurs while fetching the data from
the remote API. This exception includes the HTTP Response
code, an error message, and the HTTP body that was received in
the request.
"""
try:
self.logger.info('delete_active_directory_entry called.')
# Validate required parameters
self.logger.info(
'Validating required parameters for delete_active_directory_entry.'
)
self.validate_parameters(body=body)
# Prepare query URL
self.logger.info(
'Preparing query URL for delete_active_directory_entry.')
_url_path = '/public/activeDirectory'
_query_builder = self.config.get_base_uri()
_query_builder += _url_path
_query_url = APIHelper.clean_url(_query_builder)
# Prepare headers
self.logger.info(
'Preparing headers for delete_active_directory_entry.')
_headers = {'content-type': 'application/json; charset=utf-8'}
# Prepare and execute request
self.logger.info(
'Preparing and executing request for delete_active_directory_entry.'
)
_request = self.http_client.delete(
_query_url,
headers=_headers,
parameters=APIHelper.json_serialize(body))
AuthManager.apply(_request, self.config)
_context = self.execute_request(
_request, name='delete_active_directory_entry')
# Endpoint and global error handling using HTTP status codes.
self.logger.info(
'Validating response for delete_active_directory_entry.')
if _context.response.status_code == 0:
raise RequestErrorErrorException('Error', _context)
self.validate_response(_context)
except Exception as e:
self.logger.error(e, exc_info=True)
raise
[docs] def get_active_directory_entry(self,
domains=None,
tenant_ids=None,
all_under_hierarchy=None):
"""Does a GET request to /public/activeDirectory.
After a Cohesity Cluster has been joined to an Active Directory
domain,
the users and groups in the domain can be authenticated on the
Cohesity Cluster
using their Active Directory credentials.
NOTE: The userName and password fields are not populated by this
operation.
Args:
domains (list of string, optional): Specifies the domains to fetch
active directory entries.
tenant_ids (list of string, optional): TenantIds contains ids of
the tenants for which objects are to be returned.
all_under_hierarchy (bool, optional): AllUnderHierarchy specifies
if objects of all the tenants under the hierarchy of the
logged in user's organization should be returned.
Returns:
list of ActiveDirectoryEntry: Response from the API. Success
Raises:
APIException: When an error occurs while fetching the data from
the remote API. This exception includes the HTTP Response
code, an error message, and the HTTP body that was received in
the request.
"""
try:
self.logger.info('get_active_directory_entry called.')
# Prepare query URL
self.logger.info(
'Preparing query URL for get_active_directory_entry.')
_url_path = '/public/activeDirectory'
_query_builder = self.config.get_base_uri()
_query_builder += _url_path
_query_parameters = {
'domains': domains,
'tenantIds': tenant_ids,
'allUnderHierarchy': all_under_hierarchy
}
_query_builder = APIHelper.append_url_with_query_parameters(
_query_builder, _query_parameters,
Configuration.array_serialization)
_query_url = APIHelper.clean_url(_query_builder)
# Prepare headers
self.logger.info(
'Preparing headers for get_active_directory_entry.')
_headers = {'accept': 'application/json'}
# Prepare and execute request
self.logger.info(
'Preparing and executing request for get_active_directory_entry.'
)
_request = self.http_client.get(_query_url, headers=_headers)
AuthManager.apply(_request, self.config)
_context = self.execute_request(_request,
name='get_active_directory_entry')
# Endpoint and global error handling using HTTP status codes.
self.logger.info(
'Validating response for get_active_directory_entry.')
if _context.response.status_code == 0:
raise RequestErrorErrorException('Error', _context)
self.validate_response(_context)
# Return appropriate type
return APIHelper.json_deserialize(
_context.response.raw_body,
ActiveDirectoryEntry.from_dictionary)
except Exception as e:
self.logger.error(e, exc_info=True)
raise
[docs] def create_active_directory_entry(self, body):
"""Does a POST request to /public/activeDirectory.
After a Cohesity Cluster has been joined to an Active Directory
domain,
the users and groups in the domain can be authenticated on the
Cohesity Cluster
using their Active Directory credentials.
Args:
body (CreateActiveDirectoryEntryParams): Request to join an Active
Directory.
Returns:
ActiveDirectoryEntry: Response from the API. Success
Raises:
APIException: When an error occurs while fetching the data from
the remote API. This exception includes the HTTP Response
code, an error message, and the HTTP body that was received in
the request.
"""
try:
self.logger.info('create_active_directory_entry called.')
# Validate required parameters
self.logger.info(
'Validating required parameters for create_active_directory_entry.'
)
self.validate_parameters(body=body)
# Prepare query URL
self.logger.info(
'Preparing query URL for create_active_directory_entry.')
_url_path = '/public/activeDirectory'
_query_builder = self.config.get_base_uri()
_query_builder += _url_path
_query_url = APIHelper.clean_url(_query_builder)
# Prepare headers
self.logger.info(
'Preparing headers for create_active_directory_entry.')
_headers = {
'accept': 'application/json',
'content-type': 'application/json; charset=utf-8'
}
# Prepare and execute request
self.logger.info(
'Preparing and executing request for create_active_directory_entry.'
)
_request = self.http_client.post(
_query_url,
headers=_headers,
parameters=APIHelper.json_serialize(body))
AuthManager.apply(_request, self.config)
_context = self.execute_request(
_request, name='create_active_directory_entry')
# Endpoint and global error handling using HTTP status codes.
self.logger.info(
'Validating response for create_active_directory_entry.')
if _context.response.status_code == 0:
raise RequestErrorErrorException('Error', _context)
self.validate_response(_context)
# Return appropriate type
return APIHelper.json_deserialize(
_context.response.raw_body,
ActiveDirectoryEntry.from_dictionary)
except Exception as e:
self.logger.error(e, exc_info=True)
raise
[docs] def list_centrify_zones(self, domain_name=None):
"""Does a GET request to /public/activeDirectory/centrifyZones.
Fetches the list centrify zones of an active directory domain.
Args:
domain_name (string, optional): Specifies the fully qualified
domain name (FQDN) of an Active Directory.
Returns:
list of ListCentrifyZone: Response from the API. Success
Raises:
APIException: When an error occurs while fetching the data from
the remote API. This exception includes the HTTP Response
code, an error message, and the HTTP body that was received in
the request.
"""
try:
self.logger.info('list_centrify_zones called.')
# Prepare query URL
self.logger.info('Preparing query URL for list_centrify_zones.')
_url_path = '/public/activeDirectory/centrifyZones'
_query_builder = self.config.get_base_uri()
_query_builder += _url_path
_query_parameters = {'domainName': domain_name}
_query_builder = APIHelper.append_url_with_query_parameters(
_query_builder, _query_parameters,
Configuration.array_serialization)
_query_url = APIHelper.clean_url(_query_builder)
# Prepare headers
self.logger.info('Preparing headers for list_centrify_zones.')
_headers = {'accept': 'application/json'}
# Prepare and execute request
self.logger.info(
'Preparing and executing request for list_centrify_zones.')
_request = self.http_client.get(_query_url, headers=_headers)
AuthManager.apply(_request, self.config)
_context = self.execute_request(_request,
name='list_centrify_zones')
# Endpoint and global error handling using HTTP status codes.
self.logger.info('Validating response for list_centrify_zones.')
if _context.response.status_code == 0:
raise RequestErrorErrorException('Error', _context)
self.validate_response(_context)
# Return appropriate type
return APIHelper.json_deserialize(_context.response.raw_body,
ListCentrifyZone.from_dictionary)
except Exception as e:
self.logger.error(e, exc_info=True)
raise
[docs] def get_active_directory_domain_controllers(self, domain_name=None):
"""Does a GET request to /public/activeDirectory/domainControllers.
List the domain controllers for a domain.
Args:
domain_name (string, optional): Specifies the domain name
Returns:
DomainControllers: Response from the API. Success
Raises:
APIException: When an error occurs while fetching the data from
the remote API. This exception includes the HTTP Response
code, an error message, and the HTTP body that was received in
the request.
"""
try:
self.logger.info('get_active_directory_domain_controllers called.')
# Prepare query URL
self.logger.info(
'Preparing query URL for get_active_directory_domain_controllers.'
)
_url_path = '/public/activeDirectory/domainControllers'
_query_builder = self.config.get_base_uri()
_query_builder += _url_path
_query_parameters = {'domainName': domain_name}
_query_builder = APIHelper.append_url_with_query_parameters(
_query_builder, _query_parameters,
Configuration.array_serialization)
_query_url = APIHelper.clean_url(_query_builder)
# Prepare headers
self.logger.info(
'Preparing headers for get_active_directory_domain_controllers.'
)
_headers = {'accept': 'application/json'}
# Prepare and execute request
self.logger.info(
'Preparing and executing request for get_active_directory_domain_controllers.'
)
_request = self.http_client.get(_query_url, headers=_headers)
AuthManager.apply(_request, self.config)
_context = self.execute_request(
_request, name='get_active_directory_domain_controllers')
# Endpoint and global error handling using HTTP status codes.
self.logger.info(
'Validating response for get_active_directory_domain_controllers.'
)
if _context.response.status_code == 0:
raise RequestErrorErrorException('Error', _context)
self.validate_response(_context)
# Return appropriate type
return APIHelper.json_deserialize(
_context.response.raw_body, DomainControllers.from_dictionary)
except Exception as e:
self.logger.error(e, exc_info=True)
raise
[docs] def search_active_directory_principals(self,
domain=None,
object_class=None,
search=None,
sids=None,
include_computers=None):
"""Does a GET request to /public/activeDirectory/principals.
Optionally limit the search results by specifying security identifiers
(SIDs),
an object class (user or group) or a substring.
You can specify SIDs or a substring but not both.
Args:
domain (string, optional): Specifies the domain name of the
principals to search. If specified the principals in that
domain are searched. Domain could be an Active Directory
domain joined by the Cluster or any one of the trusted domains
of the Active Directory domain or the LOCAL domain. If not
specified, all the domains are searched.
object_class (ObjectClassSearchActiveDirectoryPrincipalsEnum,
optional): Optionally filter by a principal object class such
as 'kGroup' or 'kUser'. If 'kGroup' is specified, only group
principals are returned. If 'kUser' is specified, only user
principals are returned. If not specified, both group and user
principals are returned. 'kUser' specifies a user object
class. 'kGroup' specifies a group object class. 'kComputer'
specifies a computer object class. 'kWellKnownPrincipal'
specifies a well known principal.
search (string, optional): Optionally filter by matching a
substring. Only principals in the with a name or
sAMAccountName that matches part or all of the specified
substring are returned. If specified, a 'sids' parameter
should not be specified.
sids (list of string, optional): Optionally filter by a list of
security identifiers (SIDs) found in the specified domain.
Only principals matching the specified SIDs are returned. If
specified, a 'search' parameter should not be specified.
include_computers (bool, optional): Specifies if Computer/GMSA
accounts need to be included in this search.
Returns:
list of ActiveDirectoryPrincipal: Response from the API. Success
Raises:
APIException: When an error occurs while fetching the data from
the remote API. This exception includes the HTTP Response
code, an error message, and the HTTP body that was received in
the request.
"""
try:
self.logger.info('search_active_directory_principals called.')
# Prepare query URL
self.logger.info(
'Preparing query URL for search_active_directory_principals.')
_url_path = '/public/activeDirectory/principals'
_query_builder = self.config.get_base_uri()
_query_builder += _url_path
_query_parameters = {
'domain': domain,
'objectClass': object_class,
'search': search,
'sids': sids,
'includeComputers': include_computers
}
_query_builder = APIHelper.append_url_with_query_parameters(
_query_builder, _query_parameters,
Configuration.array_serialization)
_query_url = APIHelper.clean_url(_query_builder)
# Prepare headers
self.logger.info(
'Preparing headers for search_active_directory_principals.')
_headers = {'accept': 'application/json'}
# Prepare and execute request
self.logger.info(
'Preparing and executing request for search_active_directory_principals.'
)
_request = self.http_client.get(_query_url, headers=_headers)
AuthManager.apply(_request, self.config)
_context = self.execute_request(
_request, name='search_active_directory_principals')
# Endpoint and global error handling using HTTP status codes.
self.logger.info(
'Validating response for search_active_directory_principals.')
if _context.response.status_code == 0:
raise RequestErrorErrorException('Error', _context)
self.validate_response(_context)
# Return appropriate type
return APIHelper.json_deserialize(
_context.response.raw_body,
ActiveDirectoryPrincipal.from_dictionary)
except Exception as e:
self.logger.error(e, exc_info=True)
raise
[docs] def add_active_directory_principals(self, body=None):
"""Does a POST request to /public/activeDirectory/principals.
After a group or user has been added to a Cohesity Cluster,
the referenced Active Directory principal can be used by the Cohesity
Cluster.
In addition, this operation maps Cohesity roles with a group or user
and
this mapping defines the privileges allowed on the Cohesity Cluster
for the
group or user.
For example if an 'management' group is created on the Cohesity
Cluster
for the Active Directory 'management' principal group and is
associated with the Cohesity 'View' role, all users in the
referenced Active Directory 'management' principal group can log in to
the
Cohesity Dashboard but will only have view-only privileges.
These users cannot create new Protection Jobs, Policies, Views, etc.
NOTE: Local Cohesity users and groups cannot be created by this
operation.
Local Cohesity users or groups do not have an associated Active
Directory
principals and are created directly in the default LOCAL domain.
Args:
body (list of ActiveDirectoryPrincipalsAddParameters, optional):
Request to add groups or users to the Cohesity Cluster.
Returns:
list of AddedActiveDirectoryPrincipal: Response from the API.
Success
Raises:
APIException: When an error occurs while fetching the data from
the remote API. This exception includes the HTTP Response
code, an error message, and the HTTP body that was received in
the request.
"""
try:
self.logger.info('add_active_directory_principals called.')
# Prepare query URL
self.logger.info(
'Preparing query URL for add_active_directory_principals.')
_url_path = '/public/activeDirectory/principals'
_query_builder = self.config.get_base_uri()
_query_builder += _url_path
_query_url = APIHelper.clean_url(_query_builder)
# Prepare headers
self.logger.info(
'Preparing headers for add_active_directory_principals.')
_headers = {
'accept': 'application/json',
'content-type': 'application/json; charset=utf-8'
}
# Prepare and execute request
self.logger.info(
'Preparing and executing request for add_active_directory_principals.'
)
_request = self.http_client.post(
_query_url,
headers=_headers,
parameters=APIHelper.json_serialize(body))
AuthManager.apply(_request, self.config)
_context = self.execute_request(
_request, name='add_active_directory_principals')
# Endpoint and global error handling using HTTP status codes.
self.logger.info(
'Validating response for add_active_directory_principals.')
if _context.response.status_code == 0:
raise RequestErrorErrorException('Error', _context)
self.validate_response(_context)
# Return appropriate type
return APIHelper.json_deserialize(
_context.response.raw_body,
AddedActiveDirectoryPrincipal.from_dictionary)
except Exception as e:
self.logger.error(e, exc_info=True)
raise
[docs] def create_enable_trusted_domain_discovery(self, body, name):
"""Does a POST request to /public/activeDirectory/{name}/enableTrustedDomainState.
Updates the states of trusted domains discovery.
Args:
body (UpdateTrustedDomainEnableParams): Request to update enable
trusted domains state of an Active Directory.
name (string): Specifies the Active Directory Domain Name.
Returns:
ActiveDirectoryEntry: Response from the API. Success
Raises:
APIException: When an error occurs while fetching the data from
the remote API. This exception includes the HTTP Response
code, an error message, and the HTTP body that was received in
the request.
"""
try:
self.logger.info('create_enable_trusted_domain_discovery called.')
# Validate required parameters
self.logger.info(
'Validating required parameters for create_enable_trusted_domain_discovery.'
)
self.validate_parameters(body=body, name=name)
# Prepare query URL
self.logger.info(
'Preparing query URL for create_enable_trusted_domain_discovery.'
)
_url_path = '/public/activeDirectory/{name}/enableTrustedDomainState'
_url_path = APIHelper.append_url_with_template_parameters(
_url_path, {'name': name})
_query_builder = self.config.get_base_uri()
_query_builder += _url_path
_query_url = APIHelper.clean_url(_query_builder)
# Prepare headers
self.logger.info(
'Preparing headers for create_enable_trusted_domain_discovery.'
)
_headers = {
'accept': 'application/json',
'content-type': 'application/json; charset=utf-8'
}
# Prepare and execute request
self.logger.info(
'Preparing and executing request for create_enable_trusted_domain_discovery.'
)
_request = self.http_client.post(
_query_url,
headers=_headers,
parameters=APIHelper.json_serialize(body))
AuthManager.apply(_request, self.config)
_context = self.execute_request(
_request, name='create_enable_trusted_domain_discovery')
# Endpoint and global error handling using HTTP status codes.
self.logger.info(
'Validating response for create_enable_trusted_domain_discovery.'
)
if _context.response.status_code == 0:
raise RequestErrorErrorException('Error', _context)
self.validate_response(_context)
# Return appropriate type
return APIHelper.json_deserialize(
_context.response.raw_body,
ActiveDirectoryEntry.from_dictionary)
except Exception as e:
self.logger.error(e, exc_info=True)
raise
[docs] def update_active_directory_id_mapping(self, body, name):
"""Does a PUT request to /public/activeDirectory/{name}/idMappingInfo.
Updates the user id mapping info of an Active Directory.
Args:
body (IdMappingInfo): Request to update user id mapping of an
Active Directory.
name (string): Specifies the Active Directory Domain Name.
Returns:
ActiveDirectoryEntry: Response from the API. Success
Raises:
APIException: When an error occurs while fetching the data from
the remote API. This exception includes the HTTP Response
code, an error message, and the HTTP body that was received in
the request.
"""
try:
self.logger.info('update_active_directory_id_mapping called.')
# Validate required parameters
self.logger.info(
'Validating required parameters for update_active_directory_id_mapping.'
)
self.validate_parameters(body=body, name=name)
# Prepare query URL
self.logger.info(
'Preparing query URL for update_active_directory_id_mapping.')
_url_path = '/public/activeDirectory/{name}/idMappingInfo'
_url_path = APIHelper.append_url_with_template_parameters(
_url_path, {'name': name})
_query_builder = self.config.get_base_uri()
_query_builder += _url_path
_query_url = APIHelper.clean_url(_query_builder)
# Prepare headers
self.logger.info(
'Preparing headers for update_active_directory_id_mapping.')
_headers = {
'accept': 'application/json',
'content-type': 'application/json; charset=utf-8'
}
# Prepare and execute request
self.logger.info(
'Preparing and executing request for update_active_directory_id_mapping.'
)
_request = self.http_client.put(
_query_url,
headers=_headers,
parameters=APIHelper.json_serialize(body))
AuthManager.apply(_request, self.config)
_context = self.execute_request(
_request, name='update_active_directory_id_mapping')
# Endpoint and global error handling using HTTP status codes.
self.logger.info(
'Validating response for update_active_directory_id_mapping.')
if _context.response.status_code == 0:
raise RequestErrorErrorException('Error', _context)
self.validate_response(_context)
# Return appropriate type
return APIHelper.json_deserialize(
_context.response.raw_body,
ActiveDirectoryEntry.from_dictionary)
except Exception as e:
self.logger.error(e, exc_info=True)
raise
[docs] def update_active_directory_ignored_trusted_domains(self, body, name):
"""Does a PUT request to /public/activeDirectory/{name}/ignoredTrustedDomains.
Updates the list of trusted domains to be ignored during trusted
domain discovery of an Active Directory.
Args:
body (UpdateIgnoredTrustedDomainsParams): Request to update the
list of ignored trusted domains of an AD.
name (string): Specifies the Active Directory Domain Name.
Returns:
ActiveDirectoryEntry: Response from the API. Success
Raises:
APIException: When an error occurs while fetching the data from
the remote API. This exception includes the HTTP Response
code, an error message, and the HTTP body that was received in
the request.
"""
try:
self.logger.info(
'update_active_directory_ignored_trusted_domains called.')
# Validate required parameters
self.logger.info(
'Validating required parameters for update_active_directory_ignored_trusted_domains.'
)
self.validate_parameters(body=body, name=name)
# Prepare query URL
self.logger.info(
'Preparing query URL for update_active_directory_ignored_trusted_domains.'
)
_url_path = '/public/activeDirectory/{name}/ignoredTrustedDomains'
_url_path = APIHelper.append_url_with_template_parameters(
_url_path, {'name': name})
_query_builder = self.config.get_base_uri()
_query_builder += _url_path
_query_url = APIHelper.clean_url(_query_builder)
# Prepare headers
self.logger.info(
'Preparing headers for update_active_directory_ignored_trusted_domains.'
)
_headers = {
'accept': 'application/json',
'content-type': 'application/json; charset=utf-8'
}
# Prepare and execute request
self.logger.info(
'Preparing and executing request for update_active_directory_ignored_trusted_domains.'
)
_request = self.http_client.put(
_query_url,
headers=_headers,
parameters=APIHelper.json_serialize(body))
AuthManager.apply(_request, self.config)
_context = self.execute_request(
_request,
name='update_active_directory_ignored_trusted_domains')
# Endpoint and global error handling using HTTP status codes.
self.logger.info(
'Validating response for update_active_directory_ignored_trusted_domains.'
)
if _context.response.status_code == 0:
raise RequestErrorErrorException('Error', _context)
self.validate_response(_context)
# Return appropriate type
return APIHelper.json_deserialize(
_context.response.raw_body,
ActiveDirectoryEntry.from_dictionary)
except Exception as e:
self.logger.error(e, exc_info=True)
raise
[docs] def update_active_directory_ldap_provider(self, body, name):
"""Does a PUT request to /public/activeDirectory/{name}/ldapProvider.
Updates the LDAP provide Id for an Active Directory domain.
Args:
body (UpdateLdapProviderParams): Request to update the LDAP
provider info.
name (string): Specifies the Active Directory Domain Name.
Returns:
ActiveDirectoryEntry: Response from the API. Success
Raises:
APIException: When an error occurs while fetching the data from
the remote API. This exception includes the HTTP Response
code, an error message, and the HTTP body that was received in
the request.
"""
try:
self.logger.info('update_active_directory_ldap_provider called.')
# Validate required parameters
self.logger.info(
'Validating required parameters for update_active_directory_ldap_provider.'
)
self.validate_parameters(body=body, name=name)
# Prepare query URL
self.logger.info(
'Preparing query URL for update_active_directory_ldap_provider.'
)
_url_path = '/public/activeDirectory/{name}/ldapProvider'
_url_path = APIHelper.append_url_with_template_parameters(
_url_path, {'name': name})
_query_builder = self.config.get_base_uri()
_query_builder += _url_path
_query_url = APIHelper.clean_url(_query_builder)
# Prepare headers
self.logger.info(
'Preparing headers for update_active_directory_ldap_provider.')
_headers = {
'accept': 'application/json',
'content-type': 'application/json; charset=utf-8'
}
# Prepare and execute request
self.logger.info(
'Preparing and executing request for update_active_directory_ldap_provider.'
)
_request = self.http_client.put(
_query_url,
headers=_headers,
parameters=APIHelper.json_serialize(body))
AuthManager.apply(_request, self.config)
_context = self.execute_request(
_request, name='update_active_directory_ldap_provider')
# Endpoint and global error handling using HTTP status codes.
self.logger.info(
'Validating response for update_active_directory_ldap_provider.'
)
if _context.response.status_code == 0:
raise RequestErrorErrorException('Error', _context)
self.validate_response(_context)
# Return appropriate type
return APIHelper.json_deserialize(
_context.response.raw_body,
ActiveDirectoryEntry.from_dictionary)
except Exception as e:
self.logger.error(e, exc_info=True)
raise
[docs] def update_active_directory_machine_accounts(self, body, name):
"""Does a POST request to /public/activeDirectory/{name}/machineAccounts.
Updates the machine accounts of an Active Directory.
Args:
body (UpdateMachineAccountsParams): Request to update machine
accounts of an Active Directory.
name (string): Specifies the Active Directory Domain Name.
Returns:
ActiveDirectoryEntry: Response from the API. Success
Raises:
APIException: When an error occurs while fetching the data from
the remote API. This exception includes the HTTP Response
code, an error message, and the HTTP body that was received in
the request.
"""
try:
self.logger.info(
'update_active_directory_machine_accounts called.')
# Validate required parameters
self.logger.info(
'Validating required parameters for update_active_directory_machine_accounts.'
)
self.validate_parameters(body=body, name=name)
# Prepare query URL
self.logger.info(
'Preparing query URL for update_active_directory_machine_accounts.'
)
_url_path = '/public/activeDirectory/{name}/machineAccounts'
_url_path = APIHelper.append_url_with_template_parameters(
_url_path, {'name': name})
_query_builder = self.config.get_base_uri()
_query_builder += _url_path
_query_url = APIHelper.clean_url(_query_builder)
# Prepare headers
self.logger.info(
'Preparing headers for update_active_directory_machine_accounts.'
)
_headers = {
'accept': 'application/json',
'content-type': 'application/json; charset=utf-8'
}
# Prepare and execute request
self.logger.info(
'Preparing and executing request for update_active_directory_machine_accounts.'
)
_request = self.http_client.post(
_query_url,
headers=_headers,
parameters=APIHelper.json_serialize(body))
AuthManager.apply(_request, self.config)
_context = self.execute_request(
_request, name='update_active_directory_machine_accounts')
# Endpoint and global error handling using HTTP status codes.
self.logger.info(
'Validating response for update_active_directory_machine_accounts.'
)
if _context.response.status_code == 0:
raise RequestErrorErrorException('Error', _context)
self.validate_response(_context)
# Return appropriate type
return APIHelper.json_deserialize(
_context.response.raw_body,
ActiveDirectoryEntry.from_dictionary)
except Exception as e:
self.logger.error(e, exc_info=True)
raise
[docs] def update_preferred_domain_controllers(self, body, name):
"""Does a PUT request to /public/activeDirectory/{name}/preferredDomainControllers.
Updates the preferred domain controllers of an Active Directory
Args:
body (list of PreferredDomainController): Request to update
preferred domain controllers of an Active Directory.
name (string): Specifies the Active Directory Domain Name.
Returns:
ActiveDirectoryEntry: Response from the API. Success
Raises:
APIException: When an error occurs while fetching the data from
the remote API. This exception includes the HTTP Response
code, an error message, and the HTTP body that was received in
the request.
"""
try:
self.logger.info('update_preferred_domain_controllers called.')
# Validate required parameters
self.logger.info(
'Validating required parameters for update_preferred_domain_controllers.'
)
self.validate_parameters(body=body, name=name)
# Prepare query URL
self.logger.info(
'Preparing query URL for update_preferred_domain_controllers.')
_url_path = '/public/activeDirectory/{name}/preferredDomainControllers'
_url_path = APIHelper.append_url_with_template_parameters(
_url_path, {'name': name})
_query_builder = self.config.get_base_uri()
_query_builder += _url_path
_query_url = APIHelper.clean_url(_query_builder)
# Prepare headers
self.logger.info(
'Preparing headers for update_preferred_domain_controllers.')
_headers = {
'accept': 'application/json',
'content-type': 'application/json; charset=utf-8'
}
# Prepare and execute request
self.logger.info(
'Preparing and executing request for update_preferred_domain_controllers.'
)
_request = self.http_client.put(
_query_url,
headers=_headers,
parameters=APIHelper.json_serialize(body))
AuthManager.apply(_request, self.config)
_context = self.execute_request(
_request, name='update_preferred_domain_controllers')
# Endpoint and global error handling using HTTP status codes.
self.logger.info(
'Validating response for update_preferred_domain_controllers.')
if _context.response.status_code == 0:
raise RequestErrorErrorException('Error', _context)
self.validate_response(_context)
# Return appropriate type
return APIHelper.json_deserialize(
_context.response.raw_body,
ActiveDirectoryEntry.from_dictionary)
except Exception as e:
self.logger.error(e, exc_info=True)
raise