227 lines
6.9 KiB
Python
227 lines
6.9 KiB
Python
import random
|
||
import shelve
|
||
import json
|
||
from datetime import datetime
|
||
|
||
# 定义数据对象
|
||
class Computer_status:
|
||
def __init__(self, time, ram_all, ram_use, cpu_use, cpu_c, disk, web, gpu):
|
||
self.time = time # 时间戳,单位:秒
|
||
self.ram_all = ram_all # 总内存
|
||
self.ram_use = ram_use # 已占用内存
|
||
self.cpu_use = cpu_use # CPU 占用百分比
|
||
self.cpu_c = cpu_c # CPU 温度
|
||
self.disk = disk # 磁盘信息
|
||
self.web = web # 网络流量信息
|
||
self.gpu = gpu # GPU 信息
|
||
|
||
def to_dict(self):
|
||
"""将对象转换为字典"""
|
||
return {
|
||
"time": self.time,
|
||
"ram_all": self.ram_all,
|
||
"ram_use": self.ram_use,
|
||
"cpu_use": self.cpu_use,
|
||
"cpu_c": self.cpu_c,
|
||
"disk": self.disk,
|
||
"web": self.web,
|
||
"gpu": self.gpu,
|
||
}
|
||
|
||
@staticmethod
|
||
def from_dict(data):
|
||
"""从字典创建 Computer_status 对象"""
|
||
return Computer_status(
|
||
time=data["time"],
|
||
ram_all=data["ram_all"],
|
||
ram_use=data["ram_use"],
|
||
cpu_use=data["cpu_use"],
|
||
cpu_c=data["cpu_c"],
|
||
disk=data["disk"],
|
||
web=data["web"],
|
||
gpu=data["gpu"],
|
||
)
|
||
|
||
|
||
# 数据操作方法
|
||
DATABASE_FILE = "data/computer_status.db" # 数据库存储文件
|
||
|
||
|
||
def write_data(computer_status):
|
||
"""
|
||
写入一条数据到对象数据库
|
||
参数:
|
||
computer_status: Computer_status 对象
|
||
"""
|
||
if not isinstance(computer_status, Computer_status):
|
||
raise ValueError("Input must be a Computer_status object")
|
||
with shelve.open(DATABASE_FILE) as db:
|
||
db[str(computer_status.time)] = computer_status
|
||
|
||
|
||
# def get_data_30s():
|
||
# """
|
||
# 获取最新30秒的数据
|
||
# 返回:
|
||
# List[Computer_status],按时间升序排列
|
||
# """
|
||
# current_time = int(datetime.now().timestamp())
|
||
# with shelve.open(DATABASE_FILE) as db:
|
||
# data = [
|
||
# db[key]
|
||
# for key in db
|
||
# if current_time - int(key) <= 30 # 过滤最近30秒数据
|
||
# ]
|
||
# return sorted(data, key=lambda x: x.time)
|
||
#
|
||
#
|
||
# def get_data(time_start, time_end):
|
||
# """
|
||
# 获取指定时间戳到指定时间戳之间的数据
|
||
# 参数:
|
||
# time_start: 开始时间戳(int)
|
||
# time_end: 结束时间戳(int)
|
||
# 返回:
|
||
# List[Computer_status],按时间升序排列
|
||
# """
|
||
# with shelve.open(DATABASE_FILE) as db:
|
||
# data = [
|
||
# db[key]
|
||
# for key in db
|
||
# if time_start <= int(key) <= time_end
|
||
# ]
|
||
# return sorted(data, key=lambda x: x.time)
|
||
#
|
||
#
|
||
# def get_data_json_30s():
|
||
# """
|
||
# 获取最新30秒数据并转为 JSON
|
||
# 返回:
|
||
# JSON 数据(List[Dict])
|
||
# """
|
||
# data = get_data_30s()
|
||
# return json.dumps([cs.to_dict() for cs in data], indent=4)
|
||
#
|
||
#
|
||
# def get_data_json(time_start, time_end):
|
||
# """
|
||
# 获取指定时间范围的数据并转为 JSON
|
||
# 参数:
|
||
# time_start: 开始时间戳(int)
|
||
# time_end: 结束时间戳(int)
|
||
# 返回:
|
||
# JSON 数据(List[Dict])
|
||
# """
|
||
# data = get_data(time_start, time_end)
|
||
# return json.dumps([cs.to_dict() for cs in data], indent=4)
|
||
|
||
|
||
|
||
def delete_data(time_start):
|
||
"""
|
||
删除 time_start 之前的数据
|
||
参数:
|
||
time_start: 开始时间戳(int),保留此时间戳之后的数据
|
||
"""
|
||
with shelve.open(DATABASE_FILE, writeback=True) as db:
|
||
keys_to_delete = [key for key in db if int(key) < time_start]
|
||
for key in keys_to_delete:
|
||
del db[key]
|
||
print(f"已删除 {len(keys_to_delete)} 条记录")
|
||
|
||
|
||
|
||
|
||
# 实时数据生成函数
|
||
def generate_data(timestamp):
|
||
"""生成一个 Computer_status 对象"""
|
||
return Computer_status(
|
||
time=timestamp,
|
||
ram_all=16000,
|
||
ram_use=random.randint(8000, 16000),
|
||
cpu_use=random.randint(10, 90),
|
||
cpu_c=random.randint(40, 70),
|
||
disk={"disk1": [100000, random.randint(50000, 90000), random.randint(100, 500), random.randint(200, 600)]},
|
||
web={"eth0": [random.randint(500, 1500), random.randint(1000, 2500)]},
|
||
gpu={"GPU1": {"gpu_ram_all": 8, "gpu_ram_use": random.uniform(2, 8), "gpu_use": random.randint(10, 90), "gpu_fan": random.randint(20, 80)}}
|
||
)
|
||
|
||
# 数据操作方法
|
||
def get_data_30s():
|
||
"""
|
||
实时生成最新30秒的数据
|
||
返回:
|
||
List[Computer_status],按时间升序排列
|
||
"""
|
||
current_time = int(datetime.now().timestamp())
|
||
data = [generate_data(current_time - i) for i in range(30)] # 生成过去30秒的数据
|
||
return sorted(data, key=lambda x: x.time)
|
||
|
||
def get_data(time_start, time_end):
|
||
"""
|
||
实时生成指定时间范围的数据
|
||
参数:
|
||
time_start: 开始时间戳(int)
|
||
time_end: 结束时间戳(int)
|
||
返回:
|
||
List[Computer_status],按时间升序排列
|
||
"""
|
||
data = [generate_data(timestamp) for timestamp in range(time_start, time_end + 1)]
|
||
return sorted(data, key=lambda x: x.time)
|
||
|
||
def get_data_json_30s():
|
||
"""
|
||
实时生成最新30秒数据并转为 JSON
|
||
返回:
|
||
JSON 数据(List[Dict])
|
||
"""
|
||
data = get_data_30s()
|
||
return json.dumps([cs.to_dict() for cs in data], indent=4)
|
||
|
||
def get_data_json(time_start, time_end):
|
||
"""
|
||
实时生成指定时间范围的数据并转为 JSON
|
||
参数:
|
||
time_start: 开始时间戳(int)
|
||
time_end: 结束时间戳(int)
|
||
返回:
|
||
JSON 数据(List[Dict])
|
||
"""
|
||
data = get_data(time_start, time_end)
|
||
return json.dumps([cs.to_dict() for cs in data], indent=4)
|
||
|
||
|
||
|
||
# 示例测试
|
||
if __name__ == "__main__":
|
||
# # 创建一些测试数据
|
||
# for i in range(40):
|
||
# cs = Computer_status(
|
||
# time=int(datetime.now().timestamp()) - i,
|
||
# ram_all=16000,
|
||
# ram_use=8000 + i * 100,
|
||
# cpu_use=50 + i,
|
||
# cpu_c=60 - i,
|
||
# disk={"disk1": [100000, 50000 + i * 100, 100, 200]},
|
||
# web={"eth0": [1000 + i * 10, 2000 + i * 20]},
|
||
# gpu={"GPU1": {"gpu_ram_all": 8, "gpu_ram_use": 4 + i / 10, "gpu_use": 30 + i, "gpu_fan": 50 + i}},
|
||
# )
|
||
# write_data(cs)
|
||
#
|
||
# # 获取最新30秒数据
|
||
# print("最新30秒数据(JSON):")
|
||
# print(get_data_json_30s())
|
||
#
|
||
# # 获取指定时间段的数据
|
||
# time_now = int(datetime.now().timestamp())
|
||
# print(f"指定时间段数据({time_now - 20} 到 {time_now - 10}):")
|
||
# print(get_data_json(time_now - 20, time_now - 10))
|
||
# 获取最新30秒的实时数据
|
||
print("最新30秒数据(JSON):")
|
||
print(get_data_json_30s())
|
||
|
||
# 获取指定时间范围的实时数据
|
||
time_now = int(datetime.now().timestamp())
|
||
print(f"指定时间段数据({time_now - 20} 到 {time_now - 10}):")
|
||
print(get_data_json(time_now - 20, time_now - 10))
|