Compare commits

...

2 Commits

Author SHA1 Message Date
xdl
429c2cb14c 1 2025-01-06 09:29:02 +08:00
xdl
b553734170 1 2025-01-06 09:12:40 +08:00
13 changed files with 1045 additions and 184 deletions

View File

@ -2,7 +2,7 @@
<module type="PYTHON_MODULE" version="4"> <module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager"> <component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" /> <content url="file://$MODULE_DIR$" />
<orderEntry type="jdk" jdkName="yolov8-3" jdkType="Python SDK" /> <orderEntry type="jdk" jdkName="Embedded_game" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" /> <orderEntry type="sourceFolder" forTests="false" />
</component> </component>
<component name="PyDocumentationSettings"> <component name="PyDocumentationSettings">

View File

@ -1,4 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="yolov8-3" project-jdk-type="Python SDK" /> <component name="Black">
<option name="sdkName" value="yolov8-3" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Embedded_game" project-jdk-type="Python SDK" />
</project> </project>

View File

@ -265,7 +265,7 @@ class Mainlen():
# result = self.remove_chinese_chars(result) # 剔除中文 # result = self.remove_chinese_chars(result) # 剔除中文
print(result) print(result)
if len(result) > 10: # 车牌 if len(result) == 8: # 车牌
result = self.jiajin(result) result = self.jiajin(result)
result = self.binary_to_hexadecimal(result) result = self.binary_to_hexadecimal(result)
count = 2 count = 2

View File

@ -303,6 +303,8 @@ class Mainlen():
# time.sleep(1) # time.sleep(1)
# 二进制转十六进制 # 二进制转十六进制
def binary_to_hexadecimal(self, binary_str): def binary_to_hexadecimal(self, binary_str):
if not isinstance(binary_str, str) or not all(c in '01' for c in binary_str): if not isinstance(binary_str, str) or not all(c in '01' for c in binary_str):

View File

@ -1,147 +1,38 @@
# 二进制转十六进制 def process_qrcode_one(data):
def binary_to_hexadecimal( binary_str): result = []
if not isinstance(binary_str, str) or not all(c in '01' for c in binary_str): for char in data:
raise ValueError("输入必须是二进制字符串") if char.isalpha() and char.isupper():
result.append(char)
elif char.isdigit():
result.append(char)
if len(result) >= 8:
break
return ''.join(result)
hex_str = '' def process_qrcode_two(formula, h, n, y):
while len(binary_str) > 0: formula = formula.replace("h", str(h))
# 将二进制字符串每四位分成一组,从右到左 formula = formula.replace("n", str(n))
group = binary_str[-4:] formula = formula.replace("y", str(y))
binary_str = binary_str[:-4] try:
result = eval(formula)
return result % 5
except Exception as e:
raise ValueError(f"公式解析错误: {e}")
# 将每组四位的二进制数转换为相应的十六进制数 def display_result(x):
hex_str = hex_str + hex(int(group, 2))[2:] hex_x = hex(x)[2:].upper()
return f"20240{hex_x}"
return hex_str # 示例数据
qrcode_one = "A/145#B6"
qrcode_two_formula = "n*y+h"
h, n, y = 10, 3, 2
def jiajin( txt): valid_data = process_qrcode_one(qrcode_one)
index_j = -1 print(valid_data)
index_jian = -1 x = process_qrcode_two(qrcode_two_formula, h, n, y)
result = "" display = display_result(x)
for i in range(len(txt)):
if txt[i] == "+":
index_j = 1
elif txt[i] == "-":
index_jian = 2
else:
result += txt[i]
print(result) print("二维码一有效数据:", valid_data)
print(index_j < index_jian) print("二维码二计算结果x:", x)
# 若是加,将对整个字符串进行左移两位 print("多功能信息显示B:", display)
if index_j < index_jian:
end = result[2:] + result[:2]
# 若是-,将对字符串进行右移三位
else:
end = result[-3:] + result[:-3]
return end
# 提取字符和数字
def extract_numbers_and_letters_combined( text):
combined_str = ''
for char in text:
# 检查字符是否是数字0-9或字母A-Z 或 a-z
if ('0' <= char <= '9') or ('A' <= char <= 'Z') or ('a' <= char <= 'z'):
combined_str += char
return combined_str
# 移除ASCII码值外的数据
def remove_non_ascii_chars(text):
"""移除字符串中的非ASCII字符并返回新的字符串"""
# 只保留ASCII字符
return ''.join(char for char in text if ord(char) < 128)
def convert_hex(hex_str):
"""
将十六进制字符串转换为反转字节并添加 0x 前缀的格式
例如:
输入: "e293"
输出: "0x2e,0x39"
Args:
hex_str (str): 输入的十六进制字符串
Returns:
str: 转换后的字符串格式如 "0x2e,0x39"
Raises:
ValueError: 如果输入的字符串长度不是偶数或包含非十六进制字符
"""
# 移除可能的前缀,如 "0x"
hex_str = hex_str.lower().replace("0x", "")
# 检查输入长度是否为偶数
if len(hex_str) % 2 != 0:
raise ValueError("输入的十六进制字符串长度必须是偶数。")
# 检查是否只包含有效的十六进制字符
if not all(c in '0123456789abcdef' for c in hex_str):
raise ValueError("输入的字符串包含非十六进制字符。")
# 将字符串按每两个字符分割成字节
bytes_list = [hex_str[i:i+2] for i in range(0, len(hex_str), 2)]
# 反转每个字节
reversed_bytes = [byte[::-1] for byte in bytes_list]
# 为每个反转后的字节添加 "0x" 前缀
hex_with_prefix = [f"0x{byte}" for byte in reversed_bytes]
return hex_with_prefix
def hex_list_to_ascii(hex_list):
"""
将十六进制字符串列表转换为对应的 ASCII 字符串
例如:
输入: ['0x2e', '0x39']
输出: ".9"
Args:
hex_list (list of str): 包含十六进制字符串的列表每个字符串应以 "0x" 开头
Returns:
str: 转换后的 ASCII 字符串
Raises:
ValueError: 如果列表中的字符串不是有效的十六进制格式或超出 ASCII 范围
"""
ascii_chars = []
for hex_str in hex_list:
# 移除 "0x" 前缀并转换为整数
try:
hex_value = int(hex_str, 16)
except ValueError:
raise ValueError(f"'{hex_str}' 不是有效的十六进制数。")
# 检查是否在有效的 ASCII 范围内
if hex_value < 0 or hex_value > 0x7F:
raise ValueError(f"十六进制数 '{hex_str}' 超出 ASCII 范围。")
# 转换为对应的 ASCII 字符
ascii_char = chr(hex_value)
ascii_chars.append(ascii_char)
return ascii_chars
result="101110+01001-011"
print(result)
result = remove_non_ascii_chars(result) # 剔除ASCII码值外的数据
# result = remove_chinese_chars(result) # 剔除中文
print(result)
if len(result) > 10: # 车牌
result = jiajin(result)
result = binary_to_hexadecimal(result)
result=hex_list_to_ascii(convert_hex(result))
print(len(result))
print(type(result[0]))
def string_to_bytes(hex_str):
# 移除可能的空格或前缀
hex_str = hex_str.strip().lower().replace("0x", "")
# 将字符串转换为整数并转为字节
return bytes([int(hex_str, 16)])
# 示例
result = string_to_bytes("11")
print(result) # 输出b'\x01'

View File

@ -0,0 +1,640 @@
# -*- coding:utf-8 -*-
# @Author len
# @Create 2023/11/23 11:28
import math
import sensor, image, time, lcd
import binascii
from Maix import GPIO
from machine import Timer, PWM, UART, Timer
from fpioa_manager import fm
import KPU as kpu
class Mainlen():
def __init__(self):
'''初始化摄像头和 LCD 显示屏'''
lcd.init() # lcd初始化
self.qr91=""
self.qr92=""
self.canera_init() # 摄像头初始化
# sensor.set_auto_gain(0, gain_db=17) # 设置摄像头的自动增益功能
# 映射串口引脚
fm.register(6, fm.fpioa.UART1_RX, force=True)
fm.register(7, fm.fpioa.UART1_TX, force=True)
# 初始化串口
self.uart = UART(UART.UART1, 115200, read_buf_len=4096)
# 循迹
# --------------感光芯片配置 START -------------------
self.IMG_WIDTH = 240
self.IMG_HEIGHT = 320
# 直线灰度图颜色阈值
self.LINE_COLOR_THRESHOLD = [(0, 60)] # 找黑色
self.LINE_COLOR_BAISE = [(60, 255)] # 找白色
self.ROIS = { # 找黑色
# 'left': (0, 0, 320, 50), # 纵向取样-左侧
# 'right': (0, 190, 320, 50), # 纵向取样-右侧
'left': (0, 0, 180, 50), # 纵向取样-左侧
'right': (0, 190, 180, 50), # 纵向取样-右侧
'up': (240, 0, 80, 240), # 横向取样-上方
'middle_up': (160, 0, 80, 240), # 横向取样-中上
'middle_down': (80, 0, 80, 240), # 横向取样-中下
'down': (0, 0, 80, 240), # 横向取样-下方
}
# 红黄绿的阈值
# 颜色识别阈值 (L Min, L Max, A Min, A Max, B Min, B Max) LAB模型
# 下列阈值元组用来识别 红、绿、蓝三种颜色,可通过调整数据阈值完成更多颜色的识别。
# self.thresholds = [(59, 100, 40, 127, 5, 127), # 红色阈值
# (90, 100, -5, 2, -4, 20), # 黄色阈值
# (87, 100, -59, 127, -10, 127),] # 绿色阈值
# self.thresholds = [(35, 100, 6, 127, 0, 127, "红色"), # 红色阈值
# (25, 100, -6, 127, 5, 43, "黄色"), # 黄色阈值
# (52, 100, -128, -5, 5, 127, "绿色")] # 绿色阈值
self.thresholds = [(43, 68, 7, 74, 1, 46, "红色"), # 红色阈值
(48, 100, -23, 19, 4, 55, "黄色"), # 黄色阈值
(40, 93, -57, -7, -8, 29, "绿色")] # 绿色阈值
# self.thresholds = [(40, 70, 7, 90, -1, 46, "红色"), # 红色阈值
# (34, 98, -17, 50, 7, 67, "黄色"), # 黄色阈值
# (40, 93, -57, -7, -8, 29, "绿色")] # 绿色阈值
self.is_need_send_data = False # 是否需要发送数据的信号标志
# 主函数 run
def startMain(self):
Flag_track = False # 循迹标识
Flag_qr = False # 二维码标识
Flag_qr2 = False # 二维码标识
Flag_light = False # 交通灯标识
while True:
data = self.uart.read(8)
# print(data)
# print(len(data))
# if (len(data) >= 8):
if data is not None:
if (data[1] == 0x02) and (data[7] == 0xBB) and self.verify_checksum(data):
# 巡线与控制舵机
if data[2] == 0x91:
if data[3] == 0x01: # 启动循迹
sensor.set_pixformat(sensor.GRAYSCALE) # 设置像素格式为灰色
Flag_track = True
print("开始循迹")
elif data[3] == 0x02: # 停止循迹
sensor.set_pixformat(sensor.RGB565) # 设置像素格式为彩色 RGB565
Flag_track = False
print("停止循迹")
elif data[3] == 0x03: # 调整舵机
print("调整舵机")
self.Servo(data)
else:
pass
# 识别任务
elif data[2] == 0x92:
if data[3] == 0x01: # 识别二维码
sensor.set_pixformat(sensor.RGB565) # 设置像素格式为彩色 RGB565
Flag_qr = True
print("开始识别二维码任务10")
elif data[3] == 0x02: # 停止识别二维码
Flag_qr = False
print("停止识别二维码")
elif data[3] == 0x03: # 识别交通灯
sensor.set_pixformat(sensor.RGB565) # 设置像素格式为彩色 RGB565
Flag_light = True
print("开始识别交通灯")
elif data[3] == 0x04: # 停止识别交通灯
Flag_light = False
print("停止识别交通灯")
elif data[3] == 0x06: # 调整摄像头阈值
print("调整摄像头阈值")
self.canera_ash()
else:
pass
img = sensor.snapshot() # 获取图像
if Flag_track: # 循迹
print("循迹")
self.tracking(img)
elif Flag_qr: # 二维码#
print("识别二维码")
self.discem_QR(img)#r任务9
# Flag_qr = False
elif Flag_light: # 红绿灯
print("识别红绿灯")
self.discem_light()
# Flag_light = False
lcd.display(img) # 在LCD显示
# 初始化摄像头阈值
def canera_init(self):
# 摄像头模块初始化
sensor.reset() # 复位和初始化摄像头
sensor.set_pixformat(sensor.RGB565) # 设置像素格式为彩色 RGB565
# sensor.set_pixformat(sensor.GRAYSCALE) # 设置像素格式为灰色
sensor.set_framesize(sensor.QVGA) # 设置帧大小为QVGA320×240
sensor.set_vflip(1) # 后置模式
sensor.skip_frames(30) # # 跳过前30帧
# 调整摄像头阈值
def canera_ash(self):
sensor.reset() # 复位和初始化摄像头
sensor.set_vflip(1) # 将摄像头设置成后置方式(所见即所得)
sensor.set_pixformat(sensor.RGB565) # 设置像素格式为彩色 RGB565
# sensor.set_pixformat(sensor.GRAYSCALE) # 设置像素格式为灰色
sensor.set_framesize(sensor.QVGA) # 设置帧大小为 QVGA (320x240)
# sensor.set_windowing((224, 224)) # 设置摄像头的窗口大小
sensor.set_auto_gain(False)
sensor.set_auto_whitebal(False)
sensor.set_auto_gain(0, 0)
sensor.skip_frames(time=200) # 等待设置生效
# 红绿灯
def discem_light(self):
# 初始化
results = []
start_time = time.time()
while time.time() - start_time < 3: # 循环持续三秒
img = sensor.snapshot()
max_blob = None
max_blob_size = 0
max_blob_color = ""
max_blob_color_index = 0
# 定义右半部分的区域
right_half_roi = (self.IMG_WIDTH // 2, 0, self.IMG_WIDTH // 2, self.IMG_HEIGHT)
for index, threshold in enumerate(self.thresholds):
# 只在右半部分查找每种颜色的色块
blobs = img.find_blobs([threshold[:-1]], roi=right_half_roi, pixels_threshold=200, area_threshold=200, merge=True)
if blobs:
# 找到最大的色块
blob = max(blobs, key=lambda b: b.pixels())
if blob.pixels() > max_blob_size:
max_blob = blob
max_blob_size = blob.pixels()
max_blob_color = threshold[-1] # 颜色标签
max_blob_color_index = index
# 绘制最大色块的矩形和中心十字,并输出颜色
if max_blob:
results.append((max_blob_color, max_blob_color_index)) # 存储识别结果
img.draw_rectangle(max_blob[0:4])
img.draw_cross(max_blob[5], max_blob[6])
print("最大色块的颜色是:", max_blob_color, max_blob_color_index)
# 初始化计数字典
color_count = {}
for color, index in results:
if (color, index) in color_count:
color_count[(color, index)] += 1
else:
color_count[(color, index)] = 1
# 找出出现次数最多的颜色和下标
if color_count:
most_common_color, most_common_index = max(color_count, key=color_count.get)
print("出现最多的颜色:", most_common_color, "在下标位置:", most_common_index)
else:
most_common_index = 1
print("没有识别到有效的颜色块")
# 串口发送交通灯信息
self.uart.write(bytes([0x55]))
self.uart.write(bytes([0x02]))
self.uart.write(bytes([0x92]))
self.uart.write(bytes([0x03])) # 红绿灯
self.uart.write(bytes([most_common_index])) # 红绿灯结果
self.uart.write(bytes([0x03]))
self.uart.write(bytes([0x03]))
self.uart.write(bytes([0xbb]))
self.canera_init() # 恢复摄像头
# 二维码
def discem_QR(self, img):
# self.Tise_servo(10)
res_QR = img.find_qrcodes() # 寻找二维码
timeflag = 0
while timeflag < 39:
img = sensor.snapshot() # 获取图像
TS_QR = img.find_qrcodes() # 再次寻找二维码
for qr in TS_QR:
if all(qr.payload() != existing_qr.payload() for existing_qr in res_QR):
res_QR.append(qr)
timeflag += 1
time.sleep(0.001)
print(timeflag)
print(len(res_QR))
if len(res_QR): # 识别到
for res in range(len(res_QR)):
count = 0
if_end = 0x01
result = res_QR[res].payload()
print(result)
result = self.remove_non_ascii_chars(result) # 剔除ASCII码值外的数据
print(result)
if len(result) == 8: # 车牌
result = self.process_qrcode_one(result)
count = 1
else: # 公式
count = 2
if res == len(res_QR) - 1:
if_end = 0x00
print("count: ", count)
print("if_end: ", if_end)
# 串口发送二维码信息
self.uart.write(bytes([0x55]))
self.uart.write(bytes([0x02]))
self.uart.write(bytes([0x92]))
self.uart.write(bytes([0x01]))
print(len(result))
self.uart.write(bytes([count]))
if count==2:
for qr_data in result:
self.uart.write(bytes([ord(qr_data)]))
else:
for qr_data in result:
self.uart.write(bytes([ord(qr_data)]))
self.uart.write(bytes([0xbb]))
# time.sleep(1)
else: # 未识别
self.uart.write(bytes([0x55]))
self.uart.write(bytes([0x02]))
self.uart.write(bytes([0x92]))
self.uart.write(bytes([0x06]))
self.uart.write(bytes([0xff]))
self.uart.write(bytes([0x00]))
self.uart.write(bytes([0x00]))
self.uart.write(bytes([0xbb]))
# time.sleep(1)
def process_qrcode_one(self,data):
result = []
for char in data:
if char.isalpha() and char.isupper():
result.append(char)
elif char.isdigit():
result.append(char)
if len(result) >= 8:
break
return ''.join(result)
def jiajin(self, txt):
index_j = -1
index_jian = -1
result = ""
for i in range(len(txt)):
if txt[i] == "+":
index_j = 1
elif txt[i] == "-":
index_jian = 2
else:
result += txt[i]
print(result)
print(index_j < index_jian)
# 若是加,将对整个字符串进行左移两位
if index_j < index_jian:
end = result[2:] + result[:2]
# 若是-,将对字符串进行右移三位
else:
end = result[-3:] + result[:-3]
return end
# 移除ASCII码值外的数据
def remove_non_ascii_chars(self, text):
"""移除字符串中的非ASCII字符并返回新的字符串"""
# 只保留ASCII字符
return ''.join(char for char in text if ord(char) < 128)
# 循迹
def tracking(self, img):
print("循迹")
roi_blobs_result = self.find_blobs_in_rois(img)
down_center, state_crossing, deflection_angle = self.state_deflection_angle(roi_blobs_result)
dsd = self.data_format_wrapper(down_center, state_crossing, deflection_angle)
self.UsartSend(dsd)
print("下发指令:", dsd)
# 寻找色块 在ROIS中寻找色块获取ROI中色块的中心区域与是否有色块的信息
def find_blobs_in_rois(self, img):
canvas = img.copy()
roi_blobs_result = {} # 在各个ROI中寻找色块的结果记录
for roi_direct in self.ROIS.keys():
roi_blobs_result[roi_direct] = {
'cx': 0,
'cy': 0,
'w': 0,
'blob_flag': False
}
for roi_direct, roi in self.ROIS.items():
blobs = canvas.find_blobs(self.LINE_COLOR_THRESHOLD, roi=roi, merge=True)
if len(blobs) != 0:
largest_blob = max(blobs, key=lambda b: b.pixels())
if largest_blob.area() > 1000:
roi_blobs_result[roi_direct]['cx'] = largest_blob.cy()
roi_blobs_result[roi_direct]['cy'] = largest_blob.cx()
roi_blobs_result[roi_direct]['w'] = largest_blob.h()
roi_blobs_result[roi_direct]['blob_flag'] = True
x, y, width, height = largest_blob[:4]
img.draw_rectangle((x, y, width, height), color=(255))
else:
# blobs=canvas.find_blobs(LINE_COLOR_THRESHOLD, roi=roi, merge=True, pixels_area=10)
continue
blobs_baise = canvas.find_blobs(self.LINE_COLOR_BAISE, roi=(0, 0, 60, 240),
merge=True) # 车载摄像头屏幕下部找白色#宽度200修改为240#用途寻卡y示例中40修改为0
blobs_dixing = canvas.find_blobs(self.LINE_COLOR_BAISE, roi=(125, 0, 60, 240),
merge=True) # 车载摄像头屏幕中间找白色 #宽度200修改为240y示例中40修改为0
blobs_zuo = canvas.find_blobs(self.LINE_COLOR_THRESHOLD, roi=(0, 0, 180, 50), merge=True) # 车载摄像头屏幕左下部找黑色
blobs_you = canvas.find_blobs(self.LINE_COLOR_THRESHOLD, roi=(0, 190, 180, 50), merge=True) # 车载摄像头屏幕右下部找黑色
if len(blobs_baise) != 0:
print("*********进入循环第1步*******")
largest_baise = max(blobs_baise, key=lambda b: b.pixels())
wx, wy, wwidth, wwheight = largest_baise[:4]
arc = wwidth * wwheight
if arc >= 11000:
print("*********进入循环第2步*******")
if len(blobs_dixing) != 0:
print("*********进入循环第3步*******")
largest_dixing = max(blobs_dixing, key=lambda b: b.pixels())
wx, wy, wwidth, wwheight = largest_dixing[:4]
arc = wwidth * wwheight
if arc >= 11000:
print('kapian') # 首先中部区域识别到白色进入判断地形还是卡片,接着判断下部,如果为白色判断为卡片。
self.UsartSend(self.data_format_wrapper(0, 1, 0)) # 地形停止命令
if len(blobs_zuo) != 0 and len(blobs_you) != 0:
print("ka十字路口")
self.UsartSend(self.data_format_wrapper(1, 1, 0)) # 地形停止命令
else:
print('dixing')
print(roi_blobs_result) # 返回的是黑色色块,各区域中心位置
self.UsartSend(self.data_format_wrapper(0, 1, 0)) # 地形停止命令
return roi_blobs_result # 返回的是黑色色块,各区域中心位置
# 计算偏转状态值
def state_deflection_angle(self, roi_blobs_result):
'''
说明偏转状态值返回
'''
# ROI区域权重值
# ROIS_WEIGHT = [1, 1, 1, 1]
ROIS_WEIGHT = [1, 0, 0, 1]
state_crossing = False
deflection_angle = 0 # 偏转角
down_center = 0 # 中下值
center_num = 0 # 中间值
# print(roi_blobs_result)
# 偏转值计算ROI中心区域X值
centroid_sum = roi_blobs_result['up']['cx'] * ROIS_WEIGHT[0] + roi_blobs_result['middle_up']['cx'] * \
ROIS_WEIGHT[1] \
+ roi_blobs_result['middle_down']['cx'] * ROIS_WEIGHT[2] + roi_blobs_result['down']['cx'] * \
ROIS_WEIGHT[3]
if roi_blobs_result['up']['blob_flag']:
center_num += ROIS_WEIGHT[0]
if roi_blobs_result['middle_up']['blob_flag']:
center_num += ROIS_WEIGHT[1]
if roi_blobs_result['middle_down']['blob_flag']:
center_num += ROIS_WEIGHT[2]
if roi_blobs_result['down']['blob_flag']:
center_num += ROIS_WEIGHT[3]
center_pos = centroid_sum / (ROIS_WEIGHT[0] + ROIS_WEIGHT[1] + ROIS_WEIGHT[2] + ROIS_WEIGHT[3])
deflection_angle = (self.IMG_WIDTH / 2) - center_pos
# 判断两侧ROI区域检测到黑色线
if roi_blobs_result['left']['blob_flag'] and roi_blobs_result['right']['blob_flag']:
# 判断两侧ROI区域检测到黑色线处于图像下方1/3处
if roi_blobs_result['left']['cy'] <= ((self.IMG_HEIGHT / 3)) or roi_blobs_result['right']['cy'] <= (
(self.IMG_HEIGHT / 3)):
# 当最下方ROI区域的黑线宽度大于140像素检测到路口
if roi_blobs_result['down']['w'] > 140:
down_center = 1 # 自行修改处 判断识别到十字路口
print("输出了十字路口标识")
return down_center, state_crossing, deflection_angle
# 控制舵机
def Tise_servo(self, angle):
# 判断舵机控制方向
if angle < 0:
# 限制舵机角度,防止过大损坏舵机
if angle > 80:
angle = 80
angle = -angle
elif angle > 0:
# 限制舵机角度,防止过大损坏舵机
if angle > 35:
angle = 35
angle = angle
# PWM通过定时器配置接到IO17引脚
tim_pwm = Timer(Timer.TIMER0, Timer.CHANNEL0, mode=Timer.MODE_PWM)
S1 = PWM(tim_pwm, freq=50, duty=0, pin=17)
S1.duty((angle + 90) / 180 * 10 + 2.5)
def Servo(self, data):
'''
功能180度舵机angle:-90至90 表示相应的角度
360连续旋转度舵机angle:-90至90 旋转方向和速度值
duty占空比值0-100
'''
angle = data[5]
# 判断舵机控制方向
if data[4] == ord('-'):
# 限制舵机角度,防止过大损坏舵机
if angle > 80:
angle = 80
angle = -angle
elif data[4] == ord('+'):
# 限制舵机角度,防止过大损坏舵机
if angle > 35:
angle = 35
angle = angle
# PWM通过定时器配置接到IO17引脚
tim_pwm = Timer(Timer.TIMER0, Timer.CHANNEL0, mode=Timer.MODE_PWM)
S1 = PWM(tim_pwm, freq=50, duty=0, pin=17)
S1.duty((angle + 90) / 180 * 10 + 2.5)
# 检验校验和
def verify_checksum(self, data):
if len(data) != 8:
return False
# 计算校验和data[2]、data[3]、data[4] 和 data[5] 的和然后对256取模
calculated_checksum = (data[2] + data[3] + data[4] + data[5]) % 256
# 比较计算出的校验和与data[6]是否相等
if calculated_checksum == data[6]:
return True
else:
return False
# 串口发送
def UsartSend(self, str_data):
'''
串口发送函数
'''
print(str_data)
self.uart.write(str_data)
# 判断符号
def get_symbol(self, num):
'''
根据数值正负返回数值对应的符号
正数 + 负数- 主要为了方便C语言解析待符号的数值
'''
print("num = ", num)
if num >= 0:
return ord('+')
else:
return ord('-')
# 封装数据
def data_format_wrapper(self, down_center, state_crossing, deflection_angle):
'''
根据通信协议封装循迹数据
TODO 重新编写通信协议 与配套C解析代码
'''
send_data = [
0x55,
0x02,
0x91,
down_center, # 底部色块中心是否在中点附近#底部色块十字路口
1 if state_crossing else 0, # 是否越障 无用
self.get_symbol(deflection_angle), # 偏航角符号 print输出是43 + 45- +向左转调整 -向右转调整
abs(int(deflection_angle)), # 偏航角
0xbb]
return bytes(send_data)
def find_bracket_positions(input_str, startstr="<", endstr=">"):
"""
查找字符串中所有成对的指定括号的起始和结束位置
功能描述
- 遍历字符串查找使用指定起始括号 (startstr) 和结束括号 (endstr) 的匹配对
- 每找到一对匹配括号记录其起始位置和结束位置
- 返回包含所有匹配括号对的起始和结束位置的列表
实现逻辑
1. 使用栈记录每个起始括号 (startstr) 的位置索引
2. 每当遇到一个结束括号 (endstr) 从栈中弹出最近的起始括号位置与当前索引形成一个匹配对
3. 将匹配的索引对存入结果列表
边界条件
- 如果字符串中没有指定的括号返回空列表
- 如果有未闭合的起始括号不会记录未匹配的起始位置
时间复杂度
- O(n)其中 n 是字符串的长度
:param input_str: 输入字符串其中可能包含成对的指定括号
:param startstr: 指定的起始括号 '<' '{'默认为 '<'
:param endstr: 指定的结束括号 '>' '}'默认为 '>'
:return: 包含所有匹配括号对的起始和结束位置的列表每个匹配对用 [start, end] 表示
"""
result = []
stack = []
# 遍历字符串
for i, char in enumerate(input_str):
if char == startstr:
# 如果是起始括号,记录其位置
stack.append(i)
elif char == endstr:
# 如果是结束括号,检查栈是否为空
if stack:
# 从栈中弹出起始括号位置并记录配对
start = stack.pop()
result.append([start, i])
# 返回所有成对括号的位置
return result
def extract_valid_data(input_str):
print('输入'+input_str)
u=[
is_uppercase_letter,
is_digit,
is_digit,
is_digit,
is_uppercase_letter,
is_digit,
]
i=0
result=''
for s in input_str:
if i >= len(u):
break
if u[i](s):
i+=1
result+=s
result = result.strip()
if len(result) == len(u):
print("输出"+result)
return result
print("输出None")
return None
def a(str,i):
if i==0:
return is_uppercase_letter(str)
elif i==1:
return is_digit(str)
elif i==2:
return is_digit(str)
elif i==3:
return is_digit(str)
elif i==4:
return is_uppercase_letter(str)
elif i==5:
return is_digit(str)
else:
return None
def is_uppercase_letter(char):
return 'A' <= char <= 'Z'
def is_digit(char):
return '0' <= char <= '9'
# 运行程序
myMain = Mainlen()
myMain.startMain()

View File

@ -32,8 +32,8 @@ def get_color_threshold(image_path, color):
return mask return mask
# 示例用法 # 示例用法
image_path = "data/congche_hld_3.jpg" # 替换为您的图像路径 image_path = "../first_frame1.jpg" # 替换为您的图像路径
color = "green" # 替换为您想要获取阈值的颜色 color = "red" # 替换为您想要获取阈值的颜色
threshold_mask = get_color_threshold(image_path, color) threshold_mask = get_color_threshold(image_path, color)
# 显示阈值图像 # 显示阈值图像

Binary file not shown.

Before

Width:  |  Height:  |  Size: 218 KiB

After

Width:  |  Height:  |  Size: 87 KiB

130
图片相似度.py Normal file
View File

@ -0,0 +1,130 @@
import os
from skimage.metrics import structural_similarity as ssim
import cv2
import concurrent.futures
def split_file_name(file_name):
"""
将文件名拆分为前缀和后缀
:param file_name: 文件名
:return: 前缀和后缀
"""
file_name = os.path.basename(file_name)
file_name = file_name.split('.')[0]
file_names = file_name.split('_')
return file_names[0] + '_' + file_names[1], file_names[2]
def get_sorted_files_group(directory):
try:
# 检查目录是否存在
if not os.path.exists(directory):
print(f"目录 {directory} 不存在!")
return {}
# 获取目录下的所有文件及其完整路径
files = [
os.path.join(directory, f) for f in os.listdir(directory)
if os.path.isfile(os.path.join(directory, f))
]
files_group = {}
for file in files:
prefix, suffix = split_file_name(file)
files_group.setdefault(prefix, []).append(file)
return files_group
except Exception as e:
print(f"发生错误: {e}")
return {}
def compare_images(image1_path, image2_path):
try:
# 读取两张图片
img1 = cv2.imread(image1_path, cv2.IMREAD_GRAYSCALE)
img2 = cv2.imread(image2_path, cv2.IMREAD_GRAYSCALE)
# 检查图片是否成功读取
if img1 is None or img2 is None:
print(f"无法读取图片: {image1_path}{image2_path}")
return None
# 调整两张图片为相同大小
img1 = cv2.resize(
img1, (min(img1.shape[1], img2.shape[1]), min(img1.shape[0], img2.shape[0]))
)
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
# 计算相似度
similarity, _ = ssim(img1, img2, full=True)
return similarity
except Exception as e:
print(f"比较图片时发生错误: {e}")
return None
def del_group(group):
"""
返回需要删除的图片列表不在函数里直接执行删除操作
"""
del_list = []
compared_pairs = set() # 用于记录已经比较过的图片对
for i in range(len(group)):
for j in range(len(group)):
if i != j and (i, j) not in compared_pairs and (j, i) not in compared_pairs:
# 标记为已比较
compared_pairs.add((i, j))
# group[i] 或 group[j] 已经在 del_list 中就没必要比较了
if group[i] not in del_list and group[j] not in del_list:
similarity = compare_images(group[i], group[j])
if similarity is not None and similarity > 0.965:
# 相似度大于 0.98,判定为重复
# 保留 group[j],删除 group[i],也可以根据需要调换顺序
del_list.append(group[i])
return del_list
def process_group(prefix, file_list):
"""
子进程运行函数
1. 对指定 group 进行去重计算
2. 返回 (prefix, del_list)
"""
print(f'开始处理 {prefix}, 共 {len(file_list)} 张图片')
del_list = del_group(file_list)
print(f'需要删除 {len(del_list)}/{len(file_list)} 张图片')
return prefix, del_list
if __name__ == "__main__":
directory = r"C:\Users\10561\Desktop\frames"
files_group = get_sorted_files_group(directory)
# for e in files_group:
# print(e, len(files_group[e]))
# exit()
# 使用多进程来处理每一个 group
results = []
with concurrent.futures.ProcessPoolExecutor() as executor:
future_to_prefix = {}
for prefix, file_list in files_group.items():
future = executor.submit(process_group, prefix, file_list)
future_to_prefix[future] = prefix
for future in concurrent.futures.as_completed(future_to_prefix):
prefix = future_to_prefix[future]
try:
ret_prefix, del_list = future.result()
# 在主进程里进行删除
for e in del_list:
if os.path.exists(e):
# print(f'删除 {e}')
os.remove(e)
except Exception as e:
print(f"{prefix} 处理时发生错误: {e}")
print("所有任务已完成!")

51
抽帧.py Normal file
View File

@ -0,0 +1,51 @@
import cv2
import os
def extract_frames(video_path, output_dir, interval=0.5,output_name='frame'):
# 打开视频文件
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"无法打开视频文件: {video_path}")
return
# 获取视频帧率和总时长
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = int(fps * interval) # 每隔多少帧保存一帧
print(fps)
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
frame_count = 0
saved_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
# 如果当前帧是需要保存的帧
if frame_count % frame_interval == 0:
output_image_path = os.path.join(output_dir, f"{output_name}_{saved_count}.jpg")
cv2.imwrite(output_image_path, frame)
print(f"保存帧到: {output_image_path}")
saved_count += 1
frame_count += 1
# 释放视频对象
cap.release()
print(f"共保存了 {saved_count} 帧.")
video_dir=r'C:\Users\10561\Desktop\bcar红绿灯\1'
output_directory = r"C:\Users\10561\Desktop\frames" # 替换为你希望保存的目录路径
for file in os.listdir(video_dir):
if file.endswith('.mp4'):
video_file=os.path.join(video_dir,file)
extract_frames(video_file, output_directory, interval=0.4,output_name=file.split('.')[0])
# 示例调用
# video_file = r"C:\Users\10561\Desktop\7.mp4" # 替换为你的视频文件路径
# extract_frames(video_file, output_directory, interval=0.5)

View File

@ -1,6 +1,5 @@
import json import json
import math import math
import pandas as pd import pandas as pd
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from matplotlib import rcParams from matplotlib import rcParams
@ -10,7 +9,7 @@ rcParams['font.sans-serif'] = ['SimHei'] # 或者 ['Microsoft YaHei']
rcParams['axes.unicode_minus'] = False # 解决负号显示问题 rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# 指定 JSON 文件的路径 # 指定 JSON 文件的路径
file_path = r"C:\Users\10561\Desktop\2025-01-02_应用日志.json" file_path = r"C:\Users\10561\Desktop\2025-01-06_应用日志.json"
# 打开并读取 JSON 文件 # 打开并读取 JSON 文件
with open(file_path, 'r', encoding='utf-8') as file: with open(file_path, 'r', encoding='utf-8') as file:
@ -21,7 +20,6 @@ log_ones = []
start_idx = -1 start_idx = -1
for idx, one in enumerate(data): for idx, one in enumerate(data):
if 'Acar' == one['设备'] and '2' == one['类型']: if 'Acar' == one['设备'] and '2' == one['类型']:
print(one)
if 1 == one['位置'] and 1 == one['方向']: if 1 == one['位置'] and 1 == one['方向']:
start_idx = idx start_idx = idx
continue continue
@ -30,12 +28,14 @@ for idx, one in enumerate(data):
continue continue
log_ones.append(data[start_idx:idx+1]) log_ones.append(data[start_idx:idx+1])
start_idx = -1 start_idx = -1
print(idx)
# 筛选出只有 "Acar" 的日志片段 # 筛选出只有 "Acar" 的日志片段
log_ones_new = [] log_ones_new = []
for one in log_ones: for one in log_ones:
one_only_car = [one_one for one_one in one if 'Acar' == one_one['设备']] one_only_car = [one_one for one_one in one if 'Acar' == one_one['设备']]
log_ones_new.append(one_only_car) log_ones_new.append(one_only_car)
[print(o) for o in log_ones_new]
# 计算时间差 # 计算时间差
time_diffs = [] time_diffs = []
@ -45,44 +45,56 @@ for one_one in log_ones_new:
df['时间差'] = df['时间戳'].diff() df['时间差'] = df['时间戳'].diff()
time_diff = df[['位置', '方向', '时间差']].dropna().reset_index(drop=True) time_diff = df[['位置', '方向', '时间差']].dropna().reset_index(drop=True)
time_diffs.append(time_diff) time_diffs.append(time_diff)
print(time_diffs)
# 找出行数最多的时间差数据集 # 找出行数最多的时间差数据集,以它的 (位置-方向) 组合作为标准 X 轴标签
max_len_dataset = max(time_diffs, key=len) # 找到行数最多的 DataFrame max_len_dataset = max(time_diffs, key=len)
print(max_len_dataset) all_x_labels = max_len_dataset['位置'].astype(str) + '-' + max_len_dataset['方向'].astype(str)
all_x_labels = max_len_dataset['位置'].astype(str) + '-' + max_len_dataset['方向'].astype(str) # 提取 X 轴标签
# all_x_labels=pd.DataFrame({'位置-方向': all_x_labels}) # 标准化所有数据集(没有对应的“位置-方向”就补 0
# 标准化所有数据集,缺少的补 0
standardized_time_diffs = [] standardized_time_diffs = []
time_diffs_new = []
for time_diff in time_diffs: for time_diff in time_diffs:
time_diff['位置-方向'] = time_diff['位置'].astype(str) + '-' + time_diff['方向'].astype(str) time_diff['位置-方向'] = time_diff['位置'].astype(str) + '-' + time_diff['方向'].astype(str)
# print(time_diff)
standardized_time_diff = [] standardized_time_diff = []
# 手动补零 data_idx = 0
data_idx=0 last_is_null = 0
last_is_null=0 for idx_label, label in enumerate(all_x_labels):
for idx,label in enumerate(all_x_labels): row = time_diff.iloc[data_idx] if data_idx < len(time_diff) else None
first_row = time_diff.iloc[data_idx] if row is not None and row['位置-方向'] == label:
if first_row['位置-方向']==label: # 命中当前 label
if last_is_null==1: if last_is_null == 1:
last_is_null=0 last_is_null = 0
standardized_time_diff.append(0) standardized_time_diff.append(0) # 这里保持你原先的逻辑,先补 0后面再统一填充
else: else:
standardized_time_diff.append(first_row['时间差']) standardized_time_diff.append(row['时间差'])
data_idx+=1 data_idx += 1
else: else:
# 没有命中当前 label
standardized_time_diff.append(0) standardized_time_diff.append(0)
last_is_null=1 last_is_null = 1
print(standardized_time_diff)
standardized_time_diffs.append(standardized_time_diff)
print('-------------------')
standardized_time_diffs.append(standardized_time_diff)
# -------- 新增:填充 0 的函数 --------
def fill_zeros_with_nearest(values):
"""
将列表中的 0 用最近的非 0 值进行填充
先前向填充再后向填充
"""
# 前向填充
for i in range(1, len(values)):
if values[i] == 0 and values[i-1] != 0:
values[i] = values[i-1]
# 后向填充
for i in range(len(values) - 2, -1, -1):
if values[i] == 0 and values[i+1] != 0:
values[i] = values[i+1]
return values
# 创建子图的行列数(自动计算) # 创建子图的行列数(自动计算)
num_plots = len(all_x_labels) num_plots = len(all_x_labels)
rows = math.ceil(math.sqrt(num_plots)) # 行数 rows = math.ceil(math.sqrt(num_plots)) # 行数
cols = math.ceil(num_plots / rows) # 列数 cols = math.ceil(num_plots / rows) # 列数
# 创建大画布 # 创建大画布
fig, axes = plt.subplots(rows, cols, figsize=(16, 12)) fig, axes = plt.subplots(rows, cols, figsize=(16, 12))
@ -91,22 +103,34 @@ axes = axes.flatten() # 将子图数组展平,方便迭代
# 在每个子图中绘制折线 # 在每个子图中绘制折线
for idx, label in enumerate(all_x_labels): for idx, label in enumerate(all_x_labels):
# 获取当前 "位置-方向" 对应的 y 值 # 获取当前 "位置-方向" 对应的 y 值
y_values = [time_diff[idx] if idx < len(time_diff) else 0 for time_diff in standardized_time_diffs] y_values = []
for time_diff in standardized_time_diffs:
# 如果下标越界就补 0
y_values.append(time_diff[idx] if idx < len(time_diff) else 0)
# -------- 在绘图前,对 y_values 做一次非 0 值填充 --------
y_values_filled = fill_zeros_with_nearest(y_values)
# 绘制当前子图 # 绘制当前子图
ax = axes[idx] ax = axes[idx]
ax.plot( ax.plot(
range(len(standardized_time_diffs)), # x轴为不同的时间差数据集序号 range(len(y_values_filled)),
y_values, # y轴为对应的时间差 y_values_filled,
marker='o', # 标记点样式 marker='o',
) )
ax.set_ylim(0, 10000)
# 设置标题和轴标签 # 设置标题和轴标签
ax.set_title(f'{label} 的时间差折线图', fontsize=10) ax.set_title(f'{label} 的时间差折线图', fontsize=10)
ax.set_xlabel('标准化时间差数据集', fontsize=8) ax.set_xlabel('标准化时间差数据集', fontsize=8)
ax.set_ylabel('时间差 (ms)', fontsize=8) ax.set_ylabel('时间差 (ms)', fontsize=8)
ax.set_xticks(range(len(standardized_time_diffs))) ax.set_xticks(range(len(y_values_filled)))
ax.set_xticklabels([f'TimeDiff {i+1}' for i in range(len(standardized_time_diffs))], fontsize=6, rotation=45) ax.set_xticklabels(
[f'TimeDiff {i+1}' for i in range(len(y_values_filled))],
fontsize=6,
rotation=45
)
ax.grid(axis='y', linestyle='--', alpha=0.7) ax.grid(axis='y', linestyle='--', alpha=0.7)
# 删除多余的子图(如果子图数量多于折线图数量) # 删除多余的子图(如果子图数量多于折线图数量)
@ -115,4 +139,4 @@ for ax in axes[num_plots:]:
# 调整布局 # 调整布局
plt.tight_layout() plt.tight_layout()
plt.show() plt.show()

120
日志处理文件_真.py Normal file
View File

@ -0,0 +1,120 @@
import json
import math
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import rcParams
# 设置字体为 SimHei黑体或其他支持中文的字体
rcParams['font.sans-serif'] = ['SimHei'] # 或者 ['Microsoft YaHei']
rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# 指定 JSON 文件的路径
file_path = r"C:\Users\10561\Desktop\2025-01-06_应用日志.json"
# 打开并读取 JSON 文件
with open(file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
# 提取指定设备和类型的日志片段
log_ones = []
start_idx = -1
for idx, one in enumerate(data):
if 'Acar' == one['设备'] and '2' == one['类型']:
# print(one)
if 1 == one['位置'] and 1 == one['方向']:
start_idx = idx
continue
if 8 == one['位置'] and 2 == one['方向']:
if start_idx == -1:
continue
log_ones.append(data[start_idx:idx+1])
start_idx = -1
print(idx)
# 筛选出只有 "Acar" 的日志片段
log_ones_new = []
for one in log_ones:
one_only_car = [one_one for one_one in one if 'Acar' == one_one['设备']]
log_ones_new.append(one_only_car)
[print(o) for o in log_ones_new]
# 计算时间差
time_diffs = []
for one_one in log_ones_new:
df = pd.DataFrame(one_one)
df['时间戳'] = pd.to_numeric(df['时间戳'])
df['时间差'] = df['时间戳'].diff()
time_diff = df[['位置', '方向', '时间差']].dropna().reset_index(drop=True)
time_diffs.append(time_diff)
# print(time_diffs)
# 找出行数最多的时间差数据集
max_len_dataset = max(time_diffs, key=len) # 找到行数最多的 DataFrame
# print(max_len_dataset)
all_x_labels = max_len_dataset['位置'].astype(str) + '-' + max_len_dataset['方向'].astype(str) # 提取 X 轴标签
# all_x_labels=pd.DataFrame({'位置-方向': all_x_labels})
# 标准化所有数据集,缺少的补 0
standardized_time_diffs = []
time_diffs_new = []
for time_diff in time_diffs:
time_diff['位置-方向'] = time_diff['位置'].astype(str) + '-' + time_diff['方向'].astype(str)
# print(time_diff)
standardized_time_diff = []
# 手动补零
data_idx=0
last_is_null=0
for idx,label in enumerate(all_x_labels):
first_row = time_diff.iloc[data_idx]
if first_row['位置-方向']==label:
if last_is_null==1:
last_is_null=0
standardized_time_diff.append(0)
else:
standardized_time_diff.append(first_row['时间差'])
data_idx+=1
else:
standardized_time_diff.append(0)
last_is_null=1
print(standardized_time_diff)
standardized_time_diffs.append(standardized_time_diff)
print('-------------------')
# 创建子图的行列数(自动计算)
num_plots = len(all_x_labels)
rows = math.ceil(math.sqrt(num_plots)) # 行数
cols = math.ceil(num_plots / rows) # 列数
# 创建大画布
fig, axes = plt.subplots(rows, cols, figsize=(16, 12))
axes = axes.flatten() # 将子图数组展平,方便迭代
# 在每个子图中绘制折线
for idx, label in enumerate(all_x_labels):
# 获取当前 "位置-方向" 对应的 y 值
y_values = [time_diff[idx] if idx < len(time_diff) else 0 for time_diff in standardized_time_diffs]
# 绘制当前子图
ax = axes[idx]
ax.plot(
range(len(standardized_time_diffs)), # x轴为不同的时间差数据集序号
y_values, # y轴为对应的时间差
marker='o', # 标记点样式
)
ax.set_ylim(0, 10000)
# 设置标题和轴标签
ax.set_title(f'{label} 的时间差折线图', fontsize=10)
ax.set_xlabel('标准化时间差数据集', fontsize=8)
ax.set_ylabel('时间差 (ms)', fontsize=8)
ax.set_xticks(range(len(standardized_time_diffs)))
ax.set_xticklabels([f'TimeDiff {i+1}' for i in range(len(standardized_time_diffs))], fontsize=6, rotation=45)
ax.grid(axis='y', linestyle='--', alpha=0.7)
# 删除多余的子图(如果子图数量多于折线图数量)
for ax in axes[num_plots:]:
fig.delaxes(ax)
# 调整布局
plt.tight_layout()
plt.show()

View File

@ -4,8 +4,8 @@
import os import os
folder_path = r"E:\Waste\WXSC\人工智能社团\24换届\图片" # 图片地址路径 folder_path = r"C:\Users\10561\Desktop\bcar红绿灯\2" # 图片地址路径
name = "rgzns_" name = "hld2_"
count = 0 # 起始值 count = 0 # 起始值