#!/usr/bin/python3 import requests import json import argparse import warnings import datetime import time from os.path import isfile from atexit import register from sys import exit, stdout import re # if used on windows its pyvim # pip install pyvim pyvmomi from pyVim.connect import * from pyVmomi import vmodl, vim # ########################## CMDB FUNCTIONS ############################################### REQUEST_HEADER = {"content-type": "application/json"} def establish_session(login, password, CMDB_API): """Authenticate with ZIH credentials. Resulting session will be used for any further requests. """ login_header = { **REQUEST_HEADER, "X-RPC-Auth-Username": login, "X-RPC-Auth-Password": password, } request = {"version": "2.0", "method": "idoit.login", "params": {}, "id": 1} raw_response = requests.post( CMDB_API, data=json.dumps(request), headers=login_header ) response = raw_response.json() if "result" in response and response["result"] != {}: REQUEST_HEADER["X-RPC-Auth-Session"] = response["result"]["session-id"] else: print(response) raise Exception("No session could be established") def api_request(CMDB_API, method, params={}): """Very simple handler for cmdb requests.""" call = {"version": "2.0", "method": method, "params": params, "id": 1} response = requests.post(CMDB_API, data=json.dumps(call), headers=REQUEST_HEADER) return response.json() def parse_category(obj_id, category, CMDB_API): """Execute 'cmdb.category.read' for given object and category. Return False if no result was received. """ raw_response = api_request( CMDB_API, "cmdb.category.read", { "category": category, "objID": obj_id, "status": 2, # Show normal entries, ignore archived and deleted }, ) if "result" not in raw_response: return False return raw_response["result"] def parse_object(obj_id, CMDB_API): """Execute 'cmdb.category.read' for given object and category. Return False if no result was received. """ raw_response = api_request( CMDB_API, "cmdb.objects.read", {"filter": [{"type": obj_id}]}, ) if "result" not in raw_response: return False return raw_response["result"] # an array which contains vms that already have been sorted to a service # some vms have multiple services HANDLED_VMS = [] # [{ # "name": "Servicename", # "id": 1, # "contacts": [ # { # "login": "kuki797d", # "primary": true # }, # { # "login": "test999", # "primary": false # } # ], # "vms": [ # { # "id": 82823, # "vcenter_id": 1 # }, # { # "id": 232332, # "vcenter_id": 2 # } # ] # },] # fetch_cmdb_data returns a list as described above def fetch_cmdb_data(login, password, CMDB_API, debug): """Fetch all required Data from the CMDB. Returns a list of dictionaries, that contain the data.""" establish_session(login, password, CMDB_API) SERVICES = [] # parse data from https://cmdb.zih.tu-dresden.de/?viewMode=1001&objTypeID=108 raw_services = parse_object(108, CMDB_API) i = 0 service_len = len(raw_services) print("Fetching CMDB Data...") for service in raw_services: service_map = {} service_map["name"] = service["title"] service_map["id"] = service["id"] service_id = service["id"] # parse data from contacts service_contacts = parse_category(int(service_id), "C__CATG__CONTACT", CMDB_API) service_map["contacts"] = [] primary_key_set = False has_contacts = False for contact in service_contacts: # check of contact is really a person, can also be a group if contact["contact"]["type"] == "C__OBJTYPE__PERSON": if contact["contact"]["login"] != "": contact_map = {} contact_map["login"] = contact["contact"]["login"] if int(contact["primary"]["value"]) == 1: primary_key_set = True contact_map["primary"] = True else: contact_map["primary"] = False service_map["contacts"].append(contact_map) has_contacts = True if not has_contacts: if debug: print( f"Warning: Service {service['title']} has no or no valid contacts." ) if not primary_key_set: try: # set the first contact as primary, as there should always be at least one contact that has the primary flag service_map["contacts"][0]["primary"] = True if debug: print( f"Warning: Service {service['title']} has no primary key set. This would result in no VM-Admin Role being given. Giving this Role to the first available contact." ) except IndexError: pass service_map["vms"] = [] # parse data from service components service_components = parse_category( int(service_id), "C__CATG__IT_SERVICE_COMPONENTS", CMDB_API ) has_vms = False for component in service_components: if component["connected_object"]["type"] == "C__OBJTYPE__VIRTUAL_SERVER": raw_name = component["connected_object"]["title"] # vms should always be only mapped to one service # some vms are in multiple services, this would result in inconsistent behaviour # the vm would be moved around in multiple serivce-folders if raw_name not in HANDLED_VMS: HANDLED_VMS.append(raw_name) vmname = raw_name.replace("(", "").replace(")", "") vm_id = vmname.split("-")[-2] vm_vcenter_id = vmname.split("-")[-1] vm_map = {"id": vm_id, "vcenter_id": vm_vcenter_id} service_map["vms"].append(vm_map) has_vms = True if not has_vms: if debug: print( f"Warning: Service {service['title']} has no VMs registered to it." ) SERVICES.append(service_map) i += 1 stdout.write(f"Service {i} / {service_len}") stdout.write("\r") api_request(CMDB_API, "idoit.logout") return SERVICES # ################################################################################### def arg_parser(): # Just the arguments parser my_parser = argparse.ArgumentParser(description="Metric script") my_parser.add_argument( "--vcenter_hostname", action="store", type=str, help="The VCenter hostname. Required if not given in authfile. Required if not given in authfile.", ) my_parser.add_argument( "--vcenter_username", action="store", type=str, help="The username to log into the VCenter. Required if not given in authfile.", ) my_parser.add_argument( "--vcenter_password", action="store", type=str, help="The password for your VCenter username. Required if not given in authfile.", ) my_parser.add_argument( "--cmdb_api", action="store", type=str, help="CMDB Api including port. Example: https://cmdb.zih.tu-dresden.de:31342", ) my_parser.add_argument( "--cmdb_username", action="store", type=str, help="The username to log into the CMDB. Required if not given in authfile.", ) my_parser.add_argument( "--cmdb_password", action="store", type=str, help="The CMDB password for your username. Required if not given in authfile.", ) my_parser.add_argument( "--authfile", "-A", action="store", type=str, help="Authfile that may contain hostname, username and/or password. Authfile values will always override CMD args.", ) my_parser.add_argument( "--config", action="store", type=str, help="Path to a config file.", ) my_parser.add_argument( "--debug", "-D", action=argparse.BooleanOptionalAction, help="Enable debug mode", ) my_parser.add_argument( "--test", action=argparse.BooleanOptionalAction, help="Test Mode, doesn't apply anything. Temporarily creates folders but deletes them after.", ) args = my_parser.parse_args() # check if any authentication method is supplied if ( args.vcenter_username is None or args.vcenter_password is None or args.cmdb_username is None or args.cmdb_password is None or args.cmdb_api is None ): if args.authfile is None: print("No / Not Enough Authentification Methods or CMDB Api supplied.") exit() else: if not isfile(args.authfile): print("The path specified from -A / --authfile does not exist") exit() else: authfile = open(args.authfile, "r") try: for line in authfile: if "vcenter-username" in line: args.vcenter_username = line.split("=")[1].strip() elif "vcenter-password" in line: args.vcenter_password = line.split("=")[1].strip() elif "cmdb-username" in line: args.cmdb_username = line.split("=")[1].strip() elif "cmdb-password" in line: args.cmdb_password = line.split("=")[1].strip() elif "cmdb-api" in line: args.cmdb_api = line.split("=")[1].strip() except Exception as E: print(f"Error parsing authfile.\n{E}") authfile.close() authfile.close() if args.test is None: args.test = False if args.config is not None: if not isfile(args.config): print("The path specified from -C / --config does not exist") exit() else: print("config file is required. -C / --config") exit() return args def find_root_folder(root_folder_id, content): """Find the working folder by ID. Returns the folder Object.""" container = content.rootFolder container_view = content.viewManager.CreateContainerView( container, [vim.Folder], True ) # iterate over all folders for folder in container_view.view: # get the root Folder folderid = str(folder).split("-")[-1] folderid = re.sub("[^0-9]", "", folderid) folderid = int(folderid) if folderid == root_folder_id: return folder def check_if_vm_in_folder(vm, folder, content): """Check if vm is in folder. Returns a Boolean.""" vm_in_folder = False container_view = content.viewManager.CreateContainerView( folder, [vim.VirtualMachine], True ) for n_vm in container_view.view: if n_vm == vm: vm_in_folder = True break return vm_in_folder def check_if_folder_exist(folder_tsid, root_folder, content): """Check if a folder exists in the root_folder thats contains the folder_tsid""" container_view = content.viewManager.CreateContainerView( root_folder, [vim.Folder], True ) for folderobj in container_view.view: tsid = folderobj.name if "TSID:" in folderobj.name: tsid = tsid.split("TSID:")[1] tsid = re.sub("[^0-9]", "", tsid) tsid = int(tsid) if tsid == folder_tsid: return folderobj return None def wait_for_task(task): while task.info.state not in [vim.TaskInfo.State.success, vim.TaskInfo.State.error]: time.sleep(0.1) print("Waiting for task to finish.") if task.info.state == vim.TaskInfo.State.error: return 1 return 0 def main(): # argparser args = arg_parser() if not args.debug: print( "Started the Script without --debug. Highly recommended to see changes made." ) if args.test: print( "Started in Test-Mode. No changes will be made. Folders will be temporarily created and ALL empty folders under the root folder will be deleted in the end." ) else: print( "Attention! This script will delete ALL empty folders under the given root folder. (Not Recursive)" ) with open(args.config) as json_file: VCENTERS = json.load(json_file) SERVICES = fetch_cmdb_data( args.cmdb_username, args.cmdb_password, args.cmdb_api, args.debug ) print("Sucessfully fetched CMDB Data.") for service in SERVICES: for vcenter in VCENTERS: # check if any vm from this service is located in this vcenter # if not go to the next vcenter found = False for vm in service["vms"]: if int(vm["vcenter_id"]) == int(vcenter["id"]): found = True if not found: continue if args.debug: print( f"Service: {service['name']} has VMs in VCenter {vcenter['hostname']}." ) # Connect to the vcenter warnings.filterwarnings("ignore", category=DeprecationWarning) s = ssl.SSLContext() s.verify_mode = ssl.CERT_NONE si = SmartConnect( host=vcenter["hostname"], user=args.vcenter_username, pwd=args.vcenter_password, sslContext=s, port=443, ) register(Disconnect, si) content = si.RetrieveContent() container = content.rootFolder auth_manager = content.authorizationManager # get all roles from the system and map the name to the roleId role_id_map = {} for role in auth_manager.roleList: role_id_map[role.name] = role.roleId # finding the 'root' folder by id ROOT_FOLDER = find_root_folder(int(vcenter["root_folder_id"]), content) if ROOT_FOLDER is None: print(f"Root Folder doesnt exist in {vcenter['hostname']}, Exiting") exit(1) # folder name like DHCP [TSID:236148] name_length = len(service["name"]) + 9 + len(str(service["id"])) if name_length > 79: re = name_length - 79 foldername = f"{service['name'][:-re]} [TSID:{service['id']}]" else: foldername = f"{service['name']} [TSID:{service['id']}]" # try to find a folder with this name else return None get_folder = check_if_folder_exist(service["id"], ROOT_FOLDER, content) # if return None create the Folder if get_folder is None: if args.debug: print( f"Creating Folder in VCenter {vcenter['hostname']}: {foldername} for Service {service['name']}" ) working_folder = ROOT_FOLDER.CreateFolder(foldername) else: working_folder = get_folder # retrieve existing permissions from folder existing_permissions = auth_manager.RetrieveEntityPermissions( working_folder, False ) vm_admin_permission = [] vm_ko_admin_permission = [] # If a user exists with the role VM-Admin and VM-Ko-Admin, # that is in the existing permissions but doesnt exist in the CMDB, remove this users permissions unknown_users = [] # go over all existing permissions for permission in existing_permissions: # only interesting roles = VM-Admin and VM-Ko-Admin if permission.roleId == role_id_map["VM-Admin"]: vm_admin_permission.append(permission.principal) unknown_users.append(str(permission.principal)) elif permission.roleId == role_id_map["VM-Ko-Admin"]: vm_ko_admin_permission.append(permission.principal) unknown_users.append(str(permission.principal)) # go over all contacts from this service for contact in service["contacts"]: # vcenter principal name like DOM\name vcenter_principal_name = f"DOM\\{contact['login']}" # if primary flag -> VM-Admin if contact["primary"] == True: # check if user already has this permission if vcenter_principal_name in vm_admin_permission: # remove this user from unknown users unknown_users.remove(vcenter_principal_name) # if not create it else: vcenter_set_permission_name = ( f"{contact['login']}@dom.tu-dresden.de" ) perm = vim.AuthorizationManager.Permission() perm.group = False perm.principal = vcenter_set_permission_name perm.propagate = True perm.roleId = role_id_map["VM-Admin"] if not args.test: auth_manager.SetEntityPermissions(working_folder, [perm]) if args.debug: print( f"Adding permission VM-Admin for user {vcenter_set_permission_name} on folder {foldername}" ) # if not primary flag -> VM-Ko-Admin else: # check if user already has those permissions if vcenter_principal_name in vm_ko_admin_permission: # remove this user from unknown users unknown_users.remove(vcenter_principal_name) else: vcenter_set_permission_name = ( f"{contact['login']}@dom.tu-dresden.de" ) perm = vim.AuthorizationManager.Permission() perm.group = False perm.principal = vcenter_set_permission_name perm.propagate = True perm.roleId = role_id_map["VM-Ko-Admin"] if not args.test: auth_manager.SetEntityPermissions(working_folder, [perm]) if args.debug: print( f"Adding permission VM-Ko-Admin for user {vcenter_set_permission_name} on folder {foldername}" ) for user in unknown_users: if args.debug: print( f"Removing permissions for {user} on Folder {foldername} as Users permissions were removed in the CMDB." ) if not args.test: auth_manager.RemoveEntityPermission(working_folder, user, False) vms_to_move = [] vms = content.viewManager.CreateContainerView( container, [vim.VirtualMachine], True ) # iterate over all VMs in the VCenter and try to find the specified VMs by the ID for vm_obj in vms.view: for vm in service["vms"]: # check if the vm is in the current vcenter if int(vm["vcenter_id"]) != vcenter["id"]: continue vmid = str(vm_obj).replace("'", "").split("-")[1] if vm["id"] == vmid: if not check_if_vm_in_folder(vm_obj, working_folder, content): vms_to_move.append(vm_obj) if args.debug: print( f"VM {vm_obj.summary.config.name} will be moved to folder {foldername}." ) # finally move the vms to the service folder if len(vms_to_move) > 0: if not args.test: task = working_folder.MoveInto(vms_to_move) ret_val = wait_for_task(task) if ret_val == 1: print(f"Error while moving vms: {str(vms_to_move)}") # delete all empty folders across all vcenters after everything is done for vcenter in VCENTERS: # Connect to the vcenter warnings.filterwarnings("ignore", category=DeprecationWarning) s = ssl.SSLContext() s.verify_mode = ssl.CERT_NONE si = SmartConnect( host=vcenter["hostname"], user=args.vcenter_username, pwd=args.vcenter_password, sslContext=s, port=443, ) register(Disconnect, si) content = si.RetrieveContent() container = content.rootFolder ROOT_FOLDER = find_root_folder(int(vcenter["root_folder_id"]), content) if ROOT_FOLDER is None: print(f"Root Folder doesnt exist in {vcenter['hostname']}, Exiting") exit(1) subfolders = content.viewManager.CreateContainerView( ROOT_FOLDER, [vim.Folder], False ) # delete all empty folders for folder in subfolders.view: # THIS WILL DELETE _ALL_ CHILDREN OF THIS FOLDER AS WELL WITHOUT ANY WARNING # SO USE A DOUBLE CHECK TO REALLY MAKE SURE THE FOLDER IS EMPTY check_content = folder.childEntity if len(check_content) == 0: check2 = content.viewManager.CreateContainerView( folder, [vim.VirtualMachine, vim.Folder], True ) if len(check2.view) == 0: # shouldnt throw exceptions but a folder was created where i have no deletion rights try: if args.debug: print( f"Destroying Folder {str(folder.name)}, as it has no contents." ) if not args.test: folder.Destroy_Task() ret_val = wait_for_task(task) if ret_val == 1: print(f"Error removing folder {folder.name}") except: pass if __name__ == "__main__": main()