Computer_status/Computer_status.py

227 lines
6.9 KiB
Python
Raw Permalink Normal View History

2024-12-18 10:37:23 +08:00
import random
import shelve
import json
from datetime import datetime
# 定义数据对象
class Computer_status:
def __init__(self, time, ram_all, ram_use, cpu_use, cpu_c, disk, web, gpu):
self.time = time # 时间戳,单位:秒
self.ram_all = ram_all # 总内存
self.ram_use = ram_use # 已占用内存
self.cpu_use = cpu_use # CPU 占用百分比
self.cpu_c = cpu_c # CPU 温度
self.disk = disk # 磁盘信息
self.web = web # 网络流量信息
self.gpu = gpu # GPU 信息
def to_dict(self):
"""将对象转换为字典"""
return {
"time": self.time,
"ram_all": self.ram_all,
"ram_use": self.ram_use,
"cpu_use": self.cpu_use,
"cpu_c": self.cpu_c,
"disk": self.disk,
"web": self.web,
"gpu": self.gpu,
}
@staticmethod
def from_dict(data):
"""从字典创建 Computer_status 对象"""
return Computer_status(
time=data["time"],
ram_all=data["ram_all"],
ram_use=data["ram_use"],
cpu_use=data["cpu_use"],
cpu_c=data["cpu_c"],
disk=data["disk"],
web=data["web"],
gpu=data["gpu"],
)
# 数据操作方法
DATABASE_FILE = "data/computer_status.db" # 数据库存储文件
def write_data(computer_status):
"""
写入一条数据到对象数据库
参数
computer_status: Computer_status 对象
"""
if not isinstance(computer_status, Computer_status):
raise ValueError("Input must be a Computer_status object")
with shelve.open(DATABASE_FILE) as db:
db[str(computer_status.time)] = computer_status
# def get_data_30s():
# """
# 获取最新30秒的数据
# 返回:
# List[Computer_status],按时间升序排列
# """
# current_time = int(datetime.now().timestamp())
# with shelve.open(DATABASE_FILE) as db:
# data = [
# db[key]
# for key in db
# if current_time - int(key) <= 30 # 过滤最近30秒数据
# ]
# return sorted(data, key=lambda x: x.time)
#
#
# def get_data(time_start, time_end):
# """
# 获取指定时间戳到指定时间戳之间的数据
# 参数:
# time_start: 开始时间戳int
# time_end: 结束时间戳int
# 返回:
# List[Computer_status],按时间升序排列
# """
# with shelve.open(DATABASE_FILE) as db:
# data = [
# db[key]
# for key in db
# if time_start <= int(key) <= time_end
# ]
# return sorted(data, key=lambda x: x.time)
#
#
# def get_data_json_30s():
# """
# 获取最新30秒数据并转为 JSON
# 返回:
# JSON 数据List[Dict]
# """
# data = get_data_30s()
# return json.dumps([cs.to_dict() for cs in data], indent=4)
#
#
# def get_data_json(time_start, time_end):
# """
# 获取指定时间范围的数据并转为 JSON
# 参数:
# time_start: 开始时间戳int
# time_end: 结束时间戳int
# 返回:
# JSON 数据List[Dict]
# """
# data = get_data(time_start, time_end)
# return json.dumps([cs.to_dict() for cs in data], indent=4)
def delete_data(time_start):
"""
删除 time_start 之前的数据
参数
time_start: 开始时间戳int保留此时间戳之后的数据
"""
with shelve.open(DATABASE_FILE, writeback=True) as db:
keys_to_delete = [key for key in db if int(key) < time_start]
for key in keys_to_delete:
del db[key]
print(f"已删除 {len(keys_to_delete)} 条记录")
# 实时数据生成函数
def generate_data(timestamp):
"""生成一个 Computer_status 对象"""
return Computer_status(
time=timestamp,
ram_all=16000,
ram_use=random.randint(8000, 16000),
cpu_use=random.randint(10, 90),
cpu_c=random.randint(40, 70),
disk={"disk1": [100000, random.randint(50000, 90000), random.randint(100, 500), random.randint(200, 600)]},
web={"eth0": [random.randint(500, 1500), random.randint(1000, 2500)]},
gpu={"GPU1": {"gpu_ram_all": 8, "gpu_ram_use": random.uniform(2, 8), "gpu_use": random.randint(10, 90), "gpu_fan": random.randint(20, 80)}}
)
# 数据操作方法
def get_data_30s():
"""
实时生成最新30秒的数据
返回
List[Computer_status]按时间升序排列
"""
current_time = int(datetime.now().timestamp())
data = [generate_data(current_time - i) for i in range(30)] # 生成过去30秒的数据
return sorted(data, key=lambda x: x.time)
def get_data(time_start, time_end):
"""
实时生成指定时间范围的数据
参数
time_start: 开始时间戳int
time_end: 结束时间戳int
返回
List[Computer_status]按时间升序排列
"""
data = [generate_data(timestamp) for timestamp in range(time_start, time_end + 1)]
return sorted(data, key=lambda x: x.time)
def get_data_json_30s():
"""
实时生成最新30秒数据并转为 JSON
返回
JSON 数据List[Dict]
"""
data = get_data_30s()
return json.dumps([cs.to_dict() for cs in data], indent=4)
def get_data_json(time_start, time_end):
"""
实时生成指定时间范围的数据并转为 JSON
参数
time_start: 开始时间戳int
time_end: 结束时间戳int
返回
JSON 数据List[Dict]
"""
data = get_data(time_start, time_end)
return json.dumps([cs.to_dict() for cs in data], indent=4)
# 示例测试
if __name__ == "__main__":
# # 创建一些测试数据
# for i in range(40):
# cs = Computer_status(
# time=int(datetime.now().timestamp()) - i,
# ram_all=16000,
# ram_use=8000 + i * 100,
# cpu_use=50 + i,
# cpu_c=60 - i,
# disk={"disk1": [100000, 50000 + i * 100, 100, 200]},
# web={"eth0": [1000 + i * 10, 2000 + i * 20]},
# gpu={"GPU1": {"gpu_ram_all": 8, "gpu_ram_use": 4 + i / 10, "gpu_use": 30 + i, "gpu_fan": 50 + i}},
# )
# write_data(cs)
#
# # 获取最新30秒数据
# print("最新30秒数据JSON:")
# print(get_data_json_30s())
#
# # 获取指定时间段的数据
# time_now = int(datetime.now().timestamp())
# print(f"指定时间段数据({time_now - 20} 到 {time_now - 10}:")
# print(get_data_json(time_now - 20, time_now - 10))
# 获取最新30秒的实时数据
print("最新30秒数据JSON:")
print(get_data_json_30s())
# 获取指定时间范围的实时数据
time_now = int(datetime.now().timestamp())
print(f"指定时间段数据({time_now - 20}{time_now - 10}:")
print(get_data_json(time_now - 20, time_now - 10))