新网创想网站建设,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章主要讲解了“怎么用python+Element实现主机Host操作”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么用python+Element实现主机Host操作”吧!
创新互联建站长期为1000多家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为广元企业提供专业的成都做网站、网站设计,广元网站改版等技术服务。拥有十多年丰富建站经验和众多成功案例,为您定制开发。
<%inherit file="/base.html"/>{{ scope.row.is_monitored ? "移除监控":"加入监控" }} 查看性能 查看状态
urls.py
文件内容
from django.conf.urls import patterns from home_application.temp import views as temp_view from home_application.job import views as job_view from home_application.host import views as host_view from home_application.exam import views as exam_view urlpatterns = patterns( 'home_application.views', (r'^$', job_view.job), (r'^dev-guide/$', 'dev_guide'), (r'^contactus/$', 'contactus'), (r'^api/test/$', 'test'), (r'^temp/$', 'temp'), (r'^host/$', host_view.host), (r'^status/$', host_view.status), (r'^host_view/$', host_view.HostView.as_view()), (r'^get_all_hosts/$', host_view.get_all_hosts), (r'^get_perform_data/$', host_view.get_perform_data), ... )
host\views.py
文件内容
import json from django.views.generic import View from django.views.decorators.csrf import csrf_exempt from django.utils.decorators import method_decorator from django.http import JsonResponse from common.mymako import render_mako_context from home_application.models import Host, LoadData from home_application.utils.job_api import FastJobApi perform_script = """ #!/bin/bash MEMORY=$(free -m | awk 'NR==2{printf "%.2f%%",$3*100/$2}') DISK=$(df -h| awk '$NF=="/"{printf "%s",$5}') CPU=$(top -bn1 | grep load | awk '{printf "%.2f%%",$(NF-2)}') DATE=$(date "+%Y-%m-%d %H:%M:%S") echo -e "$DATE|$MEMORY|$DISK|$CPU" """ def host(request): return render_mako_context(request, '/home_application/host.html') def status(request): return render_mako_context(request, "/home_application/status.html") class CsrfExemptView(View): @method_decorator(csrf_exempt) def dispatch(self, request, *args, **kwargs): return super(CsrfExemptView, self).dispatch(request, *args, **kwargs) def get_all_hosts(request): from home_application.utils.cc_by_request import cc_search_host res_data = cc_search_host().get("info") for host in res_data: bk_host_innerip = host.get("host", {}).get("bk_host_innerip") bk_host_id = host.get("host", {}).get("bk_host_id") bk_host_name = host.get("host", {}).get("bk_host_name") bk_os_name = host.get("host", {}).get("bk_os_name") bk_biz_id = host.get("biz", [])[0].get("bk_biz_id") bk_biz_name = host.get("biz", [])[0].get("bk_biz_name") cloud_name = host.get("host", {}).get("bk_cloud_id")[0].get("bk_inst_name") cloud_id = host.get("host", {}).get("bk_cloud_id")[0].get("bk_inst_id") host_obj = { "ip": bk_host_innerip, "bk_biz_id": bk_biz_id, "os_name": bk_os_name, "host_name": bk_host_name, "bk_biz_name": bk_biz_name, "cloud_id": cloud_id, "cloud_name": cloud_name, } Host.objects.update_or_create(host_id=bk_host_id, defaults=host_obj) return JsonResponse({"result": res_data}) @csrf_exempt def get_perform_data(request): data = json.loads(request.body) host_id = data.get("host_id") try: obj = LoadData.objects.filter(host_id=host_id).last() if obj: res_data = { "cpu": obj.cpu, "mem": obj.mem, "disk": obj.disk, } return JsonResponse({"result": True, "data": res_data}) bk_biz_id = int(data.get("bk_biz_id")) ip = data.get("ip") cloud_id = data.get("cloud_id") # res_data = execute_script_and_return(request,ip_list, bk_biz_id, perform_script) obj = FastJobApi(bk_biz_id, cloud_id, ip, perform_script) job_res = obj.execute_script_and_return() if job_res: log_content = job_res[0].get("log_content") res_data = { "result": True, "cpu": log_content.split("|")[3], "mem": log_content.split("|")[1], "disk": log_content.split("|")[2], } return JsonResponse({"result": True, "data": res_data}) return JsonResponse({"result": False}) except Exception: return JsonResponse({"result": False}) class HostView(CsrfExemptView): def get(self, request, *args, **kwargs): try: host_query = Host.objects.all() except Exception: return JsonResponse({"result": False}) search_biz_id = request.GET.get("search_biz_id") query_str = request.GET.get("query_str") if search_biz_id: host_query = host_query.filter(bk_biz_id=search_biz_id) if query_str: host_query = host_query.filter(ip__in=query_str.split(",")) host_query = host_query[:30] if host_query.count() > 10 else host_query res_data = [i.to_dict() for i in host_query] return JsonResponse({"result": True, "data": res_data}) def post(self, request, *args, **kwargs): try: data = json.loads(request.body) host_id = data.get("host_id") is_monitored = data.get("is_monitored") if is_monitored: Host.objects.filter(host_id=host_id).update(is_monitored=False) else: Host.objects.filter(host_id=host_id).update(is_monitored=True) return JsonResponse({"result": True}) except Exception: return JsonResponse({"result": False})
models.py
文件内容
from django.db import models from home_application.utils.parse_time import parse_datetime_to_timestr class Host(models.Model): host_id = models.IntegerField(u"主机ID", primary_key=True, unique=True) ip = models.CharField(u"IP地址", max_length=32, blank=True, null=True) bk_biz_id = models.CharField(u"业务ID", max_length=16, blank=True, null=True) bk_biz_name = models.CharField(u"业务名称", max_length=512, blank=True, null=True) os_name = models.CharField(u"系统名", max_length=128, blank=True, null=True) host_name = models.CharField(u"主机名", max_length=128, blank=True, null=True) cloud_id = models.IntegerField(u"云区域ID", blank=True, null=True) cloud_name = models.CharField(u"云区域名称", max_length=32, blank=True, null=True) is_monitored = models.BooleanField(u"是否已监控", default=False) def to_dict(self): return { "host_id": self.host_id, "ip": self.ip, "pk": self.pk, "bk_biz_id": self.bk_biz_id, "bk_biz_name": self.bk_biz_name, "os_name": self.os_name, "host_name": self.host_name, "cloud_name": self.cloud_name, "cloud_id": self.cloud_id, "is_monitored": self.is_monitored, "mem_use": "-", "cpu_use": "-", "disk_use": "-", } def get_load_data(self): load_query = LoadData.objects.filter(host_id=self.pk).order_by("create_time") return [i.to_dict() for i in load_query] class LoadData(models.Model): host_id = models.IntegerField(u"主机ID", default=0) cpu = models.IntegerField(u"CPU使用率", default=0) mem = models.IntegerField(u"内存使用率", default=0) disk = models.IntegerField(u"硬盘使用率", default=0) create_time = models.DateTimeField(u"创建时间", auto_now_add=True) def to_dict(self): return { "host_id": self.host_id, "cpu": self.cpu, "mem": self.mem, "disk": self.disk, "create_time": parse_datetime_to_timestr(self.create_time) }
FastJobApi
部分代码
import base64 import time from conf.default import APP_ID, APP_TOKEN from blueking.component.shortcuts import get_client_by_user, get_client_by_request count = 0 class FastJobApi(object): def __init__(self, bk_biz_id, bk_cloud_id, ip_list, script_content): self.client = get_client_by_user('admin') self.username = "admin" self.biz_id = bk_biz_id self.script_content = script_content self.ip_list = [{"bk_cloud_id": bk_cloud_id, "ip": i} for i in ip_list.split(",")] def _fast_execute_script(self, execute_account="root", param_content='', script_timeout=1000): """ 快速执行脚本 :param execute_account: 执行脚本的账户名 :param param_content: 执行脚本的参数 :param script_timeout: 脚本执行超时时间 :return: job_instance_id """ kwargs = { "bk_app_code": APP_ID, "bk_app_secret": APP_TOKEN, "bk_biz_id": self.biz_id, "bk_username": self.username, "script_content": base64.b64encode(self.script_content), "ip_list": self.ip_list, "script_type": 1, "account": execute_account, "script_param": base64.b64encode(param_content), "script_timeout": script_timeout } result = self.client.job.fast_execute_script(kwargs) if result["result"]: return result.get("data").get("job_instance_id") return False def _get_job_instance_status(self, task_id): """ 获取脚本执行状态 :param task_id: 执行脚本的 job_instance_id :return: """ global count count += 1 # 查询执行状态 resp = self.client.job.get_job_instance_status( bk_username=self.username, bk_biz_id=self.biz_id, job_instance_id=task_id ) if resp.get('data').get('is_finished'): count = 0 return True elif not resp.get('data').get('is_finished') and count <= 5: time.sleep(2) return self._get_job_instance_status(task_id) else: count = 0 return False def _get_job_instance_log(self, task_id): """ 查询作业日志 :param task_id: 执行脚本的 job_instance_id :return: """ if self._get_job_instance_status(task_id): resp = self.client.job.get_job_instance_log( job_instance_id=task_id, bk_biz_id=self.biz_id, bk_username='admin' ) return resp['data'][0]['step_results'][0]['ip_logs'] def execute_script_and_return(self): """ 执行脚本并获取脚本执行结果 :return: 脚本执行结果 """ job_instance_id = self._fast_execute_script() if job_instance_id: ip_logs = self._get_job_instance_log(job_instance_id) return ip_logs
实现效果
感谢各位的阅读,以上就是“怎么用python+Element实现主机Host操作”的内容了,经过本文的学习后,相信大家对怎么用python+Element实现主机Host操作这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!