#/nova/compute/manager.py: ComputeManager --> SchedulerDependentManager @manager.periodic_task def_report_driver_status(self,context): curr_time=time.time() ifcurr_time-self._last_host_check>FLAGS.host_state_interval: self._last_host_check=curr_time LOG.info(_("Updating host status")) # This will grab info about the host and queue it # to be sent to the Schedulers. capabilities=self.driver.get_host_stats(refresh=True) capabilities['host_ip']=FLAGS.my_ip self.update_service_capabilities(capabilities)
#/nova/manager.py: SchedulerDependentManager @periodic_task def_publish_service_capabilities(self,context): """Pass data back to the scheduler at a periodic interval.""" ifself.last_capabilities: LOG.debug(_('Notifying Schedulers of capabilities ...')) self.scheduler_rpcapi.update_service_capabilities(context, self.service_name,self.host,self.last_capabilities)
#/nova/schedule/host_manager.py: HostManager defupdate_service_capabilities(self,service_name,host,capabilities): """Update the per-service capabilities based on this notification.""" LOG.debug(_("Received %(service_name)s service update from " "%(host)s.")%locals()) service_caps=self.service_states.get(host,{}) # Copy the capabilities, so we don't modify the original dict capab_copy=dict(capabilities) capab_copy["timestamp"]=timeutils.utcnow()# Reported time service_caps[service_name]=capab_copy self.service_states[host]=service_caps
defget_all_host_states(self,context,topic): """Returns a dict of all the hosts the HostManager knows about. Also, each of the consumable resources in HostState are pre-populated and adjusted based on data in the db.
For example: {'192.168.1.100': HostState(), ...}
Note: this can be very slow with a lot of instances. InstanceType table isn't required since a copy is stored with the instance (in case the InstanceType changed since the instance was created)."""
iftopic!='compute': raiseNotImplementedError(_( "host_manager only implemented for 'compute'"))
# Get resource usage across the available compute nodes: compute_nodes=db.compute_node_get_all(context) forcomputeincompute_nodes: service=compute['service'] ifnotservice: LOG.warn(_("No service for compute ID %s")%compute['id']) continue host=service['host'] capabilities=self.service_states.get(host,None) host_state=self.host_state_map.get(host) ifhost_state: host_state.update_capabilities(topic,capabilities, dict(service.iteritems())) else: host_state=self.host_state_cls(host,topic, capabilities=capabilities, service=dict(service.iteritems())) self.host_state_map[host]=host_state host_state.update_from_compute_node(compute)
def_sync_compute_node(self,context,resources): """Create or update the compute node DB record""" def_get_service(self,context): try: returndb.service_get_all_compute_by_host(context, self.host)[0] exceptexception.NotFound: LOG.warn(_("No service record for host %s"),self.host)
ifnotself.compute_node: # we need a copy of the ComputeNode record: service=self._get_service(context) ifnotservice: # no service record, disable resource return
def_create(self,context,values): """Create the compute node in the DB""" # initialize load stats from existing instances: compute_node=db.compute_node_create(context,values) self.compute_node=dict(compute_node)
def_update(self,context,values,prune_stats=False): """Persist the compute node updates to the DB""" compute_node=db.compute_node_update(context, self.compute_node['id'],values,prune_stats) self.compute_node=dict(compute_node)
ifnotself.compute_node: resources['service_id']=service['id'] self._create(context,resources) LOG.info(_('Compute_service record created for %s ')%self.host) else: self._update(context,resources,prune_stats=True) LOG.info(_('Compute_service record updated for %s ')%self.host)
def_update_usage_from_instances(self,resources,instances): """Calculate resource usage based on instance utilization. This is different than the hypervisor's view as it will account for all instances assigned to the local compute host, even if they are not currently powered on. """ self.tracked_instances.clear()
# purge old stats self.stats.clear()
# set some intiial values, reserve room for host/hypervisor: resources['local_gb_used']=FLAGS.reserved_host_disk_mb/1024 resources['memory_mb_used']=FLAGS.reserved_host_memory_mb resources['vcpus_used']=0 resources['free_ram_mb']=(resources['memory_mb']- resources['memory_mb_used']) resources['free_disk_gb']=(resources['local_gb']- resources['local_gb_used']) resources['current_workload']=0 resources['running_vms']=0
# if it's a new or deleted instance: ifis_new_instanceoris_deleted_instance: # new instance, update compute node resource usage: resources['memory_mb_used']+=sign*instance['memory_mb'] resources['local_gb_used']+=sign*instance['root_gb'] resources['local_gb_used']+=sign*instance['ephemeral_gb']
# free ram and disk may be negative, depending on policy: resources['free_ram_mb']=(resources['memory_mb']- resources['memory_mb_used']) resources['free_disk_gb']=(resources['local_gb']- resources['local_gb_used'])
""" # available size of the disk dk_sz_gb = self.get_local_gb_total() - self.get_local_gb_used()
# Disk size that all instance uses : virtual_size - disk_size instances_name = self.list_instances() instances_sz = 0 for i_name in instances_name: try: disk_infos = jsonutils.loads( self.get_instance_disk_info(i_name)) for info in disk_infos: i_vt_sz = int(info['virt_disk_size']) i_dk_sz = int(info['disk_size']) instances_sz += i_vt_sz - i_dk_sz except OSError as e: if e.errno == errno.ENOENT: LOG.error(_("Getting disk size of %(i_name)s: %(e)s") % locals()) else: raise except exception.InstanceNotFound: # Instance was deleted during the check so ignore it pass # NOTE(gtt116): give change to do other task. greenthread.sleep(0) # Disk available least size available_least_size = dk_sz_gb * (1024 ** 3) - instances_sz return (available_least_size / 1024 / 1024 / 1024)
if sys.platform.upper() not in ['LINUX2', 'LINUX3']: return 0
m = open('/proc/meminfo').read().split() idx1 = m.index('MemFree:') idx2 = m.index('Buffers:') idx3 = m.index('Cached:') if FLAGS.libvirt_type == 'xen': used = 0 for domain_id in self.list_instance_ids(): # skip dom0 dom_mem = int(self._conn.lookupByID(domain_id).info()[2]) if domain_id != 0: used += dom_mem else: # the mem reported by dom0 is be greater of what # it is being used used += (dom_mem - (int(m[idx1 + 1]) + int(m[idx2 + 1]) + int(m[idx3 + 1]))) # Convert it to MB return used / 1024 else: avail = (int(m[idx1 + 1]) + int(m[idx2 + 1]) + int(m[idx3 + 1])) # Convert it to MB return self.get_memory_mb_total() - avail / 1024
# On certain platforms, this will raise a NotImplementedError. try: return multiprocessing.cpu_count() except NotImplementedError: LOG.warn(_("Cannot get the number of cpu, because this " "function is not implemented for this platform. " "This error can be safely ignored for now.")) return 0 def get_memory_mb_total(self): """Getthetotalmemorysize(MB)ofphysicalcomputer.
total = 0 for dom_id in self.list_instance_ids(): dom = self._conn.lookupByID(dom_id) vcpus = dom.vcpus() if vcpus is None: # dom.vcpus is not implemented for lxc, but returning 0 for # a used count is hardly useful for something measuring usage total += 1 else: total += len(vcpus[1]) # NOTE(gtt116): give change to do other task. greenthread.sleep(0) return total