Source code for controllers.audit_controller

# -*- coding: utf-8 -*-
# Copyright 2021 Cohesity Inc.

import logging
from cohesity_management_sdk.api_helper import APIHelper
from cohesity_management_sdk.configuration import Configuration
from cohesity_management_sdk.controllers.base_controller import BaseController
from cohesity_management_sdk.http.auth.auth_manager import AuthManager
from cohesity_management_sdk.models.cluster_audit_logs_search_result import ClusterAuditLogsSearchResult
from cohesity_management_sdk.exceptions.request_error_error_exception import RequestErrorErrorException


[docs]class AuditController(BaseController): """A Controller to access Endpoints in the cohesity_management_sdk API.""" def __init__(self, config=None, client=None, call_back=None): super(AuditController, self).__init__(client, call_back) self.logger = logging.getLogger(__name__) self.config = config
[docs] def get_audit_logs_actions(self): """Does a GET request to /public/auditLogs/actions. A string array of all the actions used to filter audit logs. Returns: list of string: Response from the API. Success Raises: APIException: When an error occurs while fetching the data from the remote API. This exception includes the HTTP Response code, an error message, and the HTTP body that was received in the request. """ try: self.logger.info('get_audit_logs_actions called.') # Prepare query URL self.logger.info('Preparing query URL for get_audit_logs_actions.') _url_path = '/public/auditLogs/actions' _query_builder = self.config.get_base_uri() _query_builder += _url_path _query_url = APIHelper.clean_url(_query_builder) # Prepare headers self.logger.info('Preparing headers for get_audit_logs_actions.') _headers = {'accept': 'application/json'} # Prepare and execute request self.logger.info( 'Preparing and executing request for get_audit_logs_actions.') _request = self.http_client.get(_query_url, headers=_headers) AuthManager.apply(_request, self.config) _context = self.execute_request(_request, name='get_audit_logs_actions') # Endpoint and global error handling using HTTP status codes. self.logger.info('Validating response for get_audit_logs_actions.') if _context.response.status_code == 0: raise RequestErrorErrorException('Error', _context) self.validate_response(_context) # Return appropriate type return APIHelper.json_deserialize(_context.response.raw_body) except Exception as e: self.logger.error(e, exc_info=True) raise
[docs] def get_audit_logs_categories(self): """Does a GET request to /public/auditLogs/categories. A string array of all the categories used to filter audit logs. Returns: list of string: Response from the API. Success Raises: APIException: When an error occurs while fetching the data from the remote API. This exception includes the HTTP Response code, an error message, and the HTTP body that was received in the request. """ try: self.logger.info('get_audit_logs_categories called.') # Prepare query URL self.logger.info( 'Preparing query URL for get_audit_logs_categories.') _url_path = '/public/auditLogs/categories' _query_builder = self.config.get_base_uri() _query_builder += _url_path _query_url = APIHelper.clean_url(_query_builder) # Prepare headers self.logger.info( 'Preparing headers for get_audit_logs_categories.') _headers = {'accept': 'application/json'} # Prepare and execute request self.logger.info( 'Preparing and executing request for get_audit_logs_categories.' ) _request = self.http_client.get(_query_url, headers=_headers) AuthManager.apply(_request, self.config) _context = self.execute_request(_request, name='get_audit_logs_categories') # Endpoint and global error handling using HTTP status codes. self.logger.info( 'Validating response for get_audit_logs_categories.') if _context.response.status_code == 0: raise RequestErrorErrorException('Error', _context) self.validate_response(_context) # Return appropriate type return APIHelper.json_deserialize(_context.response.raw_body) except Exception as e: self.logger.error(e, exc_info=True) raise
[docs] def search_cluster_audit_logs(self, user_names=None, domains=None, entity_types=None, actions=None, search=None, start_time_usecs=None, end_time_usecs=None, start_index=None, page_count=None, output_format=None, tenant_id=None, all_under_hierarchy=None): """Does a GET request to /public/auditLogs/cluster. When actions (such as a login or a Job being paused) occur on the Cohesity Cluster, the Cluster generates Audit Logs. If no parameters are specified, all logs currently on the Cohesity Cluster are returned. Specifying parameters filters the results that are returned. Args: user_names (list of string, optional): Filter by user names who cause the actions that generate Cluster Audit Logs. domains (list of string, optional): Filter by domains of users who cause the actions that trigger Cluster audit logs. entity_types (list of string, optional): Filter by entity types involved in the actions that generate the Cluster audit logs, such as User, Protection Job, View, etc. For a complete list, see the Category drop-down in the Admin > Audit Logs page of the Cohesity Dashboard. actions (list of string, optional): Filter by the actions that generate Cluster audit logs such as Activate, Cancel, Clone, Create, etc. For a complete list, see the Actions drop-down in the Admin > Audit Logs page of the Cohesity Dashboard. search (string, optional): Filter by matching a substring in entity name or details of the Cluster audit log. start_time_usecs (long|int, optional): Filter by a start time. Only Cluster audit logs that were generated after the specified time are returned. Specify the start time as a Unix epoch Timestamp (in microseconds). end_time_usecs (long|int, optional): Filter by a end time specified as a Unix epoch Timestamp (in microseconds). Only Cluster audit logs that were generated before the specified end time are returned. start_index (long|int, optional): Specifies an index number that can be used to return subsets of items in multiple requests. Break up the items to return into multiple requests by setting pageCount and startIndex to return a subsets of items in the search result. For example, set startIndex to 0 to get the first set of pageCount items for the first request. Increment startIndex by pageCount to get the next set of pageCount items for a next request. Continue until all items are returned and therefore the total number of returned items is equal to totalCount. Default value is 0. page_count (long|int, optional): Limit the number of items to return in the response for pagination purposes. Default value is 1000. output_format (string, optional): Specifies the format of the output such as csv and json. If not specified, the json format is returned. If csv is specified, a comma-separated list with a heading row is returned. tenant_id (string, optional): TenantId specifies the tenant whose action resulted in the audit log. all_under_hierarchy (bool, optional): AllUnderHierarchy specifies if logs of all the tenants under the hierarchy of tenant with id TenantId should be returned. Returns: ClusterAuditLogsSearchResult: Response from the API. Success Raises: APIException: When an error occurs while fetching the data from the remote API. This exception includes the HTTP Response code, an error message, and the HTTP body that was received in the request. """ try: self.logger.info('search_cluster_audit_logs called.') # Prepare query URL self.logger.info( 'Preparing query URL for search_cluster_audit_logs.') _url_path = '/public/auditLogs/cluster' _query_builder = self.config.get_base_uri() _query_builder += _url_path _query_parameters = { 'userNames': user_names, 'domains': domains, 'entityTypes': entity_types, 'actions': actions, 'search': search, 'startTimeUsecs': start_time_usecs, 'endTimeUsecs': end_time_usecs, 'startIndex': start_index, 'pageCount': page_count, 'outputFormat': output_format, 'tenantId': tenant_id, 'allUnderHierarchy': all_under_hierarchy } _query_builder = APIHelper.append_url_with_query_parameters( _query_builder, _query_parameters, Configuration.array_serialization) _query_url = APIHelper.clean_url(_query_builder) # Prepare headers self.logger.info( 'Preparing headers for search_cluster_audit_logs.') _headers = {'accept': 'application/json'} # Prepare and execute request self.logger.info( 'Preparing and executing request for search_cluster_audit_logs.' ) _request = self.http_client.get(_query_url, headers=_headers) AuthManager.apply(_request, self.config) _context = self.execute_request(_request, name='search_cluster_audit_logs') # Endpoint and global error handling using HTTP status codes. self.logger.info( 'Validating response for search_cluster_audit_logs.') if _context.response.status_code == 0: raise RequestErrorErrorException('Error', _context) self.validate_response(_context) # Return appropriate type return APIHelper.json_deserialize( _context.response.raw_body, ClusterAuditLogsSearchResult.from_dictionary) except Exception as e: self.logger.error(e, exc_info=True) raise