Computer_status/Computer_status.py
2024-12-18 10:37:23 +08:00

227 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import shelve
import json
from datetime import datetime
# 定义数据对象
class Computer_status:
def __init__(self, time, ram_all, ram_use, cpu_use, cpu_c, disk, web, gpu):
self.time = time # 时间戳,单位:秒
self.ram_all = ram_all # 总内存
self.ram_use = ram_use # 已占用内存
self.cpu_use = cpu_use # CPU 占用百分比
self.cpu_c = cpu_c # CPU 温度
self.disk = disk # 磁盘信息
self.web = web # 网络流量信息
self.gpu = gpu # GPU 信息
def to_dict(self):
"""将对象转换为字典"""
return {
"time": self.time,
"ram_all": self.ram_all,
"ram_use": self.ram_use,
"cpu_use": self.cpu_use,
"cpu_c": self.cpu_c,
"disk": self.disk,
"web": self.web,
"gpu": self.gpu,
}
@staticmethod
def from_dict(data):
"""从字典创建 Computer_status 对象"""
return Computer_status(
time=data["time"],
ram_all=data["ram_all"],
ram_use=data["ram_use"],
cpu_use=data["cpu_use"],
cpu_c=data["cpu_c"],
disk=data["disk"],
web=data["web"],
gpu=data["gpu"],
)
# 数据操作方法
DATABASE_FILE = "data/computer_status.db" # 数据库存储文件
def write_data(computer_status):
"""
写入一条数据到对象数据库
参数:
computer_status: Computer_status 对象
"""
if not isinstance(computer_status, Computer_status):
raise ValueError("Input must be a Computer_status object")
with shelve.open(DATABASE_FILE) as db:
db[str(computer_status.time)] = computer_status
# def get_data_30s():
# """
# 获取最新30秒的数据
# 返回:
# List[Computer_status],按时间升序排列
# """
# current_time = int(datetime.now().timestamp())
# with shelve.open(DATABASE_FILE) as db:
# data = [
# db[key]
# for key in db
# if current_time - int(key) <= 30 # 过滤最近30秒数据
# ]
# return sorted(data, key=lambda x: x.time)
#
#
# def get_data(time_start, time_end):
# """
# 获取指定时间戳到指定时间戳之间的数据
# 参数:
# time_start: 开始时间戳int
# time_end: 结束时间戳int
# 返回:
# List[Computer_status],按时间升序排列
# """
# with shelve.open(DATABASE_FILE) as db:
# data = [
# db[key]
# for key in db
# if time_start <= int(key) <= time_end
# ]
# return sorted(data, key=lambda x: x.time)
#
#
# def get_data_json_30s():
# """
# 获取最新30秒数据并转为 JSON
# 返回:
# JSON 数据List[Dict]
# """
# data = get_data_30s()
# return json.dumps([cs.to_dict() for cs in data], indent=4)
#
#
# def get_data_json(time_start, time_end):
# """
# 获取指定时间范围的数据并转为 JSON
# 参数:
# time_start: 开始时间戳int
# time_end: 结束时间戳int
# 返回:
# JSON 数据List[Dict]
# """
# data = get_data(time_start, time_end)
# return json.dumps([cs.to_dict() for cs in data], indent=4)
def delete_data(time_start):
"""
删除 time_start 之前的数据
参数:
time_start: 开始时间戳int保留此时间戳之后的数据
"""
with shelve.open(DATABASE_FILE, writeback=True) as db:
keys_to_delete = [key for key in db if int(key) < time_start]
for key in keys_to_delete:
del db[key]
print(f"已删除 {len(keys_to_delete)} 条记录")
# 实时数据生成函数
def generate_data(timestamp):
"""生成一个 Computer_status 对象"""
return Computer_status(
time=timestamp,
ram_all=16000,
ram_use=random.randint(8000, 16000),
cpu_use=random.randint(10, 90),
cpu_c=random.randint(40, 70),
disk={"disk1": [100000, random.randint(50000, 90000), random.randint(100, 500), random.randint(200, 600)]},
web={"eth0": [random.randint(500, 1500), random.randint(1000, 2500)]},
gpu={"GPU1": {"gpu_ram_all": 8, "gpu_ram_use": random.uniform(2, 8), "gpu_use": random.randint(10, 90), "gpu_fan": random.randint(20, 80)}}
)
# 数据操作方法
def get_data_30s():
"""
实时生成最新30秒的数据
返回:
List[Computer_status],按时间升序排列
"""
current_time = int(datetime.now().timestamp())
data = [generate_data(current_time - i) for i in range(30)] # 生成过去30秒的数据
return sorted(data, key=lambda x: x.time)
def get_data(time_start, time_end):
"""
实时生成指定时间范围的数据
参数:
time_start: 开始时间戳int
time_end: 结束时间戳int
返回:
List[Computer_status],按时间升序排列
"""
data = [generate_data(timestamp) for timestamp in range(time_start, time_end + 1)]
return sorted(data, key=lambda x: x.time)
def get_data_json_30s():
"""
实时生成最新30秒数据并转为 JSON
返回:
JSON 数据List[Dict]
"""
data = get_data_30s()
return json.dumps([cs.to_dict() for cs in data], indent=4)
def get_data_json(time_start, time_end):
"""
实时生成指定时间范围的数据并转为 JSON
参数:
time_start: 开始时间戳int
time_end: 结束时间戳int
返回:
JSON 数据List[Dict]
"""
data = get_data(time_start, time_end)
return json.dumps([cs.to_dict() for cs in data], indent=4)
# 示例测试
if __name__ == "__main__":
# # 创建一些测试数据
# for i in range(40):
# cs = Computer_status(
# time=int(datetime.now().timestamp()) - i,
# ram_all=16000,
# ram_use=8000 + i * 100,
# cpu_use=50 + i,
# cpu_c=60 - i,
# disk={"disk1": [100000, 50000 + i * 100, 100, 200]},
# web={"eth0": [1000 + i * 10, 2000 + i * 20]},
# gpu={"GPU1": {"gpu_ram_all": 8, "gpu_ram_use": 4 + i / 10, "gpu_use": 30 + i, "gpu_fan": 50 + i}},
# )
# write_data(cs)
#
# # 获取最新30秒数据
# print("最新30秒数据JSON:")
# print(get_data_json_30s())
#
# # 获取指定时间段的数据
# time_now = int(datetime.now().timestamp())
# print(f"指定时间段数据({time_now - 20} 到 {time_now - 10}:")
# print(get_data_json(time_now - 20, time_now - 10))
# 获取最新30秒的实时数据
print("最新30秒数据JSON:")
print(get_data_json_30s())
# 获取指定时间范围的实时数据
time_now = int(datetime.now().timestamp())
print(f"指定时间段数据({time_now - 20}{time_now - 10}:")
print(get_data_json(time_now - 20, time_now - 10))