import random import shelve import json from datetime import datetime # 定义数据对象 class Computer_status: def __init__(self, time, ram_all, ram_use, cpu_use, cpu_c, disk, web, gpu): self.time = time # 时间戳,单位:秒 self.ram_all = ram_all # 总内存 self.ram_use = ram_use # 已占用内存 self.cpu_use = cpu_use # CPU 占用百分比 self.cpu_c = cpu_c # CPU 温度 self.disk = disk # 磁盘信息 self.web = web # 网络流量信息 self.gpu = gpu # GPU 信息 def to_dict(self): """将对象转换为字典""" return { "time": self.time, "ram_all": self.ram_all, "ram_use": self.ram_use, "cpu_use": self.cpu_use, "cpu_c": self.cpu_c, "disk": self.disk, "web": self.web, "gpu": self.gpu, } @staticmethod def from_dict(data): """从字典创建 Computer_status 对象""" return Computer_status( time=data["time"], ram_all=data["ram_all"], ram_use=data["ram_use"], cpu_use=data["cpu_use"], cpu_c=data["cpu_c"], disk=data["disk"], web=data["web"], gpu=data["gpu"], ) # 数据操作方法 DATABASE_FILE = "data/computer_status.db" # 数据库存储文件 def write_data(computer_status): """ 写入一条数据到对象数据库 参数: computer_status: Computer_status 对象 """ if not isinstance(computer_status, Computer_status): raise ValueError("Input must be a Computer_status object") with shelve.open(DATABASE_FILE) as db: db[str(computer_status.time)] = computer_status # def get_data_30s(): # """ # 获取最新30秒的数据 # 返回: # List[Computer_status],按时间升序排列 # """ # current_time = int(datetime.now().timestamp()) # with shelve.open(DATABASE_FILE) as db: # data = [ # db[key] # for key in db # if current_time - int(key) <= 30 # 过滤最近30秒数据 # ] # return sorted(data, key=lambda x: x.time) # # # def get_data(time_start, time_end): # """ # 获取指定时间戳到指定时间戳之间的数据 # 参数: # time_start: 开始时间戳(int) # time_end: 结束时间戳(int) # 返回: # List[Computer_status],按时间升序排列 # """ # with shelve.open(DATABASE_FILE) as db: # data = [ # db[key] # for key in db # if time_start <= int(key) <= time_end # ] # return sorted(data, key=lambda x: x.time) # # # def get_data_json_30s(): # """ # 获取最新30秒数据并转为 JSON # 返回: # JSON 数据(List[Dict]) # """ # data = get_data_30s() # return json.dumps([cs.to_dict() for cs in data], indent=4) # # # def get_data_json(time_start, time_end): # """ # 获取指定时间范围的数据并转为 JSON # 参数: # time_start: 开始时间戳(int) # time_end: 结束时间戳(int) # 返回: # JSON 数据(List[Dict]) # """ # data = get_data(time_start, time_end) # return json.dumps([cs.to_dict() for cs in data], indent=4) def delete_data(time_start): """ 删除 time_start 之前的数据 参数: time_start: 开始时间戳(int),保留此时间戳之后的数据 """ with shelve.open(DATABASE_FILE, writeback=True) as db: keys_to_delete = [key for key in db if int(key) < time_start] for key in keys_to_delete: del db[key] print(f"已删除 {len(keys_to_delete)} 条记录") # 实时数据生成函数 def generate_data(timestamp): """生成一个 Computer_status 对象""" return Computer_status( time=timestamp, ram_all=16000, ram_use=random.randint(8000, 16000), cpu_use=random.randint(10, 90), cpu_c=random.randint(40, 70), disk={"disk1": [100000, random.randint(50000, 90000), random.randint(100, 500), random.randint(200, 600)]}, web={"eth0": [random.randint(500, 1500), random.randint(1000, 2500)]}, gpu={"GPU1": {"gpu_ram_all": 8, "gpu_ram_use": random.uniform(2, 8), "gpu_use": random.randint(10, 90), "gpu_fan": random.randint(20, 80)}} ) # 数据操作方法 def get_data_30s(): """ 实时生成最新30秒的数据 返回: List[Computer_status],按时间升序排列 """ current_time = int(datetime.now().timestamp()) data = [generate_data(current_time - i) for i in range(30)] # 生成过去30秒的数据 return sorted(data, key=lambda x: x.time) def get_data(time_start, time_end): """ 实时生成指定时间范围的数据 参数: time_start: 开始时间戳(int) time_end: 结束时间戳(int) 返回: List[Computer_status],按时间升序排列 """ data = [generate_data(timestamp) for timestamp in range(time_start, time_end + 1)] return sorted(data, key=lambda x: x.time) def get_data_json_30s(): """ 实时生成最新30秒数据并转为 JSON 返回: JSON 数据(List[Dict]) """ data = get_data_30s() return json.dumps([cs.to_dict() for cs in data], indent=4) def get_data_json(time_start, time_end): """ 实时生成指定时间范围的数据并转为 JSON 参数: time_start: 开始时间戳(int) time_end: 结束时间戳(int) 返回: JSON 数据(List[Dict]) """ data = get_data(time_start, time_end) return json.dumps([cs.to_dict() for cs in data], indent=4) # 示例测试 if __name__ == "__main__": # # 创建一些测试数据 # for i in range(40): # cs = Computer_status( # time=int(datetime.now().timestamp()) - i, # ram_all=16000, # ram_use=8000 + i * 100, # cpu_use=50 + i, # cpu_c=60 - i, # disk={"disk1": [100000, 50000 + i * 100, 100, 200]}, # web={"eth0": [1000 + i * 10, 2000 + i * 20]}, # gpu={"GPU1": {"gpu_ram_all": 8, "gpu_ram_use": 4 + i / 10, "gpu_use": 30 + i, "gpu_fan": 50 + i}}, # ) # write_data(cs) # # # 获取最新30秒数据 # print("最新30秒数据(JSON):") # print(get_data_json_30s()) # # # 获取指定时间段的数据 # time_now = int(datetime.now().timestamp()) # print(f"指定时间段数据({time_now - 20} 到 {time_now - 10}):") # print(get_data_json(time_now - 20, time_now - 10)) # 获取最新30秒的实时数据 print("最新30秒数据(JSON):") print(get_data_json_30s()) # 获取指定时间范围的实时数据 time_now = int(datetime.now().timestamp()) print(f"指定时间段数据({time_now - 20} 到 {time_now - 10}):") print(get_data_json(time_now - 20, time_now - 10))