1
This commit is contained in:
parent
8f82b189a1
commit
b553734170
@ -2,7 +2,7 @@
|
|||||||
<module type="PYTHON_MODULE" version="4">
|
<module type="PYTHON_MODULE" version="4">
|
||||||
<component name="NewModuleRootManager">
|
<component name="NewModuleRootManager">
|
||||||
<content url="file://$MODULE_DIR$" />
|
<content url="file://$MODULE_DIR$" />
|
||||||
<orderEntry type="jdk" jdkName="yolov8-3" jdkType="Python SDK" />
|
<orderEntry type="jdk" jdkName="Embedded_game" jdkType="Python SDK" />
|
||||||
<orderEntry type="sourceFolder" forTests="false" />
|
<orderEntry type="sourceFolder" forTests="false" />
|
||||||
</component>
|
</component>
|
||||||
<component name="PyDocumentationSettings">
|
<component name="PyDocumentationSettings">
|
||||||
|
@ -1,4 +1,7 @@
|
|||||||
<?xml version="1.0" encoding="UTF-8"?>
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
<project version="4">
|
<project version="4">
|
||||||
<component name="ProjectRootManager" version="2" project-jdk-name="yolov8-3" project-jdk-type="Python SDK" />
|
<component name="Black">
|
||||||
|
<option name="sdkName" value="yolov8-3" />
|
||||||
|
</component>
|
||||||
|
<component name="ProjectRootManager" version="2" project-jdk-name="Embedded_game" project-jdk-type="Python SDK" />
|
||||||
</project>
|
</project>
|
@ -265,7 +265,7 @@ class Mainlen():
|
|||||||
# result = self.remove_chinese_chars(result) # 剔除中文
|
# result = self.remove_chinese_chars(result) # 剔除中文
|
||||||
print(result)
|
print(result)
|
||||||
|
|
||||||
if len(result) > 10: # 车牌
|
if len(result) == 8: # 车牌
|
||||||
result = self.jiajin(result)
|
result = self.jiajin(result)
|
||||||
result = self.binary_to_hexadecimal(result)
|
result = self.binary_to_hexadecimal(result)
|
||||||
count = 2
|
count = 2
|
||||||
|
@ -303,6 +303,8 @@ class Mainlen():
|
|||||||
|
|
||||||
# time.sleep(1)
|
# time.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# 二进制转十六进制
|
# 二进制转十六进制
|
||||||
def binary_to_hexadecimal(self, binary_str):
|
def binary_to_hexadecimal(self, binary_str):
|
||||||
if not isinstance(binary_str, str) or not all(c in '01' for c in binary_str):
|
if not isinstance(binary_str, str) or not all(c in '01' for c in binary_str):
|
||||||
|
@ -1,147 +1,38 @@
|
|||||||
# 二进制转十六进制
|
def process_qrcode_one(data):
|
||||||
def binary_to_hexadecimal( binary_str):
|
result = []
|
||||||
if not isinstance(binary_str, str) or not all(c in '01' for c in binary_str):
|
for char in data:
|
||||||
raise ValueError("输入必须是二进制字符串")
|
if char.isalpha() and char.isupper():
|
||||||
|
result.append(char)
|
||||||
|
elif char.isdigit():
|
||||||
|
result.append(char)
|
||||||
|
if len(result) >= 8:
|
||||||
|
break
|
||||||
|
return ''.join(result)
|
||||||
|
|
||||||
hex_str = ''
|
def process_qrcode_two(formula, h, n, y):
|
||||||
while len(binary_str) > 0:
|
formula = formula.replace("h", str(h))
|
||||||
# 将二进制字符串每四位分成一组,从右到左
|
formula = formula.replace("n", str(n))
|
||||||
group = binary_str[-4:]
|
formula = formula.replace("y", str(y))
|
||||||
binary_str = binary_str[:-4]
|
|
||||||
|
|
||||||
# 将每组四位的二进制数转换为相应的十六进制数
|
|
||||||
hex_str = hex_str + hex(int(group, 2))[2:]
|
|
||||||
|
|
||||||
return hex_str
|
|
||||||
|
|
||||||
def jiajin( txt):
|
|
||||||
index_j = -1
|
|
||||||
index_jian = -1
|
|
||||||
result = ""
|
|
||||||
for i in range(len(txt)):
|
|
||||||
if txt[i] == "+":
|
|
||||||
index_j = 1
|
|
||||||
elif txt[i] == "-":
|
|
||||||
index_jian = 2
|
|
||||||
else:
|
|
||||||
result += txt[i]
|
|
||||||
|
|
||||||
print(result)
|
|
||||||
print(index_j < index_jian)
|
|
||||||
# 若是加,将对整个字符串进行左移两位
|
|
||||||
if index_j < index_jian:
|
|
||||||
end = result[2:] + result[:2]
|
|
||||||
|
|
||||||
# 若是-,将对字符串进行右移三位
|
|
||||||
else:
|
|
||||||
end = result[-3:] + result[:-3]
|
|
||||||
|
|
||||||
return end
|
|
||||||
|
|
||||||
|
|
||||||
# 提取字符和数字
|
|
||||||
def extract_numbers_and_letters_combined( text):
|
|
||||||
combined_str = ''
|
|
||||||
for char in text:
|
|
||||||
# 检查字符是否是数字(0-9)或字母(A-Z 或 a-z)
|
|
||||||
if ('0' <= char <= '9') or ('A' <= char <= 'Z') or ('a' <= char <= 'z'):
|
|
||||||
combined_str += char
|
|
||||||
return combined_str
|
|
||||||
# 移除ASCII码值外的数据
|
|
||||||
def remove_non_ascii_chars(text):
|
|
||||||
"""移除字符串中的非ASCII字符,并返回新的字符串"""
|
|
||||||
# 只保留ASCII字符
|
|
||||||
return ''.join(char for char in text if ord(char) < 128)
|
|
||||||
def convert_hex(hex_str):
|
|
||||||
"""
|
|
||||||
将十六进制字符串转换为反转字节并添加 0x 前缀的格式。
|
|
||||||
|
|
||||||
例如:
|
|
||||||
输入: "e293"
|
|
||||||
输出: "0x2e,0x39"
|
|
||||||
|
|
||||||
Args:
|
|
||||||
hex_str (str): 输入的十六进制字符串。
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: 转换后的字符串,格式如 "0x2e,0x39"。
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ValueError: 如果输入的字符串长度不是偶数或包含非十六进制字符。
|
|
||||||
"""
|
|
||||||
# 移除可能的前缀,如 "0x"
|
|
||||||
hex_str = hex_str.lower().replace("0x", "")
|
|
||||||
|
|
||||||
# 检查输入长度是否为偶数
|
|
||||||
if len(hex_str) % 2 != 0:
|
|
||||||
raise ValueError("输入的十六进制字符串长度必须是偶数。")
|
|
||||||
|
|
||||||
# 检查是否只包含有效的十六进制字符
|
|
||||||
if not all(c in '0123456789abcdef' for c in hex_str):
|
|
||||||
raise ValueError("输入的字符串包含非十六进制字符。")
|
|
||||||
|
|
||||||
# 将字符串按每两个字符分割成字节
|
|
||||||
bytes_list = [hex_str[i:i+2] for i in range(0, len(hex_str), 2)]
|
|
||||||
|
|
||||||
# 反转每个字节
|
|
||||||
reversed_bytes = [byte[::-1] for byte in bytes_list]
|
|
||||||
|
|
||||||
# 为每个反转后的字节添加 "0x" 前缀
|
|
||||||
hex_with_prefix = [f"0x{byte}" for byte in reversed_bytes]
|
|
||||||
return hex_with_prefix
|
|
||||||
def hex_list_to_ascii(hex_list):
|
|
||||||
"""
|
|
||||||
将十六进制字符串列表转换为对应的 ASCII 字符串。
|
|
||||||
|
|
||||||
例如:
|
|
||||||
输入: ['0x2e', '0x39']
|
|
||||||
输出: ".9"
|
|
||||||
|
|
||||||
Args:
|
|
||||||
hex_list (list of str): 包含十六进制字符串的列表,每个字符串应以 "0x" 开头。
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: 转换后的 ASCII 字符串。
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ValueError: 如果列表中的字符串不是有效的十六进制格式或超出 ASCII 范围。
|
|
||||||
"""
|
|
||||||
ascii_chars = []
|
|
||||||
for hex_str in hex_list:
|
|
||||||
# 移除 "0x" 前缀并转换为整数
|
|
||||||
try:
|
try:
|
||||||
hex_value = int(hex_str, 16)
|
result = eval(formula)
|
||||||
except ValueError:
|
return result % 5
|
||||||
raise ValueError(f"'{hex_str}' 不是有效的十六进制数。")
|
except Exception as e:
|
||||||
|
raise ValueError(f"公式解析错误: {e}")
|
||||||
|
|
||||||
# 检查是否在有效的 ASCII 范围内
|
def display_result(x):
|
||||||
if hex_value < 0 or hex_value > 0x7F:
|
hex_x = hex(x)[2:].upper()
|
||||||
raise ValueError(f"十六进制数 '{hex_str}' 超出 ASCII 范围。")
|
return f"20240{hex_x}"
|
||||||
|
|
||||||
# 转换为对应的 ASCII 字符
|
# 示例数据
|
||||||
ascii_char = chr(hex_value)
|
qrcode_one = "A/145#B6"
|
||||||
ascii_chars.append(ascii_char)
|
qrcode_two_formula = "n*y+h"
|
||||||
return ascii_chars
|
h, n, y = 10, 3, 2
|
||||||
result="101110+01001-011"
|
|
||||||
print(result)
|
|
||||||
result = remove_non_ascii_chars(result) # 剔除ASCII码值外的数据
|
|
||||||
# result = remove_chinese_chars(result) # 剔除中文
|
|
||||||
print(result)
|
|
||||||
|
|
||||||
if len(result) > 10: # 车牌
|
valid_data = process_qrcode_one(qrcode_one)
|
||||||
result = jiajin(result)
|
print(valid_data)
|
||||||
result = binary_to_hexadecimal(result)
|
x = process_qrcode_two(qrcode_two_formula, h, n, y)
|
||||||
|
display = display_result(x)
|
||||||
|
|
||||||
result=hex_list_to_ascii(convert_hex(result))
|
print("二维码一有效数据:", valid_data)
|
||||||
print(len(result))
|
print("二维码二计算结果x:", x)
|
||||||
print(type(result[0]))
|
print("多功能信息显示B:", display)
|
||||||
|
|
||||||
def string_to_bytes(hex_str):
|
|
||||||
# 移除可能的空格或前缀
|
|
||||||
hex_str = hex_str.strip().lower().replace("0x", "")
|
|
||||||
# 将字符串转换为整数并转为字节
|
|
||||||
return bytes([int(hex_str, 16)])
|
|
||||||
|
|
||||||
# 示例
|
|
||||||
result = string_to_bytes("11")
|
|
||||||
print(result) # 输出:b'\x01'
|
|
||||||
|
640
002_B_Car/xdl/xdl_bcar_002.py
Normal file
640
002_B_Car/xdl/xdl_bcar_002.py
Normal file
@ -0,0 +1,640 @@
|
|||||||
|
# -*- coding:utf-8 -*-
|
||||||
|
# @Author len
|
||||||
|
# @Create 2023/11/23 11:28
|
||||||
|
|
||||||
|
import math
|
||||||
|
import sensor, image, time, lcd
|
||||||
|
import binascii
|
||||||
|
from Maix import GPIO
|
||||||
|
from machine import Timer, PWM, UART, Timer
|
||||||
|
from fpioa_manager import fm
|
||||||
|
import KPU as kpu
|
||||||
|
|
||||||
|
|
||||||
|
class Mainlen():
|
||||||
|
def __init__(self):
|
||||||
|
'''初始化摄像头和 LCD 显示屏'''
|
||||||
|
lcd.init() # lcd初始化
|
||||||
|
self.qr91=""
|
||||||
|
self.qr92=""
|
||||||
|
self.canera_init() # 摄像头初始化
|
||||||
|
# sensor.set_auto_gain(0, gain_db=17) # 设置摄像头的自动增益功能
|
||||||
|
|
||||||
|
# 映射串口引脚
|
||||||
|
fm.register(6, fm.fpioa.UART1_RX, force=True)
|
||||||
|
fm.register(7, fm.fpioa.UART1_TX, force=True)
|
||||||
|
|
||||||
|
# 初始化串口
|
||||||
|
self.uart = UART(UART.UART1, 115200, read_buf_len=4096)
|
||||||
|
|
||||||
|
# 循迹
|
||||||
|
# --------------感光芯片配置 START -------------------
|
||||||
|
self.IMG_WIDTH = 240
|
||||||
|
self.IMG_HEIGHT = 320
|
||||||
|
# 直线灰度图颜色阈值
|
||||||
|
self.LINE_COLOR_THRESHOLD = [(0, 60)] # 找黑色
|
||||||
|
self.LINE_COLOR_BAISE = [(60, 255)] # 找白色
|
||||||
|
self.ROIS = { # 找黑色
|
||||||
|
# 'left': (0, 0, 320, 50), # 纵向取样-左侧
|
||||||
|
# 'right': (0, 190, 320, 50), # 纵向取样-右侧
|
||||||
|
'left': (0, 0, 180, 50), # 纵向取样-左侧
|
||||||
|
'right': (0, 190, 180, 50), # 纵向取样-右侧
|
||||||
|
'up': (240, 0, 80, 240), # 横向取样-上方
|
||||||
|
'middle_up': (160, 0, 80, 240), # 横向取样-中上
|
||||||
|
'middle_down': (80, 0, 80, 240), # 横向取样-中下
|
||||||
|
'down': (0, 0, 80, 240), # 横向取样-下方
|
||||||
|
}
|
||||||
|
|
||||||
|
# 红黄绿的阈值
|
||||||
|
# 颜色识别阈值 (L Min, L Max, A Min, A Max, B Min, B Max) LAB模型
|
||||||
|
# 下列阈值元组用来识别 红、绿、蓝三种颜色,可通过调整数据阈值完成更多颜色的识别。
|
||||||
|
# self.thresholds = [(59, 100, 40, 127, 5, 127), # 红色阈值
|
||||||
|
# (90, 100, -5, 2, -4, 20), # 黄色阈值
|
||||||
|
# (87, 100, -59, 127, -10, 127),] # 绿色阈值
|
||||||
|
|
||||||
|
# self.thresholds = [(35, 100, 6, 127, 0, 127, "红色"), # 红色阈值
|
||||||
|
# (25, 100, -6, 127, 5, 43, "黄色"), # 黄色阈值
|
||||||
|
# (52, 100, -128, -5, 5, 127, "绿色")] # 绿色阈值
|
||||||
|
self.thresholds = [(43, 68, 7, 74, 1, 46, "红色"), # 红色阈值
|
||||||
|
(48, 100, -23, 19, 4, 55, "黄色"), # 黄色阈值
|
||||||
|
(40, 93, -57, -7, -8, 29, "绿色")] # 绿色阈值
|
||||||
|
# self.thresholds = [(40, 70, 7, 90, -1, 46, "红色"), # 红色阈值
|
||||||
|
# (34, 98, -17, 50, 7, 67, "黄色"), # 黄色阈值
|
||||||
|
# (40, 93, -57, -7, -8, 29, "绿色")] # 绿色阈值
|
||||||
|
|
||||||
|
self.is_need_send_data = False # 是否需要发送数据的信号标志
|
||||||
|
|
||||||
|
# 主函数 run
|
||||||
|
def startMain(self):
|
||||||
|
Flag_track = False # 循迹标识
|
||||||
|
Flag_qr = False # 二维码标识
|
||||||
|
Flag_qr2 = False # 二维码标识
|
||||||
|
|
||||||
|
Flag_light = False # 交通灯标识
|
||||||
|
while True:
|
||||||
|
data = self.uart.read(8)
|
||||||
|
# print(data)
|
||||||
|
# print(len(data))
|
||||||
|
# if (len(data) >= 8):
|
||||||
|
if data is not None:
|
||||||
|
if (data[1] == 0x02) and (data[7] == 0xBB) and self.verify_checksum(data):
|
||||||
|
# 巡线与控制舵机
|
||||||
|
if data[2] == 0x91:
|
||||||
|
if data[3] == 0x01: # 启动循迹
|
||||||
|
sensor.set_pixformat(sensor.GRAYSCALE) # 设置像素格式为灰色
|
||||||
|
Flag_track = True
|
||||||
|
print("开始循迹")
|
||||||
|
|
||||||
|
elif data[3] == 0x02: # 停止循迹
|
||||||
|
sensor.set_pixformat(sensor.RGB565) # 设置像素格式为彩色 RGB565
|
||||||
|
Flag_track = False
|
||||||
|
print("停止循迹")
|
||||||
|
|
||||||
|
|
||||||
|
elif data[3] == 0x03: # 调整舵机
|
||||||
|
print("调整舵机")
|
||||||
|
self.Servo(data)
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# 识别任务
|
||||||
|
elif data[2] == 0x92:
|
||||||
|
if data[3] == 0x01: # 识别二维码
|
||||||
|
sensor.set_pixformat(sensor.RGB565) # 设置像素格式为彩色 RGB565
|
||||||
|
Flag_qr = True
|
||||||
|
print("开始识别二维码任务10")
|
||||||
|
elif data[3] == 0x02: # 停止识别二维码
|
||||||
|
Flag_qr = False
|
||||||
|
print("停止识别二维码")
|
||||||
|
|
||||||
|
elif data[3] == 0x03: # 识别交通灯
|
||||||
|
sensor.set_pixformat(sensor.RGB565) # 设置像素格式为彩色 RGB565
|
||||||
|
Flag_light = True
|
||||||
|
print("开始识别交通灯")
|
||||||
|
|
||||||
|
elif data[3] == 0x04: # 停止识别交通灯
|
||||||
|
Flag_light = False
|
||||||
|
print("停止识别交通灯")
|
||||||
|
|
||||||
|
elif data[3] == 0x06: # 调整摄像头阈值
|
||||||
|
print("调整摄像头阈值")
|
||||||
|
self.canera_ash()
|
||||||
|
else:
|
||||||
|
pass
|
||||||
|
img = sensor.snapshot() # 获取图像
|
||||||
|
if Flag_track: # 循迹
|
||||||
|
print("循迹")
|
||||||
|
self.tracking(img)
|
||||||
|
elif Flag_qr: # 二维码#
|
||||||
|
print("识别二维码")
|
||||||
|
self.discem_QR(img)#r任务9
|
||||||
|
# Flag_qr = False
|
||||||
|
elif Flag_light: # 红绿灯
|
||||||
|
print("识别红绿灯")
|
||||||
|
self.discem_light()
|
||||||
|
# Flag_light = False
|
||||||
|
|
||||||
|
lcd.display(img) # 在LCD显示
|
||||||
|
|
||||||
|
# 初始化摄像头阈值
|
||||||
|
def canera_init(self):
|
||||||
|
# 摄像头模块初始化
|
||||||
|
sensor.reset() # 复位和初始化摄像头
|
||||||
|
sensor.set_pixformat(sensor.RGB565) # 设置像素格式为彩色 RGB565
|
||||||
|
# sensor.set_pixformat(sensor.GRAYSCALE) # 设置像素格式为灰色
|
||||||
|
sensor.set_framesize(sensor.QVGA) # 设置帧大小为QVGA(320×240)
|
||||||
|
sensor.set_vflip(1) # 后置模式
|
||||||
|
sensor.skip_frames(30) # # 跳过前30帧
|
||||||
|
|
||||||
|
# 调整摄像头阈值
|
||||||
|
def canera_ash(self):
|
||||||
|
sensor.reset() # 复位和初始化摄像头
|
||||||
|
sensor.set_vflip(1) # 将摄像头设置成后置方式(所见即所得)
|
||||||
|
sensor.set_pixformat(sensor.RGB565) # 设置像素格式为彩色 RGB565
|
||||||
|
# sensor.set_pixformat(sensor.GRAYSCALE) # 设置像素格式为灰色
|
||||||
|
sensor.set_framesize(sensor.QVGA) # 设置帧大小为 QVGA (320x240)
|
||||||
|
# sensor.set_windowing((224, 224)) # 设置摄像头的窗口大小
|
||||||
|
sensor.set_auto_gain(False)
|
||||||
|
sensor.set_auto_whitebal(False)
|
||||||
|
sensor.set_auto_gain(0, 0)
|
||||||
|
sensor.skip_frames(time=200) # 等待设置生效
|
||||||
|
|
||||||
|
# 红绿灯
|
||||||
|
def discem_light(self):
|
||||||
|
# 初始化
|
||||||
|
results = []
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
while time.time() - start_time < 3: # 循环持续三秒
|
||||||
|
img = sensor.snapshot()
|
||||||
|
max_blob = None
|
||||||
|
max_blob_size = 0
|
||||||
|
max_blob_color = ""
|
||||||
|
max_blob_color_index = 0
|
||||||
|
|
||||||
|
# 定义右半部分的区域
|
||||||
|
right_half_roi = (self.IMG_WIDTH // 2, 0, self.IMG_WIDTH // 2, self.IMG_HEIGHT)
|
||||||
|
|
||||||
|
for index, threshold in enumerate(self.thresholds):
|
||||||
|
# 只在右半部分查找每种颜色的色块
|
||||||
|
blobs = img.find_blobs([threshold[:-1]], roi=right_half_roi, pixels_threshold=200, area_threshold=200, merge=True)
|
||||||
|
if blobs:
|
||||||
|
# 找到最大的色块
|
||||||
|
blob = max(blobs, key=lambda b: b.pixels())
|
||||||
|
if blob.pixels() > max_blob_size:
|
||||||
|
max_blob = blob
|
||||||
|
max_blob_size = blob.pixels()
|
||||||
|
max_blob_color = threshold[-1] # 颜色标签
|
||||||
|
max_blob_color_index = index
|
||||||
|
|
||||||
|
# 绘制最大色块的矩形和中心十字,并输出颜色
|
||||||
|
if max_blob:
|
||||||
|
results.append((max_blob_color, max_blob_color_index)) # 存储识别结果
|
||||||
|
img.draw_rectangle(max_blob[0:4])
|
||||||
|
img.draw_cross(max_blob[5], max_blob[6])
|
||||||
|
print("最大色块的颜色是:", max_blob_color, max_blob_color_index)
|
||||||
|
|
||||||
|
# 初始化计数字典
|
||||||
|
color_count = {}
|
||||||
|
|
||||||
|
for color, index in results:
|
||||||
|
if (color, index) in color_count:
|
||||||
|
color_count[(color, index)] += 1
|
||||||
|
else:
|
||||||
|
color_count[(color, index)] = 1
|
||||||
|
|
||||||
|
# 找出出现次数最多的颜色和下标
|
||||||
|
if color_count:
|
||||||
|
most_common_color, most_common_index = max(color_count, key=color_count.get)
|
||||||
|
print("出现最多的颜色:", most_common_color, "在下标位置:", most_common_index)
|
||||||
|
else:
|
||||||
|
most_common_index = 1
|
||||||
|
print("没有识别到有效的颜色块")
|
||||||
|
|
||||||
|
# 串口发送交通灯信息
|
||||||
|
self.uart.write(bytes([0x55]))
|
||||||
|
self.uart.write(bytes([0x02]))
|
||||||
|
self.uart.write(bytes([0x92]))
|
||||||
|
|
||||||
|
self.uart.write(bytes([0x03])) # 红绿灯
|
||||||
|
self.uart.write(bytes([most_common_index])) # 红绿灯结果
|
||||||
|
|
||||||
|
self.uart.write(bytes([0x03]))
|
||||||
|
self.uart.write(bytes([0x03]))
|
||||||
|
self.uart.write(bytes([0xbb]))
|
||||||
|
|
||||||
|
self.canera_init() # 恢复摄像头
|
||||||
|
|
||||||
|
# 二维码
|
||||||
|
def discem_QR(self, img):
|
||||||
|
# self.Tise_servo(10)
|
||||||
|
res_QR = img.find_qrcodes() # 寻找二维码
|
||||||
|
timeflag = 0
|
||||||
|
while timeflag < 39:
|
||||||
|
img = sensor.snapshot() # 获取图像
|
||||||
|
TS_QR = img.find_qrcodes() # 再次寻找二维码
|
||||||
|
for qr in TS_QR:
|
||||||
|
if all(qr.payload() != existing_qr.payload() for existing_qr in res_QR):
|
||||||
|
res_QR.append(qr)
|
||||||
|
timeflag += 1
|
||||||
|
time.sleep(0.001)
|
||||||
|
print(timeflag)
|
||||||
|
print(len(res_QR))
|
||||||
|
|
||||||
|
if len(res_QR): # 识别到
|
||||||
|
for res in range(len(res_QR)):
|
||||||
|
count = 0
|
||||||
|
if_end = 0x01
|
||||||
|
result = res_QR[res].payload()
|
||||||
|
|
||||||
|
|
||||||
|
print(result)
|
||||||
|
result = self.remove_non_ascii_chars(result) # 剔除ASCII码值外的数据
|
||||||
|
print(result)
|
||||||
|
|
||||||
|
if len(result) == 8: # 车牌
|
||||||
|
result = self.process_qrcode_one(result)
|
||||||
|
count = 1
|
||||||
|
|
||||||
|
else: # 公式
|
||||||
|
count = 2
|
||||||
|
|
||||||
|
if res == len(res_QR) - 1:
|
||||||
|
if_end = 0x00
|
||||||
|
|
||||||
|
print("count: ", count)
|
||||||
|
print("if_end: ", if_end)
|
||||||
|
|
||||||
|
|
||||||
|
# 串口发送二维码信息
|
||||||
|
self.uart.write(bytes([0x55]))
|
||||||
|
self.uart.write(bytes([0x02]))
|
||||||
|
self.uart.write(bytes([0x92]))
|
||||||
|
|
||||||
|
self.uart.write(bytes([0x01]))
|
||||||
|
print(len(result))
|
||||||
|
self.uart.write(bytes([count]))
|
||||||
|
if count==2:
|
||||||
|
for qr_data in result:
|
||||||
|
self.uart.write(bytes([ord(qr_data)]))
|
||||||
|
else:
|
||||||
|
for qr_data in result:
|
||||||
|
self.uart.write(bytes([ord(qr_data)]))
|
||||||
|
|
||||||
|
self.uart.write(bytes([0xbb]))
|
||||||
|
|
||||||
|
# time.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
else: # 未识别
|
||||||
|
self.uart.write(bytes([0x55]))
|
||||||
|
self.uart.write(bytes([0x02]))
|
||||||
|
self.uart.write(bytes([0x92]))
|
||||||
|
|
||||||
|
self.uart.write(bytes([0x06]))
|
||||||
|
|
||||||
|
self.uart.write(bytes([0xff]))
|
||||||
|
self.uart.write(bytes([0x00]))
|
||||||
|
self.uart.write(bytes([0x00]))
|
||||||
|
self.uart.write(bytes([0xbb]))
|
||||||
|
|
||||||
|
# time.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def process_qrcode_one(self,data):
|
||||||
|
result = []
|
||||||
|
for char in data:
|
||||||
|
if char.isalpha() and char.isupper():
|
||||||
|
result.append(char)
|
||||||
|
elif char.isdigit():
|
||||||
|
result.append(char)
|
||||||
|
if len(result) >= 8:
|
||||||
|
break
|
||||||
|
return ''.join(result)
|
||||||
|
|
||||||
|
|
||||||
|
def jiajin(self, txt):
|
||||||
|
index_j = -1
|
||||||
|
index_jian = -1
|
||||||
|
result = ""
|
||||||
|
for i in range(len(txt)):
|
||||||
|
if txt[i] == "+":
|
||||||
|
index_j = 1
|
||||||
|
elif txt[i] == "-":
|
||||||
|
index_jian = 2
|
||||||
|
else:
|
||||||
|
result += txt[i]
|
||||||
|
|
||||||
|
print(result)
|
||||||
|
print(index_j < index_jian)
|
||||||
|
# 若是加,将对整个字符串进行左移两位
|
||||||
|
if index_j < index_jian:
|
||||||
|
end = result[2:] + result[:2]
|
||||||
|
|
||||||
|
# 若是-,将对字符串进行右移三位
|
||||||
|
else:
|
||||||
|
end = result[-3:] + result[:-3]
|
||||||
|
|
||||||
|
return end
|
||||||
|
|
||||||
|
# 移除ASCII码值外的数据
|
||||||
|
def remove_non_ascii_chars(self, text):
|
||||||
|
"""移除字符串中的非ASCII字符,并返回新的字符串"""
|
||||||
|
# 只保留ASCII字符
|
||||||
|
return ''.join(char for char in text if ord(char) < 128)
|
||||||
|
|
||||||
|
# 循迹
|
||||||
|
def tracking(self, img):
|
||||||
|
print("循迹")
|
||||||
|
roi_blobs_result = self.find_blobs_in_rois(img)
|
||||||
|
down_center, state_crossing, deflection_angle = self.state_deflection_angle(roi_blobs_result)
|
||||||
|
|
||||||
|
dsd = self.data_format_wrapper(down_center, state_crossing, deflection_angle)
|
||||||
|
self.UsartSend(dsd)
|
||||||
|
print("下发指令:", dsd)
|
||||||
|
|
||||||
|
# 寻找色块 在ROIS中寻找色块,获取ROI中色块的中心区域与是否有色块的信息
|
||||||
|
def find_blobs_in_rois(self, img):
|
||||||
|
canvas = img.copy()
|
||||||
|
roi_blobs_result = {} # 在各个ROI中寻找色块的结果记录
|
||||||
|
for roi_direct in self.ROIS.keys():
|
||||||
|
roi_blobs_result[roi_direct] = {
|
||||||
|
'cx': 0,
|
||||||
|
'cy': 0,
|
||||||
|
'w': 0,
|
||||||
|
'blob_flag': False
|
||||||
|
}
|
||||||
|
for roi_direct, roi in self.ROIS.items():
|
||||||
|
blobs = canvas.find_blobs(self.LINE_COLOR_THRESHOLD, roi=roi, merge=True)
|
||||||
|
if len(blobs) != 0:
|
||||||
|
largest_blob = max(blobs, key=lambda b: b.pixels())
|
||||||
|
if largest_blob.area() > 1000:
|
||||||
|
roi_blobs_result[roi_direct]['cx'] = largest_blob.cy()
|
||||||
|
roi_blobs_result[roi_direct]['cy'] = largest_blob.cx()
|
||||||
|
roi_blobs_result[roi_direct]['w'] = largest_blob.h()
|
||||||
|
roi_blobs_result[roi_direct]['blob_flag'] = True
|
||||||
|
x, y, width, height = largest_blob[:4]
|
||||||
|
img.draw_rectangle((x, y, width, height), color=(255))
|
||||||
|
else:
|
||||||
|
# blobs=canvas.find_blobs(LINE_COLOR_THRESHOLD, roi=roi, merge=True, pixels_area=10)
|
||||||
|
continue
|
||||||
|
blobs_baise = canvas.find_blobs(self.LINE_COLOR_BAISE, roi=(0, 0, 60, 240),
|
||||||
|
merge=True) # 车载摄像头屏幕下部找白色#宽度200修改为240#用途寻卡,y示例中40修改为0
|
||||||
|
blobs_dixing = canvas.find_blobs(self.LINE_COLOR_BAISE, roi=(125, 0, 60, 240),
|
||||||
|
merge=True) # 车载摄像头屏幕中间找白色 #宽度200修改为240,y示例中40修改为0
|
||||||
|
blobs_zuo = canvas.find_blobs(self.LINE_COLOR_THRESHOLD, roi=(0, 0, 180, 50), merge=True) # 车载摄像头屏幕左下部找黑色
|
||||||
|
blobs_you = canvas.find_blobs(self.LINE_COLOR_THRESHOLD, roi=(0, 190, 180, 50), merge=True) # 车载摄像头屏幕右下部找黑色
|
||||||
|
if len(blobs_baise) != 0:
|
||||||
|
print("*********进入循环第1步*******")
|
||||||
|
largest_baise = max(blobs_baise, key=lambda b: b.pixels())
|
||||||
|
wx, wy, wwidth, wwheight = largest_baise[:4]
|
||||||
|
arc = wwidth * wwheight
|
||||||
|
if arc >= 11000:
|
||||||
|
print("*********进入循环第2步*******")
|
||||||
|
if len(blobs_dixing) != 0:
|
||||||
|
print("*********进入循环第3步*******")
|
||||||
|
largest_dixing = max(blobs_dixing, key=lambda b: b.pixels())
|
||||||
|
wx, wy, wwidth, wwheight = largest_dixing[:4]
|
||||||
|
arc = wwidth * wwheight
|
||||||
|
if arc >= 11000:
|
||||||
|
print('kapian') # 首先中部区域识别到白色进入判断地形还是卡片,接着判断下部,如果为白色判断为卡片。
|
||||||
|
self.UsartSend(self.data_format_wrapper(0, 1, 0)) # 地形停止命令
|
||||||
|
if len(blobs_zuo) != 0 and len(blobs_you) != 0:
|
||||||
|
print("ka十字路口")
|
||||||
|
self.UsartSend(self.data_format_wrapper(1, 1, 0)) # 地形停止命令
|
||||||
|
else:
|
||||||
|
print('dixing')
|
||||||
|
print(roi_blobs_result) # 返回的是黑色色块,各区域中心位置
|
||||||
|
self.UsartSend(self.data_format_wrapper(0, 1, 0)) # 地形停止命令
|
||||||
|
return roi_blobs_result # 返回的是黑色色块,各区域中心位置
|
||||||
|
|
||||||
|
# 计算偏转状态值
|
||||||
|
def state_deflection_angle(self, roi_blobs_result):
|
||||||
|
'''
|
||||||
|
说明:偏转状态值返回
|
||||||
|
'''
|
||||||
|
# ROI区域权重值
|
||||||
|
# ROIS_WEIGHT = [1, 1, 1, 1]
|
||||||
|
ROIS_WEIGHT = [1, 0, 0, 1]
|
||||||
|
state_crossing = False
|
||||||
|
deflection_angle = 0 # 偏转角
|
||||||
|
down_center = 0 # 中下值
|
||||||
|
center_num = 0 # 中间值
|
||||||
|
# print(roi_blobs_result)
|
||||||
|
|
||||||
|
# 偏转值计算,ROI中心区域X值
|
||||||
|
centroid_sum = roi_blobs_result['up']['cx'] * ROIS_WEIGHT[0] + roi_blobs_result['middle_up']['cx'] * \
|
||||||
|
ROIS_WEIGHT[1] \
|
||||||
|
+ roi_blobs_result['middle_down']['cx'] * ROIS_WEIGHT[2] + roi_blobs_result['down']['cx'] * \
|
||||||
|
ROIS_WEIGHT[3]
|
||||||
|
if roi_blobs_result['up']['blob_flag']:
|
||||||
|
center_num += ROIS_WEIGHT[0]
|
||||||
|
if roi_blobs_result['middle_up']['blob_flag']:
|
||||||
|
center_num += ROIS_WEIGHT[1]
|
||||||
|
if roi_blobs_result['middle_down']['blob_flag']:
|
||||||
|
center_num += ROIS_WEIGHT[2]
|
||||||
|
if roi_blobs_result['down']['blob_flag']:
|
||||||
|
center_num += ROIS_WEIGHT[3]
|
||||||
|
|
||||||
|
center_pos = centroid_sum / (ROIS_WEIGHT[0] + ROIS_WEIGHT[1] + ROIS_WEIGHT[2] + ROIS_WEIGHT[3])
|
||||||
|
deflection_angle = (self.IMG_WIDTH / 2) - center_pos
|
||||||
|
|
||||||
|
# 判断两侧ROI区域检测到黑色线
|
||||||
|
if roi_blobs_result['left']['blob_flag'] and roi_blobs_result['right']['blob_flag']:
|
||||||
|
# 判断两侧ROI区域检测到黑色线处于图像下方1/3处
|
||||||
|
if roi_blobs_result['left']['cy'] <= ((self.IMG_HEIGHT / 3)) or roi_blobs_result['right']['cy'] <= (
|
||||||
|
(self.IMG_HEIGHT / 3)):
|
||||||
|
# 当最下方ROI区域的黑线宽度大于140像素(检测到路口)
|
||||||
|
if roi_blobs_result['down']['w'] > 140:
|
||||||
|
down_center = 1 # 自行修改处 判断识别到十字路口
|
||||||
|
print("输出了十字路口标识")
|
||||||
|
|
||||||
|
return down_center, state_crossing, deflection_angle
|
||||||
|
|
||||||
|
# 控制舵机
|
||||||
|
def Tise_servo(self, angle):
|
||||||
|
# 判断舵机控制方向
|
||||||
|
if angle < 0:
|
||||||
|
# 限制舵机角度,防止过大损坏舵机
|
||||||
|
if angle > 80:
|
||||||
|
angle = 80
|
||||||
|
angle = -angle
|
||||||
|
elif angle > 0:
|
||||||
|
# 限制舵机角度,防止过大损坏舵机
|
||||||
|
if angle > 35:
|
||||||
|
angle = 35
|
||||||
|
angle = angle
|
||||||
|
# PWM通过定时器配置,接到IO17引脚
|
||||||
|
tim_pwm = Timer(Timer.TIMER0, Timer.CHANNEL0, mode=Timer.MODE_PWM)
|
||||||
|
S1 = PWM(tim_pwm, freq=50, duty=0, pin=17)
|
||||||
|
S1.duty((angle + 90) / 180 * 10 + 2.5)
|
||||||
|
|
||||||
|
def Servo(self, data):
|
||||||
|
'''
|
||||||
|
功能:180度舵机:angle:-90至90 表示相应的角度
|
||||||
|
360连续旋转度舵机:angle:-90至90 旋转方向和速度值。
|
||||||
|
【duty】占空比值:0-100
|
||||||
|
'''
|
||||||
|
angle = data[5]
|
||||||
|
# 判断舵机控制方向
|
||||||
|
if data[4] == ord('-'):
|
||||||
|
# 限制舵机角度,防止过大损坏舵机
|
||||||
|
if angle > 80:
|
||||||
|
angle = 80
|
||||||
|
angle = -angle
|
||||||
|
elif data[4] == ord('+'):
|
||||||
|
# 限制舵机角度,防止过大损坏舵机
|
||||||
|
if angle > 35:
|
||||||
|
angle = 35
|
||||||
|
angle = angle
|
||||||
|
# PWM通过定时器配置,接到IO17引脚
|
||||||
|
tim_pwm = Timer(Timer.TIMER0, Timer.CHANNEL0, mode=Timer.MODE_PWM)
|
||||||
|
S1 = PWM(tim_pwm, freq=50, duty=0, pin=17)
|
||||||
|
S1.duty((angle + 90) / 180 * 10 + 2.5)
|
||||||
|
|
||||||
|
# 检验校验和
|
||||||
|
def verify_checksum(self, data):
|
||||||
|
if len(data) != 8:
|
||||||
|
return False
|
||||||
|
# 计算校验和:data[2]、data[3]、data[4] 和 data[5] 的和,然后对256取模
|
||||||
|
calculated_checksum = (data[2] + data[3] + data[4] + data[5]) % 256
|
||||||
|
# 比较计算出的校验和与data[6]是否相等
|
||||||
|
if calculated_checksum == data[6]:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# 串口发送
|
||||||
|
def UsartSend(self, str_data):
|
||||||
|
'''
|
||||||
|
串口发送函数
|
||||||
|
'''
|
||||||
|
print(str_data)
|
||||||
|
self.uart.write(str_data)
|
||||||
|
|
||||||
|
# 判断符号
|
||||||
|
def get_symbol(self, num):
|
||||||
|
'''
|
||||||
|
根据数值正负,返回数值对应的符号
|
||||||
|
正数: ‘+’, 负数‘-’ 主要为了方便C语言解析待符号的数值。
|
||||||
|
'''
|
||||||
|
print("num = ", num)
|
||||||
|
if num >= 0:
|
||||||
|
return ord('+')
|
||||||
|
else:
|
||||||
|
return ord('-')
|
||||||
|
|
||||||
|
# 封装数据
|
||||||
|
def data_format_wrapper(self, down_center, state_crossing, deflection_angle):
|
||||||
|
'''
|
||||||
|
根据通信协议封装循迹数据
|
||||||
|
TODO 重新编写通信协议 与配套C解析代码
|
||||||
|
'''
|
||||||
|
send_data = [
|
||||||
|
0x55,
|
||||||
|
0x02,
|
||||||
|
0x91,
|
||||||
|
down_center, # 底部色块中心是否在中点附近#底部色块十字路口
|
||||||
|
1 if state_crossing else 0, # 是否越障 无用
|
||||||
|
self.get_symbol(deflection_angle), # 偏航角符号 print输出是43 + 45- +向左转调整 -向右转调整
|
||||||
|
abs(int(deflection_angle)), # 偏航角
|
||||||
|
0xbb]
|
||||||
|
return bytes(send_data)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def find_bracket_positions(input_str, startstr="<", endstr=">"):
|
||||||
|
"""
|
||||||
|
查找字符串中所有成对的指定括号的起始和结束位置。
|
||||||
|
|
||||||
|
功能描述:
|
||||||
|
- 遍历字符串,查找使用指定起始括号 (startstr) 和结束括号 (endstr) 的匹配对。
|
||||||
|
- 每找到一对匹配括号,记录其起始位置和结束位置。
|
||||||
|
- 返回包含所有匹配括号对的起始和结束位置的列表。
|
||||||
|
|
||||||
|
实现逻辑:
|
||||||
|
1. 使用栈记录每个起始括号 (startstr) 的位置索引。
|
||||||
|
2. 每当遇到一个结束括号 (endstr) 时,从栈中弹出最近的起始括号位置,与当前索引形成一个匹配对。
|
||||||
|
3. 将匹配的索引对存入结果列表。
|
||||||
|
|
||||||
|
边界条件:
|
||||||
|
- 如果字符串中没有指定的括号,返回空列表。
|
||||||
|
- 如果有未闭合的起始括号,不会记录未匹配的起始位置。
|
||||||
|
|
||||||
|
时间复杂度:
|
||||||
|
- O(n),其中 n 是字符串的长度。
|
||||||
|
|
||||||
|
:param input_str: 输入字符串,其中可能包含成对的指定括号。
|
||||||
|
:param startstr: 指定的起始括号(如 '<' 或 '{'),默认为 '<'。
|
||||||
|
:param endstr: 指定的结束括号(如 '>' 或 '}'),默认为 '>'。
|
||||||
|
:return: 包含所有匹配括号对的起始和结束位置的列表,每个匹配对用 [start, end] 表示。
|
||||||
|
"""
|
||||||
|
result = []
|
||||||
|
stack = []
|
||||||
|
|
||||||
|
# 遍历字符串
|
||||||
|
for i, char in enumerate(input_str):
|
||||||
|
if char == startstr:
|
||||||
|
# 如果是起始括号,记录其位置
|
||||||
|
stack.append(i)
|
||||||
|
elif char == endstr:
|
||||||
|
# 如果是结束括号,检查栈是否为空
|
||||||
|
if stack:
|
||||||
|
# 从栈中弹出起始括号位置并记录配对
|
||||||
|
start = stack.pop()
|
||||||
|
result.append([start, i])
|
||||||
|
|
||||||
|
# 返回所有成对括号的位置
|
||||||
|
return result
|
||||||
|
def extract_valid_data(input_str):
|
||||||
|
print('输入'+input_str)
|
||||||
|
u=[
|
||||||
|
is_uppercase_letter,
|
||||||
|
is_digit,
|
||||||
|
is_digit,
|
||||||
|
is_digit,
|
||||||
|
is_uppercase_letter,
|
||||||
|
is_digit,
|
||||||
|
]
|
||||||
|
i=0
|
||||||
|
result=''
|
||||||
|
for s in input_str:
|
||||||
|
if i >= len(u):
|
||||||
|
break
|
||||||
|
if u[i](s):
|
||||||
|
i+=1
|
||||||
|
result+=s
|
||||||
|
result = result.strip()
|
||||||
|
if len(result) == len(u):
|
||||||
|
print("输出"+result)
|
||||||
|
return result
|
||||||
|
print("输出None")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def a(str,i):
|
||||||
|
if i==0:
|
||||||
|
return is_uppercase_letter(str)
|
||||||
|
elif i==1:
|
||||||
|
return is_digit(str)
|
||||||
|
elif i==2:
|
||||||
|
return is_digit(str)
|
||||||
|
elif i==3:
|
||||||
|
return is_digit(str)
|
||||||
|
elif i==4:
|
||||||
|
return is_uppercase_letter(str)
|
||||||
|
elif i==5:
|
||||||
|
return is_digit(str)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def is_uppercase_letter(char):
|
||||||
|
return 'A' <= char <= 'Z'
|
||||||
|
|
||||||
|
def is_digit(char):
|
||||||
|
return '0' <= char <= '9'
|
||||||
|
# 运行程序
|
||||||
|
myMain = Mainlen()
|
||||||
|
myMain.startMain()
|
@ -32,8 +32,8 @@ def get_color_threshold(image_path, color):
|
|||||||
return mask
|
return mask
|
||||||
|
|
||||||
# 示例用法
|
# 示例用法
|
||||||
image_path = "data/congche_hld_3.jpg" # 替换为您的图像路径
|
image_path = "../first_frame1.jpg" # 替换为您的图像路径
|
||||||
color = "green" # 替换为您想要获取阈值的颜色
|
color = "red" # 替换为您想要获取阈值的颜色
|
||||||
threshold_mask = get_color_threshold(image_path, color)
|
threshold_mask = get_color_threshold(image_path, color)
|
||||||
|
|
||||||
# 显示阈值图像
|
# 显示阈值图像
|
||||||
|
130
图片相似度.py
Normal file
130
图片相似度.py
Normal file
@ -0,0 +1,130 @@
|
|||||||
|
import os
|
||||||
|
from skimage.metrics import structural_similarity as ssim
|
||||||
|
import cv2
|
||||||
|
import concurrent.futures
|
||||||
|
|
||||||
|
|
||||||
|
def split_file_name(file_name):
|
||||||
|
"""
|
||||||
|
将文件名拆分为前缀和后缀
|
||||||
|
:param file_name: 文件名
|
||||||
|
:return: 前缀和后缀
|
||||||
|
"""
|
||||||
|
file_name = os.path.basename(file_name)
|
||||||
|
file_name = file_name.split('.')[0]
|
||||||
|
file_names = file_name.split('_')
|
||||||
|
return file_names[0] + '_' + file_names[1], file_names[2]
|
||||||
|
|
||||||
|
|
||||||
|
def get_sorted_files_group(directory):
|
||||||
|
try:
|
||||||
|
# 检查目录是否存在
|
||||||
|
if not os.path.exists(directory):
|
||||||
|
print(f"目录 {directory} 不存在!")
|
||||||
|
return {}
|
||||||
|
|
||||||
|
# 获取目录下的所有文件及其完整路径
|
||||||
|
files = [
|
||||||
|
os.path.join(directory, f) for f in os.listdir(directory)
|
||||||
|
if os.path.isfile(os.path.join(directory, f))
|
||||||
|
]
|
||||||
|
|
||||||
|
files_group = {}
|
||||||
|
for file in files:
|
||||||
|
prefix, suffix = split_file_name(file)
|
||||||
|
files_group.setdefault(prefix, []).append(file)
|
||||||
|
return files_group
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"发生错误: {e}")
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
def compare_images(image1_path, image2_path):
|
||||||
|
try:
|
||||||
|
# 读取两张图片
|
||||||
|
img1 = cv2.imread(image1_path, cv2.IMREAD_GRAYSCALE)
|
||||||
|
img2 = cv2.imread(image2_path, cv2.IMREAD_GRAYSCALE)
|
||||||
|
|
||||||
|
# 检查图片是否成功读取
|
||||||
|
if img1 is None or img2 is None:
|
||||||
|
print(f"无法读取图片: {image1_path} 或 {image2_path}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 调整两张图片为相同大小
|
||||||
|
img1 = cv2.resize(
|
||||||
|
img1, (min(img1.shape[1], img2.shape[1]), min(img1.shape[0], img2.shape[0]))
|
||||||
|
)
|
||||||
|
img2 = cv2.resize(img2, (img1.shape[1], img1.shape[0]))
|
||||||
|
|
||||||
|
# 计算相似度
|
||||||
|
similarity, _ = ssim(img1, img2, full=True)
|
||||||
|
return similarity
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"比较图片时发生错误: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def del_group(group):
|
||||||
|
"""
|
||||||
|
返回需要删除的图片列表,不在函数里直接执行删除操作。
|
||||||
|
"""
|
||||||
|
del_list = []
|
||||||
|
compared_pairs = set() # 用于记录已经比较过的图片对
|
||||||
|
|
||||||
|
for i in range(len(group)):
|
||||||
|
for j in range(len(group)):
|
||||||
|
if i != j and (i, j) not in compared_pairs and (j, i) not in compared_pairs:
|
||||||
|
# 标记为已比较
|
||||||
|
compared_pairs.add((i, j))
|
||||||
|
# group[i] 或 group[j] 已经在 del_list 中就没必要比较了
|
||||||
|
if group[i] not in del_list and group[j] not in del_list:
|
||||||
|
similarity = compare_images(group[i], group[j])
|
||||||
|
if similarity is not None and similarity > 0.965:
|
||||||
|
# 相似度大于 0.98,判定为重复
|
||||||
|
# 保留 group[j],删除 group[i],也可以根据需要调换顺序
|
||||||
|
del_list.append(group[i])
|
||||||
|
|
||||||
|
return del_list
|
||||||
|
|
||||||
|
|
||||||
|
def process_group(prefix, file_list):
|
||||||
|
"""
|
||||||
|
子进程运行函数:
|
||||||
|
1. 对指定 group 进行去重计算
|
||||||
|
2. 返回 (prefix, del_list)
|
||||||
|
"""
|
||||||
|
print(f'开始处理 {prefix}, 共 {len(file_list)} 张图片')
|
||||||
|
del_list = del_group(file_list)
|
||||||
|
print(f'需要删除 {len(del_list)}/{len(file_list)} 张图片')
|
||||||
|
return prefix, del_list
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
directory = r"C:\Users\10561\Desktop\frames"
|
||||||
|
files_group = get_sorted_files_group(directory)
|
||||||
|
# for e in files_group:
|
||||||
|
# print(e, len(files_group[e]))
|
||||||
|
# exit()
|
||||||
|
# 使用多进程来处理每一个 group
|
||||||
|
results = []
|
||||||
|
with concurrent.futures.ProcessPoolExecutor() as executor:
|
||||||
|
future_to_prefix = {}
|
||||||
|
for prefix, file_list in files_group.items():
|
||||||
|
future = executor.submit(process_group, prefix, file_list)
|
||||||
|
future_to_prefix[future] = prefix
|
||||||
|
|
||||||
|
for future in concurrent.futures.as_completed(future_to_prefix):
|
||||||
|
prefix = future_to_prefix[future]
|
||||||
|
try:
|
||||||
|
ret_prefix, del_list = future.result()
|
||||||
|
# 在主进程里进行删除
|
||||||
|
for e in del_list:
|
||||||
|
if os.path.exists(e):
|
||||||
|
# print(f'删除 {e}')
|
||||||
|
os.remove(e)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"{prefix} 处理时发生错误: {e}")
|
||||||
|
|
||||||
|
print("所有任务已完成!")
|
51
抽帧.py
Normal file
51
抽帧.py
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
import cv2
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def extract_frames(video_path, output_dir, interval=0.5,output_name='frame'):
|
||||||
|
# 打开视频文件
|
||||||
|
cap = cv2.VideoCapture(video_path)
|
||||||
|
|
||||||
|
if not cap.isOpened():
|
||||||
|
print(f"无法打开视频文件: {video_path}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 获取视频帧率和总时长
|
||||||
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
||||||
|
frame_interval = int(fps * interval) # 每隔多少帧保存一帧
|
||||||
|
print(fps)
|
||||||
|
# 创建输出目录
|
||||||
|
os.makedirs(output_dir, exist_ok=True)
|
||||||
|
|
||||||
|
frame_count = 0
|
||||||
|
saved_count = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
ret, frame = cap.read()
|
||||||
|
if not ret:
|
||||||
|
break
|
||||||
|
|
||||||
|
# 如果当前帧是需要保存的帧
|
||||||
|
if frame_count % frame_interval == 0:
|
||||||
|
output_image_path = os.path.join(output_dir, f"{output_name}_{saved_count}.jpg")
|
||||||
|
cv2.imwrite(output_image_path, frame)
|
||||||
|
print(f"保存帧到: {output_image_path}")
|
||||||
|
saved_count += 1
|
||||||
|
|
||||||
|
frame_count += 1
|
||||||
|
|
||||||
|
# 释放视频对象
|
||||||
|
cap.release()
|
||||||
|
print(f"共保存了 {saved_count} 帧.")
|
||||||
|
|
||||||
|
video_dir=r'C:\Users\10561\Desktop\bcar红绿灯\1'
|
||||||
|
output_directory = r"C:\Users\10561\Desktop\frames" # 替换为你希望保存的目录路径
|
||||||
|
|
||||||
|
for file in os.listdir(video_dir):
|
||||||
|
if file.endswith('.mp4'):
|
||||||
|
video_file=os.path.join(video_dir,file)
|
||||||
|
extract_frames(video_file, output_directory, interval=0.4,output_name=file.split('.')[0])
|
||||||
|
|
||||||
|
# 示例调用
|
||||||
|
# video_file = r"C:\Users\10561\Desktop\7.mp4" # 替换为你的视频文件路径
|
||||||
|
# extract_frames(video_file, output_directory, interval=0.5)
|
11
日志处理文件.py
11
日志处理文件.py
@ -10,7 +10,7 @@ rcParams['font.sans-serif'] = ['SimHei'] # 或者 ['Microsoft YaHei']
|
|||||||
rcParams['axes.unicode_minus'] = False # 解决负号显示问题
|
rcParams['axes.unicode_minus'] = False # 解决负号显示问题
|
||||||
|
|
||||||
# 指定 JSON 文件的路径
|
# 指定 JSON 文件的路径
|
||||||
file_path = r"C:\Users\10561\Desktop\2025-01-02_应用日志.json"
|
file_path = r"C:\Users\10561\Desktop\2025-01-06_应用日志.json"
|
||||||
|
|
||||||
# 打开并读取 JSON 文件
|
# 打开并读取 JSON 文件
|
||||||
with open(file_path, 'r', encoding='utf-8') as file:
|
with open(file_path, 'r', encoding='utf-8') as file:
|
||||||
@ -21,7 +21,7 @@ log_ones = []
|
|||||||
start_idx = -1
|
start_idx = -1
|
||||||
for idx, one in enumerate(data):
|
for idx, one in enumerate(data):
|
||||||
if 'Acar' == one['设备'] and '2' == one['类型']:
|
if 'Acar' == one['设备'] and '2' == one['类型']:
|
||||||
print(one)
|
# print(one)
|
||||||
if 1 == one['位置'] and 1 == one['方向']:
|
if 1 == one['位置'] and 1 == one['方向']:
|
||||||
start_idx = idx
|
start_idx = idx
|
||||||
continue
|
continue
|
||||||
@ -30,12 +30,13 @@ for idx, one in enumerate(data):
|
|||||||
continue
|
continue
|
||||||
log_ones.append(data[start_idx:idx+1])
|
log_ones.append(data[start_idx:idx+1])
|
||||||
start_idx = -1
|
start_idx = -1
|
||||||
|
print(idx)
|
||||||
# 筛选出只有 "Acar" 的日志片段
|
# 筛选出只有 "Acar" 的日志片段
|
||||||
log_ones_new = []
|
log_ones_new = []
|
||||||
for one in log_ones:
|
for one in log_ones:
|
||||||
one_only_car = [one_one for one_one in one if 'Acar' == one_one['设备']]
|
one_only_car = [one_one for one_one in one if 'Acar' == one_one['设备']]
|
||||||
log_ones_new.append(one_only_car)
|
log_ones_new.append(one_only_car)
|
||||||
|
[print(o) for o in log_ones_new]
|
||||||
|
|
||||||
# 计算时间差
|
# 计算时间差
|
||||||
time_diffs = []
|
time_diffs = []
|
||||||
@ -45,10 +46,10 @@ for one_one in log_ones_new:
|
|||||||
df['时间差'] = df['时间戳'].diff()
|
df['时间差'] = df['时间戳'].diff()
|
||||||
time_diff = df[['位置', '方向', '时间差']].dropna().reset_index(drop=True)
|
time_diff = df[['位置', '方向', '时间差']].dropna().reset_index(drop=True)
|
||||||
time_diffs.append(time_diff)
|
time_diffs.append(time_diff)
|
||||||
print(time_diffs)
|
# print(time_diffs)
|
||||||
# 找出行数最多的时间差数据集
|
# 找出行数最多的时间差数据集
|
||||||
max_len_dataset = max(time_diffs, key=len) # 找到行数最多的 DataFrame
|
max_len_dataset = max(time_diffs, key=len) # 找到行数最多的 DataFrame
|
||||||
print(max_len_dataset)
|
# print(max_len_dataset)
|
||||||
all_x_labels = max_len_dataset['位置'].astype(str) + '-' + max_len_dataset['方向'].astype(str) # 提取 X 轴标签
|
all_x_labels = max_len_dataset['位置'].astype(str) + '-' + max_len_dataset['方向'].astype(str) # 提取 X 轴标签
|
||||||
# all_x_labels=pd.DataFrame({'位置-方向': all_x_labels})
|
# all_x_labels=pd.DataFrame({'位置-方向': all_x_labels})
|
||||||
# 标准化所有数据集,缺少的补 0
|
# 标准化所有数据集,缺少的补 0
|
||||||
|
Loading…
Reference in New Issue
Block a user